#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2018-01-16
# @Filename: actor.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import asyncio
import json
import pathlib
import re
import uuid
from typing import Any, Dict, Optional, TypeVar, Union, cast
import aio_pika as apika
import click
from .base import BaseActor, Reply
from .client import AMQPClient
from .command import Command, TimedCommandList
from .exceptions import CommandError
from .parsers import ClickParser, CluCommand
from .protocol import TCPStreamServer
from .tools import log_reply
__all__ = ["AMQPActor", "JSONActor", "AMQPBaseActor", "TCPBaseActor"]
T = TypeVar("T")
PathLike = Union[str, pathlib.Path]
SchemaType = Union[Dict[str, Any], PathLike]
class CustomTransportType(asyncio.Transport):
user_id: Union[str, int]
multiline: bool
[docs]class AMQPBaseActor(AMQPClient, BaseActor):
"""An actor class that uses AMQP message brokering.
This class differs from `~clu.legacy.actor.LegacyActor` in that it uses
an AMQP messaging broker (typically RabbitMQ) to communicate with other
actors in the system, instead of standard TCP sockets. Although the
internals and protocols are different the entry points and behaviour for
both classes should be almost identical.
This class needs to be subclassed with a command parser.
See the documentation for `.AMQPActor` and `.AMQPClient` for additional
parameter information.
Parameters
----------
schema
The path to the datamodel schema for the actor, in JSON Schema format.
If the schema is provided all replies will be validated against it.
An invalid reply will fail and not be emitted. The schema can also be
set when subclassing by setting the class ``schema`` attribute.
"""
def __init__(self, *args, schema: Optional[SchemaType] = None, **kwargs):
AMQPClient.__init__(self, *args, **kwargs)
self.commands_queue = None
self.timed_commands = TimedCommandList(self)
# Not calling BaseClient.__init__() here because we already called
# AMQPClient.__init__.
self.load_schema(schema)
[docs] async def start(self, **kwargs):
"""Starts the connection to the AMQP broker."""
# This sets the replies queue but not a commands one.
await AMQPClient.start(self, **kwargs)
# Binds the commands queue.
self.commands_queue = await self.connection.add_queue(
f"{self.name}_commands",
callback=self.new_command,
bindings=[f"command.{self.name}.#"],
)
self.log.info(
f"commands queue {self.commands_queue.name!r} "
f"bound to {self.connection.connection.url!s}"
)
self.timed_commands.start()
return self
[docs] async def new_command(self, message: apika.IncomingMessage, ack=True):
"""Handles a new command received by the actor."""
if ack:
async with message.process():
headers = message.info()["headers"]
command_body = json.loads(message.body.decode())
else:
headers = message.info()["headers"]
command_body = json.loads(message.body.decode())
commander_id = headers["commander_id"].decode()
command_id = headers["command_id"].decode()
command_string = command_body["command_string"]
try:
command = Command(
command_string,
command_id=command_id,
commander_id=commander_id,
consumer_id=self.name,
actor=self,
loop=self.loop,
)
command.actor = self # Assign the actor
except CommandError as ee:
self.write(
"f",
{
"error": "Could not parse the "
"following as a command: "
f"{command_string!r}. {ee!r}"
},
)
return
return self.parse_command(command)
async def _write_internal(self, reply: Reply):
"""Writes a message to user(s)."""
message = reply.message
message_json = json.dumps(message)
command = reply.command
if command is None or reply.broadcast:
routing_key = "reply.broadcast"
else:
routing_key = f"reply.{command.commander_id}"
commander_id = command.commander_id if command else None
command_id = command.command_id if command else None
headers = {
"message_code": reply.message_code,
"commander_id": commander_id,
"command_id": command_id,
"sender": self.name,
}
await self.connection.exchange.publish(
apika.Message(
message_json.encode(),
content_type="text/json",
headers=headers,
correlation_id=command_id,
),
routing_key=routing_key,
)
if self.log:
log_reply(self.log, reply.message_code, message_json)
[docs]class AMQPActor(ClickParser, AMQPBaseActor):
"""An `AMQP actor <.AMQPBaseActor>` that uses a `click parser <.ClickParser>`."""
pass
TCPBaseActor_co = TypeVar("TCPBaseActor_co", bound="TCPBaseActor")
[docs]class TCPBaseActor(BaseActor):
"""A TCP base actor that replies using JSON.
This implementation of `.BaseActor` uses TCP as command/reply channel and
replies to the user by sending a JSON-valid string. This makes it useful
as a "device" actor that is not connected to the central message parsing
system but that we still want to accept commands and reply with easily
parseable messages.
Commands received by this actor must be in the format
``[<uid>] <command string>``, where ``<uid>`` is any integer unique
identifier that will be used as ``command_id`` and appended to any reply.
This is a base actor that does not include a parser. It must be subclassed with
a concrete parser that overrides ``parse_command``.
Parameters
----------
name
The actor name.
host
The host where the TCP server will run.
port
The port of the TCP server.
args,kwargs
Arguments to be passed to `.BaseActor`.
"""
def __init__(
self,
name: str,
host: Optional[str] = None,
port: Optional[int] = None,
*args,
**kwargs,
):
super().__init__(name, *args, **kwargs)
self.host = host or "localhost"
self.port = port
if self.port is None:
raise ValueError("port needs to be specified.")
#: Mapping of commander_id to transport
self.transports = dict()
#: TCPStreamServer: The server to talk to this actor.
self.server = TCPStreamServer(
self.host,
self.port,
loop=self.loop,
connection_callback=self.new_user,
data_received_callback=self.new_command,
)
self.timed_commands = TimedCommandList(self)
[docs] async def start(self: TCPBaseActor_co) -> TCPBaseActor_co:
"""Starts the TCP server."""
await self.server.start()
self.log.info(f"running TCP server on {self.host}:{self.port}")
self.timed_commands.start()
return self
[docs] async def stop(self):
"""Stops the client connection and running tasks."""
if self.server.is_serving():
self.server.stop()
await self.timed_commands.stop()
[docs] async def run_forever(self):
"""Runs the actor forever, keeping the loop alive."""
await self.server.serve_forever()
[docs] def new_user(self, transport: CustomTransportType):
"""Assigns userID to new client connection."""
if transport.is_closing():
if hasattr(transport, "user_id"):
self.log.debug(f"user {transport.user_id} disconnected.")
return self.transports.pop(transport.user_id)
user_id = str(uuid.uuid4())
transport.user_id = user_id
transport.multiline = False
self.transports[user_id] = transport
sock = transport.get_extra_info("socket")
if sock is not None:
peername = sock.getpeername()[0]
self.log.debug(f"user {user_id} connected from {peername}.")
[docs] def new_command(self, transport: CustomTransportType, command_str: bytes):
"""Handles a new command received by the actor."""
commander_id: Optional[int] = getattr(transport, "user_id", None)
message: str = command_str.decode().strip()
if not message:
return
command_id, command_string = re.match(r"([0-9]*)\s*(.+)", message).groups()
if command_id == "":
command_id = 0
else:
command_id = int(command_id)
command_string = command_string.strip()
if not command_string:
return
try:
command = Command(
command_string=command_string,
commander_id=commander_id,
command_id=command_id,
consumer_id=self.name,
actor=self,
loop=self.loop,
transport=transport,
)
except CommandError as ee:
self.write(
"f", {"text": f"Could not parse the following as a command: {ee!r}"}
)
return
return self.parse_command(command)
[docs] def write(self, *args, **kwargs):
"""Writes a message to user(s) as a JSON.
A ``header`` keyword with the ``commander_id`` (i.e., the user id of
the transport that sent the command), ``command_id``, ``message_code``,
and ``sender`` is added to each message. The payload of the message
is written to ``data``. An example of a valid message is
.. code-block:: yaml
{
"header": {
"command_id": 0,
"commander_id": 1,
"message_code": "i",
"sender": "test_camera"
},
"message": {
"camera": {
"name": "test_camera",
"uid": "DEV_12345"
},
"status": {
"temperature": 25.0,
"cooler": 10.0
}
}
}
Although the messsage is displayed here in multiple lines, it is
written as a single line to the TCP clients to facilitate parsing.
For a multiline output, which is more human-readable, use the
``multiline`` command.
See `~.BaseActor.write` for details on the allowed parameters.
"""
BaseActor.write(self, *args, **kwargs)
def _write_internal(self, reply: Reply):
"""Write a reply to the users."""
def send_to_transport(transport, message):
if getattr(transport, "multiline", False):
message_json = json.dumps(message, sort_keys=False, indent=4) + "\n"
else:
message_json = json.dumps(message, sort_keys=False) + "\n"
transport.write(message_json.encode())
message = reply.message
command = cast(Command, reply.command)
commander_id = command.commander_id if command else None
command_id = command.command_id if command else None
transport = command.transport if command else None
message_full = {}
header = {
"header": {
"command_id": command_id,
"commander_id": commander_id,
"message_code": reply.message_code,
"sender": self.name,
}
}
message_full.update(header)
message_full.update({"data": message})
if reply.broadcast or commander_id is None or transport is None:
for transport in self.transports.values():
send_to_transport(transport, message_full)
else:
send_to_transport(transport, message_full)
message_json = json.dumps(message_full, sort_keys=False) + "\n"
if self.log:
log_reply(self.log, reply.message_code, message_json.strip())
[docs]class JSONActor(ClickParser, TCPBaseActor):
"""An implementation of `.TCPBaseActor` that uses a Click command parser."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Add the multiline command
self.parser.add_command(multiline)
[docs] def send_command(self, *args, **kwargs):
"""Not implemented for `.JSONActor`."""
raise NotImplementedError("JSONActor cannot send commands to other actors.")
@click.command(cls=CluCommand)
@click.option("--on/--off", default=True, help="Turn multiline on/off.")
async def multiline(command: Command, on: bool):
"""Set multiline mode for the transport."""
transport: CustomTransportType = getattr(command, "transport", None)
if not transport:
return command.fail("The command has no transport.")
transport.multiline = on
return command.finish("Multiline mode is {}".format("on" if on else "off"))