#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-06
# @Filename: parser.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import functools
import inspect
import json
import re
import shlex
import time
from typing import Any, List, TypeVar
import click
from click.decorators import group, pass_obj
from unclick.core import command_to_json
from sdsstools.logger import SDSSLogger
from clu.command import Command
from .. import actor
__all__ = [
"CluCommand",
"CluGroup",
"command_parser",
"ClickParser",
"timeout",
"unique",
"get_running_tasks",
"cancel_command",
"get_current_command_name",
]
def coroutine(fn):
"""Create a coroutine. Avoids deprecation of asyncio.coroutine in 3.10."""
if inspect.iscoroutinefunction(fn):
return fn
@functools.wraps(fn)
async def _wrapper(*args, **kwargs):
return fn(*args, **kwargs)
return _wrapper
[docs]
def get_running_tasks(cmd_name) -> list[asyncio.Task] | None:
"""Returns the list of tasks for a given command name, sorted by start date."""
all_tasks = [(t, getattr(t, "_command_name", None)) for t in asyncio.all_tasks()]
# Sort by date but exclude potential tasks without our added _date
matching = [t[0] for t in all_tasks if t[1] == cmd_name]
matching_dated = [(m, getattr(m, "_date")) for m in matching if hasattr(m, "_date")]
if len(matching) == 0:
return None
return [m[0] for m in sorted(matching_dated, key=lambda x: x[1])]
[docs]
def cancel_command(
name: str | None = None,
ctx: click.Context | None = None,
ok_no_exists: bool = True,
keep_last: bool = False,
):
"""Cancels any running instance of a command callback.
Parameters
----------
name
The name of the command. If `None`, calls `.get_current_command_name`.
ctx
The click context. If `None`, uses the current context.
ok_no_exists
Exits silently if no command with that name is running.
keep_last
Does not cancel the last instance of the command. Useful when
a command wants to cancel other instances of itself, but not
itself.
Returns
-------
result
`True` if a command was cancelled.
"""
ctx = ctx or click.get_current_context()
name = name or get_current_command_name()
tasks = get_running_tasks(name)
if tasks and keep_last:
tasks = tasks[:-1]
if tasks is None or len(tasks) == 0:
if ok_no_exists:
return False
else:
raise ValueError(f"Cannot find any running command {name!r}.")
for task in tasks:
task.cancel()
return True
[docs]
def get_current_command_name():
"""Returns the name of the current click command.
Must be called from inside the command callback.
"""
ctx = click.get_current_context()
subcmd = ctx.invoked_subcommand or ""
return (ctx.command_path + " " + subcmd).replace(" ", "_")
[docs]
class CluCommand(click.Command):
"""Override :py:class:`click.Command` to pass the actor and command."""
def __init__(self, *args, context_settings=None, **kwargs):
# Unless told otherwise, set ignore_unknown_options=True to prevent
# negative numbers to be considered as options. See #40.
if context_settings is None or "ignore_unknown_options" not in context_settings:
context_settings = context_settings or {}
context_settings["ignore_unknown_options"] = True
self.cancellable = kwargs.pop("cancellable", False)
self.internal = kwargs.pop("internal", None)
if self.cancellable is True:
kwargs["params"].append(
click.Option(
["--stop"],
is_flag=True,
help="Stops the execution of the command.",
)
)
self.full_path = None
super().__init__(
*args,
context_settings=context_settings,
**kwargs,
)
[docs]
def done_callback(self, task, exception_handler=None):
"""Checks if the command task has been successfully done."""
ctx = task.ctx
command = ctx.obj["parser_args"][0]
if exception_handler and task.exception():
exception_handler(command, task.exception())
async def _schedule_callback(self, ctx, timeout=None):
"""Schedules the callback as a task with a timeout."""
parser_args = ctx.obj.get("parser_args", [])
command = parser_args[0] if len(parser_args) > 0 else None
if command is not None and self.internal is not None:
command.internal = self.internal
callback_task = asyncio.create_task(
ctx.invoke(self.callback, *parser_args, **ctx.params)
)
try:
await asyncio.wait_for(callback_task, timeout=timeout)
except asyncio.TimeoutError:
if command:
command.set_status(
command.status.TIMEDOUT,
f"Command timed out after {timeout} seconds.",
)
return False
except asyncio.CancelledError:
if command:
command.set_status(
command.status.CANCELLED,
"This command has been cancelled.",
)
return False
return True
[docs]
def invoke(self, ctx):
"""As :py:class:`click.Command.invoke` but passes the actor and command."""
if self.callback is not None:
with ctx:
loop = asyncio.get_event_loop()
ctx.obj["parser_args"][0].context = ctx
self.full_path = ctx.command_path.replace(" ", "_")
if self.cancellable and self._cancel_command(ctx):
return
# Makes sure the callback is a coroutine
if not asyncio.iscoroutinefunction(self.callback):
self.callback = coroutine(self.callback)
# Check to see if there is a timeout value in the callback.
# If so, schedules a task to be cancelled after timeout.
timeout = getattr(self.callback, "timeout", None)
log = ctx.obj.pop("log", None)
# Defines the done callback function.
exception_handler = ctx.obj.pop("exception_handler", None)
done_callback = functools.partial(
self.done_callback, exception_handler=exception_handler
)
# Launches callback scheduler and adds the done callback
ctx.task = loop.create_task( # type: ignore
self._schedule_callback(ctx, timeout=timeout)
)
ctx.task._command_name = self.full_path # type: ignore For PY<38
ctx.task._date = time.time() # type: ignore
ctx.task.add_done_callback(done_callback) # type: ignore
# Add some attributes to the task because it's
# what will be passed to done_callback
ctx.task.ctx = ctx # type: ignore
ctx.task.log = log # type: ignore
return ctx
def _cancel_command(self, ctx):
"""Stops a cancellable command."""
parser_args = ctx.obj.get("parser_args", [])
if len(parser_args) == 0: # The Command should always be there.
return
command = parser_args[0]
stopping = "stop" in ctx.params and ctx.params.pop("stop") is True
running_tasks = get_running_tasks(self.full_path)
if not stopping:
if running_tasks is not None:
command.fail(f"Another command {self.full_path} is already running.")
return True
else:
return False
if running_tasks is None:
command.fail(error=f"Cannot find running command {self.full_path}.")
return False
else:
# Cancel the oldest running one (although there should only be one)
running_tasks[0].cancel()
command.finish(text="Command has been scheduled for cancellation.")
return True
[docs]
class CluGroup(click.Group):
"""Override :py:class:`click.Group`.
Makes all child commands instances of `.CluCommand`.
"""
[docs]
def command(self, *args, **kwargs):
"""Override :py:class:`click.Group` to use `.CluCommand` by default."""
if "cls" in kwargs:
pass
else:
kwargs["cls"] = CluCommand
def decorator(f):
cmd = click.decorators.command(*args, **kwargs)(f)
self.add_command(cmd)
return cmd
return decorator
[docs]
def parse_args(self, ctx, args): # pragma: no cover
# Copy this method so that we can turn off the printing of the
# usage before ctx.exit()
if not args and self.no_args_is_help and not ctx.resilient_parsing:
ctx.exit()
rest = click.Command.parse_args(self, ctx, args)
if self.chain:
ctx.protected_args = rest
ctx.args = []
elif rest:
ctx.protected_args, ctx.args = rest[:1], rest[1:]
return ctx.args
[docs]
def group(self, *args, **kwargs):
"""Creates a new group inheriting from this class."""
if "cls" not in kwargs:
kwargs["cls"] = self.__class__
def decorator(f):
assert not asyncio.iscoroutinefunction(f), "groups cannot be coroutines."
cmd = group(*args, **kwargs)(f)
self.add_command(cmd)
return cmd
return decorator
async def coro_helper(f, *args, **kwargs):
if asyncio.iscoroutinefunction(f):
return await f(*args, **kwargs)
else:
return f(*args, **kwargs)
[docs]
def timeout(seconds: float):
"""A decorator to timeout the command after a number of ``seconds``."""
def decorator(f):
# This is a bit of a hack but we cannot access the context here so
# we add the timeout directly to the callback function.
f.timeout = seconds
@functools.wraps(f)
async def wrapper(*args, **kwargs):
return await coro_helper(f, *args, **kwargs)
return functools.update_wrapper(wrapper, f)
return decorator
def unique():
"""Allow the execution of only one of these commands at a time."""
def decorator(f):
@functools.wraps(f)
async def new_func(command, *args, **kwargs):
ctx = click.get_current_context()
subcmd = ctx.invoked_subcommand or ""
name = (ctx.command_path + " " + subcmd).replace(" ", "_")
tasks = get_running_tasks(name)
# Fails if there are two tasks with the same name, the current one and
# an already running one.
if tasks is not None and len(tasks) > 1:
return command.fail(
error=f"Another command with name {name} is already running."
)
return await f(command, *args, **kwargs)
return functools.update_wrapper(new_func, f)
return decorator
def pass_args():
"""Thing wrapper around pass_obj to pass the command and parser_args."""
def decorator(f):
@functools.wraps(f)
@pass_obj
def new_func(obj, *args, **kwargs):
return f(*obj["parser_args"], *args, **kwargs)
return functools.update_wrapper(new_func, f)
return decorator
@click.group(cls=CluGroup)
def command_parser(*args):
pass
@command_parser.command()
def ping(*args):
"""Pings the actor."""
command = args[0]
command.set_status(command.status.DONE, "Pong.")
return
@command_parser.command()
def version(*args):
"""Reports the version."""
command = args[0]
command.set_status(command.status.DONE, version=command.actor.version)
return
@command_parser.command(cls=CluCommand, name="get_schema")
def get_schema(*args):
"""Returns the schema of the actor as a JSON schema."""
command = args[0]
if command.actor.model is None:
return command.fail(error="The actor does not know its own schema.")
return command.finish(schema=json.dumps(command.actor.model.schema))
@command_parser.command(name="help")
@click.argument("PARSER-COMMAND", type=str, required=False)
@click.pass_context
def help_(ctx, *args, parser_command):
"""Shows the help."""
command = args[0]
# The parser_command arrives wrapped in quotes to make sure is a single
# value. Strip it and unpack it in as many groups and commands as needed.
parser_command = parser_command.strip('"').split()
help_lines = ""
command_name = args[0].actor.name # Actor name
# Gets the help lines for the command group or for a specific command.
if len(parser_command) > 0:
ctx_commands = ctx.command.commands
for ii in range(len(parser_command)):
ctx_command_name = parser_command[ii]
command_name += f" {ctx_command_name}"
if ctx_command_name not in ctx_commands:
return command.fail(error=f"command {ctx_command_name} not found.")
ctx_command = ctx_commands[ctx_command_name]
if ii == len(parser_command) - 1:
# This is the last element in the command list
# so we want to actually output this help lines.
help_lines = ctx_command.get_help(ctx)
else:
ctx_commands = ctx_command.commands
else:
help_lines: str = ctx.get_help()
message = []
for line in help_lines.splitlines():
# Remove the parser name.
match = re.match(r"^Usage: ([A-Za-z-_]+)", line)
if match:
line = line.replace(match.groups()[0], command_name)
message.append(line)
if isinstance(command.actor, (actor.AMQPActor, actor.JSONActor)):
return command.finish(help=message, write_to_log=False)
else:
for line in message:
command.warning(help=line, write_to_log=False)
return command.finish()
@command_parser.command(internal=True)
@click.argument("COMMAND-NAME", type=str, required=False)
@click.pass_context
def get_command_model(
ctx: click.Context,
command: Command,
*args,
command_name: str | None = None,
):
"""Returns a dictionary representation of the command using ``unclick``."""
from ..legacy import LegacyActor
click_command = ctx.command
assert isinstance(click_command, CluGroup)
if command_name is not None:
click_command = click_command.commands.get(command_name, None)
if not click_command:
return command.fail(f"Cannot find command {command_name}.")
model_str = command_to_json(click_command)
model_dict = json.loads(model_str)
if isinstance(command.actor, LegacyActor):
return command.finish(command_model=model_str, write_to_log=False)
return command.finish(command_model=model_dict, write_to_log=False)
@command_parser.command(name="keyword")
@click.argument("KEYWORD", type=str, required=True)
def keyword(command, *args, keyword):
"""Prints human-readable information about a keyword."""
model = command.actor.model
if model is None or model.schema is None:
return command.fail(error="Actor does not have a data model.")
if keyword not in model.schema["properties"]:
return command.fail(error=f"Keyword {keyword!r} is not part of the data model.")
schema = model.schema["properties"][keyword]
lines = json.dumps(schema, indent=2).splitlines()[1:]
max_length = max([len(line) for line in lines])
command.info(text=f"{keyword} = {{".ljust(max_length, " "), write_to_log=False)
for line in lines:
command.info(
text=line.replace('"', "").ljust(max_length, " "),
write_to_log=False,
)
command.finish()
T = TypeVar("T", bound=Command)
[docs]
class ClickParser:
"""A command parser that uses Click at its base."""
#: list: Arguments to be passed to each command in the parser.
#: Note that the command is always passed first.
parser_args: List[Any] = []
parser = command_parser
#: dict: Parameters to be set in the context object.
context_obj = {}
# For type hints
log: SDSSLogger
name: str
[docs]
def parse_command(self, command: T) -> T:
"""Parses a user command using the Click internals."""
# This will pass the command as the first argument for each command.
# If self.parser_args is defined, those arguments will be passed next.
parser_args = [command]
parser_args += self.parser_args
# Empty command. Just finish the command.
if not command.body:
command.done()
return command
assert command.status
command.set_status(command.status.RUNNING, internal=True)
# If the command contains the --help flag,
# redirects it to the help command.
if "--help" in command.body and command.body != "--help":
command.body = "help " + command.body
command.body = command.body.replace(" --help", "")
if command.body != "--help" and not command.body.startswith("help"):
command_args = shlex.split(command.body)
elif command.body == "--help":
command_args = ["help", '""']
else:
command_args = ["help", '"{}"'.format(command.body[5:])]
# We call the command with a custom context to get around
# the default handling of exceptions in Click. This will force
# exceptions to be raised instead of redirected to the stdout.
# See http://click.palletsprojects.com/en/7.x/exceptions/
obj = {
"parser_args": parser_args,
"log": self.log,
"exception_handler": self._handle_command_exception,
}
obj.update(self.context_obj)
ctx = self.parser.make_context(
f"{self.name}-command-parser",
command_args,
obj=obj,
)
# Makes sure this is the global context. This solves problems when
# the actor have been started from inside an existing context,
# for example when it's called from a CLI click application.
click.globals.push_context(ctx)
try:
self.parser.invoke(ctx)
except Exception as exc:
self._handle_command_exception(command, exc)
return command
def _handle_command_exception(self, command, exception):
"""Handles an exception during parsing or execution of a command."""
try:
raise exception
except click.ClickException as ee:
ctx = command.ctx
message = ""
# If this is a command that cannot be parsed.
if not hasattr(ee, "message") and ctx:
message = f"{ee.__class__.__name__}:\n{ctx.get_help()}"
else:
message = f"{ee.__class__.__name__}: {ee.format_message()}"
if isinstance(command.actor, (actor.AMQPActor, actor.JSONActor)):
command.warning(help=message.splitlines())
else:
lines = message.splitlines()
for line in lines:
command.warning(help=line)
msg = f"Command {command.body!r} failed."
if not command.status.is_done:
command.fail(error=msg)
else:
command.write("e", error=msg)
except (click.exceptions.Exit, click.exceptions.Abort):
if not command.status.is_done:
command.fail(error=f"Command {command.body!r} was aborted.")
except Exception as err:
msg = (
f"Command {command.body!r} failed because of an uncaught "
f"error '{err.__class__.__name__}: {str(err)}'. See "
f"traceback in the log for more information."
)
if command.status.is_done:
command.write("i", text=msg)
else:
command.fail(error=msg)
log = self.log or command.ctx.obj.get("log", None)
if log:
log.exception(f"Command {command.body!r} failed with error:")