#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-01-17
# @Filename: command.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import re
import time
from contextlib import suppress
from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, TypeVar, Union
import clu
import clu.base
from clu.tools import CommandStatus, StatusMixIn
__all__ = [
"BaseCommand",
"Command",
"parse_legacy_command",
"TimedCommand",
"TimedCommandList",
]
Actor_co = TypeVar("Actor_co", bound="clu.base.BaseActor", covariant=True)
[docs]class BaseCommand(asyncio.Future, StatusMixIn[CommandStatus], Generic[Actor_co]):
"""Base class for commands of all types (user and device).
A `BaseCommand` instance is a `~asyncio.Future` whose result gets set
when the status is done (either successfully or not).
Parameters
----------
commander_id
The ID of the commander issuing this command. Can be a string or an
integer. Normally the former is used for new-style actor and the latter
for legacy actors.
command_id
The ID associated to this command. As with the commander_id, it can be
a string or an integer
consumer_id
The actor that is consuming this command. Normally this is our own
actor but if we are commanding another actor ``consumer_id`` will
be the destination actor.
actor
The actor instance associated to this command.
parent
Another `.BaseCommand` object that is issuing this subcommand.
Messages emitted by the command will use the parent ``command_id``.
status_callback
A function to call when the status changes.
call_now
Whether to call ``status_callback`` when initialising the command.
default_keyword
The keyword to use when writing a message that does not specify a
keyword.
loop
The event loop.
"""
def __init__(
self,
commander_id: Union[int, str] = 0,
command_id: Union[int, str] = 0,
consumer_id: Union[int, str] = 0,
actor: Optional[Actor_co] = None,
parent: Optional[BaseCommand[Actor_co]] = None,
status_callback: Optional[Callable[[CommandStatus], Any]] = None,
call_now: bool = False,
default_keyword: str = "text",
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.commander_id = commander_id
self.consumer_id = consumer_id
self.command_id = command_id
self.actor = actor
self.parent = parent
self.default_keyword = default_keyword
self.loop = loop or asyncio.get_event_loop()
#: .Reply: A list of replies this command has received.
self.replies: List[clu.base.Reply] = []
asyncio.Future.__init__(self, loop=self.loop)
self._status: CommandStatus
StatusMixIn.__init__(
self,
CommandStatus,
initial_status=CommandStatus.READY,
callback_func=status_callback,
call_now=call_now,
)
@property
def status(self) -> Union[CommandStatus, None]:
"""Returns the status."""
return self._status
@status.setter
def status(self, status: CommandStatus):
"""Sets the status. A message is output to the users.
This setter calls `.set_status` with an empty message to the users.
Parameters
----------
status
The status to set, either as a `CommandStatus` value or the
integer associated with the maskbit. If ``value`` is a string,
loops over the bits in `CommandStatus` and assigns the one whose
name matches.
"""
self.set_status(status)
[docs] def set_status(
self,
status: Union[CommandStatus, str],
message: Dict[str, Any] = None,
silent: bool = False,
**kwargs,
) -> BaseCommand:
"""Same as `.status` but allows to specify a message to the users."""
if self.status.is_done:
raise RuntimeError("cannot modify a done command.")
if isinstance(status, str):
for bit in self.flags:
if status.lower() == bit.name.lower():
status = bit
break
status = self.flags(status)
try:
is_flag = status in self.flags
if not is_flag:
raise TypeError()
except TypeError:
raise TypeError(f"Status {status!r} is not a valid command status.")
if status != self._status:
status_code = status.code
if status_code is None:
raise ValueError(f"Invalid status code {status_code!r}.")
if self.actor and not silent:
self.write(status_code, message, **kwargs)
self._status = status
self.do_callbacks()
# If the command is done, set the result of the future.
if self._status.is_done:
self.set_result(self)
# Set the status watcher
if self.watcher is not None:
self.watcher.set()
return self
[docs] def finish(self, *args, **kwargs):
"""Convenience method to mark a command `~.CommandStatus.DONE`."""
self.set_status(CommandStatus.DONE, *args, **kwargs)
return self
[docs] def fail(self, *args, **kwargs):
"""Convenience method to mark a command `~.CommandStatus.FAILED`."""
self.set_status(CommandStatus.FAILED, *args, **kwargs)
return self
[docs] def debug(self, *args, **kwargs):
"""Writes a debug-level message."""
self.write("d", *args, **kwargs)
[docs] def info(self, *args, **kwargs):
"""Writes an info-level message."""
self.write("i", *args, **kwargs)
[docs] def warning(self, *args, **kwargs):
"""Writes a warning-level message."""
self.write("w", *args, **kwargs)
[docs] def error(self, *args, **kwargs):
"""Writes an error-level message (does not fail the command)."""
self.write("e", *args, **kwargs)
[docs] def write(
self,
message_code: str = "i",
message: Optional[Union[Dict[str, Any], str]] = None,
broadcast: bool = False,
**kwargs,
):
"""Writes to the user(s).
Parameters
----------
message_code
The message code (e.g., ``'i'`` or ``':'``).
message
The text to be output. If `None`, only the code will be written.
"""
if message is None:
message = {}
elif isinstance(message, dict):
pass
elif isinstance(message, str):
message = {self.default_keyword: message}
else:
raise ValueError(f"invalid message {message!r}")
if not self.actor:
raise clu.CommandError(
"An actor has not been defined for "
"this command. Cannot write to users."
)
command = self if not self.parent else self.parent
self.actor.write(
message_code,
message=message,
command=command,
broadcast=broadcast,
**kwargs,
)
[docs]class Command(BaseCommand[Actor_co]):
"""A command from a user.
Parameters
----------
command_string
The string that defines the body of the command.
transport
The TCP transport associated with this command (only relevant
for `.LegacyActor` commands).
"""
def __init__(
self,
command_string: str = "",
transport: Optional[Any] = None,
**kwargs,
):
BaseCommand.__init__(self, **kwargs)
#: The raw command string.
self.raw_command_string = command_string
#: The body of the command.
self.body = command_string
if not self.actor and kwargs.get("parent", None):
self.actor = self.parent.actor
#: The `~click.Context` running this command. Only relevant if
#: using the built-in click-based parser.
self.ctx = None
self.transport = transport
[docs] def parse(self) -> Command:
"""Parses the command."""
if not isinstance(self.actor, clu.base.BaseActor):
raise clu.CluError("the actor is not defined. Cannot parse command.")
self.actor.parse_command(self)
return self
def __str__(self):
return (
f"<Command (commander_id={self.commander_id!r}, "
f"command_id={self.command_id!r}, body={self.body!r})>"
)
[docs]def parse_legacy_command(command_string: str) -> Tuple[int, str]:
"""Parses a command received by a legacy actor.
Parameters
----------
command_string
The command to parse, including an optional header.
Returns
-------
command_id, command_body
The command ID, and the command body parsed from the command
string.
"""
_HEADER_BODY_RE = re.compile(
r"((?P<cmdID>\d+)(?:\s+\d+)?\s+)?((?P<cmdBody>[A-Za-z_].*))?$"
)
command_match = _HEADER_BODY_RE.match(command_string)
if not command_match:
raise clu.CommandError(f"Could not parse command {command_string!r}")
command_dict = command_match.groupdict("")
command_id_str = command_dict["cmdID"]
if command_id_str:
command_id = int(command_id_str)
else:
command_id = 0
command_body = command_dict.get("cmdBody", "").strip()
return command_id, command_body
[docs]class TimedCommandList(list):
"""A list of `.TimedCommand` objects that will be executed on a loop.
Parameters
----------
actor
The actor in which the commands are to be run.
resolution
In seconds, how frequently to check if any of the `.TimedCommand` must
be executed.
"""
def __init__(
self,
actor: clu.base.BaseActor,
resolution=0.5,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.resolution = resolution
self.actor = actor
self.loop = loop or asyncio.get_event_loop()
self._task: Optional[asyncio.Task] = None
list.__init__(self, [])
[docs] def add_command(self, command_string: str, **kwargs):
"""Adds a new `.TimedCommand`."""
self.append(TimedCommand(command_string, **kwargs))
[docs] async def poller(self):
"""The polling loop."""
current_time = time.time()
while True:
for timed_command in self:
elapsed = current_time - timed_command.last_run
if elapsed > timed_command.delay:
timed_command_task = self.loop.create_task(
timed_command.run(self.actor)
)
timed_command_task.add_done_callback(timed_command.done)
self._sleep_task = self.loop.create_task(asyncio.sleep(self.resolution))
await self._sleep_task
current_time += self.resolution
[docs] def start(self) -> TimedCommandList:
"""Starts the loop."""
if self.running:
raise RuntimeError("poller is already running.")
self._task = self.loop.create_task(self.poller())
return self
[docs] async def stop(self):
"""Cancel the poller."""
if not self.running:
return
if self._task:
self._task.cancel()
with suppress(asyncio.CancelledError):
await self._task
@property
def running(self) -> bool:
"""Returns `True` if the poller is running."""
if self._task and not self._task.cancelled():
return True
return False
[docs]class TimedCommand(object):
"""A command to be executed on a loop.
Parameters
----------
command_string
The command string to run.
delay
How many seconds to wait between repeated calls.
"""
def __init__(self, command_string: str, delay: float = 1):
self.command_string = command_string
self.delay = delay
self.last_run = 0.0
[docs] async def run(self, actor: clu.base.BaseActor):
"""Run the command."""
await Command(self.command_string, actor=actor, commander_id=actor.name).parse()
[docs] def done(self, task):
"""Marks the execution of a command."""
self.last_run = time.time()