Source code for clu.base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-20
# @Filename: base.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import abc
import asyncio
import enum
import inspect
import logging
import pathlib
import time
from datetime import datetime

from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, TypeVar, Union, cast

import jsonschema.exceptions
import yaml

from sdsstools import get_logger, read_yaml_file
from sdsstools.logger import SDSSLogger

from .model import Model
from .store import KeywordStore
from .tools import REPLY


if TYPE_CHECKING:
    from clu.command import BaseCommand, Command
    from clu.legacy.types.messages import Keywords


__all__ = ["BaseClient", "BaseActor", "Reply", "MessageCode"]


SchemaType = Union[Dict[str, Any], pathlib.Path, str]
T = TypeVar("T", bound="BaseClient")


class MessageCode(enum.Enum):
    """Flags for message codes."""

    STARTED = ">"
    DONE = ":"
    FAILED = "f"
    ERROR = "e"
    CRITICAL = "!"
    INFO = "i"
    WARNING = "w"
    DEBUG = "d"

    def __eq__(self, __o: object) -> bool:
        if isinstance(__o, MessageCode):
            return super().__eq__(__o)

        return self.value == __o


[docs] class BaseClient(metaclass=abc.ABCMeta): """A base client that can be used for listening or for an actor. This class defines a new client. Clients differ from actors in that they do not receive commands or issue replies, but do send commands to other actors and listen to the keyword-value flow. All actors are also clients and any actor should subclass from `.BaseClient`. Normally a new instance of a client or actor is created by passing a configuration file path to `.from_config` which defines how the client must be started. Parameters ---------- name The name of the client. version The version of the client. loop The event loop. If `None`, the current event loop will be used. log_dir The directory where to store the file logs. log A `~logging.Logger` instance to be used for logging instead of creating a new one. verbose Whether to log to stdout. Can be an integer logging level. validate Whether to actor should validate its own messages against its model (if it has one). This is a global parameter that can be overridden when calling `~.BaseClient.write`. config A dictionary of configuration parameters that will be accessible to the client. """ name: str command_models: dict[str, dict] = {} def __init__( self, name: str, version: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None, log_dir: Optional[Union[pathlib.Path, str]] = None, log: Optional[SDSSLogger] = None, verbose: Union[bool, int] = False, validate: bool = True, config: dict = {}, ): self.name = name assert self.name, "name cannot be empty." self.log: SDSSLogger self.setup_logger(log, log_dir, verbose=verbose) self.version = version or "?" # Internally store the original configuration used to start the client. self.config: Dict[str, Any] = {} self.validate = validate self.config = config def __repr__(self): return f"<{str(self)} (name={self.name!r})>" def __str__(self): return self.__class__.__name__
[docs] @abc.abstractmethod async def start(self: T) -> T: """Runs the client.""" pass
[docs] async def stop(self): """Shuts down all the remaining tasks.""" self.log.info("cancelling all pending tasks and shutting down.") tasks = [ task for task in asyncio.all_tasks() if task is not asyncio.current_task() ] list(map(lambda task: task.cancel(), tasks)) await asyncio.gather(*tasks, return_exceptions=True) asyncio.get_running_loop().stop()
@staticmethod def _parse_config( input: Union[Dict[str, Any], pathlib.Path, str], loader=yaml.FullLoader, ) -> Dict[str, Any]: if not isinstance(input, dict): input = pathlib.Path(input) assert input.exists(), "configuration path does not exist." config = read_yaml_file(str(input), loader=loader) else: config = input return cast("Dict[str, Any]", config)
[docs] @classmethod def from_config( cls, config: Union[Dict[str, Any], pathlib.Path, str], *args, loader=yaml.FullLoader, **kwargs, ): """Parses a configuration file. Parameters ---------- config A configuration dictionary or the path to a YAML configuration file. If the file contains a section called ``'actor'`` or ``'client'``, that section will be used instead of the whole file. """ orig_config_dict = cls._parse_config(config, loader=loader) config_dict = orig_config_dict.copy() if "actor" in config_dict: config_dict = config_dict["actor"] elif "client" in config_dict: config_dict = config_dict["client"] config_dict.update(kwargs) # Decide what to do with the rest of the keyword arguments: args_inspect = inspect.getfullargspec(cls) if args_inspect.varkw is not None: # If there is a catch-all kw variable, send everything and let the # subclass handle it. config_dict.update(kwargs) else: # Check the kw arguments in the subclass and pass only # values from config_dict that match them. class_kwargs = args_inspect.args class_kwargs.remove("self") # Remove keys that are not in the signature. config_dict = { key: value for key, value in config_dict.items() if key in class_kwargs } # We also pass *args in case the actor has been subclassed # and the subclass' __init__ accepts different arguments. new_client = cls(*args, config=orig_config_dict, **config_dict) return new_client
[docs] def setup_logger( self, log: Any, log_dir: Optional[Union[pathlib.Path, str]], verbose: Union[bool, int] = False, ): """Starts the file logger.""" if not log: log = get_logger("clu:" + self.name) else: assert isinstance(log, SDSSLogger), "Logger must be sdsstools.SDSSLogger" log.setLevel(REPLY) if log_dir: log_dir = pathlib.Path(log_dir).expanduser() log.start_file_logger( str(log_dir / f"{self.name}.log"), rotating=True, rollover=True, ) if log.fh: # In case starting the file logger fails. if log.fh.formatter: log.fh.formatter.converter = time.gmtime log.fh.setLevel(REPLY) log.sh.setLevel(logging.WARNING) if verbose is True: log.sh.setLevel(logging.DEBUG) elif verbose is not False and isinstance(verbose, int): log.sh.setLevel(verbose) self.log = log self.log.debug(f"{self.name}: logging system initiated.") return log
[docs] def set_loop_exception_handler(self): """Sets the lopp exception handler to be handled by the logger.""" if self.log: # Set the loop exception handler to be handled by the logger. loop = asyncio.get_running_loop() loop.set_exception_handler(self.log.asyncio_exception_handler)
[docs] async def send_command(self, actor: str, *args, **kwargs) -> Command: """Sends a command to an actor and returns a `.Command` instance.""" raise NotImplementedError( "Sending commands is not implemented for this client." )
[docs] def proxy(self, actor: str) -> ProxyClient: """Creates a proxy for an actor. Returns a `.ProxyClient` that simplifies running a command multiple times. For example :: await client.send_command("focus_state", "moverelative -1000 UM") can be replaced with :: focus_stage = client.proxy("focus_stage") await focus_stage.send_command("moverelative", "-1000", "UM") """ return ProxyClient(self, actor)
class ProxyClient: """A proxy representing an actor. Parameters ---------- client The client used to command the actor. actor The actor to command. """ def __init__(self, client: BaseClient, actor: str): self.client = client self.actor = actor def send_command(self, *args): """Sends a command to the actor. Returns the result of calling the client ``send_command()`` method with the actor and concatenated arguments as parameters. Note that in some cases the client ``send_command()`` method may be a coroutine function, in which case the returned coroutine needs to be awaited. Parameters ---------- args Arguments to pass to the actor. They will be concatenated using spaces. """ command = " ".join(map(str, args)) return self.client.send_command(self.actor, command)
[docs] class BaseActor(BaseClient): """An actor based on `asyncio`. This class expands `.BaseClient` with a parsing system for new commands and placeholders for methods for handling new commands and writing replies, which should be overridden by the specific actors. Parameters ---------- schema The schema for the actor replies, as a JSONschema dictionary or path to a JSON file. If `None`, defaults to the internal basic schema. store Whether to store the output keywords in a `.KeywordStore`. `False` (the default), disables the feature. `True` will store a record of all the output keywords. A list of keyword names to store can also be passed. additional_properties Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used if ``schema=None`` or if ``additionalProperties`` is not defined in the schema. """ model: Union[Model, None] = None _message_processor: Callable[[dict], dict] | None = None def __init__( self, *args, schema: SchemaType | None = None, store: bool | list[str] = False, additional_properties: bool = False, **kwargs, ): super().__init__(*args, **kwargs) self.load_schema(schema, additional_properties=additional_properties) if store is False: self.store = None else: self.store = KeywordStore(self, filter=None if store is True else store)
[docs] def load_schema( self, schema: Union[SchemaType, None], is_file=True, additional_properties=False, ) -> Union[Model, None]: """Loads and validates the actor schema.""" if schema is None: schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", "properties": {}, "additionalProperties": additional_properties, } is_file = False if isinstance(schema, dict): is_file = False self.model = Model( self.name, schema, is_file=is_file, additional_properties=additional_properties, ) return self.model
[docs] def set_message_processor(self, processor: Callable[[dict], dict] | None): """Sets the message processor. Parameters ---------- processor A function that receives the messsages to output to the users, as a dictionary, and reformats them, returning a new dictionary. If `None`, no processing is done. """ self._message_processor = processor
[docs] @abc.abstractmethod def new_command(self): """Handles a new command. Must be overridden by the subclass and call `.parse_command` with a `.Command` object. """ pass
[docs] @abc.abstractmethod def parse_command(self, command: BaseCommand): """Parses and executes a `.Command`. Must be overridden.""" pass
@abc.abstractmethod def _write_internal(self, reply: Reply, write_to_log: bool = True): """Internally handle the reply and output it to the users. Must handle converting the general `.Reply` to the specific format of the actor transport. Must also handle logging the reply. Parameters ---------- reply The reply object to output to users. write_to_log Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log. """ pass
[docs] def write( self, message_code: MessageCode | str = MessageCode.INFO, message: Optional[Dict[str, Any] | str] = None, command: Optional[BaseCommand] = None, broadcast: bool = False, validate: bool | None = None, expand_exceptions: bool = True, traceback_frame: int = 0, silent: bool = False, internal: bool = False, emit: bool = True, write_to_log: bool = True, **kwargs, ) -> Reply: """Writes a message to user(s). The reply to the users will be formatted by the actor class into message specific to the communication channel. Additional keywords passed to this method will be used to complete the message (as long as they don't overlap with named parameters). For example :: actor.write('i', message={'text': 'Hi!', 'value1': 1}) and :: actor.write('i', text='Hi!', value1=1) are equivalent and :: actor.write('i', message={'text': 'Hi!'}, value1=1) is equivalent to :: actor.write('i', message={'text': 'Hi!', 'value1': 1}) This method generates a `.Reply` object that is passed to the ``_write_internal`` method in the class, which processes it and outputs the message to the users using the appropriate transport. If ``command`` is passed, the reply is added to ``command.replies``. Parameters ---------- message_code The message code (e.g., ``'i'`` or ``':'``). message The keywords to be output. Must be a dictionary of pairs ``{keyword: value}``. command The command to which we are replying. If not set, it is assumed that this is a broadcast. broadcast Whether to broadcast the message to all the users or only to the commander. validate Validate the reply against the actor schema. This is ignored if the actor was not started with knowledge of its own schema. If `None`, defaults to the actor global behaviour. expand_exceptions If the value of one of the keywords is an exception object and `expand_exception=True`, the exception information and traceback will be expanded. Otherwise the exception will just be stringified. traceback_frame The frame from the traceback to use if ``expand_exception=True``. By default reports the file and line of the last traceback frame. silent When `True` does not output the message to the users. This can be used to issue internal commands that update the internal model but that don't clutter the output. internal Marks the `.Reply` instance as internal. Internal replies are expected to be emitted to the users, but with a header indicating that the message is not for broader consumption. This can be useful when one wants to reply with verbose information that can be skipped from CLI or logs. emit Whether to call the actor internal write method. Should be `True` but it's sometimes useful to call `.write` with ``emit=False`` when one is overriding the method and wants to control when to call the internal method. In that case, a `.Reply` object is returned but nothing is output to the users. write_to_log Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log. kwargs Keyword arguments that will used to update the message. """ message_code = MessageCode(message_code) if isinstance(message, str): message = {"text": message} else: message = message or {} if not isinstance(message, dict): raise TypeError("message must be a string or a dictionary.") message.update(kwargs) if self._message_processor: message = self._message_processor(message) for key, value in message.items(): if isinstance(value, Exception): if expand_exceptions is True: filename: str | None = None lineno: int | None = None if value.__traceback__ is not None: tb = value.__traceback__ for _ in range(traceback_frame): t_next = tb.tb_next if t_next is None: break tb = t_next filename = tb.tb_frame.f_code.co_filename if tb else None lineno = tb.tb_lineno if tb else None message[key] = { "module": value.__class__.__module__, "type": value.__class__.__name__, "message": str(value), "filename": filename, "lineno": lineno, } else: message[key] = str(value) reply = Reply( message_code, message, command=command, broadcast=broadcast, internal=internal, ) do_validate = validate if validate is not None else self.validate if do_validate and self.model is not None: reply.use_validation = True result, err = self.model.validate(message, update_model=True) if result is False: if isinstance(err, jsonschema.exceptions.ValidationError): message = { "error": f"Failed validating the reply: message {message} " "does not match the schema." } else: message = {"error": f"Failed validating the reply: {err}"} reply.message_code = MessageCode.ERROR reply.message = message reply.validated = False else: reply.validated = True if command: command.replies.append(reply) if emit and silent is False: if asyncio.iscoroutinefunction(self._write_internal): asyncio.create_task( self._write_internal( reply, write_to_log=write_to_log, ) ) else: self._write_internal(reply, write_to_log=write_to_log) if self.store is not None: self.store.add_reply(reply) return reply
[docs] def invoke_mock_command(self, command_str, command_id=0) -> Command: """Send a new command to an actor for testing. Requires calling `.setup_test_actor`.""" raise NotImplementedError("setup_test_actor() has not been called.")
[docs] class Reply: """A reply from a command or actor to be sent to the users.""" def __init__( self, message_code: MessageCode | str, message: Dict[str, Any], command: Optional[BaseCommand] = None, broadcast: bool = False, use_validation: bool = False, validated: bool = False, internal: bool = False, keywords: Optional[Keywords] = None, ): self.date = datetime.utcnow() self.message_code = MessageCode(message_code) self.message = message self.command = command self.broadcast = broadcast self.use_validation = use_validation self.validated = validated self.keywords = keywords self.internal = internal @property def body(self): """Alias to ``message``.""" return self.message