The AMQP actor#

The new-style actor that CLU introduces uses the AMQP protocol, of which RabbitMQ is the best known implementation, to communicate with other actors and clients. CLU supports AMQP 0-9-1; AMQP 1.0 is not supported at the moment.

In the following sections we’ll delve deeper into how the protocol is implemented and some of the code gritty details.

Why a new messaging protocol for SDSS?#

Say whatever you want about it, the current SDSS message passing protocol based on Tron, opscore, and actorcore is stable and robust. So, why should we replace it? Here is a list of reasons:

  • It reinvents the wheel. Ok, in all honesty Tron and opscore were written when wheel were still not completely circular, but the truth is that nowadays there are more robust, standard, and better documented technologies out there for message passing.

  • We can remove the need for a central hub product by relying in open-source message brokers such as RabbitMQ.

  • Tron and opscore are Python 2 and it’s unclear the amount of effort that would be needed to convert them to Python 3.

  • While there is some documentation for Tron and opscore, and the code is well written, it’s also cumbersome and difficult to modify by people that didn’t write it. It’s ultimately non-maintainable.

  • The opsctore/actorkeys datamodel is custom-built and extremely difficult to maintain. Standard solutions such as JSON with a JSON schema validator should be preferred.

  • asyncio provides an asynchronous API that is cleaner and easier to code than using threads. It is also more readable and less convoluted than twisted and it’s a Python core library with very active development.

  • CLU uses click for parsing commands, providing a well-defined, easy to use parser.

How does the new actor work?#

To completely understand how the code works you need a basic grasp of the AMQP protocol. The RabbitMQ tutorial is a good starting point, but let’s quickly go over some of the main concepts:

  • A program that sends messages is a producer. A message is any form of communication that can be encoded as bytes, for example a plaintext string or a JSON object.

  • A program that receives a message is a consumer. Actors are both producers and consumers.

  • Messages are sent to exchanges. Exchanges can be of different types, which enables various message brokering behaviours.

  • A queue is a buffer that holds messages. Queues subscribe to exchanges to get messages from them.

With all of that in mind, here is a conceptual diagram of how communication happens between two actors:


_images/AMQP_Diagram.png

This may look daunting so let’s break it into smaller pieces:

  • We have two actors that are both producers (i.e., can send messages) and consumers (can receive messages). Messages can either be commands or replies.

  • There is a global exchange, sdss_exchange used both to send commands and replies. The exchange is of type topic, which allow queues to subscribe to them and receive only messages that match a certain routing key.

  • Each actor has a queue for commands and another for replies. The command queue (<actor>_commands) only gets messages whose routing key contains the key command and the name of the actor, and ignores all other command messages. The replies queue, <actor>_replies, receives all messages with topic reply in the routing key.

More actors can be added to this picture, but they all share the same basic components. This enables us to create a slightly modified RCP pattern in which the callback queue is fixed and we use a topic exchange instead of a direct one.

The protocol#

Here we will formalise the AMQP actor protocol. This serves two purposes: document how CLU works internally, and enable other base actor classes to be developed (e.g., a thread-based actor).

Message brokering#

We use the AMQP standard for message queueing. The preferred implementation of the standard is RabbitMQ but any other broker that complies with the AMQP standard is acceptable.

Communication happens through a single topic exchange called sdss_exchange. Actors and clients are both consumers and producers. There is not formal difference between an actor and a client (e.g., a command line interpreter or a GUI) although the latter is not expected to receive commands, only emit them.

Each actor receives messages from two queues:

  • The commands queue must be called <actor>_commands, where <actor> is the name of the actor. It subscribes to the exchange and listens to messages with command and <actor> in their routing key. Clients that are not actors do not need to have a commands queue.

  • The replies queue is called <actor>_replies. It subscribes to exchange and listens to message with a reply routing key.

Queues must be exclusive and both exchanges and queues must auto-delete when the connection is closed.

Commands#

A command is a message that must conform to the following parameters:

  • Its routing key is command.<consumer> where <consumer> is the name of the actor that will receive and execute the command. For example, if actor1 is commanding actor2, the routing key would be command.actor2.

  • The header of the message must contain two keys: commander_id set to the name of the actor sending the command, and command_id set to a unique identifier for the command. UUID4 is recommended for command IDs.

  • The correlation_id of the message must be set to the same value as command_id.

  • The content type of the message must be text/json.

  • The body of the message must be a JSON string with a simple key called command_string whose value is the command string to be parsed by the remote actor.

Let’s guess that actor1 wants to send the command status --verbose to actor2 and that we create a unique identifier command_id='7b93d8d5-11c1-4c08-82a8-56842e1a86c4' for it. The information to be sent is

{headers: {command_id: '7b93d8d5-11c1-4c08-82a8-56842e1a86c4',
           commander_id: 'actor1'},
 payload: {command_string: 'status --verbose'},
 content-type: 'text/json',
 correlation_id: '7b93d8d5-11c1-4c08-82a8-56842e1a86c4',
 routing_key: 'command.actor2'}

Replies#

The format for a reply to a command is as follows:

  • The routing key must be reply.<producer> where <producer> is the actor or client that sent the command we are replying to. If an actor wants to reply with a broadcast to all the actors connected to the exchanged, it may do so with the routing key reply.broadcast. This is technically not necessary since all actors and clients receive all message with topic reply, regardless of whether they have a secondary topic, but it allows finer filtering.

  • The header of the message must contain the keywords command_id set to the command ID of the command we are replying to, commander_id set to the name of the commander actor, sender set to the name of the actor replying, and message_code with the message type code, which must be one of these. commander_id and command_id can be null, if the reply is unrequested (e.g., a status command that runs on a timer).

  • The correlation_id must be the same as the command_id (None for broadcasts).

  • The content type of the message must be text/json.

  • The body of the message must be a JSON string with a series of keyword-value pairs that conform to the keyword model of the actor that is replying.

In our example above, we commanded actor2 with status --verbose. Let’s imagine that the actor run that command and now wants to reply indicating that the lamps are on and the flat field screen is closed. The reply would look like

{headers: {message_code: ':',
           command_id: '7b93d8d5-11c1-4c08-82a8-56842e1a86c4',
           commander_id: 'actor1',
           sender: 'actor2'},
 payload: {lamps_on: true,
           ffs: 'closed'},
 content-type: 'text/json',
 correlation_id: '7b93d8d5-11c1-4c08-82a8-56842e1a86c4',
 routing_key: 'reply.actor1'}

The internals#

In this section we’ll have a quick look at how CLU implements the protocol we just defined. This is probably only useful to know if you are planning to override the AMQPActor class significantly.

For handling the connection to the RabbitMQ server while allowing asynchronous programming we use aio_pika, a wrapper around the Pika with support for asyncio.

When an AMQPActor is instantiated and we await start, a new instance of TopicListener is created, connects to the RabbitMQ server, and creates a channel and the topic exchange (or connects to it, if it already exists). We then add the commands_<actor> and replies_<actor> queues. When a new message is received by the command queue a callback to new_command is scheduled. When a new reply is received, we deal with it in handle_reply and use it to update the internal keyword model.

When a new message is received by new_command it unpacks the command ID and command string and creates a new instance of Command which is then passed to parse_command. This method is the one that invokes the click parser with the command string and calls the command function.

Internally, the top-level class in BaseClient, which defines the basic functionality for an actor/client (configuration parsing, logging, etc). The AMQPClient class implements a client based on an AMQP connection. The client provides a connection to the exchange, a replies queue, and methods to handle replies and issue commands. BaseActor expands BaseClient with a command parser and placeholders for handling incoming commands and writing but to the users. Both AMQPActor and LegacyActor subclass from BaseActor and implement their own specific protocols. AMQPActor also subclasses from AMQPClient given that reply parsing and command issuing are similar. The following diagram shows the inheritance tree:

digraph inheritance5771e858f1 { bgcolor=transparent; rankdir=LR; size="8.0, 12.0"; "asyncio.transports.BaseTransport" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="Base class for transports."]; "asyncio.transports.ReadTransport" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="Interface for read-only transports."]; "asyncio.transports.BaseTransport" -> "asyncio.transports.ReadTransport" [arrowsize=0.5,style="setlinewidth(0.5)"]; "asyncio.transports.Transport" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="Interface representing a bidirectional transport."]; "asyncio.transports.ReadTransport" -> "asyncio.transports.Transport" [arrowsize=0.5,style="setlinewidth(0.5)"]; "asyncio.transports.WriteTransport" -> "asyncio.transports.Transport" [arrowsize=0.5,style="setlinewidth(0.5)"]; "asyncio.transports.WriteTransport" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",tooltip="Interface for write-only transports."]; "asyncio.transports.BaseTransport" -> "asyncio.transports.WriteTransport" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.AMQPActor" [URL="api.html#clu.actor.AMQPActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="An `AMQP actor <.AMQPBaseActor>` that uses a `click parser <.ClickParser>`."]; "clu.parsers.click.ClickParser" -> "clu.actor.AMQPActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.AMQPBaseActor" -> "clu.actor.AMQPActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.AMQPBaseActor" [URL="api.html#clu.actor.AMQPBaseActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="An actor class that uses AMQP message brokering."]; "clu.client.AMQPClient" -> "clu.actor.AMQPBaseActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.base.BaseActor" -> "clu.actor.AMQPBaseActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.CustomTransportType" [fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled"]; "asyncio.transports.Transport" -> "clu.actor.CustomTransportType" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.JSONActor" [URL="api.html#clu.actor.JSONActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="An implementation of `.TCPBaseActor` that uses a Click command parser."]; "clu.parsers.click.ClickParser" -> "clu.actor.JSONActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.TCPBaseActor" -> "clu.actor.JSONActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.actor.TCPBaseActor" [URL="api.html#clu.actor.TCPBaseActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A TCP base actor that replies using JSON."]; "clu.base.BaseActor" -> "clu.actor.TCPBaseActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.base.BaseActor" [URL="api.html#clu.base.BaseActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="An actor based on `asyncio`."]; "clu.base.BaseClient" -> "clu.base.BaseActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.base.BaseClient" [URL="api.html#clu.base.BaseClient",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A base client that can be used for listening or for an actor."]; "clu.client.AMQPClient" [URL="api.html#clu.client.AMQPClient",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Defines a new client based on the AMQP standard."]; "clu.base.BaseClient" -> "clu.client.AMQPClient" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.client.AMQPReply" [URL="api.html#clu.client.AMQPReply",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="Wrapper for an `~aio_pika.IncomingMessage` that expands and decodes it."]; "clu.legacy.actor.BaseLegacyActor" [URL="api.html#clu.legacy.actor.BaseLegacyActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="An actor that provides compatibility with the SDSS opscore protocol."]; "clu.base.BaseActor" -> "clu.legacy.actor.BaseLegacyActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.legacy.actor.LegacyActor" [URL="api.html#clu.legacy.actor.LegacyActor",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A legacy actor that uses the `.ClickParser`."]; "clu.parsers.click.ClickParser" -> "clu.legacy.actor.LegacyActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.legacy.actor.BaseLegacyActor" -> "clu.legacy.actor.LegacyActor" [arrowsize=0.5,style="setlinewidth(0.5)"]; "clu.parsers.click.ClickParser" [URL="api.html#clu.parsers.click.ClickParser",fillcolor=white,fontname="Vera Sans, DejaVu Sans, Liberation Sans, Arial, Helvetica, sans",fontsize=10,height=0.25,shape=box,style="setlinewidth(0.5),filled",target="_top",tooltip="A command parser that uses Click at its base."]; }