#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-20
# @Filename: base.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import abc
import asyncio
import inspect
import logging
import pathlib
import time
from datetime import datetime
from typing import Any, Dict, Optional, TypeVar, Union, cast
from sdsstools import get_logger, read_yaml_file
from sdsstools.logger import SDSSLogger
from clu.command import BaseCommand
from .model import Model
from .tools import REPLY
__all__ = ["BaseClient", "BaseActor", "Reply"]
SchemaType = Union[Dict[str, Any], pathlib.Path, str]
T = TypeVar("T", bound="BaseClient")
[docs]class BaseClient(metaclass=abc.ABCMeta):
"""A base client that can be used for listening or for an actor.
This class defines a new client. Clients differ from actors in that
they do not receive commands or issue replies, but do send commands to
other actors and listen to the keyword-value flow. All actors are also
clients and any actor should subclass from `.BaseClient`.
Normally a new instance of a client or actor is created by passing a
configuration file path to `.from_config` which defines how the
client must be started.
Parameters
----------
name
The name of the client.
version
The version of the client.
loop
The event loop. If `None`, the current event loop will be used.
log_dir
The directory where to store the file logs.
log
A `~logging.Logger` instance to be used for logging instead of creating
a new one.
verbose
Whether to log to stdout. Can be an integer logging level.
"""
name: str
def __init__(
self,
name: str,
version: Optional[str] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
log_dir: Optional[Union[pathlib.Path, str]] = None,
log: Optional[SDSSLogger] = None,
verbose: Union[bool, int] = False,
):
self.loop = loop or asyncio.get_event_loop()
self.name = name
assert self.name, "name cannot be empty."
self.log: SDSSLogger
self.setup_logger(log, log_dir, verbose=verbose)
self.version = version or "?"
# Internally store the original configuration used to start the client.
self.config: Dict[str, Any] = {}
def __repr__(self):
return f"<{str(self)} (name={self.name!r})>"
def __str__(self):
return self.__class__.__name__
[docs] @abc.abstractmethod
async def start(self: T) -> T:
"""Runs the client."""
pass
[docs] async def stop(self):
"""Shuts down all the remaining tasks."""
self.log.info("cancelling all pending tasks and shutting down.")
tasks = [
task
for task in asyncio.all_tasks(loop=self.loop)
if task is not asyncio.current_task(loop=self.loop)
]
list(map(lambda task: task.cancel(), tasks))
await asyncio.gather(*tasks, return_exceptions=True)
self.loop.stop()
@staticmethod
def _parse_config(
input: Union[Dict[str, Any], pathlib.Path, str]
) -> Dict[str, Any]:
if not isinstance(input, dict):
input = pathlib.Path(input)
assert input.exists(), "configuration path does not exist."
config = read_yaml_file(str(input))
else:
config = input
return cast("Dict[str, Any]", config)
[docs] @classmethod
def from_config(
cls,
config: Union[Dict[str, Any], pathlib.Path, str],
*args,
**kwargs,
):
"""Parses a configuration file.
Parameters
----------
config
A configuration dictionary or the path to a YAML configuration
file. If the file contains a section called ``'actor'`` or
``'client'``, that section will be used instead of the whole
file.
"""
orig_config_dict = cls._parse_config(config)
config_dict = orig_config_dict.copy()
if "actor" in config_dict:
config_dict = config_dict["actor"]
elif "client" in config_dict:
config_dict = config_dict["client"]
config_dict.update(kwargs)
# Decide what to do with the rest of the keyword arguments:
args_inspect = inspect.getfullargspec(cls)
if args_inspect.varkw is not None:
# If there is a catch-all kw variable, send everything and let the
# subclass handle it.
config_dict.update(kwargs)
else:
# Check the kw arguments in the subclass and pass only
# values from config_dict that match them.
class_kwargs = args_inspect.args
class_kwargs.remove("self")
# Remove keys that are not in the signature.
config_dict = {
key: value for key, value in config_dict.items() if key in class_kwargs
}
# We also pass *args in case the actor has been subclassed
# and the subclass' __init__ accepts different arguments.
new_client = cls(*args, **config_dict)
# Store original config. This may not be complete since from_config
# may have been super'd from somewhere else.
new_client.config = orig_config_dict
return new_client
[docs] def setup_logger(
self,
log: Any,
log_dir: Optional[Union[pathlib.Path, str]],
verbose: Union[bool, int] = False,
):
"""Starts the file logger."""
if not log:
log = get_logger("clu:" + self.name)
else:
assert isinstance(log, SDSSLogger), "Logger must be sdsstools.SDSSLogger"
self.log = log
return log
log.setLevel(REPLY)
if log is not False and log_dir:
log_dir = pathlib.Path(log_dir).expanduser()
log.start_file_logger(
str(log_dir / f"{self.name}.log"),
rotating=True,
rollover=True,
)
if log.fh: # In case starting the file logger fails.
log.fh.formatter.converter = time.gmtime
log.fh.setLevel(REPLY)
log.sh.setLevel(logging.WARNING)
if verbose is True:
log.sh.setLevel(logging.DEBUG)
elif verbose is not False and isinstance(verbose, int):
log.sh.setLevel(verbose)
self.log = log
self.log.debug(f"{self.name}: logging system initiated.")
# Set the loop exception handler to be handled by the logger.
self.loop.set_exception_handler(self.log.asyncio_exception_handler)
return log
[docs] def send_command(self, actor: str, *args, **kwargs): # pragma: no cover
"""Sends a command to an actor and returns a `.Command` instance."""
raise NotImplementedError(
"Sending commands is not implemented for this client."
)
[docs]class BaseActor(BaseClient):
"""An actor based on `asyncio`.
This class expands `.BaseClient` with a parsing system for new commands
and placeholders for methods for handling new commands and writing replies,
which should be overridden by the specific actors.
"""
model: Union[Model, None] = None
def __init__(self, *args, schema: SchemaType = None, **kwargs):
super().__init__(*args, **kwargs)
self.load_schema(schema)
[docs] def load_schema(
self,
schema: Union[SchemaType, None],
is_file=True,
) -> Union[Model, None]:
"""Loads and validates the actor schema."""
if schema is None:
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {},
"additionalProperties": True,
}
is_file = False
self.model = Model(self.name, schema, is_file=is_file, log=self.log)
return self.model
[docs] @abc.abstractmethod
def new_command(self):
"""Handles a new command.
Must be overridden by the subclass and call `.parse_command`
with a `.Command` object.
"""
pass
[docs] @abc.abstractmethod
def parse_command(self, command: BaseCommand):
"""Parses and executes a `.Command`. Must be overridden."""
pass
[docs] def send_command(self):
"""Sends a command to another actor."""
raise NotImplementedError("Sending commands is not implemented for this actor.")
@abc.abstractmethod
def _write_internal(self, reply: Reply):
"""Internally handle the reply and output it to the users.
Must handle converting the general `.Reply` to the specific format of the
actor transport. Must also handle logging the reply.
"""
pass
[docs] def write(
self,
message_code: str = "i",
message: Optional[Dict[str, Any] | str] = None,
command: Optional[BaseCommand] = None,
broadcast: bool = False,
validate: bool = True,
call_internal: bool = True,
**kwargs,
) -> Reply:
"""Writes a message to user(s).
The reply to the users will be formatted by the actor class into message
specific to the communication channel. Additional keywords passed to this
method will be used to complete the message (as long as they don't overlap
with named parameters). For example ::
actor.write('i', message={'text': 'Hi!', 'value1': 1})
and ::
actor.write('i', text='Hi!', value1=1)
are equivalent and ::
actor.write('i', message={'text': 'Hi!'}, value1=1)
is equivalent to ::
actor.write('i', message={'text': 'Hi!', 'value1': 1})
This method generates a `.Reply` object that is passed to the
``_write_internal`` method in the class, which processes it and outputs the
message to the users using the appropriate transport. If ``command`` is passed,
the reply is added to ``command.replies``.
Parameters
----------
message_code
The message code (e.g., ``'i'`` or ``':'``).
message
The keywords to be output. Must be a dictionary of pairs
``{keyword: value}``.
command
The command to which we are replying. If not set, it is assumed
that this is a broadcast.
broadcast
Whether to broadcast the message to all the users or only to the
commander.
validate
Validate the reply against the actor schema. This is ignored if the actor
was not started with knowledge of its own schema.
call_internal
Whether to call the actor internal write method. Should be `True` but
it's sometimes useful to call `.write` with ``call_internal=False`` when
one is overriding the method and wants to control when to call the internal
method.
kwargs
Keyword arguments that will used to update the message.
"""
if isinstance(message, str):
message = {"text": message}
else:
message = message or {}
if not isinstance(message, dict):
raise TypeError("message must be a dictionary")
message.update(kwargs)
reply = Reply(message_code, message, command=command, broadcast=broadcast)
if validate and self.model is not None:
reply.use_validation = True
result, err = self.model.update_model(message)
if result is False:
message = {"error": f"Failed validating the reply: {err}"}
reply.message_code = "e"
reply.message = message
reply.validated = False
else:
reply.validated = True
if command:
command.replies.append(reply)
if call_internal:
if asyncio.iscoroutinefunction(self._write_internal):
asyncio.create_task(self._write_internal(reply)) # type: ignore
else:
self._write_internal(reply)
return reply
[docs]class Reply:
"""A reply from a command or actor to be sent to the users."""
def __init__(
self,
message_code: str,
message: Dict[str, Any],
command: Optional[BaseCommand] = None,
broadcast: bool = False,
use_validation: bool = False,
validated: bool = False,
):
self.date = datetime.utcnow()
self.message_code = message_code
self.message = message
self.command = command
self.broadcast = broadcast
self.use_validation = use_validation
self.validated = validated