Source code for clu.command

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-01-17
# @Filename: command.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging
import re
import sys
import time
import warnings
from contextlib import suppress

from typing import (
    Any,
    Awaitable,
    Callable,
    Dict,
    Generic,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
    cast,
)

import click

import clu
import clu.base
from clu.exceptions import CluWarning, CommandError
from clu.tools import CommandStatus, StatusMixIn


__all__ = [
    "BaseCommand",
    "Command",
    "parse_legacy_command",
    "TimedCommand",
    "TimedCommandList",
    "FakeCommand",
]


Actor_co = TypeVar("Actor_co", bound="clu.base.BaseActor")
Future_co = TypeVar("Future_co", bound="BaseCommand")
Reply_co = TypeVar("Reply_co", bound="clu.base.Reply")

if sys.version_info >= (3, 9, 0):
    Future = asyncio.Future
else:

    class Future(asyncio.Future, Generic[Future_co]):
        pass


class ReplyList(List[Reply_co]):
    """A list of replies to a command."""

    def get(self, keyword: str):
        """Return the value of the reply that matches the key."""

        for reply in self:
            if keyword in reply.message:
                return reply.message[keyword]

        raise KeyError(f"Keyword {keyword} not found.")


[docs] class BaseCommand( Future[Future_co], StatusMixIn[CommandStatus], Generic[Actor_co, Future_co], ): """Base class for commands of all types (user and device). A `BaseCommand` instance is a `~asyncio.Future` whose result gets set when the status is done (either successfully or not). Parameters ---------- commander_id The ID of the commander issuing this command. Can be a string or an integer. Normally the former is used for new-style actor and the latter for legacy actors. command_id The ID associated to this command. As with the commander_id, it can be a string or an integer consumer_id The actor that is consuming this command. Normally this is our own actor but if we are commanding another actor ``consumer_id`` will be the destination actor. actor The actor instance associated to this command. parent Another `.BaseCommand` object that is issuing this subcommand. Messages emitted by the command will use the parent ``command_id``. reply_callback A callback that gets called when a client command receives a reply from the actor. status_callback A function to call when the status changes. call_now Whether to call ``status_callback`` when initialising the command. default_keyword The keyword to use when writing a message that does not specify a keyword. silent A silent command will call the actor ``write`` method with ``silent=True``, which will update the internal model and record all the output replies but will not write them to the users. internal A silent command will call the actor ``write`` method with ``internal=True``, which will instruct the actor to add the internal flag to the header of the reply. write_to_log Whether to write replies to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log. time_limit Time out the command if it has been running for this long. loop The event loop. """ def __init__( self, commander_id: Union[int, str, None] = None, command_id: Union[int, str] = 0, consumer_id: Union[int, str] = 0, actor: Optional[Actor_co] = None, parent: Optional[BaseCommand[Actor_co, Future_co]] = None, reply_callback: Optional[Callable[[Any], None]] = None, status_callback: Optional[Callable[[CommandStatus], Any]] = None, call_now: bool = False, default_keyword: str = "text", silent: bool = False, internal: bool = False, write_to_log: bool = True, time_limit: float | None = None, loop: Optional[asyncio.AbstractEventLoop] = None, ): self.commander_id = commander_id self.consumer_id = consumer_id self.command_id = command_id # Casting here so that we can type Command[SomeActor] and not have to # assert command.actor every time. self.actor = cast(Actor_co, actor) self.parent = parent self.silent = silent self.internal = internal self.write_to_log = write_to_log self._reply_callback = reply_callback self.default_keyword = default_keyword self.loop = loop or asyncio.get_event_loop() #: The click context, if the click parser is used. self.context: click.Context | None = None #: A list of replies this command has received. The type of #: reply object depends on the actor or client issuing the command. self.replies = ReplyList([]) asyncio.Future.__init__(self) self._status: CommandStatus StatusMixIn.__init__( self, CommandStatus, initial_status=CommandStatus.READY, callback_func=status_callback, call_now=call_now, ) self._timer_handler = None if time_limit: self._timer_handler = asyncio.get_running_loop().call_later( time_limit, self.set_status, CommandStatus.TIMEDOUT, ) @property def status(self) -> CommandStatus: """Returns the status.""" return self._status @status.setter def status(self, status: CommandStatus): """Sets the status. A message is output to the users. This setter calls `.set_status` with an empty message to the users. Parameters ---------- status The status to set, either as a `CommandStatus` value or the integer associated with the maskbit. If ``value`` is a string, loops over the bits in `CommandStatus` and assigns the one whose name matches. """ self.set_status(status)
[docs] def set_status( self, status: Union[CommandStatus, str], message: Dict[str, Any] | str | None = None, silent: bool = False, **kwargs, ) -> BaseCommand: """Same as `.status` but allows to specify a message to the users.""" assert self.status # Don't do anything if the command is done. This means the command # finished before the timeout happened. if status == CommandStatus.TIMEDOUT and self.done(): return self if self.status.is_done: raw_command_string = getattr(self, "raw_command_string", "NA") warnings.warn( f"{raw_command_string}: cannot modify a " f"done command with status {status!r}.", CluWarning, ) return self if isinstance(status, str): for bit in self.flags: assert bit.name if status.lower() == bit.name.lower(): status = bit break status = self.flags(status) try: is_flag = status in self.flags if not is_flag: raise TypeError() except TypeError: raise TypeError(f"Status {status!r} is not a valid command status.") if status != self._status: status_code = status.code if status_code is None: raise ValueError(f"Invalid status code {status_code!r}.") if isinstance(message, str) and status_code in ["f", "e"]: message = {"error": message} if self.actor and not silent: self.write(status_code, message, **kwargs) self._status = status self.do_callbacks() # If the command is done, set the result of the future. if self._status.is_done and not self.done(): self.set_result(self) # type: ignore if self._timer_handler: self._timer_handler.cancel() # Set the status watcher if self.watcher is not None: self.watcher.set() return self
[docs] def finish(self, *args, **kwargs): """Convenience method to mark a command `~.CommandStatus.DONE`.""" self.set_status(CommandStatus.DONE, *args, **kwargs) return self
[docs] def fail(self, *args, **kwargs): """Convenience method to mark a command `~.CommandStatus.FAILED`.""" self.set_status(CommandStatus.FAILED, *args, **kwargs) return self
[docs] def debug(self, *args, **kwargs): """Writes a debug-level message.""" self.write("d", *args, **kwargs)
[docs] def info(self, *args, **kwargs): """Writes an info-level message.""" self.write("i", *args, **kwargs)
[docs] def warning(self, *args, **kwargs): """Writes a warning-level message.""" self.write("w", *args, **kwargs)
[docs] def error(self, *args, **kwargs): """Writes an error-level message (does not fail the command).""" self.write("e", *args, **kwargs)
[docs] def write( self, message_code: clu.base.MessageCode | str | int = clu.base.MessageCode.INFO, message: Optional[Union[Dict[str, Any], str]] = None, broadcast: bool = False, **kwargs, ): """Writes to the user(s). Parameters ---------- message_code The message code (e.g., ``'i'`` or ``':'``). message The text to be output. If `None`, only the code will be written. """ if message is None: message = {} elif isinstance(message, dict): pass elif isinstance(message, str): keyword = "error" if message_code in ["f", "e"] else self.default_keyword message = {keyword: message} elif isinstance(message, Exception): message = {"error": message} else: raise ValueError(f"invalid message {message!r}") if not self.actor: raise clu.CommandError( "An actor has not been defined for " "this command. Cannot write to users." ) command = self if not self.parent else self.parent # If the message code is an integer, interpret that as if it's a logging # level and translate it to SDSS codes. if isinstance(message_code, int): if message_code == logging.DEBUG: message_code = "d" elif message_code == logging.INFO: message_code = "i" elif message_code == logging.WARNING: message_code = "w" elif message_code == logging.ERROR: message_code = "e" else: raise ValueError(f"Invalid message code {message_code}.") message_code = clu.base.MessageCode(message_code) # If the parent has a command, do not output : or f since it would # confuse the stream and potentially Tron. if self.parent: if message_code == clu.base.MessageCode.STARTED: # The parent is already running and > never includes a message. return if message_code == clu.base.MessageCode.DONE: message_code = clu.base.MessageCode.INFO if kwargs == {} and (message == {} or not message): return elif message_code == clu.base.MessageCode.FAILED: message_code = clu.base.MessageCode.ERROR internal = kwargs.pop("internal", self.internal) write_to_log = kwargs.pop("write_to_log", self.write_to_log) self.actor.write( message_code, message=message, command=command, broadcast=broadcast, silent=self.silent, internal=internal, write_to_log=write_to_log, **kwargs, )
[docs] def send_command( self, target: str, command_string: str, new_command: bool = True, *args, **kwargs, ) -> BaseCommand[Actor_co, BaseCommand] | Awaitable[BaseCommand]: """Sends a command to an actor using the commander ID of this command.""" if self.actor is None: raise CommandError("The actor need to be defined to send commands.") return self.actor.send_command( target, command_string, *args, command=self if new_command is False else None, **kwargs, )
[docs] class Command(BaseCommand[Actor_co, "Command"]): """A command from a user. Parameters ---------- command_string The string that defines the body of the command. transport The TCP transport associated with this command (only relevant for `.LegacyActor` commands). """ def __init__( self, command_string: str = "", transport: Optional[Any] = None, **kwargs, ): BaseCommand.__init__(self, **kwargs) #: The raw command string. self.raw_command_string = command_string #: The body of the command. self.body = command_string if not self.actor and self.parent: self.actor = self.parent.actor #: The `~click.Context` running this command. Only relevant if #: using the built-in click-based parser. self.ctx = None self.transport = transport
[docs] def child_command(self, command_string): """Starts a sub-command on the actor currently running this command. The practical effect is to run another of the same actor's commands as if it were part of the current `.Command`. """ return Command( command_string, actor=self.actor, commander_id=self.actor.name, parent=self, ).parse()
[docs] def parse(self) -> Command: """Parses the command.""" if not isinstance(self.actor, clu.base.BaseActor): raise clu.CluError("the actor is not defined. Cannot parse command.") self.actor.parse_command(self) return self
def __str__(self): return ( f"<Command (commander_id={self.commander_id!r}, " f"command_id={self.command_id!r}, body={self.body!r})>" )
[docs] def parse_legacy_command(command_string: str) -> Tuple[Union[str, None], int, str]: """Parses a command received by a legacy actor. Parameters ---------- command_string The command to parse, including an optional header. Returns ------- command_id, command_body The command ID, and the command body parsed from the command string. """ _HEADER_BODY_RE = re.compile( r"(?:([a-z0-9]*\.[a-z0-9_\.]+)\s+)?" r"(?:(\d+)(?:\s+\d+)?\s+)?" r"(?:([a-z_].*|--help))?$", re.IGNORECASE, ) command_match = _HEADER_BODY_RE.match(command_string) if not command_match: raise clu.CommandError(f"Could not parse command {command_string!r}") commander, command_id_str, command_body = command_match.groups() if command_id_str: command_id = int(command_id_str) else: command_id = 0 command_body = command_body.strip() return commander, command_id, command_body
[docs] class TimedCommandList(list): """A list of `.TimedCommand` objects that will be executed on a loop. Parameters ---------- actor The actor in which the commands are to be run. resolution In seconds, how frequently to check if any of the `.TimedCommand` must be executed. """ def __init__( self, actor: clu.base.BaseActor, resolution=0.5, loop: Optional[asyncio.AbstractEventLoop] = None, ): self.resolution = resolution self.actor = actor self.loop = loop or asyncio.get_event_loop() self._task: Optional[asyncio.Task] = None list.__init__(self, [])
[docs] def add_command(self, command_string: str, **kwargs): """Adds a new `.TimedCommand`.""" self.append(TimedCommand(command_string, **kwargs))
[docs] async def poller(self): """The polling loop.""" current_time = time.time() first_time = True while True: for timed_command in self: elapsed = current_time - timed_command.last_run if first_time or elapsed > timed_command.delay: timed_command_task = self.loop.create_task( timed_command.run(self.actor) ) timed_command_task.add_done_callback(timed_command.done) first_time = False self._sleep_task = self.loop.create_task(asyncio.sleep(self.resolution)) await self._sleep_task current_time += self.resolution
[docs] def start(self) -> TimedCommandList: """Starts the loop.""" if self.running: raise RuntimeError("poller is already running.") self._task = self.loop.create_task(self.poller()) return self
[docs] async def stop(self): """Cancel the poller.""" if not self.running: return if self._task: self._task.cancel() with suppress(asyncio.CancelledError): await self._task
@property def running(self) -> bool: """Returns `True` if the poller is running.""" if self._task and not self._task.cancelled(): return True return False
[docs] class TimedCommand(object): """A command to be executed on a loop. Parameters ---------- command_string The command string to run. delay How many seconds to wait between repeated calls. first_silent Runs the command in silent mode the first one (useful to internally update the model). """ def __init__(self, command_string: str, delay: float = 1, first_silent=False): self.command_string = command_string self.delay = delay self.last_run = 0.0 self.is_running = False self.first_silent = first_silent
[docs] async def run(self, actor: clu.base.BaseActor): """Run the command.""" if self.is_running: return silent = True if self.first_silent and self.last_run == 0.0 else False self.is_running = True await Command( self.command_string, actor=actor, commander_id=f".{actor.name}", silent=silent, ).parse()
[docs] def done(self, task): """Marks the execution of a command.""" self.last_run = time.time() self.is_running = False
[docs] class FakeCommand(BaseCommand): """A fake command that output to a logger.""" def __init__(self, log: logging.Logger, actor=None): self.log = log super().__init__(actor=actor)
[docs] def write( self, message_code: str = "i", message: Optional[Union[Dict[str, Any], str]] = None, **kwargs, ): if message_code == "d": level = logging.DEBUG elif message_code == "i": level = logging.INFO elif message_code == "w": level = logging.WARNING elif message_code in ["f", "e"]: level = logging.ERROR else: return if message is None: message = {} elif isinstance(message, dict): pass elif isinstance(message, str): keyword = "error" if message_code in ["f", "e"] else self.default_keyword message = {keyword: message} elif isinstance(message, Exception): message = {"error": message} else: raise ValueError(f"invalid message {message!r}") message.update(kwargs) self.log.log(level, str(message))