Getting started with CLU#

Introduction#

In this section we provide a quick introduction to CLU. We assume that you are comfortable with asynchronous programming and asyncio. Here we won’t dive into the internal details of how messages are routed and the messaging protocol but we make use of standard messaging jargon such as consumer or exchanges. If you are not familiar with those concepts, the RabbitMQ tutorial is a good starting place.

We’ll begin by defining what an actor is: and actor is a piece of software that performs a well defined task (control a CCD camera, interface with a database) and is a server that receives commands and replies with a series of keywords. Let’s go over each one of those concepts:

  • A server means that the actor provides a certain functionality for a series of clients, which can be users or other actors. In this case, server also means that the actor is part of a network and is able to communicate with the clients via a well-defined protocol. We often refer to the actor as a consumer and the clients that command it as commanders or producers.

  • A command is an instruction that the actor receives to do a specific task. It must comply with a previously defined schema. In general, commands are formed by a verb, which indicates the task to perform, and series of mandatory arguments and optional parameters. Before it can be understood by the actor, a commands needs to be parsed.

  • In most cases, when an actor carries out a command, it needs to output some information back to commander. This is done via pairs of keywords with an associated values. The keywords and values need to conform with a model for the given actor.

We will expand on these concepts in following sections.

Running an actor#

A new actor can be created by simply instantiating the AMQPActor class

from clu import AMQPActor
my_actor = AMQPActor('my_actor', 'guest', 'localhost', version='0.1.0')

This will create the instance but will not start the actor yet. For that you need to await the coroutine start

await my_actor.start()

which will create the connection to RabbitMQ, set up the exchanges and queues, and get the actor ready to receive commands. Note that awaiting start does not block the event loop so you will need to run the loop forever. A simple implementation is

import asyncio
from clu import AMQPActor

def main(loop):
    # run() returns the actor so we can declare and run the actor more compactly.
    my_actor = await AMQPActor('my_actor', user='guest', password='guest',
                               host='localhost', version='0.1.0', loop=loop).start()

loop = asyncio.get_event_loop()
loop.create_task(main(loop))
loop.run_forever()

In these examples we have used the new-style AMQPActor class, but it’s trivial to replace it with the legacy actor class LegacyActor. The parameters to start a LegacyActor are the same with the exception that we pass the hostname and port where the actor will be serving, and we can provide the tron_host and tron_port to connect to an instance of Tron. We’ll talk more about legacy actor in a following section.

Normally an actor will run on the background as a daemon or service. There are multiple ways to accomplish this, from setting up the process as a systemd service to using nohup or one of the multiple solutions to create a Unix daemon.

CLU doesn’t provide a specific way to run an actor as a daemon, but a simple solution is to use daemonocle for this purpose. Here’s a very simple example

import asyncio
from functools import wraps
import click
from clu import AMQPActor
from daemonocle import DaemonCLI

def cli_coro(f):
    """Decorator function that allows defining coroutines with click."""

    @wraps(f)
    def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return loop.run_until_complete(f(*args, **kwargs))

    return wrapper


@click.command(cls=DaemonCLI, daemon_params={'pid_file': '/var/tmp/myactor.pid'})
@cli_coro
async def main():

    actor = AMQPActor('myactor')  # Assuming RabbitMQ runs on localhost
    await actor.start()
    await actor.run_forever()


if __name__ == '__main__':
    main()

Note that the only difference with respect to the standard daemonocle example is that we need to make the click command a coroutine. This is achieved with the custom cli_coro wrapper.

For more complex CLI, especially if you are implementing click groups with multiple commands, check the sdsstools DaemonGroup class.

Configuration files#

In general the parameters to start a new actor are stored in a configuration file. We can instantiate a new actor from it with the from_config classmethod

actor = Actor.from_config('~/config_files/actor.yaml')

The parameter passed to from_config must be a YAML file with the configuration. If the configuration file has a section called actor, that subsection will be used. Alternatively, a dictionary with the configuration already parsed can be passed to from_config. The parameter names in the configuration files must be the same as those of the arguments and keyword arguments used to instantiate AMQPActor. The following is an example of a valid configuration file

actor:
    name: 'jaeger'
    user: 'guest'
    host: '127.0.0.1'
    version: '0.2.0dev'
    log_dir: '/data/logs/actors/jaeger'

The behaviour for LegacyActor is the same but note that the parameters for tron must be grouped under its own subsection

actor:
    name: 'jaeger'
    host: '127.0.0.1'
    port: 19990
    version: '0.2.0dev'
    tron:
        host: '127.0.0.1'
        port: 6093
        models: ['tcc']
    log_dir: '/data/logs/actors/jaeger'

Overriding from_config when subclassing the actor can be a bit tricky if you have added new parameters. Here is an example of how to correctly do so

class JaegerActor(clu.LegacyActor):

    def __init__(self, fps, *args, **kwargs):

        self.fps = fps

        super().__init__(*args, **kwargs)

    @classmethod
    def from_config(cls, config, fps):

        return super().from_config(config, fps)

Note that the new argument fps must be the first argument in __init__.

The logger#

When an actor gets instantiated, a new logger is attached. The path to the file logger defaults to /data/logs/actors/<name>/<name>.log where <name> is the actor name, although this can be changed via the log_dir parameter. The file log rotates at midnight UTC or when a new instance of the logger is created. The logger name is actor:<name>.

The logger provides a few niceties, such as coloured console output and exception traceback formatting. It also captures the warnings issues with the warnings module.

It is possible to pass your own Logger instance to the actor via the log parameter, or set log=False to disable logging.

Defining commands#

When the actor receives a new command via a queue (new-style actor) or socket (legacy actor), it is parsed and a Command object is created to track its status and completion. Then the command function that matches the parsed command is called with the Command instance and the appropriate parameters. It may sound a bit confusing that a command can be the string received from commander, the instance of Command used to keep track of its completion, and the function that executes the command task, but there are historical reasons to keep this nomenclature and in most cases it’s obvious from the context to which one we are referring.

Ultimately the whole process of receiving a command string, parsing it, creating a Command instance, and calling the command function happens internally and the user does not need to worry about it unless you’re planning to create your own parser. Let’s see a very simple example of a command that is always available, ping

@command_parser.command()
def ping(command):
    """Pings the actor."""

    command.write(text='Pong')
    command.set_status(command.status.DONE)

    return

We’ll worry about what @command_parser.command() means later. For now lets focus on the function. ping() gets called when the parser receives the ping string. The function always receives a Command instance as the first argument, followed by other arguments or parameters the command accepts (none for ping). In this case the command function simply replies with the keyword text set to 'Pong' and then marks the status as DONE. This is an easy way of knowing if the actor is running and alive.

The command parser#

So, what was that weird decorator wrapping the command function? CLU uses click as its default command parser. If you’re not familiar with that package you should go and read their documentation since you’ll need it to define new commands.

The entry point for all commands is the command_parser group. Any command added to command_parser will become an actor command. Let’s add a simple status command that accepts an optional flag --verbose

import click
from clu import command_parser

@command_parser.command()
@click.option('--verbose', is_flag=True, help='outputs extra information')
def status(command, verbose=False):
    """Returns the status."""

    command.write(text='Everything is ok.')

    if verbose:
        command.write(text='Some extra information.')

    command.set_status(command.status.DONE)

    return

We’ll talk about some advanced features of the parser in Command parsing.

The help command#

By default, the command set comes with a help command that outputs the usage of the available commands. As long as you document your commands and options correctly (see the relevant section in the click documentation) the usage is autogenerated. For example, in a legacy style actor, if you send the command help the output will be something like

0 1 w text="Usage: COMMAND [ARGS]..."
0 1 w text=""
0 1 w text="Options:"
0 1 w text="  --help  Show this message and exit."
0 1 w text=""
0 1 w text="Commands:"
0 1 w text="  goto   Sends a positioner to a given (alpha, beta) position."
0 1 w text="  help   Shows the help."
0 1 w text="  ping   Pings the actor."

Timing out commands#

Sometimes you want your command to timeout after a certain amount of time if it has not completed. You can achieve that with the timeout decorator

from clu.parser import timeout

@command_parser.command()
@timeout(10)
def my_command(command):
    """A command that timeouts after 10 seconds."""

    ...

The command status#

You can access and modify the status of a Command instance via the status property. Statuses must be values of the CommandStatus enumeration. They can also be set as a string. You can change the status of a command by doing

command.status = CommandStatus.DONE

or via the set_status method, which also allows you to set a message

command.set_status('FAILED', message={'text': 'this command failed'})

When a command string is parsed and the command function called, the command is set to RUNNING. Any time a command status changed, a reply is send to the command with the message code associated with the status. A command should always successfully be DONE or set to one of the various FAILED_STATES. Command instances are also Futures and their result is set when the command is done (successfully or not).

Sometimes it’s necessary to wait until a command has reached a certain status before doing something else. This can be accomplished with the wait_for_status method

# Wait until command has been cancelled
await command.wait_for_status(CommandStatus.CANCELLED)

Replying to the commander#

One of the most frequent tasks the command needs to do is to reply to the commander with a series of keywords and values. This is done by using the write method

command.write(message_code='i', message={'lamp_on': True, 'ffs': 'closed'})

In this case we are outputting two keywords, lamp_on and ffs, the first with a boolean value and the second with a string. The first parameter, mesage_code, indicates the typo of message and must be:

  • d for a debug message.

  • i for an info message.

  • w for a warning message.

  • e for an error message (does not fail the command).

  • f for a message that accompanies to a failed command.

  • : for a message that accompanies to a successfully done command.

All the commands are output in the same way regardless of the message code. We will talk more about the reply format in following sections.

It is also possible to call write with keywords in the form of parameters. The following command is equivalent to the previous example

command.write('i', lamp_on=True, ffs='closed')

By default the command will reply only to the commander, but in some cases we want to broadcast a message to all the clients in the actor network. This is useful for status commands or internal periodic commands. In that case with can pass a broadcast=True to write.

Commanding other actors#

Frequently one of our commands requires commanding a different actor and waiting for it to complete

external_command = my_command.actor.send_command('actor2', 'goto ra=100 dec=20')

In this case our command, my_command, is commanding actor2 and sending it the command string goto ra=100 dec=20. Note that the returned external_command is itself a Command instance and as such a Future. We can wait until the command is done

# Block until external_command is done
await external_command

# Do something else
...

The keyword model#

CLU uses the JSON Schema Draft 7 specification to define and validate data models for the actors. Each actor must be accompanied by a JSON Schema-compatible file with a definition of the actor model. An example of a model definition file for an actor with two keywords, text and temperature, the first having to be a string and the second a float, would look something like

{
"type" : "object",
"properties" : {
    "text" : {"type" : "string"},
    "temperature" : {"type" : "number"}
    }
}

The name of the file must be <actor>.json with <actor> being the name of the actor. To load a series of models when the actor begins you need to do something like

my_actor = await AMQPActor('my_actor', 'guest', 'localhost',
                           schema='my_actor.json', models=['sop', 'guider'],
                           version='0.1.0', loop=loop).start()

This will load and keep track of the models for the sop and guider actors. The model for the own actor, my_actor, is loaded from the JSON Schema my_actor.json. If one or more of the model schemas cannot be found, a warning will be issued.

Models are accessible as a ModelSet object via the models attribute. A ModelSet is just a dictionary of Model instances, one for each of the models being tracked. When a new reply is received from an actor, the body of the reply is automatically parsed and validated against the model schema, and the model itself is updated.

>>> my_actor.models['guider']
{
    "text": "Pong.",
    "guideState": "on",
    "axisError": [0.1, 0.04, 1.2]
    ...
}
>>> type(my_actor.models['guider']['guideState'])
clu.model.Property
>>> my_actor.models['guider']['guideState']
<Property (guideState): 'on'>
>>> my_actor.models['guider']['guideState'].value
'on'

It is possible to set callbacks that will be invoked when the model is updated or when a specific property changes.

Validating schemas#

To check whether the actor schema you are writing is JSON Schema-compliant you can use the Model.check_schema staticmethod

>>> from clu.model import Model
>>> Model.check_schema('~/my_models/my_actor.json', is_file=True)
True

Legacy actors#

Actors that derive from LegacyActor track their models via the TronConnection instance. In this case the model schema needs to be defined as part of the actorkeys and the parsing and validation of the keys is done using the opscore machinery that has been integrated into CLU. That said, the bahviour of the TronModel instances that can be accessed via actor.models is the same as the one described above for Model, including the access format and the ability to set callbacks.

Devices#

A Device provides a TCP socket to a remote server and a way of handling messages from it. Devices are usually small pieces of hardware that do not need a dedicated actor and that have a limited command set. For example, a telescope control actor can have multiple devices (mirror actuators, lamps, flat field screens), each one of them behind a terminal server.

Devices are usually instantiated and started with the actor by subclassing AMQPActor or LegacyActor, which is quite straightforward to do

from clu import AMQPActor, Device

class MyActor(AMQPActor):

    def __init__(self, *args, device_host, device_port, **kwargs):

        super().__init__(*args, **kwargs)

        self.device = Device(device_host, device_port,
                             callback=self.process_device)

    async def run():

        await self.device.start()
        await super().start()

    async def process_device(self, line):

        # Here we do something with the line received
        # from the device.

        return

We can write to the device via the Device.write method. The callback passed to the Device must be a coroutine that handles each line received from the actor.

It is possible, in principle, to connect directly to another legacy actor using a device (as long as the actor accepts multiple connections) and handle the commands and replies directly. This is strongly discouraged since it contravenes the legacy protocol; all communication to and from other legacy actors must happen through Tron.