API Reference#

Base classes#

class clu.base.BaseClient(name, version=None, loop=None, log_dir=None, log=None, verbose=False, validate=True, config={})[source]#

Bases: object

A base client that can be used for listening or for an actor.

This class defines a new client. Clients differ from actors in that they do not receive commands or issue replies, but do send commands to other actors and listen to the keyword-value flow. All actors are also clients and any actor should subclass from BaseClient.

Normally a new instance of a client or actor is created by passing a configuration file path to from_config which defines how the client must be started.

Parameters:
  • name (str) – The name of the client.

  • version (Optional[str]) – The version of the client.

  • loop (Optional[AbstractEventLoop]) – The event loop. If None, the current event loop will be used.

  • log_dir (Union[Path, str, None]) – The directory where to store the file logs.

  • log (Optional[SDSSLogger]) – A Logger instance to be used for logging instead of creating a new one.

  • verbose (Union[bool, int]) – Whether to log to stdout. Can be an integer logging level.

  • validate (bool) – Whether to actor should validate its own messages against its model (if it has one). This is a global parameter that can be overridden when calling write.

  • config (dict) – A dictionary of configuration parameters that will be accessible to the client.

classmethod from_config(config, *args, loader=<class 'yaml.loader.FullLoader'>, **kwargs)[source]#

Parses a configuration file.

Parameters:

config (Union[Dict[str, Any], Path, str]) – A configuration dictionary or the path to a YAML configuration file. If the file contains a section called 'actor' or 'client', that section will be used instead of the whole file.

proxy(actor) ProxyClient[source]#
Return type:

ProxyClient

Creates a proxy for an actor.

Returns a ProxyClient that simplifies running a command multiple times. For example

await client.send_command("focus_state", "moverelative -1000 UM")

can be replaced with

focus_stage = client.proxy("focus_stage")
await focus_stage.send_command("moverelative", "-1000", "UM")
async send_command(actor, *args, **kwargs) Command[source]#
Return type:

Command

Sends a command to an actor and returns a Command instance.

set_loop_exception_handler()[source]#

Sets the lopp exception handler to be handled by the logger.

setup_logger(log, log_dir, verbose=False)[source]#

Starts the file logger.

abstract async start() T[source]#
Return type:

TypeVar(T, bound= BaseClient)

Runs the client.

async stop()[source]#

Shuts down all the remaining tasks.

class clu.base.BaseActor(*args, schema=None, store=False, additional_properties=False, **kwargs)[source]#

Bases: BaseClient

An actor based on asyncio.

This class expands BaseClient with a parsing system for new commands and placeholders for methods for handling new commands and writing replies, which should be overridden by the specific actors.

Parameters:
  • schema (Union[Dict[str, Any], Path, str, None]) – The schema for the actor replies, as a JSONschema dictionary or path to a JSON file. If None, defaults to the internal basic schema.

  • store (bool | list[str]) – Whether to store the output keywords in a KeywordStore. False (the default), disables the feature. True will store a record of all the output keywords. A list of keyword names to store can also be passed.

  • additional_properties (bool) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used if schema=None or if additionalProperties is not defined in the schema.

invoke_mock_command(command_str, command_id=0) Command[source]#
Return type:

Command

Send a new command to an actor for testing.

Requires calling setup_test_actor.

load_schema(schema, is_file=True, additional_properties=False) Model | None[source]#
Return type:

Optional[Model]

Loads and validates the actor schema.

abstract new_command()[source]#

Handles a new command.

Must be overridden by the subclass and call parse_command with a Command object.

abstract parse_command(command)[source]#

Parses and executes a Command. Must be overridden.

set_message_processor(processor)[source]#

Sets the message processor.

Parameters:

processor (Optional[Callable[[dict], dict]]) – A function that receives the messsages to output to the users, as a dictionary, and reformats them, returning a new dictionary. If None, no processing is done.

write(message_code=MessageCode.INFO, message=None, command=None, broadcast=False, validate=None, expand_exceptions=True, traceback_frame=0, silent=False, internal=False, emit=True, write_to_log=True, **kwargs) Reply[source]#

Writes a message to user(s).

The reply to the users will be formatted by the actor class into message specific to the communication channel. Additional keywords passed to this method will be used to complete the message (as long as they don’t overlap with named parameters). For example

actor.write('i', message={'text': 'Hi!', 'value1': 1})

and

actor.write('i', text='Hi!', value1=1)

are equivalent and

actor.write('i', message={'text': 'Hi!'}, value1=1)

is equivalent to

actor.write('i', message={'text': 'Hi!', 'value1': 1})

This method generates a Reply object that is passed to the _write_internal method in the class, which processes it and outputs the message to the users using the appropriate transport. If command is passed, the reply is added to command.replies.

Parameters:
  • message_code (MessageCode | str) – The message code (e.g., 'i' or ':').

  • message (Union[Dict[str, Any], str, None]) – The keywords to be output. Must be a dictionary of pairs {keyword: value}.

  • command (Optional[BaseCommand]) – The command to which we are replying. If not set, it is assumed that this is a broadcast.

  • broadcast (bool) – Whether to broadcast the message to all the users or only to the commander.

  • validate (bool | None) – Validate the reply against the actor schema. This is ignored if the actor was not started with knowledge of its own schema. If None, defaults to the actor global behaviour.

  • expand_exceptions (bool) – If the value of one of the keywords is an exception object and expand_exception=True, the exception information and traceback will be expanded. Otherwise the exception will just be stringified.

  • traceback_frame (int) – The frame from the traceback to use if expand_exception=True. By default reports the file and line of the last traceback frame.

  • silent (bool) – When True does not output the message to the users. This can be used to issue internal commands that update the internal model but that don’t clutter the output.

  • internal (bool) – Marks the Reply instance as internal. Internal replies are expected to be emitted to the users, but with a header indicating that the message is not for broader consumption. This can be useful when one wants to reply with verbose information that can be skipped from CLI or logs.

  • emit (bool) – Whether to call the actor internal write method. Should be True but it’s sometimes useful to call write with emit=False when one is overriding the method and wants to control when to call the internal method. In that case, a Reply object is returned but nothing is output to the users.

  • write_to_log (bool) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.

  • kwargs – Keyword arguments that will used to update the message.

Return type:

Reply

class clu.base.Reply(message_code, message, command=None, broadcast=False, use_validation=False, validated=False, internal=False, keywords=None)[source]#

Bases: object

A reply from a command or actor to be sent to the users.

property body#

Alias to message.

Client#

class clu.client.AMQPClient(name=None, url=None, user='guest', password='guest', host='localhost', port=5672, virtualhost='/', ssl=False, version=None, loop=None, log_dir=None, log=None, models=[], **kwargs)[source]#

Bases: BaseClient

Defines a new client based on the AMQP standard.

To start a new client first instantiate the class and then run start as a coroutine. Note that start does not block so you will need to use asyncio’s run_forever or a similar system

>>> loop = asyncio.get_event_loop()
>>> client = await AMQPClient('my_client', host='localhost').start()
>>> loop.run_forever()
Parameters:
  • name (str | None) – The name of the client.

  • url (Optional[str]) – RFC3986 formatted broker address. When used, the other connection keyword arguments are ignored.

  • user (str) – The user to connect to the AMQP broker. Defaults to guest.

  • password (str) – The password for the user. Defaults to guest.

  • host (str) – The host where the AMQP message broker runs. Defaults to localhost.

  • virtualhost (str) – Virtualhost parameter. '/' by default.

  • port (int) – The port on which the AMQP broker is running. Defaults to 5672.

  • ssl (bool) – Whether to use TLS/SSL connection.

  • version (Optional[str]) – The version of the client.

  • loop (Optional[AbstractEventLoop]) – The event loop. If None, the current event loop will be used.

  • log_dir (Union[Path, str, None]) – The directory where to store the logs. Defaults to $HOME/logs/<name> where <name> is the name of the actor.

  • log (Optional[SDSSLogger]) – A Logger instance to be used for logging instead of creating a new one.

  • parser – A click command parser that is a subclass of CluGroup. If None, the active parser will be used.

  • models (List[str]) – A list of actor models whose schemas will be monitored.

add_reply_callback(callback_func)[source]#

Adds a callback that is called when a new reply is received.

async handle_reply(message) AMQPReply[source]#

Handles a reply received from the exchange.

Creates a new instance of AMQPReply from the message. If the reply is valid it updates any running command.

Parameters:

message (AbstractIncomingMessage) – The message received.

Returns:

The AMQPReply object created from the message.

Return type:

reply

is_connected()[source]#

Is the client connected to the exchange?

remove_reply_callback(callback_func)[source]#

Removes a reply callback.

async run_forever()[source]#

Runs the event loop forever.

async send_command(consumer, command_string, *args, command_id=None, callback=None, internal=False, command=None, time_limit=None, await_command=True)[source]#

Commands another actor over its RCP queue.

Parameters:
  • consumer (str) – The actor we are commanding.

  • command_string (str) – The command string that will be parsed by the remote actor.

  • args – Arguments to concatenate to the command string.

  • command_id (str | None) – The command ID associated with this command. If empty, an unique identifier will be attached.

  • callback (Optional[Callable[[AMQPReply], None]]) – A callback to invoke with each reply received from the actor.

  • internal (bool) – Whether to mark the command as internal, in which case replies will also be considered internal.

  • command (Optional[Command]) – The Command that initiated the new command. Only relevant for actors.

  • time_limit (Optional[float]) – A delay after which the command is marked as timed out and done.

  • await_command (bool) – If True, awaits the command until it finishes.

Examples

These two are equivalent

>>> client.send_command('my_actor', 'do_something --now')
>>> client.send_command('my_actor', 'do_something', '--now')
async send_task(consumer, task_name, payload={}, **kwargs)[source]#
Parameters:
  • consumer (str) – The actor we are commanding.

  • task_name (str) – The task to execute in the remote actor

  • payload (dict[str, Any]) – A serialisable dictionary with the payload to pass to the task.

  • kwargs – Additional arguments used to update the payload dictionary.

async start(exchange_name='sdss_exchange')[source]#

Starts the connection to the AMQP broker.

async stop()[source]#

Cancels queues and closes the connection.

running_commands: Dict[str, Command]#

External commands currently running.

Type:

dict

class clu.client.AMQPReply(message, log=None)[source]#

Bases: object

Wrapper for an IncomingMessage that expands and decodes it.

Parameters:
  • message (AbstractIncomingMessage) – The message that contains the reply.

  • log (Optional[Logger]) – A message logger.

is_valid#

Whether the message is valid and correctly parsed.

body#

The body of the message, as a JSON dictionary.

info#

The info dictionary.

headers#

The headers of the message, decoded if they are bytes.

message_code#

The message code.

internal#

Whether this reply was marked internal.

sender#

The name of the actor that sends the reply.

command_id#

The command ID.

Actor#

class clu.actor.AMQPBaseActor(*args, **kwargs)[source]#

Bases: AMQPClient, BaseActor

An actor class that uses AMQP message brokering.

This class differs from LegacyActor in that it uses an AMQP messaging broker (typically RabbitMQ) to communicate with other actors in the system, instead of standard TCP sockets. Although the internals and protocols are different the entry points and behaviour for both classes should be almost identical.

This class needs to be subclassed with a command parser.

See the documentation for AMQPActor and AMQPClient for additional parameter information.

async new_command(message, ack=True)[source]#

Handles a new command received by the actor.

async start(**kwargs)[source]#

Starts the connection to the AMQP broker.

class clu.actor.AMQPActor(*args, **kwargs)[source]#

Bases: ClickParser, AMQPBaseActor

An AMQP actor that uses a click parser.

add_task_handler(name, callback)[source]#

Adds a task handler.

Parameters:
async new_command(message, ack=True)[source]#

Handles a new command received by the actor.

class clu.actor.TCPBaseActor(name, host=None, port=None, *args, **kwargs)[source]#

Bases: BaseActor

A TCP base actor that replies using JSON.

This implementation of BaseActor uses TCP as command/reply channel and replies to the user by sending a JSON-valid string. This makes it useful as a “device” actor that is not connected to the central message parsing system but that we still want to accept commands and reply with easily parseable messages.

Commands received by this actor must be in the format [<uid>] <command string>, where <uid> is any integer unique identifier that will be used as command_id and appended to any reply.

This is a base actor that does not include a parser. It must be subclassed with a concrete parser that overrides parse_command.

Parameters:
  • name (str) – The actor name.

  • host (Optional[str]) – The host where the TCP server will run.

  • port (Optional[int]) – The port of the TCP server.

  • args – Arguments to be passed to BaseActor.

  • kwargs – Arguments to be passed to BaseActor.

new_command(transport, command_str)[source]#

Handles a new command received by the actor.

new_user(transport)[source]#

Assigns userID to new client connection.

async run_forever()[source]#

Runs the actor forever, keeping the loop alive.

async start() TCPBaseActor_co[source]#
Return type:

TypeVar(TCPBaseActor_co, bound= TCPBaseActor)

Starts the TCP server.

async stop()[source]#

Stops the client connection and running tasks.

write(*args, **kwargs)[source]#

Writes a message to user(s) as a JSON.

A header keyword with the commander_id (i.e., the user id of the transport that sent the command), command_id, message_code, and sender is added to each message. The payload of the message is written to data. An example of a valid message is

{
    "header": {
        "command_id": 0,
        "commander_id": 1,
        "message_code": "i",
        "sender": "test_camera"
    },
    "message": {
        "camera": {
            "name": "test_camera",
            "uid": "DEV_12345"
        },
        "status": {
            "temperature": 25.0,
            "cooler": 10.0
        }
    }
}

Although the messsage is displayed here in multiple lines, it is written as a single line to the TCP clients to facilitate parsing. For a multiline output, which is more human-readable, use the multiline command.

See write for details on the allowed parameters.

server#

The server to talk to this actor.

Type:

TCPStreamServer

transports#

Mapping of commander_id to transport

class clu.actor.JSONActor(*args, **kwargs)[source]#

Bases: ClickParser, TCPBaseActor

An implementation of TCPBaseActor that uses a Click command parser.

send_command(*args, **kwargs)[source]#

Not implemented for JSONActor.

Command#

class clu.command.BaseCommand(commander_id=None, command_id=0, consumer_id=0, actor=None, parent=None, reply_callback=None, status_callback=None, call_now=False, default_keyword='text', silent=False, internal=False, write_to_log=True, time_limit=None, loop=None)[source]#

Bases: Future[Future_co], StatusMixIn[CommandStatus], Generic[Actor_co, Future_co]

Base class for commands of all types (user and device).

A BaseCommand instance is a Future whose result gets set when the status is done (either successfully or not).

Parameters:
  • commander_id (Union[int, str, None]) – The ID of the commander issuing this command. Can be a string or an integer. Normally the former is used for new-style actor and the latter for legacy actors.

  • command_id (Union[int, str]) – The ID associated to this command. As with the commander_id, it can be a string or an integer

  • consumer_id (Union[int, str]) – The actor that is consuming this command. Normally this is our own actor but if we are commanding another actor consumer_id will be the destination actor.

  • actor (Optional[TypeVar(Actor_co, bound= clu.base.BaseActor)]) – The actor instance associated to this command.

  • parent (Optional[BaseCommand[TypeVar(Actor_co, bound= clu.base.BaseActor), TypeVar(Future_co, bound= BaseCommand)]]) – Another BaseCommand object that is issuing this subcommand. Messages emitted by the command will use the parent command_id.

  • reply_callback (Optional[Callable[[Any], None]]) – A callback that gets called when a client command receives a reply from the actor.

  • status_callback (Optional[Callable[[CommandStatus], Any]]) – A function to call when the status changes.

  • call_now (bool) – Whether to call status_callback when initialising the command.

  • default_keyword (str) – The keyword to use when writing a message that does not specify a keyword.

  • silent (bool) – A silent command will call the actor write method with silent=True, which will update the internal model and record all the output replies but will not write them to the users.

  • internal (bool) – A silent command will call the actor write method with internal=True, which will instruct the actor to add the internal flag to the header of the reply.

  • write_to_log (bool) – Whether to write replies to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.

  • time_limit (float | None) – Time out the command if it has been running for this long.

  • loop (Optional[AbstractEventLoop]) – The event loop.

debug(*args, **kwargs)[source]#

Writes a debug-level message.

error(*args, **kwargs)[source]#

Writes an error-level message (does not fail the command).

fail(*args, **kwargs)[source]#

Convenience method to mark a command FAILED.

finish(*args, **kwargs)[source]#

Convenience method to mark a command DONE.

info(*args, **kwargs)[source]#

Writes an info-level message.

send_command(target, command_string, new_command=True, *args, **kwargs) BaseCommand[Actor_co, BaseCommand] | Awaitable[BaseCommand][source]#
Return type:

Union[BaseCommand[TypeVar(Actor_co, bound= clu.base.BaseActor), BaseCommand], Awaitable[BaseCommand]]

Sends a command to an actor using the commander ID of this command.

set_status(status, message=None, silent=False, **kwargs) BaseCommand[source]#
Return type:

BaseCommand

Same as status but allows to specify a message to the users.

warning(*args, **kwargs)[source]#

Writes a warning-level message.

write(message_code=MessageCode.INFO, message=None, broadcast=False, **kwargs)[source]#

Writes to the user(s).

Parameters:
  • message_code (MessageCode | str | int) – The message code (e.g., 'i' or ':').

  • message (Union[Dict[str, Any], str, None]) – The text to be output. If None, only the code will be written.

context: click.Context | None#

The click context, if the click parser is used.

replies#

A list of replies this command has received. The type of reply object depends on the actor or client issuing the command.

property status: CommandStatus#

Returns the status.

class clu.command.Command(command_string='', transport=None, **kwargs)[source]#

Bases: BaseCommand[Actor_co, Command]

A command from a user.

Parameters:
  • command_string (str) – The string that defines the body of the command.

  • transport (Optional[Any]) – The TCP transport associated with this command (only relevant for LegacyActor commands).

child_command(command_string)[source]#

Starts a sub-command on the actor currently running this command.

The practical effect is to run another of the same actor’s commands as if it were part of the current Command.

parse() Command[source]#
Return type:

Command

Parses the command.

body#

The body of the command.

ctx#

The Context running this command. Only relevant if using the built-in click-based parser.

raw_command_string#

The raw command string.

class clu.command.FakeCommand(log, actor=None)[source]#

Bases: BaseCommand

A fake command that output to a logger.

write(message_code='i', message=None, **kwargs)[source]#

Writes to the user(s).

Parameters:
  • message_code (str) – The message code (e.g., 'i' or ':').

  • message (Union[Dict[str, Any], str, None]) – The text to be output. If None, only the code will be written.

class clu.command.TimedCommand(command_string, delay=1, first_silent=False)[source]#

Bases: object

A command to be executed on a loop.

Parameters:
  • command_string (str) – The command string to run.

  • delay (float) – How many seconds to wait between repeated calls.

  • first_silent – Runs the command in silent mode the first one (useful to internally update the model).

done(task)[source]#

Marks the execution of a command.

async run(actor)[source]#

Run the command.

class clu.command.TimedCommandList(actor, resolution=0.5, loop=None)[source]#

Bases: list

A list of TimedCommand objects that will be executed on a loop.

Parameters:
  • actor (BaseActor) – The actor in which the commands are to be run.

  • resolution – In seconds, how frequently to check if any of the TimedCommand must be executed.

add_command(command_string, **kwargs)[source]#

Adds a new TimedCommand.

async poller()[source]#

The polling loop.

start() TimedCommandList[source]#
Return type:

TimedCommandList

Starts the loop.

async stop()[source]#

Cancel the poller.

property running: bool#

Returns True if the poller is running.

clu.command.parse_legacy_command(command_string) Tuple[str | None, int, str][source]#

Parses a command received by a legacy actor.

Parameters:

command_string (str) – The command to parse, including an optional header.

Returns:

The command ID, and the command body parsed from the command string.

Return type:

command_id, command_body

Legacy#

class clu.legacy.actor.BaseLegacyActor(name, host, port, tron_host=None, tron_port=None, models=[], version=None, loop=None, log_dir=None, log=None, verbose=False, schema=None, store=False, additional_properties=False, config={})[source]#

Bases: BaseActor

An actor that provides compatibility with the SDSS opscore protocol.

The TCP servers need to be started by awaiting the coroutine start. Note that start does not block so you will need to use asyncio’s run_forever or a similar system

>>> loop = asyncio.get_event_loop()
>>> my_actor = await LegacyActor('my_actor', '127.0.0.1', 9999, loop=loop)
>>> my_actor.start()
>>> loop.run_forever()
Parameters:
  • name (str) – The name of the actor.

  • host (str) – The host where the TCP server will run.

  • port (int) – The port of the TCP server.

  • tron_host (Optional[str]) – The host on which Tron is running.

  • tron_port (Optional[int]) – The port on which Tron is running.

  • models (List[str]) – A list of strings with the actors whose models will be tracked.

  • version (Optional[str]) – The version of the actor.

  • loop (Optional[AbstractEventLoop]) – The event loop. If None, the current event loop will be used.

  • log_dir (Union[Path, str, None]) – The directory where to store the logs. Defaults to $HOME/logs/<name> where <name> is the name of the actor.

  • log (Optional[Logger]) – A Logger instance to be used for logging instead of creating a new one.

  • schema (Union[Path, str, None]) – The path to the datamodel schema for the actor, in JSON Schema format. If the schema is provided all replies will be validated against it. An invalid reply will fail and not be emitted. The schema can also be set when subclassing by setting the class schema attribute.

  • store (bool | list[str]) – Whether to store the output keywords in a KeywordStore. False (the default), disables the feature. True will store a record of all the output keywords. A list of keyword names to store can also be passed.

  • config (Dict[str, Any]) – A dictionary of configuration parameters that will be accessible to the actor.

static format_user_output(user_id, command_id, message_code, msg_str=None) str[source]#
Return type:

str

Formats a string to send to users.

static get_user_command_id(command=None, user_id=0, command_id=0) Tuple[int, int][source]#

Returns commander_id, command_id based on user-supplied information.

Parameters:
  • command (Optional[Command]) – User command; used as a default for user_id and command_id.

  • user_id (int) – If None then use command.user_id.

  • command_id (int) – If None then use command.command_id.

Returns:

The commander ID and the command ID, parsed from the inputs. If they cannot be determined, returns zeros.

Return type:

user_id, command_id

new_command(transport, command_str)[source]#

Handles a new command received by the actor.

new_user(transport)[source]#

Assigns userID to new client connection.

async run_forever()[source]#

Runs the actor forever, keeping the loop alive.

send_command(target, command_string, *args, commander=None, command_id=None, command=None, callback=None, time_limit=None)[source]#

Sends a command through the hub.

Parameters:
  • target (str) – The actor to command.

  • command_string (str) – The command to send.

  • args – Arguments to concatenate to the command string.

  • commander (Optional[str]) – The commander string to send to Tron. If not provided, a valid string is built using the name of the actor and the target.

  • command_id (Optional[int]) – The command id. If None, a sequentially increasing value will be used. You should not specify a command_id unless you really know what you’re doing.

  • callback (Optional[Callable[[Reply], None]]) – A callback to invoke with each reply received from the actor.

  • time_limit (Optional[float]) – A delay after which the command is marked as timed out and done.

Examples

These two are equivalent

>>> actor.send_command('my_actor', 'do_something --now')
>>> actor.send_command('my_actor', 'do_something', '--now')
show_new_user_info(user_id)[source]#

Shows information for new users. Called when a new user connects.

show_user_info(user_id)[source]#

Shows user information including your user_id.

show_user_list()[source]#

Shows a list of connected users. Broadcast to all users.

show_version(user_id=0)[source]#

Shows actor version.

async start(get_keys=True, start_nubs=True) T[source]#

Starts the server and the Tron client connection.

Parameters:
  • get_keys (bool) – Whether to issue keys getFor commands against the hub to retrieve the current values for the model keys.

  • start_nubs (bool) – If True, and a TronConnection has been created, sends a hub startNubs <name> where <name> is the name of the actor to attempt to automatically connect it to the hub.

Return type:

TypeVar(T, bound= BaseLegacyActor)

async stop()[source]#

Stops the client connection and running tasks.

write(message_code='i', message=None, command=None, user_id=0, command_id=0, concatenate=True, broadcast=False, validate=True, write_to_log=True, **kwargs)[source]#

Writes a message to user(s).

Parameters:
  • message_code (MessageCode | str) – The message code (e.g., 'i' or ':').

  • message (Optional[Dict[str, Any]]) – The keywords to be output. Must be a dictionary of pairs {keyword: value}. If value is a list it will be converted into a comma-separated string. To prevent unexpected casting, it is recommended for value to always be a string.

  • command (Optional[Command]) – User command; used as a default for user_id and command_id. If the command is done, it is ignored.

  • user_id (int) – The user (transport) to which to write. None defaults to 0.

  • command_id (int) – If None then use command.command_id.

  • concatenate (bool) – Concatenates all the keywords to be output in a single reply with the keyword-values joined with semicolons. Otherwise each keyword will be output as a different message.

  • broadcast (bool) – Whether to broadcast the reply. Equivalent to user_id=0.

  • validate (bool) – Validate the reply against the actor model. This is ignored if the actor was not started with knowledge of its own schema.

  • write_to_log (bool) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.

  • kwargs – Keyword arguments that will be added to the message. If a keyword is both in message and in kwargs, the value in kwargs supersedes message.

models#

Actor models.

Type:

dict

transports#

Mapping of user_id to transport

tron#

The client connection to Tron.

Type:

TronConnection

class clu.legacy.actor.LegacyActor(*args, **kwargs)[source]#

Bases: ClickParser, BaseLegacyActor

A legacy actor that uses the ClickParser.

class clu.legacy.tron.TronConnection(commander, host, port=6093, name='tron', models=[], **kwargs)[source]#

Bases: BaseClient

Allows to send commands to Tron and manages the feed of replies.

Parameters:
  • commander (str) – The name of the commander that will send commands to Tron. Must be in the form program.user, for example foo.bar.

  • host (str) – The host on which Tron is running.

  • port (int) – The port on which Tron is running.

  • models (List[str]) – A list of strings with the actors whose models will be tracked.

  • kwargs – Arguments to be passed to BaseClient.

connected()[source]#

Checks whether the client is connected.

async get_keys()[source]#

Gets all the keys for the models being tracked.

send_command(target, command_string, *args, commander=None, mid=None, callback=None, time_limit=None)[source]#

Sends a command through the hub.

Parameters:
  • target – The actor to command.

  • command_string – The command to send.

  • args – Arguments to concatenate to the command string.

  • commander – The actor or client sending the command. The format for Tron is “commander message_id target command” where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse. If commander=None, the instance commander value will be used.

  • mid – The message id. If None, a sequentially increasing value will be used. You should not specify a mid unless you really know what you’re doing.

  • callback (Optional[Callable[[Reply], None]]) – A callback to invoke with each reply received from the actor.

  • time_limit (Optional[float]) – A delay after which the command is marked as timed out and done.

Examples

These two are equivalent

>>> tron.send_command('my_actor', 'do_something --now')
>>> tron.send_command('my_actor', 'do_something', '--now')
async start(get_keys=True)[source]#

Starts the connection to Tron.

Parameters:

get_keys (bool) – If True, gets all the keys in the models.

stop()[source]#

Closes the connection.

keyword_dicts#

The KeysDictionary associated with each actor to track.

Type:

dict

models#

The model and values of each actor being tracked.

Type:

dict

class clu.legacy.tron.TronKey(name, key, keyword=None, model=None, callback=None)[source]#

Bases: Property

A Tron model key with callbacks.

Similar to Property but stores the original key with the keyword datamodel.

update_keyword(keyword)[source]#

Updates the keyword and value.

class clu.legacy.tron.TronModel(keydict, callback=None)[source]#

Bases: BaseModel[TronKey]

A JSON-compliant model for actor keywords.

Parameters:
  • keydict (KeysDictionary) – A dictionary of keys that define the datamodel.

  • callback (Optional[Callable[[TronModel], Any]]) – A function or coroutine to call when the datamodel changes. The function is called with the instance of TronModel and the modified keyword. If the callback is a coroutine, it is scheduled as a task.

parse_reply(reply)[source]#

Parses a reply and updates the datamodel.

reload()[source]#

Reloads the model. Clears callbacks.

Maskbits#

class clu.tools.Maskbit(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Flag

A maskbit enumeration. Intended for subclassing.

property active_bits: List[Maskbit]#

Returns a list of non-combination flags that match the value.

class clu.tools.CommandStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: Maskbit

DONE = 1#
CANCELLED = 2#
FAILED = 4#
TIMEDOUT = 8#
READY = 16#
RUNNING = 32#
CANCELLING = 64#
FAILING = 128#
DEBUG = 256#
ACTIVE_STATES = 224#
FAILED_STATES = 14#
FAILING_STATES = 192#
DONE_STATES = 15#
ALL_STATES = 255#
property is_combination: bool#

Returns True if a flag is a combination.

property did_fail: bool#

Command failed or was cancelled.

property did_succeed: bool#

Command finished with DONE status.

property is_active: bool#

Command is running, cancelling or failing.

property is_done: bool#

Command is done (whether successfully or not).

property is_failing: bool#

Command is being cancelled or is failing.

static code_to_status(code, default=None) CommandStatus[source]#
Return type:

CommandStatus

Returns the status associated with a code.

If the code doesn’t have an associated status, returns default. default defaults to CommandStatus.RUNNING.

MixIns#

class clu.tools.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]#

Bases: Generic[MaskbitType]

A mixin that provides status tracking with callbacks.

Provides a status property that executes a list of callbacks when the status changes.

Parameters:
  • maskbit_flags (Type[TypeVar(MaskbitType, bound= Maskbit)]) – A class containing the available statuses as a series of maskbit flags. Usually as subclass of enum.Flag.

  • initial_status (Optional[TypeVar(MaskbitType, bound= Maskbit)]) – The initial status.

  • callback_func (Optional[Callable[[TypeVar(MaskbitType, bound= Maskbit)], Any]]) – The function to call if the status changes. It receives the status.

  • call_now (bool) – Whether the callback function should be called when initialising.

callbacks#

A list of the callback functions to call.

do_callbacks()[source]#

Calls functions in callbacks.

async wait_for_status(value)[source]#

Awaits until the status matches value.

property status#

Returns the status.

class clu.tools.CaseInsensitiveDict(values)[source]#

Bases: Dict[str, T]

A dictionary that performs case-insensitive operations.

Model#

class clu.model.BaseModel(name, callback=None)[source]#

Bases: CaseInsensitiveDict[T], CallbackMixIn

A JSON-compliant model.

Parameters:
  • name (str) – The name of the model.

  • callback (Optional[Callable[[Any], Any]]) – A function or coroutine to call when the datamodel changes. The function is called with the flattened instance of BaseModel and the key that changed.

flatten() Dict[str, Any][source]#
Return type:

Dict[str, Any]

Returns a dictionary of values.

Return a dictionary in which the Property instances are replaced with their values.

jsonify() str[source]#
Return type:

str

Returns a JSON string with the model.

class clu.model.Model(name, schema, is_file=False, additional_properties=False, **kwargs)[source]#

Bases: BaseModel[Property]

A model with JSON validation.

In addition to the parameters in BaseModel, the following parameters are accepted:

Parameters:
  • schema (Union[Dict[str, Any], PathLike, str]) – A valid JSON schema, to be used for validation.

  • is_file (bool) – Whether the input schema is a filepath or a dictionary.

  • additional_properties (bool) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used if schema=None or if additionalProperties is not defined in the schema.

  • kwargs – Additional parameters to pass to BaseModel on initialisation.

static check_schema(schema) bool[source]#

Checks whether a JSON schema is valid.

Parameters:

schema (Dict[str, Any]) – The schema to check as a dictionary.

Returns:

Returns True if the schema is a valid JSON schema, False otherwise.

Return type:

result

update_model(instance)[source]#

Validates a new instance and updates the model.

validate(instance, update_model=True)[source]#

Validates a new instance.

class clu.model.ModelSet(client, actors=[], get_schema_command='get_schema', raise_exception=True, **kwargs)[source]#

Bases: dict

A dictionary of Model instances.

Given a list of actors, queries each of the actors to return their own schemas, which are then parsed and loaded as Model instances. Since obtaining the schema require sending a command to the actor, that process happens when the coroutine load_schemas is awaited, which should usually occur when the client is started.

Parameters:
  • client (BaseClient) – A client with a connection to the actors to monitor.

  • actors (List[str]) – A list of actor models whose schemas will be loaded.

  • get_schema_command (str) – The command to send to the actor to get it to return its own schema.

  • raise_exception (bool) – Whether to raise an exception if any of the models cannot be loaded.

  • kwargs – Keyword arguments to be passed to Model.

Example

>>> model_set = ModelSet(client, actors=['sop', 'guider'])
>>> model_set['sop']
<Model (name='sop')>
async add_actor(actor)[source]#

Adds an actor schema.

async load_schemas(actors=None)[source]#

Loads the actor schemas.

class clu.model.Property(name, value=None, model=None, callback=None)[source]#

Bases: CallbackMixIn

A model property with callbacks.

Parameters:
  • name (str) – The name of the property.

  • value (Optional[Any]) – The value of the property.

  • model (Optional[Any]) – The parent model.

  • callback (Optional[Callable[[Any], Any]]) – The function or coroutine that will be called if the value of the key if updated. The callback is called with the instance of Property as the only argument. Note that the callback will be scheduled even if the value does not change.

copy()[source]#

Returns a copy of self.

flatten() Dict[str, Any][source]#
Return type:

Dict[str, Any]

Returns a dictionary with the name and value of the property.

property value: Any#

The value associated to the key.

Store#

class clu.store.KeywordOutput(name, message_code, date, value) None[source]#

Bases: object

Records a single output of a keyword.

Parameters:
  • name (str) – The name of the keyword.

  • message_code (Any) – The message code with which the keyword was output.

  • date (datetime) – A datetime object with the date-time at which the keyword was output this time.

  • value (Any) – The value of the keyword when it was output.

class clu.store.KeywordStore(actor, filter=None)[source]#

Bases: defaultdict

Stores the keywords output by an actor.

Parameters:
  • actor (BaseActor) – The actor to which this store is attached to.

  • filter (list[str] | None) – A list of keyword names to filter. If provided, only those keywords will be tracked.

add_reply(reply)[source]#

Processes a reply and adds new entries to the store.

Parameters:

reply (Reply) – The Reply object containing the keywords output in a message from the actor.

head(keyword, n=1)[source]#

Returns the first N output values of a keyword.

Parameters:
  • keyword (str) – The name of the keyword to search for.

  • n (int) – Return the first n times the keyword was output.

tail(keyword, n=1)[source]#

Returns the last N output values of a keyword.

Parameters:
  • keyword (str) – The name of the keyword to search for.

  • n (int) – Return the last n times the keyword was output.

Parser#

class clu.parsers.click.ClickParser[source]#

Bases: object

A command parser that uses Click at its base.

parse_command(command) T[source]#
Return type:

TypeVar(T, bound= Command)

Parses a user command using the Click internals.

context_obj = {}#

Parameters to be set in the context object.

Type:

dict

parser_args: List[Any] = []#

Arguments to be passed to each command in the parser. Note that the command is always passed first.

Type:

list

class clu.parsers.click.CluCommand(*args, context_settings=None, **kwargs)[source]#

Bases: Command

Override click.Command to pass the actor and command.

done_callback(task, exception_handler=None)[source]#

Checks if the command task has been successfully done.

invoke(ctx)[source]#

As click.Command.invoke but passes the actor and command.

class clu.parsers.click.CluGroup(name=None, commands=None, **attrs) None[source]#

Bases: Group

Override click.Group.

Makes all child commands instances of CluCommand.

command(*args, **kwargs)[source]#

Override click.Group to use CluCommand by default.

group(*args, **kwargs)[source]#

Creates a new group inheriting from this class.

parse_args(ctx, args)[source]#

Given a context and a list of arguments this creates the parser and parses the arguments, then modifies the context as necessary. This is automatically invoked by make_context().

clu.parsers.click.timeout(seconds)[source]#

A decorator to timeout the command after a number of seconds.

clu.parsers.click.cancel_command(name=None, ctx=None, ok_no_exists=True, keep_last=False)[source]#

Cancels any running instance of a command callback.

Parameters:
  • name (str | None) – The name of the command. If None, calls get_current_command_name.

  • ctx (Context | None) – The click context. If None, uses the current context.

  • ok_no_exists (bool) – Exits silently if no command with that name is running.

  • keep_last (bool) – Does not cancel the last instance of the command. Useful when a command wants to cancel other instances of itself, but not itself.

Returns:

True if a command was cancelled.

Return type:

result

clu.parsers.click.get_running_tasks(cmd_name) list[Task] | None[source]#
Return type:

list[Task] | None

Returns the list of tasks for a given command name, sorted by start date.

clu.parsers.click.get_current_command_name()[source]#

Returns the name of the current click command.

Must be called from inside the command callback.

class clu.parsers.json.JSONParser[source]#

Bases: object

A parser that receives commands and arguments as a JSON string.

See JSON parser for details on implementation and use-cases.

parse_command(command) T[source]#
Return type:

TypeVar(T, bound= Command)

Parses a user command.

The command string must be a serialised JSON-like string that contains at least a keyword command with the name of the callback, and any number of additional arguments which will be passed to it.

callbacks: Dict[str, Callable[..., Coroutine]] = {}#

Mapping of command verb to callback coroutine.

Type:

dict

parser_args: List[Any] = []#

Additional arguments to be passed to each command in the parser. Note that the command is always passed first.

Type:

list

Sockets#

class clu.protocol.TCPProtocol(loop=None, connection_callback=None, data_received_callback=None, max_connections=None)[source]#

Bases: Protocol

A TCP server/client based on asyncio protocols.

This is a high-level implementation of the client and server asyncio protocols. See asyncio protocol for details.

Parameters:
  • loop (AbstractEventLoop | None) – The event loop. The current event loop is used by default.

  • connection_callback (Optional[Callable[[Any], Any]]) – Callback to call when a new client connects.

  • data_received_callback (Optional[Callable[[str], Any]]) – Callback to call when a new data is received.

  • max_connections (Optional[int]) – How many clients the server accepts. If None, unlimited connections are allowed.

async classmethod create_server(host, port, **kwargs)[source]#

Returns a Server connection.

async classmethod create_client(host, port, **kwargs)[source]#

Returns a Transport and Protocol.

connection_made(transport)[source]#

Receives a connection and calls the connection callback.

data_received(data)[source]#

Decodes the received data.

connection_lost(exc)[source]#

Called when connection is lost.

class clu.protocol.PeriodicTCPServer(periodic_callback=None, sleep_time=1, **kwargs)[source]#

Bases: TCPProtocol

A TCP server that runs a callback periodically.

Parameters:
  • period_callback – Callback to run every iteration.

  • sleep_time (float) – The delay between two calls to periodic_callback.

  • kwargs – Parameters to pass to TCPProtocol

async classmethod create_client(*args, **kwargs)[source]#

Returns a Transport and Protocol.

async classmethod create_server(host, port, *args, **kwargs)[source]#

Returns a Server connection.

property periodic_callback: Callable[[Any], Any] | None#

Returns the periodic callback.

class clu.protocol.TCPStreamServer(host, port, connection_callback=None, data_received_callback=None, loop=None, max_connections=None)[source]#

Bases: object

A TCP server based on asyncio streams.

This is a high-level implementation of the asyncio server using streams. See asyncio streams for details.

Parameters:
  • host (str) – The server host.

  • port (int) – The server port.

  • connection_callback (Optional[Callable[[Any], Any]]) – Callback to call when a new client connects or disconnects.

  • data_received_callback (Optional[Callable[[Any, bytes], Any]]) – Callback to call when a new data is received.

  • loop (Optional[AbstractEventLoop]) – The event loop. The current event loop is used by default.

  • max_connections (Optional[int]) – How many clients the server accepts. If None, unlimited connections are allowed.

async start() AbstractServer[source]#
Return type:

AbstractServer

Starts the server and returns a Server connection.

stop()[source]#

Stops the server.

serve_forever()[source]#

Exposes TCPStreamServer.server.serve_forever.

async connection_made(reader, writer)[source]#

Called when a new client connects to the server.

Stores the writer protocol in transports, calls the connection callback, if any, and starts a loop to read any incoming data.

class clu.protocol.TCPStreamPeriodicServer(host, port, periodic_callback=None, sleep_time=1, **kwargs)[source]#

Bases: TCPStreamServer

A TCP server that calls a function periodically.

Parameters:
  • host (str) – The server host.

  • port (int) – The server port.

  • period_callback – Callback to run every iteration. It is called for each transport that is connected to the server and receives the transport object.

  • sleep_time (float) – The delay between two calls to periodic_callback.

  • kwargs – Parameters to pass to TCPStreamServer

async start() AbstractServer[source]#
Return type:

AbstractServer

Starts the server and returns a Server connection.

stop()[source]#

Stops the server.

property periodic_callback#

Returns the periodic callback.

class clu.protocol.TCPStreamClient(host, port)[source]#

Bases: object

An object containing a writer and reader stream to a TCP server.

async open_connection()[source]#

Creates the connection.

close()[source]#

Closes the stream.

async clu.protocol.open_connection(host, port) TCPStreamClient[source]#

Returns a TCP stream connection with a writer and reader.

This function is equivalent to doing

>>> client = TCPStreamClient('127.0.0.1', 5555)
>>> await client.open_connection()

Instead just do

>>> client = await TCPStreamClient('127.0.0.1', 5555)
>>> client.writer.write('Hi!\n'.encode())
Parameters:
  • host (str) – The host of the TCP server.

  • port (int) – The port of the TCP server.

Returns:

client – A container for the stream reader and writer.

Return type:

TCPStreamClient

class clu.protocol.TopicListener(url=None, user='guest', password='guest', host='localhost', virtualhost='/', port=5672, ssl=False)[source]#

Bases: object

A class to declare and listen to AMQP queues with topic conditions.

Parameters:
  • url (str | None) – RFC3986 formatted broker address. When used, the other keyword arguments are ignored.

  • user (str) – The user to connect to the RabbitMQ broker.

  • password (str) – The password for the user.

  • host (str) – The host where the RabbitMQ message broker runs.

  • virtualhost (str) – Virtualhost parameter. '/' by default.

  • port (int) – The port on which the RabbitMQ message broker is running.

  • ssl (bool) – Whether to use TLS/SSL connection.

async connect(exchange_name, exchange_type=ExchangeType.TOPIC, on_return_raises=True) TopicListener[source]#

Initialise the connection.

Parameters:
  • exchange_name (str) – The name of the exchange to create.

  • exchange_type (ExchangeType) – The type of exchange to create.

Return type:

TopicListener

async add_queue(queue_name, callback=None, bindings='*') AbstractQueue[source]#

Adds a queue with bindings.

Parameters:
  • queue_name (str) – The name of the queue to create.

  • callback (Optional[Callable[[AbstractIncomingMessage], Any]]) – A callable that will be called when a new message is received in the queue. Can be a coroutine.

  • bindings (Union[str, List[str]]) – The list of bindings for the queue. Can be a list of string or a single string in which the bindings are comma-separated.

Return type:

AbstractQueue

async stop()[source]#

Cancels queues and closes the connection.

class clu.protocol.ReconnectingTCPClientProtocol(*args, **kwargs)[source]#

Bases: Protocol

A reconnecting client modelled after Twisted ReconnectingClientFactory.

Taken from https://bit.ly/3yn6MWa.

connection_lost(exc)[source]#

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

class clu.device.Device(host, port, callback=None)[source]#

Bases: CallbackMixIn

A class that handles the TCP connection to a device.

There are two ways to create a new device. You can create a subclass from Device and override the process_message method which handles how you react to a new line being received

class MyDevice(Device):

    async def process_message(self, line):
        print(line)

my_device = MyDevice('192.168.1.10', 4444)
await my_device.start()

Note that process_message must be a coroutine. Alternatively you can pass a callback that will be called instead of process_message when a new message arrives. The callback must also be a coroutine

async def printer(line):
    print(line)

my_device = MyDevice('192.168.1.10', 4444, callback=printer)
await my_device.start()
Parameters:
  • host (str) – The host of the device.

  • port (int) – The port on which the device is serving.

  • callback (Optional[Callable[[str], Any]]) – The callback to call with each new message received from the client (after decoding into a string). If no callback is specified, process_message is called. If the callback is not a coroutine, it will be converted to one.

is_connected() bool[source]#
Return type:

bool

Returns True if the connection is open.

async process_message(line)[source]#

Processes a newly received message.

async start() T[source]#
Return type:

TypeVar(T, bound= Device)

Opens the connection and starts the listener.

async stop()[source]#

Closes the connection and stops the listener.

write(message, newline='\\n')[source]#

Write to the device. The message is encoded and a new line added.

Tools#

clu.escape(value)[source]#

Escapes a text using json.dumps.

class clu.tools.ActorHandler(actor, level=40, keyword='text', code_mapping=None, filter_warnings=None)[source]#

Bases: Handler

A handler that outputs log messages as actor keywords.

Parameters:
  • actor – The actor instance.

  • level (int) – The level above which records will be output in the actor.

  • keyword (str) – The keyword around which the messages will be output.

  • code_mapping (Optional[Dict[int, str]]) – A mapping of logging levels to actor codes. The values provided override the default mapping. For example, to make input log messages with info level be output as debug, code_mapping={logging.INFO: 'd'}.

  • filter_warnings (Optional[List[Type[Warning]]]) – A list of warning classes that will be issued to the actor. Subclasses of the filter warning are accepted, any other warnings will be ignored.

emit(record)[source]#

Emits the record.

class clu.tools.CallbackMixIn(callbacks=[], loop=None)[source]#

Bases: object

A mixin for executing callbacks.

Parameters:

callbacks (List[Callable[[Any], Any]]) – A list of functions or coroutines to be called.

notify(*args)[source]#

Calls the callback functions with some arguments.

Coroutine callbacks are scheduled as a task. Synchronous callbacks are called immediately.

register_callback(callback_func)[source]#

Adds a callback function or coroutine function.

remove_callback(callback_func)[source]#

Removes a callback function.

async stop_callbacks()[source]#

Cancels any running callback task.

Testing#

class clu.testing.MockReply(user_id, command_id, flag, data={})[source]#

Bases: dict

Stores a reply written to a transport.

The data of the message is stored as part of the dictionary.

Parameters:
  • user_id (int | str | None) – The user ID of the client to which the reply was sent.

  • command_id (int | str | None) – The command ID of the command that produced this reply.

  • flag (str) – The message type flag.

  • data (Dict[str, Any]) – The payload of the message.

class clu.testing.MockReplyList(actor)[source]#

Bases: list

Stores replies as MockReply objects.

clear()[source]#

Remove all items from list.

parse_reply(reply, routing_key=None)[source]#

Parses a reply and construct a MockReply, which is appended.

async clu.testing.setup_test_actor(actor, user_id=666) T[source]#

Setups an actor for testing, mocking the client transport.

Takes an actor and modifies it in two ways: :rtype: TypeVar(T, bound= MockedActor)

  • Adds a invoke_mock_command method to it that allows to submit a command string as if it had been received from a transport.

  • Mocks a client transport with user_id that is connected to the actor. Messages written to the transport are stored as MockReply in a MockReplyList that is accessible via a new actor.mock_replies attribute.

The actor is modified in place and returned.

Websocket#