Source code for clu.parsers.click

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-06
# @Filename: parser.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import functools
import inspect
import json
import re
import shlex

from typing import Any, List, TypeVar

import click
from click.decorators import group, pass_obj

from sdsstools.logger import SDSSLogger

from clu.command import Command

from .. import actor


__all__ = ["CluCommand", "CluGroup", "command_parser", "ClickParser", "timeout"]


def coroutine(fn):
    """Create a coroutine. Avoids deprecation of asyncio.coroutine in 3.10."""

    if inspect.iscoroutinefunction(fn):
        return fn

    @functools.wraps(fn)
    async def _wrapper(*args, **kwargs):
        return fn(*args, **kwargs)

    return _wrapper


[docs]class CluCommand(click.Command): """Override :py:class:`click.Command` to pass the actor and command.""" def __init__(self, *args, context_settings=None, **kwargs): # Unless told otherwise, set ignore_unknown_options=True to prevent # negative numbers to be considered as options. See #40. if context_settings is None or "ignore_unknown_options" not in context_settings: context_settings = context_settings or {} context_settings["ignore_unknown_options"] = True super().__init__( *args, context_settings=context_settings, # type: ignore **kwargs, )
[docs] def done_callback(self, task, exception_handler=None): """Checks if the command task has been successfully done.""" ctx = task.ctx command = ctx.obj["parser_args"][0] if exception_handler and task.exception(): exception_handler(command, task.exception())
async def _schedule_callback(self, ctx, timeout=None): """Schedules the callback as a task with a timeout.""" parser_args = ctx.obj.get("parser_args", []) command = parser_args[0] if len(parser_args) > 0 else None callback_task = asyncio.create_task( ctx.invoke(self.callback, *parser_args, **ctx.params) ) try: await asyncio.wait_for(callback_task, timeout=timeout) except asyncio.TimeoutError: if command: command.set_status( command.status.TIMEDOUT, f"command timed out after {timeout} seconds.", ) return False return True
[docs] def invoke(self, ctx): """As :py:class:`click.Command.invoke` but passes the actor and command.""" click.core._maybe_show_deprecated_notice(self) # type: ignore if self.callback is not None: with ctx: loop = asyncio.get_event_loop() # Makes sure the callback is a coroutine if not asyncio.iscoroutinefunction(self.callback): self.callback = coroutine(self.callback) # Check to see if there is a timeout value in the callback. # If so, schedules a task to be cancelled after timeout. timeout = getattr(self.callback, "timeout", None) log = ctx.obj.pop("log", None) # Defines the done callback function. exception_handler = ctx.obj.pop("exception_handler", None) done_callback = functools.partial( self.done_callback, exception_handler=exception_handler ) # Launches callback scheduler and adds the done callback ctx.task = loop.create_task( self._schedule_callback(ctx, timeout=timeout) ) ctx.task.add_done_callback(done_callback) # Add some attributes to the task because it's # what will be passed to done_callback ctx.task.ctx = ctx # type: ignore ctx.task.log = log # type: ignore return ctx
[docs]class CluGroup(click.Group): """Override :py:class:`click.Group`. Makes all child commands instances of `.CluCommand`. """
[docs] def command(self, *args, **kwargs): """Override :py:class:`click.Group` to use `.CluCommand` by default.""" if "cls" in kwargs: pass else: kwargs["cls"] = CluCommand def decorator(f): cmd = click.decorators.command(*args, **kwargs)(f) self.add_command(cmd) return cmd return decorator
[docs] def parse_args(self, ctx, args): # pragma: no cover # Copy this method so that we can turn off the printing of the # usage before ctx.exit() if not args and self.no_args_is_help and not ctx.resilient_parsing: ctx.exit() rest = click.Command.parse_args(self, ctx, args) if self.chain: ctx.protected_args = rest ctx.args = [] elif rest: ctx.protected_args, ctx.args = rest[:1], rest[1:] return ctx.args
[docs] def group(self, *args, **kwargs): """Creates a new group inheriting from this class.""" if "cls" not in kwargs: kwargs["cls"] = self.__class__ def decorator(f): assert not asyncio.iscoroutinefunction(f), "groups cannot be coroutines." cmd = group(*args, **kwargs)(f) self.add_command(cmd) return cmd return decorator
[docs]def timeout(seconds: float): """A decorator to timeout the command after a number of ``seconds``.""" def decorator(f): # This is a bit of a hack but we cannot access the context here so # we add the timeout directly to the callback function. f.timeout = seconds async def helper(f, *args, **kwargs): if asyncio.iscoroutinefunction(f): return await f(*args, **kwargs) else: return f(*args, **kwargs) @functools.wraps(f) async def wrapper(*args, **kwargs): return await helper(f, *args, **kwargs) return functools.update_wrapper(wrapper, f) return decorator
def pass_args(): """Thing wrapper around pass_obj to pass the command and parser_args.""" def decorator(f): @functools.wraps(f) @pass_obj def new_func(obj, *args, **kwargs): return f(*obj["parser_args"], *args, **kwargs) return functools.update_wrapper(new_func, f) return decorator @click.group(cls=CluGroup) def command_parser(*args): pass @command_parser.command() def ping(*args): """Pings the actor.""" command = args[0] command.set_status(command.status.DONE, "Pong.") return @command_parser.command() def version(*args): """Reports the version.""" command = args[0] command.set_status(command.status.DONE, version=command.actor.version) return @command_parser.command(cls=CluCommand, name="get_schema") def get_schema(*args): """Returns the schema of the actor as a JSON schema.""" command = args[0] if command.actor.model is None: return command.fail(error="The actor does not know its own schema.") return command.finish(schema=json.dumps(command.actor.model.schema)) @command_parser.command(name="help") @click.argument("PARSER-COMMAND", type=str, required=False) @click.pass_context def help_(ctx, *args, parser_command): """Shows the help.""" command = args[0] # The parser_command arrives wrapped in quotes to make sure is a single # value. Strip it and unpack it in as many groups and commands as needed. parser_command = parser_command.strip('"').split() help_lines = "" command_name = args[0].actor.name # Actor name # Gets the help lines for the command group or for a specific command. if len(parser_command) > 0: ctx_commands = ctx.command.commands for ii in range(len(parser_command)): ctx_command_name = parser_command[ii].lower() command_name += f" {ctx_command_name}" if ctx_command_name not in ctx_commands: return command.fail(error=f"command {ctx_command_name} not found.") ctx_command = ctx_commands[ctx_command_name] if ii == len(parser_command) - 1: # This is the last element in the command list # so we want to actually output this help lines. help_lines = ctx_command.get_help(ctx) else: ctx_commands = ctx_command.commands else: help_lines: str = ctx.get_help() message = [] for line in help_lines.splitlines(): # Remove the parser name. match = re.match(r"^Usage: ([A-Za-z-_]+)", line) if match: line = line.replace(match.groups()[0], command_name) message.append(line) if isinstance(command.actor, (actor.AMQPActor, actor.JSONActor)): return command.finish(help=message) else: for line in message: command.warning(help=line) return command.finish() @command_parser.command(name="keyword") @click.argument("KEYWORD", type=str, required=True) def keyword(command, *args, keyword): """Prints human-readable information about a keyword.""" model = command.actor.model if model is None or model.schema is None: return command.fail(error="Actor does not have a data model.") if keyword not in model.schema["properties"]: return command.fail(error=f"Keyword {keyword!r} is not part of the data model.") schema = model.schema["properties"][keyword] lines = json.dumps(schema, indent=2).splitlines()[1:] max_length = max([len(line) for line in lines]) command.info(text=f"{keyword} = {{".ljust(max_length, " ")) for line in lines: command.info(text=line.replace('"', "").ljust(max_length, " ")) command.finish() T = TypeVar("T", bound=Command)
[docs]class ClickParser: """A command parser that uses Click at its base.""" #: list: Arguments to be passed to each command in the parser. #: Note that the command is always passed first. parser_args: List[Any] = [] parser = command_parser #: dict: Parameters to be set in the context object. context_obj = {} # For type hints log: SDSSLogger name: str
[docs] def parse_command(self, command: T) -> T: """Parses a user command using the Click internals.""" # This will pass the command as the first argument for each command. # If self.parser_args is defined, those arguments will be passed next. parser_args = [command] parser_args += self.parser_args # Empty command. Just finish the command. if not command.body: command.done() return command command.set_status(command.status.RUNNING) # If the command contains the --help flag, # redirects it to the help command. if "--help" in command.body: command.body = "help " + command.body command.body = command.body.replace(" --help", "") if not command.body.startswith("help"): command_args = shlex.split(command.body) else: command_args = ["help", '"{}"'.format(command.body[5:])] # We call the command with a custom context to get around # the default handling of exceptions in Click. This will force # exceptions to be raised instead of redirected to the stdout. # See http://click.palletsprojects.com/en/7.x/exceptions/ obj = { "parser_args": parser_args, "log": self.log, "exception_handler": self._handle_command_exception, } obj.update(self.context_obj) ctx = self.parser.make_context( f"{self.name}-command-parser", command_args, obj=obj, ) # Makes sure this is the global context. This solves problems when # the actor have been started from inside an existing context, # for example when it's called from a CLI click application. click.globals.push_context(ctx) try: self.parser.invoke(ctx) except Exception as exc: self._handle_command_exception(command, exc) return command
def _handle_command_exception(self, command, exception): """Handles an exception during parsing or execution of a command.""" try: raise exception except (click.ClickException) as ee: ctx = command.ctx message = "" # If this is a command that cannot be parsed. if not hasattr(ee, "message") and ctx: message = f"{ee.__class__.__name__}:\n{ctx.get_help()}" else: message = f"{ee.__class__.__name__}: {ee.format_message()}" if isinstance(command.actor, (actor.AMQPActor, actor.JSONActor)): command.warning(help=message.splitlines()) else: lines = message.splitlines() for line in lines: command.warning(help=line) msg = f"Command {command.body!r} failed." if not command.status.is_done: command.fail(error=msg) else: command.write("e", error=msg) except (click.exceptions.Exit, click.exceptions.Abort): if not command.status.is_done: command.fail(error=f"Command {command.body!r} was aborted.") except Exception as err: msg = ( f"Command {command.body!r} failed because of an uncaught " f"error '{err.__class__.__name__}: {str(err)}'. See " f"traceback in the log for more information." ) if command.status.is_done: command.write("i", text=msg) else: command.fail(error=msg) log = self.log or command.ctx.obj.get("log", None) if log: log.exception(f"Command {command.body!r} failed with error:")