API Reference¶
Base classes¶
- class clu.base.BaseClient(name, version=None, loop=None, log_dir=None, log=None, verbose=False)[source]¶
Bases:
objectA base client that can be used for listening or for an actor.
This class defines a new client. Clients differ from actors in that they do not receive commands or issue replies, but do send commands to other actors and listen to the keyword-value flow. All actors are also clients and any actor should subclass from
BaseClient.Normally a new instance of a client or actor is created by passing a configuration file path to
from_configwhich defines how the client must be started.- Parameters
name (str) – The name of the client.
version (Optional[str]) – The version of the client.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop. If
None, the current event loop will be used.log_dir (Optional[Union[pathlib.Path, str]]) – The directory where to store the file logs.
log (Optional[SDSSLogger]) – A
Loggerinstance to be used for logging instead of creating a new one.verbose (Union[bool, int]) – Whether to log to stdout. Can be an integer logging level.
- classmethod from_config(config, *args, **kwargs)[source]¶
Parses a configuration file.
- Parameters
config (Union[Dict[str, Any], pathlib.Path, str]) – A configuration dictionary or the path to a YAML configuration file. If the file contains a section called
'actor'or'client', that section will be used instead of the whole file.
- send_command(actor, *args, **kwargs)[source]¶
Sends a command to an actor and returns a
Commandinstance.- Parameters
actor (str) –
- setup_logger(log, log_dir, verbose=False)[source]¶
Starts the file logger.
- Parameters
log (Any) –
log_dir (Optional[Union[pathlib.Path, str]]) –
- class clu.base.BaseActor(*args, schema=None, **kwargs)[source]¶
Bases:
clu.base.BaseClientAn actor based on
asyncio.This class expands
BaseClientwith a parsing system for new commands and placeholders for methods for handling new commands and writing replies, which should be overridden by the specific actors.- Parameters
schema (SchemaType) –
- load_schema(schema, is_file=True)[source]¶
Loads and validates the actor schema.
- Parameters
schema (Optional[Union[Dict[str, Any], pathlib.Path, str]]) –
- Return type
Optional[clu.model.Model]
- abstract new_command()[source]¶
Handles a new command.
Must be overridden by the subclass and call
parse_commandwith aCommandobject.
- abstract parse_command(command)[source]¶
Parses and executes a
Command. Must be overridden.- Parameters
command (clu.command.BaseCommand) –
- write(message_code='i', message=None, command=None, broadcast=False, validate=True, call_internal=True, **kwargs)[source]¶
Writes a message to user(s).
The reply to the users will be formatted by the actor class into message specific to the communication channel. Additional keywords passed to this method will be used to complete the message (as long as they don’t overlap with named parameters). For example
actor.write('i', message={'text': 'Hi!', 'value1': 1})
and
actor.write('i', text='Hi!', value1=1)
are equivalent and
actor.write('i', message={'text': 'Hi!'}, value1=1)
is equivalent to
actor.write('i', message={'text': 'Hi!', 'value1': 1})
This method generates a
Replyobject that is passed to the_write_internalmethod in the class, which processes it and outputs the message to the users using the appropriate transport. Ifcommandis passed, the reply is added tocommand.replies.- Parameters
message_code (str) – The message code (e.g.,
'i'or':').message (Optional[Dict[str, Any] | str]) – The keywords to be output. Must be a dictionary of pairs
{keyword: value}.command (Optional[BaseCommand]) – The command to which we are replying. If not set, it is assumed that this is a broadcast.
broadcast (bool) – Whether to broadcast the message to all the users or only to the commander.
validate (bool) – Validate the reply against the actor schema. This is ignored if the actor was not started with knowledge of its own schema.
call_internal (bool) – Whether to call the actor internal write method. Should be
Truebut it’s sometimes useful to callwritewithcall_internal=Falsewhen one is overriding the method and wants to control when to call the internal method.kwargs – Keyword arguments that will used to update the message.
- Return type
Client¶
- class clu.client.AMQPClient(name, url=None, user='guest', password='guest', host='localhost', port=5672, virtualhost='/', ssl=False, version=None, loop=None, log_dir=None, log=None, models=[])[source]¶
Bases:
clu.base.BaseClientDefines a new client based on the AMQP standard.
To start a new client first instantiate the class and then run
startas a coroutine. Note thatstartdoes not block so you will need to use asyncio’srun_foreveror a similar system>>> loop = asyncio.get_event_loop() >>> client = await AMQPClient('my_client', host='localhost').start() >>> loop.run_forever()
- Parameters
name (str) – The name of the client.
url (Optional[str]) – RFC3986 formatted broker address. When used, the other connection keyword arguments are ignored.
user (str) – The user to connect to the AMQP broker. Defaults to
guest.password (str) – The password for the user. Defaults to
guest.host (str) – The host where the AMQP message broker runs. Defaults to
localhost.virtualhost (str) – Virtualhost parameter.
'/'by default.port (int) – The port on which the AMQP broker is running. Defaults to 5672.
ssl (bool) – Whether to use TLS/SSL connection.
version (Optional[str]) – The version of the client.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop. If
None, the current event loop will be used.log_dir (Optional[PathLike]) – The directory where to store the logs. Defaults to
$HOME/logs/<name>where<name>is the name of the actor.log (Optional[SDSSLogger]) – A
Loggerinstance to be used for logging instead of creating a new one.parser – A click command parser that is a subclass of
CluGroup. IfNone, the active parser will be used.models (List[str]) – A list of actor models whose schemas will be monitored.
- async handle_reply(message)[source]¶
Handles a reply received from the exchange.
Creates a new instance of
AMQPReplyfrom themessage. If the reply is valid it updates any running command.- Parameters
message (aio_pika.message.IncomingMessage) – The message received.
- Returns
reply – The
AMQPReplyobject created from the message.- Return type
- async send_command(consumer, command_string, command_id=None)[source]¶
Commands another actor over its RCP queue.
- class clu.client.AMQPReply(message, log=None)[source]¶
Bases:
objectWrapper for an
IncomingMessagethat expands and decodes it.- Parameters
message (apika.IncomingMessage) – The message that contains the reply.
log (Optional[logging.Logger]) – A message logger.
- Variables
is_valid – Whether the message is valid and correctly parsed.
body – The body of the message, as a JSON dictionary.
info – The info dictionary.
headers – The headers of the message, decoded if they are bytes.
message_code – The message code.
sender – The name of the actor that sends the reply.
command_id – The command ID.
Actor¶
- class clu.actor.AMQPBaseActor(*args, schema=None, **kwargs)[source]¶
Bases:
clu.client.AMQPClient,clu.base.BaseActorAn actor class that uses AMQP message brokering.
This class differs from
LegacyActorin that it uses an AMQP messaging broker (typically RabbitMQ) to communicate with other actors in the system, instead of standard TCP sockets. Although the internals and protocols are different the entry points and behaviour for both classes should be almost identical.This class needs to be subclassed with a command parser.
See the documentation for
AMQPActorandAMQPClientfor additional parameter information.- Parameters
schema (Optional[SchemaType]) – The path to the datamodel schema for the actor, in JSON Schema format. If the schema is provided all replies will be validated against it. An invalid reply will fail and not be emitted. The schema can also be set when subclassing by setting the class
schemaattribute.
- class clu.actor.AMQPActor(*args, schema=None, **kwargs)[source]¶
Bases:
clu.parsers.click.ClickParser,clu.actor.AMQPBaseActorAn
AMQP actorthat uses aclick parser.- Parameters
schema (Optional[SchemaType]) –
- class clu.actor.TCPBaseActor(name, host=None, port=None, *args, **kwargs)[source]¶
Bases:
clu.base.BaseActorA TCP base actor that replies using JSON.
This implementation of
BaseActoruses TCP as command/reply channel and replies to the user by sending a JSON-valid string. This makes it useful as a “device” actor that is not connected to the central message parsing system but that we still want to accept commands and reply with easily parseable messages.Commands received by this actor must be in the format
[<uid>] <command string>, where<uid>is any integer unique identifier that will be used ascommand_idand appended to any reply.This is a base actor that does not include a parser. It must be subclassed with a concrete parser that overrides
parse_command.- Parameters
- new_command(transport, command_str)[source]¶
Handles a new command received by the actor.
- Parameters
transport (clu.actor.CustomTransportType) –
command_str (bytes) –
- new_user(transport)[source]¶
Assigns userID to new client connection.
- Parameters
transport (clu.actor.CustomTransportType) –
- async start()[source]¶
Starts the TCP server.
- Parameters
self (clu.actor.TCPBaseActor_co) –
- Return type
clu.actor.TCPBaseActor_co
- write(*args, **kwargs)[source]¶
Writes a message to user(s) as a JSON.
A
headerkeyword with thecommander_id(i.e., the user id of the transport that sent the command),command_id,message_code, andsenderis added to each message. The payload of the message is written todata. An example of a valid message is{ "header": { "command_id": 0, "commander_id": 1, "message_code": "i", "sender": "test_camera" }, "message": { "camera": { "name": "test_camera", "uid": "DEV_12345" }, "status": { "temperature": 25.0, "cooler": 10.0 } } }
Although the messsage is displayed here in multiple lines, it is written as a single line to the TCP clients to facilitate parsing. For a multiline output, which is more human-readable, use the
multilinecommand.See
writefor details on the allowed parameters.
- server¶
The server to talk to this actor.
- Type
- transports¶
Mapping of commander_id to transport
- class clu.actor.JSONActor(*args, **kwargs)[source]¶
Bases:
clu.parsers.click.ClickParser,clu.actor.TCPBaseActorAn implementation of
TCPBaseActorthat uses a Click command parser.
Command¶
- class clu.command.BaseCommand(commander_id=0, command_id=0, consumer_id=0, actor=None, parent=None, status_callback=None, call_now=False, default_keyword='text', loop=None)[source]¶
Bases:
_asyncio.Future,clu.tools.StatusMixIn[clu.tools.CommandStatus],Generic[clu.command.Actor_co]Base class for commands of all types (user and device).
A
BaseCommandinstance is aFuturewhose result gets set when the status is done (either successfully or not).- Parameters
commander_id (Union[int, str]) – The ID of the commander issuing this command. Can be a string or an integer. Normally the former is used for new-style actor and the latter for legacy actors.
command_id (Union[int, str]) – The ID associated to this command. As with the commander_id, it can be a string or an integer
consumer_id (Union[int, str]) – The actor that is consuming this command. Normally this is our own actor but if we are commanding another actor
consumer_idwill be the destination actor.actor (Optional[Actor_co]) – The actor instance associated to this command.
parent (Optional[BaseCommand[Actor_co]]) – Another
BaseCommandobject that is issuing this subcommand. Messages emitted by the command will use the parentcommand_id.status_callback (Optional[Callable[[CommandStatus], Any]]) – A function to call when the status changes.
call_now (bool) – Whether to call
status_callbackwhen initialising the command.default_keyword (str) – The keyword to use when writing a message that does not specify a keyword.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop.
- set_status(status, message=None, silent=False, **kwargs)[source]¶
Same as
statusbut allows to specify a message to the users.- Parameters
status (Union[clu.tools.CommandStatus, str]) –
message (Optional[Dict[str, Any]]) –
silent (bool) –
- Return type
- replies: List[clu.base.Reply]¶
A list of replies this command has received.
- Type
- property status: Optional[clu.tools.CommandStatus]¶
Returns the status.
- class clu.command.Command(command_string='', transport=None, **kwargs)[source]¶
Bases:
clu.command.BaseCommand[clu.command.Actor_co]A command from a user.
- Parameters
command_string (str) – The string that defines the body of the command.
transport (Optional[Any]) – The TCP transport associated with this command (only relevant for
LegacyActorcommands).
- body¶
The body of the command.
- raw_command_string¶
The raw command string.
- class clu.command.TimedCommand(command_string, delay=1)[source]¶
Bases:
objectA command to be executed on a loop.
- Parameters
- async run(actor)[source]¶
Run the command.
- Parameters
actor (clu.base.BaseActor) –
- class clu.command.TimedCommandList(actor, resolution=0.5, loop=None)[source]¶
Bases:
listA list of
TimedCommandobjects that will be executed on a loop.- Parameters
actor (clu.base.BaseActor) – The actor in which the commands are to be run.
resolution – In seconds, how frequently to check if any of the
TimedCommandmust be executed.loop (Optional[asyncio.AbstractEventLoop]) –
- add_command(command_string, **kwargs)[source]¶
Adds a new
TimedCommand.- Parameters
command_string (str) –
Legacy¶
- class clu.legacy.actor.BaseLegacyActor(name, host, port, tron_host=None, tron_port=None, models=[], version=None, loop=None, log_dir=None, log=None, verbose=False, schema=None)[source]¶
Bases:
clu.base.BaseActorAn actor that provides compatibility with the SDSS opscore protocol.
The TCP servers need to be started by awaiting the coroutine
start. Note thatstartdoes not block so you will need to use asyncio’srun_foreveror a similar system>>> loop = asyncio.get_event_loop() >>> my_actor = await LegacyActor('my_actor', '127.0.0.1', 9999, loop=loop) >>> my_actor.start() >>> loop.run_forever()
- Parameters
name (str) – The name of the actor.
host (str) – The host where the TCP server will run.
port (int) – The port of the TCP server.
tron_host (Optional[str]) – The host on which Tron is running.
tron_port (Optional[int]) – The port on which Tron is running.
models (List[str]) – A list of strings with the actors whose models will be tracked.
version (Optional[str]) – The version of the actor.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop. If
None, the current event loop will be used.log_dir (Optional[PathLike]) – The directory where to store the logs. Defaults to
$HOME/logs/<name>where<name>is the name of the actor.log (Optional[logging.Logger]) – A
Loggerinstance to be used for logging instead of creating a new one.schema (Optional[PathLike]) – The path to the datamodel schema for the actor, in JSON Schema format. If the schema is provided all replies will be validated against it. An invalid reply will fail and not be emitted. The schema can also be set when subclassing by setting the class
schemaattribute.verbose (bool) –
- static format_user_output(user_id, command_id, message_code, msg_str=None)[source]¶
Formats a string to send to users.
- static get_user_command_id(command=None, user_id=0, command_id=0)[source]¶
Returns commander_id, command_id based on user-supplied information.
- Parameters
command (Optional[clu.command.Command]) – User command; used as a default for
user_idandcommand_id.command_id (Optional[int]) – If
Nonethen usecommand.command_id.
- Returns
user_id, command_id – The commander ID and the command ID, parsed from the inputs. If they cannot be determined, returns zeros.
- Return type
- new_command(transport, command_str)[source]¶
Handles a new command received by the actor.
- Parameters
transport (clu.actor.CustomTransportType) –
command_str (bytes) –
- new_user(transport)[source]¶
Assigns userID to new client connection.
- Parameters
transport (clu.actor.CustomTransportType) –
- show_new_user_info(user_id)[source]¶
Shows information for new users. Called when a new user connects.
- Parameters
user_id (int) –
- show_user_info(user_id)[source]¶
Shows user information including your user_id.
- Parameters
user_id (int) –
- async start()[source]¶
Starts the server and the Tron client connection.
- Parameters
self (clu.legacy.actor.T) –
- Return type
clu.legacy.actor.T
- write(message_code='i', message=None, command=None, user_id=None, command_id=None, concatenate=True, broadcast=False, validate=True, **kwargs)[source]¶
Writes a message to user(s).
- Parameters
message_code (str) – The message code (e.g.,
'i'or':').message (Optional[Dict[str, Any]]) – The keywords to be output. Must be a dictionary of pairs
{keyword: value}. Ifvalueis a list it will be converted into a comma-separated string. To prevent unexpected casting, it is recommended forvalueto always be a string.command (Optional[clu.command.Command]) – User command; used as a default for
user_idandcommand_id. If the command is done, it is ignored.user_id (Optional[int]) – The user (transport) to which to write.
Nonedefaults to 0.command_id (Optional[int]) – If
Nonethen usecommand.command_id.concatenate (bool) – Concatenates all the keywords to be output in a single reply with the keyword-values joined with semicolons. Otherwise each keyword will be output as a different message.
broadcast (bool) – Whether to broadcast the reply. Equivalent to
user_id=0.validate (bool) – Validate the reply against the actor model. This is ignored if the actor was not started with knowledge of its own schema.
kwargs – Keyword arguments that will be added to the message. If a keyword is both in
messageand inkwargs, the value inkwargssupersedesmessage.
- transports¶
Mapping of user_id to transport
- tron¶
The client connection to Tron.
- Type
- class clu.legacy.actor.LegacyActor(name, host, port, tron_host=None, tron_port=None, models=[], version=None, loop=None, log_dir=None, log=None, verbose=False, schema=None)[source]¶
Bases:
clu.parsers.click.ClickParser,clu.legacy.actor.BaseLegacyActorA legacy actor that uses the
ClickParser.- Parameters
name (str) –
host (str) –
port (int) –
tron_host (Optional[str]) –
tron_port (Optional[int]) –
models (List[str]) –
version (Optional[str]) –
loop (Optional[asyncio.AbstractEventLoop]) –
log_dir (Optional[PathLike]) –
log (Optional[logging.Logger]) –
verbose (bool) –
schema (Optional[PathLike]) –
- class clu.legacy.tron.TronConnection(host, port=6093, name='tron', models=[], **kwargs)[source]¶
Bases:
clu.base.BaseClientAllows to send commands to Tron and manages the feed of replies.
- Parameters
name (str) – The name of the client.
host (str) – The host on which Tron is running.
port (int) – The port on which Tron is running.
models (List[str]) – A list of strings with the actors whose models will be tracked.
kwargs – Arguments to be passed to
BaseClient.
- send_command(target, command_string, commander='tron.tron', mid=None)[source]¶
Sends a command through the hub.
- Parameters
target – The actor to command.
command_string – The command to send.
commander – The actor or client sending the command. The format for Tron is “commander message_id target command” where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse.
mid – The message id. If
None, a sequentially increasing value will be used. You should not specify amidunless you really know what you’re doing.
- class clu.legacy.tron.TronKey(name, key, keyword=None, model=None, callback=None)[source]¶
Bases:
clu.model.PropertyA Tron model key with callbacks.
Similar to
Propertybut stores the original key with the keyword datamodel.- Parameters
- class clu.legacy.tron.TronModel(keydict, callback=None, log=None)[source]¶
Bases:
clu.model.BaseModel[clu.legacy.tron.TronKey]A JSON-compliant model for actor keywords.
- Parameters
keydict (KeysDictionary) – A dictionary of keys that define the datamodel.
callback (Callable[[TronModel], Any]) – A function or coroutine to call when the datamodel changes. The function is called with the instance of
TronModeland the modified keyword. If the callback is a coroutine, it is scheduled as a task.log (Optional[logging.Logger]) – Where to log messages.
Maskbits¶
- class clu.tools.Maskbit(value)[source]¶
Bases:
enum.FlagA maskbit enumeration. Intended for subclassing.
- property active_bits: List[clu.tools.Maskbit]¶
Returns a list of non-combination flags that match the value.
- class clu.tools.CommandStatus(value)[source]¶
Bases:
clu.tools.MaskbitAn enumeration.
- DONE = 1¶
- CANCELLED = 2¶
- FAILED = 4¶
- TIMEDOUT = 8¶
- READY = 16¶
- RUNNING = 32¶
- CANCELLING = 64¶
- FAILING = 128¶
- DEBUG = 256¶
- ACTIVE_STATES = 224¶
- FAILED_STATES = 14¶
- FAILING_STATES = 192¶
- DONE_STATES = 15¶
- ALL_STATES = 255¶
- property is_combination: bool¶
Returns True if a flag is a combination.
- property did_fail: bool¶
Command failed or was cancelled.
- property did_succeed: bool¶
Command finished with DONE status.
- property is_active: bool¶
Command is running, cancelling or failing.
- property is_done: bool¶
Command is done (whether successfully or not).
- property is_failing: bool¶
Command is being cancelled or is failing.
- static code_to_status(code, default=None)[source]¶
Returns the status associated with a code.
If the code doesn’t have an associated status, returns
default.defaultdefaults toCommandStatus.RUNNING.- Parameters
default (Optional[clu.tools.CommandStatus]) –
- Return type
MixIns¶
- class clu.tools.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]¶
Bases:
Generic[clu.tools.MaskbitType]A mixin that provides status tracking with callbacks.
Provides a status property that executes a list of callbacks when the status changes.
- Parameters
maskbit_flags – A class containing the available statuses as a series of maskbit flags. Usually as subclass of
enum.Flag.initial_status – The initial status.
callback_func – The function to call if the status changes.
call_now – Whether the callback function should be called when initialising.
- Variables
callbacks – A list of the callback functions to call.
- property status¶
Returns the status.
Model¶
- class clu.model.BaseModel(name, callback=None, **kwargs)[source]¶
Bases:
clu.tools.CaseInsensitiveDict[clu.model.T],clu.tools.CallbackMixInA JSON-compliant model.
- Parameters
- class clu.model.Model(name, schema, is_file=False, **kwargs)[source]¶
Bases:
clu.model.BaseModel[clu.model.Property]A model with JSON validation.
In addition to the parameters in
BaseModel, the following parameters are accepted:- Parameters
- class clu.model.ModelSet(client, actors=[], get_schema_command='get_schema', raise_exception=True, **kwargs)[source]¶
Bases:
dictA dictionary of
Modelinstances.Given a list of
actors, queries each of the actors to return their own schemas, which are then parsed and loaded asModelinstances. Since obtaining the schema require sending a command to the actor, that process happens when the coroutineload_schemasis awaited, which should usually occur when the client is started.- Parameters
client (clu.base.BaseClient) – A client with a connection to the actors to monitor.
actors (List[str]) – A list of actor models whose schemas will be loaded.
get_schema_command (str) – The command to send to the actor to get it to return its own schema.
raise_exception (bool) – Whether to raise an exception if any of the models cannot be loaded.
kwargs – Keyword arguments to be passed to
Model.
Example
>>> model_set = ModelSet(client, actors=['sop', 'guider']) >>> model_set['sop'] <Model (name='sop')>
- class clu.model.Property(name, value=None, model=None, callback=None)[source]¶
Bases:
clu.tools.CallbackMixInA model property with callbacks.
- Parameters
name (str) – The name of the property.
value (Optional[Any]) – The value of the property.
model (Optional[Any]) – The parent model.
callback (Optional[Callable[[Any], Any]]) – The function or coroutine that will be called if the value of the key if updated. The callback is called with the instance of
Propertyas the only argument. Note that the callback will be scheduled even if the value does not change.
- flatten()[source]¶
Returns a dictionary with the name and value of the property.
- Return type
Dict[str, Any]
- property value: Any¶
The value associated to the key.
Parser¶
- class clu.parsers.click.ClickParser[source]¶
Bases:
objectA command parser that uses Click at its base.
- class clu.parsers.click.CluCommand(*args, context_settings=None, **kwargs)[source]¶
Bases:
click.core.CommandOverride
click.Commandto pass the actor and command.
- class clu.parsers.click.CluGroup(name=None, commands=None, **attrs)[source]¶
Bases:
click.core.GroupOverride
click.Group.Makes all child commands instances of
CluCommand.- command(*args, **kwargs)[source]¶
Override
click.Groupto useCluCommandby default.
- class clu.parsers.json.JSONParser[source]¶
Bases:
objectA parser that receives commands and arguments as a JSON string.
See JSON parser for details on implementation and use-cases.
- parse_command(command)[source]¶
Parses a user command.
The command string must be a serialised JSON-like string that contains at least a keyword
commandwith the name of the callback, and any number of additional arguments which will be passed to it.- Parameters
command (clu.parsers.json.T) –
- Return type
clu.parsers.json.T
Sockets¶
- class clu.protocol.TCPProtocol(loop=None, connection_callback=None, data_received_callback=None, max_connections=None)[source]¶
Bases:
asyncio.protocols.ProtocolA TCP server/client based on asyncio protocols.
This is a high-level implementation of the client and server asyncio protocols. See asyncio protocol for details.
- Parameters
loop (asyncio.AbstractEventLoop) – The event loop. The current event loop is used by default.
connection_callback (Optional[ConnectionCallbackType]) – Callback to call when a new client connects.
data_received_callback (Optional[Callable[[str], Any]]) – Callback to call when a new data is received.
max_connections (Optional[int]) – How many clients the server accepts. If
None, unlimited connections are allowed.
- class clu.protocol.PeriodicTCPServer(periodic_callback=None, sleep_time=1, **kwargs)[source]¶
Bases:
clu.protocol.TCPProtocolA TCP server that runs a callback periodically.
- Parameters
period_callback – Callback to run every iteration.
sleep_time (float) – The delay between two calls to
periodic_callback.kwargs – Parameters to pass to
TCPProtocolperiodic_callback (Optional[ConnectionCallbackType]) –
- property periodic_callback: Optional[Callable[[Any], Any]]¶
Returns the periodic callback.
- class clu.protocol.TCPStreamServer(host, port, connection_callback=None, data_received_callback=None, loop=None, max_connections=None)[source]¶
Bases:
objectA TCP server based on asyncio streams.
This is a high-level implementation of the asyncio server using streams. See asyncio streams for details.
- Parameters
host (str) – The server host.
port (int) – The server port.
connection_callback (Optional[ConnectionCallbackType]) – Callback to call when a new client connects or disconnects.
data_received_callback (Optional[DataReceivedCallbackType]) – Callback to call when a new data is received.
loop (Optional[asyncio.AbstractEventLoop]) – The event loop. The current event loop is used by default.
max_connections (Optional[int]) – How many clients the server accepts. If
None, unlimited connections are allowed.
- async start()[source]¶
Starts the server and returns a
Serverconnection.- Return type
asyncio.events.AbstractServer
- async connection_made(reader, writer)[source]¶
Called when a new client connects to the server.
Stores the writer protocol in
transports, calls the connection callback, if any, and starts a loop to read any incoming data.- Parameters
reader (asyncio.streams.StreamReader) –
writer (asyncio.streams.StreamWriter) –
- class clu.protocol.TCPStreamPeriodicServer(host, port, periodic_callback=None, sleep_time=1, **kwargs)[source]¶
Bases:
clu.protocol.TCPStreamServerA TCP server that calls a function periodically.
- Parameters
host (str) – The server host.
port (int) – The server port.
period_callback – Callback to run every iteration. It is called for each transport that is connected to the server and receives the transport object.
sleep_time (float) – The delay between two calls to
periodic_callback.kwargs – Parameters to pass to
TCPStreamServerperiodic_callback (Optional[Callable[[asyncio.Transport], Any]]) –
- async start()[source]¶
Starts the server and returns a
Serverconnection.- Return type
asyncio.events.AbstractServer
- property periodic_callback¶
Returns the periodic callback.
- class clu.protocol.TCPStreamClient(host, port)[source]¶
Bases:
objectAn object containing a writer and reader stream to a TCP server.
- async clu.protocol.open_connection(host, port)[source]¶
Returns a TCP stream connection with a writer and reader.
This function is equivalent to doing
>>> client = TCPStreamClient('127.0.0.1', 5555) >>> await client.open_connection()
Instead just do
>>> client = await TCPStreamClient('127.0.0.1', 5555) >>> client.writer.write('Hi!\n'.encode())
- Parameters
- Returns
client (
TCPStreamClient) – A container for the stream reader and writer.- Return type
- class clu.protocol.TopicListener(url=None, user='guest', password='guest', host='localhost', virtualhost='/', port=5672, ssl=False)[source]¶
Bases:
objectA class to declare and listen to AMQP queues with topic conditions.
- Parameters
url (str) – RFC3986 formatted broker address. When used, the other keyword arguments are ignored.
user (str) – The user to connect to the RabbitMQ broker.
password (str) – The password for the user.
host (str) – The host where the RabbitMQ message broker runs.
virtualhost (str) – Virtualhost parameter.
'/'by default.port (int) – The port on which the RabbitMQ message broker is running.
ssl (bool) – Whether to use TLS/SSL connection.
- async connect(exchange_name, exchange_type=<ExchangeType.TOPIC: 'topic'>, on_return_raises=True)[source]¶
Initialise the connection.
- Parameters
exchange_name (str) – The name of the exchange to create.
exchange_type (aio_pika.exchange.ExchangeType) – The type of exchange to create.
- Return type
- async add_queue(queue_name, callback=None, bindings='*')[source]¶
Adds a queue with bindings.
- Parameters
queue_name (str) – The name of the queue to create.
callback (Optional[Callable[[aio_pika.message.IncomingMessage], Any]]) – A callable that will be called when a new message is received in the queue. Can be a coroutine.
bindings (Union[str, List[str]]) – The list of bindings for the queue. Can be a list of string or a single string in which the bindings are comma-separated.
- Return type
aio_pika.queue.Queue
- class clu.device.Device(host, port, callback=None)[source]¶
Bases:
clu.tools.CallbackMixInA class that handles the TCP connection to a device.
There are two ways to create a new device. You can create a subclass from
Deviceand override theprocess_messagemethod which handles how you react to a new line being receivedclass MyDevice(Device): async def process_message(self, line): print(line) my_device = MyDevice('192.168.1.10', 4444) await my_device.start()
Note that
process_messagemust be a coroutine. Alternatively you can pass a callback that will be called instead ofprocess_messagewhen a new message arrives. The callback must also be a coroutineasync def printer(line): print(line) my_device = MyDevice('192.168.1.10', 4444, callback=printer) await my_device.start()
- Parameters
host (str) – The host of the device.
port (int) – The port on which the device is serving.
callback (Optional[Callable[[str], Any]]) – The callback to call with each new message received from the client (after decoding into a string). If no callback is specified,
process_messageis called. If the callback is not a coroutine, it will be converted to one.
Tools¶
- clu.escape(value)[source]¶
Escapes a text using
json.dumps.- Parameters
value (Any) –
- class clu.tools.ActorHandler(actor, level=40, keyword='text', code_mapping=None, filter_warnings=None)[source]¶
Bases:
logging.HandlerA handler that outputs log messages as actor keywords.
- Parameters
actor – The actor instance.
level (int) – The level above which records will be output in the actor.
keyword (str) – The keyword around which the messages will be output.
code_mapping (Optional[Dict[int, str]]) – A mapping of logging levels to actor codes. The values provided override the default mapping. For example, to make input log messages with info level be output as debug,
code_mapping={logging.INFO: 'd'}.filter_warnings (Optional[List[Type[Warning]]]) – A list of warning classes that will be issued to the actor. Subclasses of the filter warning are accepted, any other warnings will be ignored.
- emit(record)[source]¶
Emits the record.
- Parameters
record (logging.LogRecord) –
- class clu.tools.CallbackMixIn(callbacks=[], loop=None)[source]¶
Bases:
objectA mixin for executing callbacks.
- Parameters
callbacks (List[Callable[[Any], Any]]) – A list of functions or coroutines to be called.
loop (Optional[asyncio.AbstractEventLoop]) –
- notify(*args)[source]¶
Calls the callback functions with some arguments.
Coroutine callbacks are scheduled as a task. Synchronous callbacks are scheduled with
call_soon.
- register_callback(callback_func)[source]¶
Adds a callback function or coroutine function.
- Parameters
callback_func (Callable[[...], Any]) –
Testing¶
- class clu.testing.MockReply(command_id, user_id, flag, data={})[source]¶
Bases:
dictStores a reply written to a transport.
The data of the message is stored as part of the dictionary.
- async clu.testing.setup_test_actor(actor, user_id=1)[source]¶
Setups an actor for testing, mocking the client transport.
Takes an
actorand modifies it in two ways:Adds a
invoke_mock_commandmethod to it that allows to submit a command string as if it had been received from a transport.Mocks a client transport with
user_idthat is connected to the actor. Messages written to the transport are stored asMockReplyin aMockReplyListthat is accessible via a newactor.mock_repliesattribute.
The actor is modified in place and returned.
- Parameters
actor (clu.testing.T) –
user_id (int) –
- Return type
clu.testing.T