Examples#

hello_actor.py#

A legacy-style actor with a simple command that says hello to the user. To test it, run the actor code, open a telnet terminal as telnet localhost 9999, and write say-hello John.

import asyncio
import click
from clu import LegacyActor, command_parser

@command_parser.command()
@click.argument('NAME', type=str)
def say_hello(command, name):
    command.write('i', text=f'Hi {name}!')
    command.finish()

class HelloActor(LegacyActor):
    def __init__(self):
        super().__init__(name='hello_actor',
                         host='localhost', port=9999,
                         version='0.1.0')

async def run_actor():
    hello_actor = await HelloActor().start()
    await hello_actor.run_forever()

asyncio.run(run_actor())

hello_actor_amqp.py#

The same actor that greets you but using the AMQPActor. Note that the code is identical except the subclassing and the parameters needed to start the connection to RabbitMQ. This allows to transition an actor from legacy to AMQP with minimal changes.

import asyncio
import click
from clu import AMQPActor, command_parser

@command_parser.command()
@click.argument('NAME', type=str)
def say_hello(command, name):
    command.write('i', text=f'Hi {name}!')
    command.finish()

class HelloActor(AMQPActor):
    def __init__(self):
        super().__init__(name='hello_actor',
                         user='guest', password='guest',
                         host='localhost', port=5672,
                         version='0.1.0')

async def run_actor():
    hello_actor = await HelloActor().start()
    await hello_actor.run_forever()

asyncio.run(run_actor())

To test this code you can use the clu CLI command from a shell terminal. This creates a very simple interactive interface. In this case you need to indicate the name of the actor before the command string, so that the client knows to what AMQP queue to direct it.

$ clu
hello_actor say-hello Lucy
hello_actor >
hello_actor i {
   "text": "Hi Lucy!"
}
hello_actor :
hello_actor ping
hello_actor >
hello_actor : {
   "text": "Pong."
}

hello_actor_schema.py#

Let’s continue expanding our example. Now we define the same greeter actor but we provide it with a schema. We also add a command, say-goodbye, that outputs a keyword that is not in the actor schema.

import asyncio
import os
import click
from clu import AMQPActor, command_parser

@command_parser.command()
@click.argument('NAME', type=str)
def say_hello(command, name):
    command.write('i', text=f'Hi {name}!')
    command.finish()

@command_parser.command()
def say_goodbye(command):
    command.write('i', invalid_key=f'Bye!')
    command.finish()

class HelloActor(AMQPActor):

    schema = os.path.join(os.path.dirname(__file__), 'hello_actor.json')

    def __init__(self):
        super().__init__(name='hello_actor',
                         user='guest', password='guest',
                         host='localhost', port=5672)

async def run_actor():
    hello_actor = await HelloActor().start()
    await hello_actor.run_forever()

asyncio.run(run_actor())

The schema file is hello_actor.json

{
    "type": "object",
    "properties": {
        "text": {"type": "string"}
    },
    "additionalProperties": false
}

If you run the code and try to invoke the say-goodbye command, you’ll get an error because it’s trying to use an invalid keyword

hello_actor say-goodbye
hello_actor >
hello_actor i {
"error": "Failed validating the reply: message {'invalid_key': 'Bye!'} does not match the schema."
}
hello_actor :

Note that in the JSON schema we included the parameter "additionalProperties": false. That is the line that actually enforces that outputting undefined keywords breaks the validation. Otherwise the schema will accept other undefined keywords but will still validate the type and format of those specified in the schema.

hello_actor_json.py#

The same example, now using JSONActor.

import asyncio
import click
from clu import JSONActor, command_parser

@command_parser.command()
@click.argument('NAME', type=str)
def say_hello(command, name):
    command.write('i', text=f'Hi {name}!')
    command.finish()

class HelloActor(JSONActor):
    def __init__(self):
        super().__init__(name='hello_actor',
                         host='localhost', port=9999,
                         version='0.1.0')

async def run_actor():
    hello_actor = await HelloActor().start()
    await hello_actor.run_forever()

asyncio.run(run_actor())

tron_connection.py#

A connection to tron that listens to the keywords from the guider actor and reports the FWHM.

import asyncio
from clu.legacy import TronConnection

def report_fwhm(fwhm_key):
    # Get the value for the trimmed mean
    tmean = fwhm_key.value[1]
    print(f'The FWHM is {tmean} arcsec.')

async def main():
    tron = TronConnection(host='localhost', port=6093, models=['guider'])
    tron.models['guider']['fwhm'].register_callback(report_fwhm)
    await tron.start()
    await tron.run_forever()

asyncio.run(main())

jaeger_actor.py#

The following code shows the real implementation of the jaeger actor. There are a few interesting things to note.

  • First, this actor receives an fps object when it gets initialised, which allows the actor to command the associated hardware. The fps gets added to self.parser_args and it’s passed to each command callback along with the command (and any arguments and options defined for the command).

  • Second, this actor sets its version to match that of the library that is wrapping. This is a good design choice to make sure that the version of the actor reports the library tag used.

  • Third, the actor adds an ActorHandler to the jaeger library log. Any log message above level INFO will be output to the actor as a reply. If, for example, there were an exception in the library, the traceback would be reported to the user that is connected to the actor.

  • Finally, we implement a TimedCommand, which executes the command ieb status every 60 seconds without input from the user.

import logging

import clu
from clu import ActorHandler

from jaeger import __version__, log

class JaegerActor(clu.LegacyActor):
    """The jaeger SDSS-style actor."""

    def __init__(self, fps, *args, **kwargs):

        self.fps = fps

        # Pass the FPS instance as the second argument to each parser
        # command (the first argument is always the actor command).
        self.parser_args = [fps]

        super().__init__(*args, **kwargs)

        self.version = __version__

        # Add ActorHandler to log
        self.actor_handler = ActorHandler(self, code_mapping={logging.INFO: 'd'})
        log.addHandler(self.actor_handler)
        self.actor_handler.setLevel(logging.INFO)

        if fps.ieb and not fps.ieb.disabled:
            self.timed_commands.add_command('ieb status', delay=60)

Communicating with other actors#

Let’s expand a bit the example that we saw in Actor communication. We had two actors, one of them CameraActor manages a camera (integration, buffer fetching) while the other, ShutterActor handles the external shutter and allows is to open/close it. We want CameraActor to command ShutterActor at the beginning and the end of an exposure, and read the status reported by ShutterActor. For that, we write two actors in two different files. We use AMQPActor but the same code, with small modifications, could be written for LegacyActor using a TronConnection.

First we write the code for ShutterActor (click here to download):

import asyncio

from clu import AMQPActor, command_parser


@command_parser.command()
async def open(command):
    """Open the shutter."""

    command.info(text="Opening the shutter!")
    # Here we would implement the actual communication
    # with the shutter hardware.
    command.finish(shutter="open")

    return


@command_parser.command()
async def close(command):
    """Close the shutter."""

    command.info(text="Closing the shutter!")
    # Here we would implement the actual communication
    # with the shutter hardware.
    command.finish(shutter="closed")

    return


class ShutterActor(AMQPActor):
    def __init__(self):
        super().__init__(
            name="shutter_actor",
            user="guest",
            password="guest",
            host="localhost",
            port=5672,
            version="0.1.0",
        )


async def run_actor():
    actor = await ShutterActor().start()
    await actor.run_forever()


asyncio.run(run_actor())

And we do the same for CameraActor (click here to download):

import asyncio

import click

from clu import AMQPActor, command_parser


@command_parser.command()
@click.argument("EXPTIME", type=float)
async def expose(command, exptime):
    """Exposes the camera."""

    command.info(text="Starting the exposure.")

    # Here we talk to the camera to initiate the exposure.

    # Use command to access the actor and command the shutter
    shutter_cmd = await command.actor.send_command("shutter_actor", "open")

    await shutter_cmd  # Block until the command is done (finished or failed)
    if shutter_cmd.status.did_fail:
        # Do cleanup
        return command.fail(error="Shutter failed to open")

    # Report status of the shutter
    replies = shutter_cmd.replies
    shutter_status = replies[-1].body["shutter"]
    if shutter_status not in ["open", "closed"]:
        return command.fail(error=f"Unknown shutter status {shutter_status!r}.")

    command.info(f"Shutter is now {shutter_status!r}.")

    # Sleep until the exposure is complete.
    await asyncio.sleep(exptime)

    # Close the shutter. Note the double await.
    await (await command.actor.send_command("shutter_actor", "close"))

    # Finish exposure, read buffer, etc.

    return command.finish(text="Exposure done!")


class CameraActor(AMQPActor):
    def __init__(self):
        super().__init__(
            name="camera_actor",
            user="guest",
            password="guest",
            host="localhost",
            port=5672,
            version="0.1.0",
        )


async def run_actor():
    actor = await CameraActor().start()
    await actor.run_forever()


asyncio.run(run_actor())

To run this code we need to execute python camera_actor.py and python shutter_actor.py on different terminals. Then, on a third terminal, we use clu to open a CLI to the actors.

$ clu
camera_actor expose 1
17:18:56.382 camera_actor >
17:18:56.394 camera_actor i {
   "text": "Starting the exposure."
}
17:18:56.402 shutter_actor >
17:18:56.410 shutter_actor i {
   "text": "Opening the shutter!"
}
17:18:56.416 shutter_actor : {
   "shutter": "open"
}
17:18:56.424 camera_actor i {
   "text": "Shutter is now 'open'."
}
17:18:57.386 shutter_actor >
17:18:57.399 shutter_actor i {
   "text": "Closing the shutter!"
}
17:18:57.409 shutter_actor : {
   "shutter": "closed"
}
17:18:57.417 camera_actor : {
   "text": "Exposure done!"
}

Note how we are receiving the replies from both actors, and how we use the reply from shutter_actor to output the status of the shutter in camera_actor.

Running an actor#

See the detailed explanation here.

import asyncio
from functools import wraps

import click
from daemonocle import DaemonCLI

from clu import AMQPActor
from clu.parsers.click import command_parser


def cli_coro(f):
    """Decorator function that allows defining coroutines with click."""

    @wraps(f)
    def wrapper(*args, **kwargs):
        loop = asyncio.new_event_loop()
        return loop.run_until_complete(f(*args, **kwargs))

    return wrapper


@command_parser.command()
async def hello(command):
    return command.finish("Hello!")


@click.command(cls=DaemonCLI, daemon_params={"pid_file": "/var/tmp/myactor.pid"})
@cli_coro
async def main():
    actor = AMQPActor("myactor")  # Assuming RabbitMQ runs on localhost
    await actor.start()
    await actor.run_forever()


if __name__ == "__main__":
    main()