Source code for clu.client

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2020-07-30
# @Filename: client.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import json
import logging
import pathlib
import uuid

from typing import Any, Dict, List, Optional, Union

import aio_pika as apika

from sdsstools.logger import SDSSLogger

from .base import BaseClient
from .command import Command
from .model import ModelSet
from .protocol import TopicListener
from .tools import CommandStatus


__all__ = ["AMQPClient", "AMQPReply"]


PathLike = Union[str, pathlib.Path]


[docs]class AMQPReply(object): """Wrapper for an `~aio_pika.IncomingMessage` that expands and decodes it. Parameters ---------- message The message that contains the reply. log A message logger. Attributes ---------- is_valid Whether the message is valid and correctly parsed. body The body of the message, as a JSON dictionary. info The info dictionary. headers The headers of the message, decoded if they are bytes. message_code The message code. sender The name of the actor that sends the reply. command_id The command ID. """ def __init__( self, message: apika.IncomingMessage, log: Optional[logging.Logger] = None, ): self.message = message self.log = log self.is_valid = True self.body = None # Acknowledges receipt of message message.ack() self.info: Dict[Any, Any] = message.info() self.headers = self.info["headers"] for key in self.headers: if isinstance(self.headers[key], bytes): self.headers[key] = self.headers[key].decode() self.message_code = self.headers.get("message_code", None) if self.message_code is None: self.is_valid = False if self.log: self.log.warning(f"received message without message_code: {message}") return self.sender = self.headers.get("sender", None) if self.sender is None and self.log: self.log.warning(f"received message without sender: {message}") self.command_id = message.correlation_id command_id_header = self.headers.get("command_id", None) if command_id_header and command_id_header != self.command_id: if self.log: self.log.error( f"mismatch between message " f"correlation_id={self.command_id} " f"and header command_id={command_id_header} " f"in message {message}" ) self.is_valid = False return self.body = json.loads(self.message.body.decode())
[docs]class AMQPClient(BaseClient): """Defines a new client based on the AMQP standard. To start a new client first instantiate the class and then run `.start` as a coroutine. Note that `.start` does not block so you will need to use asyncio's ``run_forever`` or a similar system :: >>> loop = asyncio.get_event_loop() >>> client = await AMQPClient('my_client', host='localhost').start() >>> loop.run_forever() Parameters ---------- name The name of the client. url RFC3986 formatted broker address. When used, the other connection keyword arguments are ignored. user The user to connect to the AMQP broker. Defaults to ``guest``. password The password for the user. Defaults to ``guest``. host The host where the AMQP message broker runs. Defaults to ``localhost``. virtualhost Virtualhost parameter. ``'/'`` by default. port The port on which the AMQP broker is running. Defaults to 5672. ssl Whether to use TLS/SSL connection. version The version of the client. loop The event loop. If `None`, the current event loop will be used. log_dir The directory where to store the logs. Defaults to ``$HOME/logs/<name>`` where ``<name>`` is the name of the actor. log A `~logging.Logger` instance to be used for logging instead of creating a new one. parser A click command parser that is a subclass of `~clu.parser.CluGroup`. If `None`, the active parser will be used. models A list of actor models whose schemas will be monitored. """ __EXCHANGE_NAME__ = "sdss_exchange" connection = None def __init__( self, name: str, url: Optional[str] = None, user: str = "guest", password: str = "guest", host: str = "localhost", port: int = 5672, virtualhost: str = "/", ssl: bool = False, version: Optional[str] = None, loop: Optional[asyncio.AbstractEventLoop] = None, log_dir: Optional[PathLike] = None, log: Optional[SDSSLogger] = None, models: List[str] = [], ): super().__init__(name, version=version, loop=loop, log_dir=log_dir, log=log) self.replies_queue = None # Creates the connection to the AMQP broker self.connection = TopicListener( url=url, user=user, password=password, host=host, port=port, ssl=ssl, virtualhost=virtualhost, ) #: dict: External commands currently running. self.running_commands = {} self.models = ModelSet(self, actors=models, raise_exception=False, log=self.log) def __repr__(self): if not self.connection or self.connection.connection is None: url = "disconnected" else: url = str(self.connection.connection.url) return f"<{str(self)} (name={self.name!r}, {url}>"
[docs] async def start(self, exchange_name: str = __EXCHANGE_NAME__): """Starts the connection to the AMQP broker.""" # Starts the connection and creates the exchange await self.connection.connect(exchange_name) # Binds the replies queue. self.replies_queue = await self.connection.add_queue( f"{self.name}_replies", callback=self.handle_reply, bindings=["reply.#"], ) self.log.info( f"replies queue {self.replies_queue.name!r} " f"bound to {self.connection.connection.url!s}" ) # Initialises the models. await self.models.load_schemas() return self
[docs] async def stop(self): """Cancels queues and closes the connection.""" await self.connection.stop()
[docs] async def run_forever(self): """Runs the event loop forever.""" while not self.connection.connection.is_closed: await asyncio.sleep(1)
[docs] async def handle_reply(self, message: apika.IncomingMessage) -> AMQPReply: """Handles a reply received from the exchange. Creates a new instance of `.AMQPReply` from the ``message``. If the reply is valid it updates any running command. Parameters ---------- message The message received. Returns ------- reply The `.AMQPReply` object created from the message. """ reply = AMQPReply(message, log=self.log) if not reply.is_valid: self.log.error("Invalid message received.") return reply # Ignores message from self, because actors are also clients and they # receive their own messages. The exception is when the commander is also the # actor (an actor sent a command to itself). commander_id = reply.headers["commander_id"] if reply.sender and self.name == reply.sender: if commander_id and commander_id != self.name: return reply # Update the models if self.models and reply.sender in self.models: self.models[reply.sender].update_model(reply.body) # If the command is running we check if the message code indicates # the command is done and, if so, sets the result in the Future. # Also, add the reply to the command list of replies. if reply.command_id in self.running_commands: self.running_commands[reply.command_id].replies.append(reply) status = CommandStatus.code_to_status(reply.message_code) if status.is_done: command = self.running_commands.pop(reply.command_id) command.set_status(status) if not command.done(): command.set_result(command) return reply
[docs] async def send_command( self, consumer: str, command_string: str, command_id: Optional[str] = None, ): """Commands another actor over its RCP queue. Parameters ---------- consumer The actor we are commanding. command_string The command string that will be parsed by the remote actor. command_id The command ID associated with this command. If empty, an unique identifier will be attached. """ command_id = command_id or str(uuid.uuid4()) # Creates and registers a command. command = Command( command_string=command_string, command_id=command_id, commander_id=self.name, consumer_id=consumer, actor=None, loop=self.loop, ) self.running_commands[command_id] = command headers = {"command_id": command_id, "commander_id": self.name} # The routing key has the topic command and the name of # the commanded actor. routing_key = f"command.{consumer}" message_body = {"command_string": command_string} try: await self.connection.exchange.publish( apika.Message( json.dumps(message_body).encode(), content_type="text/json", headers=headers, correlation_id=command_id, reply_to=self.replies_queue.name, ), routing_key=routing_key, ) except (apika.exceptions.DeliveryError, apika.exceptions.PublishError): # The consumer (actor) did not reply. This usually means that the actor # is not connected. We fake a reply from that actor saying so. That will # be received by handle_reply which will fail the current command. error_msg = dict(error=f"Failed routing message to consumer {consumer!r}.") headers.update({"message_code": "f", "sender": consumer}) await self.connection.exchange.publish( apika.Message( json.dumps(error_msg).encode(), content_type="text/json", headers=headers, correlation_id=command_id, ), routing_key=f"reply.{command.commander_id}", ) return command