Source code for clu.actor

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-01-16
# @Filename: actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import json
import pathlib
import re
import uuid
import warnings
from datetime import datetime

from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, Union, cast

import aio_pika as apika
import click

from .base import BaseActor, MessageCode, Reply
from .client import AMQPClient
from .command import Command, TimedCommandList
from .exceptions import CluWarning, CommandError
from .parsers import ClickParser, CluCommand
from .protocol import TCPStreamServer
from .tools import log_reply


__all__ = ["AMQPActor", "JSONActor", "AMQPBaseActor", "TCPBaseActor"]


T = TypeVar("T")
PathLike = Union[str, pathlib.Path]
SchemaType = Union[Dict[str, Any], PathLike]
TaskCallbackType = Callable[[dict], Awaitable[Union[Command, None]]]


class CustomTransportType(asyncio.Transport):
    user_id: Union[str, int]
    multiline: bool


[docs] class AMQPBaseActor(AMQPClient, BaseActor): """An actor class that uses AMQP message brokering. This class differs from `~clu.legacy.actor.LegacyActor` in that it uses an AMQP messaging broker (typically RabbitMQ) to communicate with other actors in the system, instead of standard TCP sockets. Although the internals and protocols are different the entry points and behaviour for both classes should be almost identical. This class needs to be subclassed with a command parser. See the documentation for `.AMQPActor` and `.AMQPClient` for additional parameter information. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.commands_queue = None self.timed_commands = TimedCommandList(self)
[docs] async def start(self, **kwargs): """Starts the connection to the AMQP broker.""" self.set_loop_exception_handler() # This sets the replies queue but not a commands one. await AMQPClient.start(self, **kwargs) assert self.connection.connection assert isinstance(self.connection.connection, apika.Connection) # Binds the commands queue. self.commands_queue = await self.connection.add_queue( f"{self.name}_commands", callback=self.new_command, bindings=[f"command.{self.name}.#"], ) self.log.debug( f"Commands queue {self.commands_queue.name!r} " f"bound to {self.connection.connection.url!s}" ) self.timed_commands.start() return self
[docs] async def new_command(self, message: apika.abc.AbstractIncomingMessage, ack=True): """Handles a new command received by the actor.""" if ack: async with message.process(): headers = message.info()["headers"] command_body = json.loads(message.body.decode()) else: headers = message.info().get("headers", {}) command_body = json.loads(message.body.decode()) commander_id = headers.get("commander_id", None) command_id = headers.get("command_id", None) internal = headers.get("internal", False) command_string = command_body.get("command_string", "") full_command_string = f"{self.name} {command_string}" try: command = Command( command_string, command_id=command_id, commander_id=commander_id, consumer_id=self.name, actor=self, internal=internal, ) command.actor = self # Assign the actor except CommandError as ee: self.write( "f", { "error": "Could not parse the following as a command: " f"{full_command_string!r}: {ee}" }, ) return self.log.info( f"New command received: {full_command_string!r} " f"(commander_id={commander_id!r}, command_id={command_id!r})" ) return self.parse_command(command)
async def _write_internal(self, reply: Reply, write_to_log: bool = True): """Writes a message to user(s). Parameters ---------- reply The reply object to output to users. write_to_log Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log. """ assert self.connection message = reply.message message_json = json.dumps(message) command = reply.command if command is None or reply.broadcast: routing_key = "reply.broadcast" else: routing_key = f"reply.{command.commander_id}" commander_id = command.commander_id if command else None command_id = command.command_id if command else None headers = { "message_code": reply.message_code.value, "commander_id": commander_id, "command_id": command_id, "sender": self.name, "internal": reply.internal, } if self.log and write_to_log: log_dict = {"headers": headers, "message": message} log_reply(self.log, reply.message_code, json.dumps(log_dict)) if hasattr(self.connection, "exchange"): await self.connection.exchange.publish( apika.Message( message_json.encode(), content_type="text/json", headers=headers, correlation_id=str(command_id) if command_id is not None else None, timestamp=datetime.utcnow(), ), routing_key=routing_key, ) else: warnings.warn( f"Exchange is not ready to output message: {message}", CluWarning, )
[docs] class AMQPActor(ClickParser, AMQPBaseActor): """An `AMQP actor <.AMQPBaseActor>` that uses a `click parser <.ClickParser>`.""" def __init__(self, *args, **kwargs): AMQPBaseActor.__init__(self, *args, **kwargs) self.task_handlers: dict[str, TaskCallbackType] = {}
[docs] def add_task_handler(self, name: str, callback: TaskCallbackType): """Adds a task handler. Parameters ---------- name The name of the task handler. callback A coroutine to call with the task payload as the only argument. """ self.task_handlers[name] = callback
[docs] async def new_command(self, message: apika.abc.AbstractIncomingMessage, ack=True): """Handles a new command received by the actor.""" headers = message.info().get("headers", {}) is_task = headers.get("task", False) if not is_task: return await AMQPBaseActor.new_command(self, message, ack=ack) body = json.loads(message.body.decode()) task_name = body.pop("task", None) if task_name not in self.task_handlers: self.write(MessageCode.ERROR, {"error": f"Unknown task {task_name!r}"}) return # Add actor to payload. body["__actor__"] = self asyncio.gather(self.task_handlers[task_name](body))
TCPBaseActor_co = TypeVar("TCPBaseActor_co", bound="TCPBaseActor")
[docs] class TCPBaseActor(BaseActor): """A TCP base actor that replies using JSON. This implementation of `.BaseActor` uses TCP as command/reply channel and replies to the user by sending a JSON-valid string. This makes it useful as a "device" actor that is not connected to the central message parsing system but that we still want to accept commands and reply with easily parseable messages. Commands received by this actor must be in the format ``[<uid>] <command string>``, where ``<uid>`` is any integer unique identifier that will be used as ``command_id`` and appended to any reply. This is a base actor that does not include a parser. It must be subclassed with a concrete parser that overrides ``parse_command``. Parameters ---------- name The actor name. host The host where the TCP server will run. port The port of the TCP server. args,kwargs Arguments to be passed to `.BaseActor`. """ def __init__( self, name: str, host: Optional[str] = None, port: Optional[int] = None, *args, **kwargs, ): super().__init__(name, *args, **kwargs) self.host = host or "localhost" self.port = port if self.port is None: raise ValueError("port needs to be specified.") #: Mapping of commander_id to transport self.transports = dict() #: TCPStreamServer: The server to talk to this actor. self.server = TCPStreamServer( self.host, self.port, connection_callback=self.new_user, data_received_callback=self.new_command, ) self.timed_commands = TimedCommandList(self)
[docs] async def start(self: TCPBaseActor_co) -> TCPBaseActor_co: """Starts the TCP server.""" if self.log: # Set the loop exception handler to be handled by the logger. loop = asyncio.get_running_loop() loop.set_exception_handler(self.log.asyncio_exception_handler) await self.server.start() self.log.info(f"running TCP server on {self.host}:{self.port}") self.timed_commands.start() return self
[docs] async def stop(self): """Stops the client connection and running tasks.""" if self.server.is_serving(): self.server.stop() await self.timed_commands.stop()
[docs] async def run_forever(self): """Runs the actor forever, keeping the loop alive.""" await self.server.serve_forever()
[docs] def new_user(self, transport: CustomTransportType): """Assigns userID to new client connection.""" if transport.is_closing(): if hasattr(transport, "user_id"): self.log.debug(f"user {transport.user_id} disconnected.") return self.transports.pop(transport.user_id) user_id = str(uuid.uuid4()) transport.user_id = user_id transport.multiline = False self.transports[user_id] = transport sock = transport.get_extra_info("socket") if sock is not None: peername = sock.getpeername()[0] self.log.debug(f"user {user_id} connected from {peername}.")
[docs] def new_command(self, transport: CustomTransportType, command_str: bytes): """Handles a new command received by the actor.""" print("hellow") commander_id: Optional[int] = getattr(transport, "user_id", None) message: str = command_str.decode().strip() if not message: return match = re.match(r"([0-9]*)\s*(.+)", message) if match is None: self.write("f", {"error": "Cannot match command string text."}) return command_id, command_string = match.groups() if command_id == "": command_id = 0 else: command_id = int(command_id) command_string = command_string.strip() full_command_string = f"{self.name} {command_string}" if not command_string: return try: command = Command( command_string=command_string, commander_id=commander_id, command_id=command_id, consumer_id=self.name, actor=self, transport=transport, ) except CommandError as ee: self.write( "f", { "error": "Could not parse the following as a command: " f"{full_command_string!r}: {ee}" }, ) return self.log.info( f"New command received: {full_command_string!r} " f"(commander_id={commander_id!r}, command_id={command_id!r})" ) return self.parse_command(command)
[docs] def write(self, *args, **kwargs): """Writes a message to user(s) as a JSON. A ``header`` keyword with the ``commander_id`` (i.e., the user id of the transport that sent the command), ``command_id``, ``message_code``, and ``sender`` is added to each message. The payload of the message is written to ``data``. An example of a valid message is .. code-block:: yaml { "header": { "command_id": 0, "commander_id": 1, "message_code": "i", "sender": "test_camera" }, "message": { "camera": { "name": "test_camera", "uid": "DEV_12345" }, "status": { "temperature": 25.0, "cooler": 10.0 } } } Although the messsage is displayed here in multiple lines, it is written as a single line to the TCP clients to facilitate parsing. For a multiline output, which is more human-readable, use the ``multiline`` command. See `~.BaseActor.write` for details on the allowed parameters. """ BaseActor.write(self, *args, **kwargs)
def _write_internal(self, reply: Reply, write_to_log: bool = True): """Write a reply to the users. Parameters ---------- reply The reply object to output to users. write_to_log Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log. """ def send_to_transport(transport, message): if getattr(transport, "multiline", False): message_json = json.dumps(message, sort_keys=False, indent=4) + "\n" else: message_json = json.dumps(message, sort_keys=False) + "\n" transport.write(message_json.encode()) message = reply.message command = cast(Command, reply.command) commander_id = command.commander_id if command else None command_id = command.command_id if command else None transport = command.transport if command else None message_full = {} header = { "header": { "command_id": command_id, "commander_id": commander_id, "message_code": reply.message_code.value, "internal": reply.internal, "sender": self.name, } } message_full.update(header) message_full.update({"data": message}) if reply.broadcast or commander_id is None or transport is None: for transport in self.transports.values(): send_to_transport(transport, message_full) else: send_to_transport(transport, message_full) message_json = json.dumps(message_full, sort_keys=False) + "\n" if self.log and write_to_log: log_reply(self.log, reply.message_code, message_json.strip())
[docs] class JSONActor(ClickParser, TCPBaseActor): """An implementation of `.TCPBaseActor` that uses a Click command parser.""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # Add the multiline command self.parser.add_command(multiline)
[docs] def send_command(self, *args, **kwargs): """Not implemented for `.JSONActor`.""" raise NotImplementedError("JSONActor cannot send commands to other actors.")
@click.command(cls=CluCommand) @click.option("--on/--off", default=True, help="Turn multiline on/off.") async def multiline(command: Command, on: bool): """Set multiline mode for the transport.""" transport = getattr(command, "transport", None) if not transport: return command.fail("The command has no transport.") transport.multiline = on return command.finish("Multiline mode is {}".format("on" if on else "off"))