API Reference¶
Base classes¶
- class clu.base.BaseClient(name, version=None, log_dir=None, log=None, verbose=False, validate=True, config={})[source]¶
Bases:
objectA base client that can be used for listening or for an actor.
This class defines a new client. Clients differ from actors in that they do not receive commands or issue replies, but do send commands to other actors and listen to the keyword-value flow. All actors are also clients and any actor should subclass from
BaseClient.Normally a new instance of a client or actor is created by passing a configuration file path to
from_configwhich defines how the client must be started.- Parameters:
name (
str) – The name of the client.log_dir (
Union[Path,str,None]) – The directory where to store the file logs.log (
Optional[SDSSLogger]) – ALoggerinstance to be used for logging instead of creating a new one.verbose (
Union[bool,int]) – Whether to log to stdout. Can be an integer logging level.validate (
bool) – Whether to actor should validate its own messages against its model (if it has one). This is a global parameter that can be overridden when callingwrite.config (
dict) – A dictionary of configuration parameters that will be accessible to the client.
- classmethod from_config(config, *args, loader=<class 'yaml.loader.FullLoader'>, **kwargs)[source]¶
Parses a configuration file.
- proxy(actor) ProxyClient[source]¶
Creates a proxy for an actor.
Returns a
ProxyClientthat simplifies running a command multiple times. For exampleawait client.send_command("focus_state", "moverelative -1000 UM")
can be replaced with
focus_stage = client.proxy("focus_stage") await focus_stage.send_command("moverelative", "-1000", "UM")
- Return type:
ProxyClient
- class clu.base.BaseActor(*args, schema=None, store=False, additional_properties=False, **kwargs)[source]¶
Bases:
BaseClientAn actor based on
asyncio.This class expands
BaseClientwith a parsing system for new commands and placeholders for methods for handling new commands and writing replies, which should be overridden by the specific actors.- Parameters:
schema (
Union[Type[BaseModel],Dict[str,Any],Path,str,None]) – The schema for the actor replies, as a JSONschema dictionary or path to a JSON file. IfNone, defaults to the internal basic schema.store (
bool|list[str]) – Whether to store the output keywords in aKeywordStore.False(the default), disables the feature.Truewill store a record of all the output keywords. A list of keyword names to store can also be passed.additional_properties (
bool) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used ifschema=Noneor ifadditionalPropertiesis not defined in the schema.
- invoke_mock_command(command_str, command_id=0) Command[source]¶
Send a new command to an actor for testing.
Requires calling
setup_test_actor.- Return type:
- load_schema(schema, is_file=None, additional_properties=False) Model | None[source]¶
Loads and validates the actor schema.
- abstract new_command()[source]¶
Handles a new command.
Must be overridden by the subclass and call
parse_commandwith aCommandobject.
- write(message_code=MessageCode.INFO, message=None, command=None, broadcast=False, validate=None, expand_exceptions=True, traceback_frame=0, silent=False, internal=False, emit=True, write_to_log=True, **kwargs) Reply[source]¶
Writes a message to user(s).
The reply to the users will be formatted by the actor class into message specific to the communication channel. Additional keywords passed to this method will be used to complete the message (as long as they don’t overlap with named parameters). For example
actor.write('i', message={'text': 'Hi!', 'value1': 1})
and
actor.write('i', text='Hi!', value1=1)
are equivalent and
actor.write('i', message={'text': 'Hi!'}, value1=1)
is equivalent to
actor.write('i', message={'text': 'Hi!', 'value1': 1})
This method generates a
Replyobject that is passed to the_write_internalmethod in the class, which processes it and outputs the message to the users using the appropriate transport. Ifcommandis passed, the reply is added tocommand.replies.- Parameters:
message_code (
MessageCode|str) – The message code (e.g.,'i'or':').message (
Union[Dict[str,Any],str,None]) – The keywords to be output. Must be a dictionary of pairs{keyword: value}.command (
Optional[BaseCommand]) – The command to which we are replying. If not set, it is assumed that this is a broadcast.broadcast (
bool) – Whether to broadcast the message to all the users or only to the commander.validate (
bool|None) – Validate the reply against the actor schema. This is ignored if the actor was not started with knowledge of its own schema. IfNone, defaults to the actor global behaviour.expand_exceptions (
bool) – If the value of one of the keywords is an exception object andexpand_exception=True, the exception information and traceback will be expanded. Otherwise the exception will just be stringified.traceback_frame (
int) – The frame from the traceback to use ifexpand_exception=True. By default reports the file and line of the last traceback frame.silent (
bool) – WhenTruedoes not output the message to the users. This can be used to issue internal commands that update the internal model but that don’t clutter the output.internal (
bool) – Marks theReplyinstance as internal. Internal replies are expected to be emitted to the users, but with a header indicating that the message is not for broader consumption. This can be useful when one wants to reply with verbose information that can be skipped from CLI or logs.emit (
bool) – Whether to call the actor internal write method. Should beTruebut it’s sometimes useful to callwritewithemit=Falsewhen one is overriding the method and wants to control when to call the internal method. In that case, aReplyobject is returned but nothing is output to the users.write_to_log (
bool) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.kwargs – Keyword arguments that will used to update the message.
- Return type:
Client¶
- class clu.client.AMQPClient(name=None, url=None, user='guest', password='guest', host='localhost', port=5672, virtualhost='/', ssl=False, version=None, loop=None, log_dir=None, log=None, models=[], **kwargs)[source]¶
Bases:
BaseClientDefines a new client based on the AMQP standard.
To start a new client first instantiate the class and then run
startas a coroutine. Note thatstartdoes not block so you will need to use asyncio’srun_foreveror a similar system>>> loop = asyncio.get_event_loop() >>> client = await AMQPClient('my_client', host='localhost').start() >>> loop.run_forever()
- Parameters:
url (
Optional[str]) – RFC3986 formatted broker address. When used, the other connection keyword arguments are ignored.user (
str) – The user to connect to the AMQP broker. Defaults toguest.password (
str) – The password for the user. Defaults toguest.host (
str) – The host where the AMQP message broker runs. Defaults tolocalhost.virtualhost (
str) – Virtualhost parameter.'/'by default.port (
int) – The port on which the AMQP broker is running. Defaults to 5672.ssl (
bool) – Whether to use TLS/SSL connection.loop (
Optional[AbstractEventLoop]) – The event loop. IfNone, the current event loop will be used.log_dir (
Union[Path,str,None]) – The directory where to store the logs. Defaults to$HOME/logs/<name>where<name>is the name of the actor.log (
Optional[SDSSLogger]) – ALoggerinstance to be used for logging instead of creating a new one.parser – A click command parser that is a subclass of
CluGroup. IfNone, the active parser will be used.models (
List[str]) – A list of actor models whose schemas will be monitored.
- add_reply_callback(callback_func)[source]¶
Adds a callback that is called when a new reply is received.
- async handle_reply(message) AMQPReply[source]¶
Handles a reply received from the exchange.
Creates a new instance of
AMQPReplyfrom themessage. If the reply is valid it updates any running command.- Parameters:
message (
AbstractIncomingMessage) – The message received.- Returns:
The
AMQPReplyobject created from the message.- Return type:
reply
- async send_command(consumer, command_string, *args, command_id=None, callback=None, internal=False, command=None, time_limit=None, await_command=True)[source]¶
Commands another actor over its RCP queue.
- Parameters:
consumer (
str) – The actor we are commanding.command_string (
str) – The command string that will be parsed by the remote actor.args – Arguments to concatenate to the command string.
command_id (
str|None) – The command ID associated with this command. If empty, an unique identifier will be attached.callback (
Optional[Callable[[AMQPReply],None]]) – A callback to invoke with each reply received from the actor.internal (
bool) – Whether to mark the command as internal, in which case replies will also be considered internal.command (
Optional[Command]) – TheCommandthat initiated the new command. Only relevant for actors.time_limit (
Optional[float]) – A delay after which the command is marked as timed out and done.await_command (
bool) – IfTrue, awaits the command until it finishes.
Examples
These two are equivalent
>>> client.send_command('my_actor', 'do_something --now') >>> client.send_command('my_actor', 'do_something', '--now')
- class clu.client.AMQPReply(message, log=None)[source]¶
Bases:
objectWrapper for an
IncomingMessagethat expands and decodes it.- Parameters:
- is_valid¶
Whether the message is valid and correctly parsed.
- body¶
The body of the message, as a JSON dictionary.
- info¶
The info dictionary.
- headers¶
The headers of the message, decoded if they are bytes.
- message_code¶
The message code.
- internal¶
Whether this reply was marked internal.
- sender¶
The name of the actor that sends the reply.
- command_id¶
The command ID.
Actor¶
- class clu.actor.AMQPBaseActor(*args, **kwargs)[source]¶
Bases:
AMQPClient,BaseActorAn actor class that uses AMQP message brokering.
This class differs from
LegacyActorin that it uses an AMQP messaging broker (typically RabbitMQ) to communicate with other actors in the system, instead of standard TCP sockets. Although the internals and protocols are different the entry points and behaviour for both classes should be almost identical.This class needs to be subclassed with a command parser.
See the documentation for
AMQPActorandAMQPClientfor additional parameter information.
- class clu.actor.AMQPActor(*args, **kwargs)[source]¶
Bases:
ClickParser,AMQPBaseActorAn
AMQP actorthat uses aclick parser.
- class clu.actor.TCPBaseActor(name, host=None, port=None, *args, **kwargs)[source]¶
Bases:
BaseActorA TCP base actor that replies using JSON.
This implementation of
BaseActoruses TCP as command/reply channel and replies to the user by sending a JSON-valid string. This makes it useful as a “device” actor that is not connected to the central message parsing system but that we still want to accept commands and reply with easily parseable messages.Commands received by this actor must be in the format
[<uid>] <command string>, where<uid>is any integer unique identifier that will be used ascommand_idand appended to any reply.This is a base actor that does not include a parser. It must be subclassed with a concrete parser that overrides
parse_command.- Parameters:
- async start() TCPBaseActor_co[source]¶
Starts the TCP server.
- Return type:
TypeVar(TCPBaseActor_co, bound= TCPBaseActor)
- write(*args, **kwargs)[source]¶
Writes a message to user(s) as a JSON.
A
headerkeyword with thecommander_id(i.e., the user id of the transport that sent the command),command_id,message_code, andsenderis added to each message. The payload of the message is written todata. An example of a valid message is{ "header": { "command_id": 0, "commander_id": 1, "message_code": "i", "sender": "test_camera" }, "message": { "camera": { "name": "test_camera", "uid": "DEV_12345" }, "status": { "temperature": 25.0, "cooler": 10.0 } } }
Although the messsage is displayed here in multiple lines, it is written as a single line to the TCP clients to facilitate parsing. For a multiline output, which is more human-readable, use the
multilinecommand.See
writefor details on the allowed parameters.
- server¶
The server to talk to this actor.
- Type:
- transports¶
Mapping of commander_id to transport
- class clu.actor.JSONActor(*args, **kwargs)[source]¶
Bases:
ClickParser,TCPBaseActorAn implementation of
TCPBaseActorthat uses a Click command parser.
Command¶
- class clu.command.BaseCommand(commander_id=None, command_id=0, consumer_id=0, actor=None, parent=None, reply_callback=None, status_callback=None, call_now=False, default_keyword='text', silent=False, internal=False, write_to_log=True, time_limit=None, loop=None)[source]¶
Bases:
Future[Future_co],StatusMixIn[CommandStatus],Generic[Actor_co,Future_co]Base class for commands of all types (user and device).
A
BaseCommandinstance is aFuturewhose result gets set when the status is done (either successfully or not).- Parameters:
commander_id (
Union[int,str,None]) – The ID of the commander issuing this command. Can be a string or an integer. Normally the former is used for new-style actor and the latter for legacy actors.command_id (
Union[int,str]) – The ID associated to this command. As with the commander_id, it can be a string or an integerconsumer_id (
Union[int,str]) – The actor that is consuming this command. Normally this is our own actor but if we are commanding another actorconsumer_idwill be the destination actor.actor (
Optional[TypeVar(Actor_co, bound= clu.base.BaseActor)]) – The actor instance associated to this command.parent (
Optional[BaseCommand[TypeVar(Actor_co, bound= clu.base.BaseActor),TypeVar(Future_co, bound= BaseCommand)]]) – AnotherBaseCommandobject that is issuing this subcommand. Messages emitted by the command will use the parentcommand_id.reply_callback (
Optional[Callable[[Any],None]]) – A callback that gets called when a client command receives a reply from the actor.status_callback (
Optional[Callable[[CommandStatus],Any]]) – A function to call when the status changes.call_now (
bool) – Whether to callstatus_callbackwhen initialising the command.default_keyword (
str) – The keyword to use when writing a message that does not specify a keyword.silent (
bool) – A silent command will call the actorwritemethod withsilent=True, which will update the internal model and record all the output replies but will not write them to the users.internal (
bool) – A silent command will call the actorwritemethod withinternal=True, which will instruct the actor to add the internal flag to the header of the reply.write_to_log (
bool) – Whether to write replies to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.time_limit (
float|None) – Time out the command if it has been running for this long.loop (
Optional[AbstractEventLoop]) – The event loop.
- send_command(target, command_string, new_command=True, *args, **kwargs) BaseCommand[Actor_co, BaseCommand] | Awaitable[BaseCommand][source]¶
Sends a command to an actor using the commander ID of this command.
- Return type:
Union[BaseCommand[TypeVar(Actor_co, bound= clu.base.BaseActor),BaseCommand],Awaitable[BaseCommand]]
- set_status(status, message=None, silent=False, **kwargs) BaseCommand[source]¶
Same as
statusbut allows to specify a message to the users.- Return type:
- write(message_code=MessageCode.INFO, message=None, broadcast=False, **kwargs)[source]¶
Writes to the user(s).
- context: click.Context | None¶
The click context, if the click parser is used.
- replies¶
A list of replies this command has received. The type of reply object depends on the actor or client issuing the command.
- property status: CommandStatus¶
Returns the status.
- class clu.command.Command(command_string='', transport=None, **kwargs)[source]¶
Bases:
BaseCommand[Actor_co, Command]A command from a user.
- Parameters:
command_string (
str) – The string that defines the body of the command.transport (
Optional[Any]) – The TCP transport associated with this command (only relevant forLegacyActorcommands).
- child_command(command_string)[source]¶
Starts a sub-command on the actor currently running this command.
The practical effect is to run another of the same actor’s commands as if it were part of the current
Command.
- body¶
The body of the command.
- raw_command_string¶
The raw command string.
- class clu.command.FakeCommand(log, actor=None)[source]¶
Bases:
BaseCommandA fake command that output to a logger.
- class clu.command.TimedCommand(command_string, delay=1, first_silent=False)[source]¶
Bases:
objectA command to be executed on a loop.
- Parameters:
- class clu.command.TimedCommandList(actor, resolution=0.5, loop=None)[source]¶
Bases:
listA list of
TimedCommandobjects that will be executed on a loop.- Parameters:
actor (
BaseActor) – The actor in which the commands are to be run.resolution – In seconds, how frequently to check if any of the
TimedCommandmust be executed.
- add_command(command_string, **kwargs)[source]¶
Adds a new
TimedCommand.
- start() TimedCommandList[source]¶
Starts the loop.
- Return type:
- clu.command.parse_legacy_command(command_string) Tuple[str | None, int, str][source]¶
Parses a command received by a legacy actor.
- Parameters:
command_string (
str) – The command to parse, including an optional header.- Returns:
The command ID, and the command body parsed from the command string.
- Return type:
command_id, command_body
Legacy¶
- class clu.legacy.actor.BaseLegacyActor(name, host, port, tron_host=None, tron_port=None, models=[], version=None, loop=None, log_dir=None, log=None, verbose=False, schema=None, store=False, additional_properties=False, config={})[source]¶
Bases:
BaseActorAn actor that provides compatibility with the SDSS opscore protocol.
The TCP servers need to be started by awaiting the coroutine
start. Note thatstartdoes not block so you will need to use asyncio’srun_foreveror a similar system>>> loop = asyncio.get_event_loop() >>> my_actor = await LegacyActor('my_actor', '127.0.0.1', 9999, loop=loop) >>> my_actor.start() >>> loop.run_forever()
- Parameters:
name (
str) – The name of the actor.host (
str) – The host where the TCP server will run.port (
int) – The port of the TCP server.tron_host (
Optional[str]) – The host on which Tron is running.tron_port (
Optional[int]) – The port on which Tron is running.models (
List[str]) – A list of strings with the actors whose models will be tracked.loop (
Optional[AbstractEventLoop]) – The event loop. IfNone, the current event loop will be used.log_dir (
Union[Path,str,None]) – The directory where to store the logs. Defaults to$HOME/logs/<name>where<name>is the name of the actor.log (
Optional[Logger]) – ALoggerinstance to be used for logging instead of creating a new one.schema (
Union[Path,str,None]) – The path to the datamodel schema for the actor, in JSON Schema format. If the schema is provided all replies will be validated against it. An invalid reply will fail and not be emitted. The schema can also be set when subclassing by setting the classschemaattribute.store (
bool|list[str]) – Whether to store the output keywords in aKeywordStore.False(the default), disables the feature.Truewill store a record of all the output keywords. A list of keyword names to store can also be passed.config (
Dict[str,Any]) – A dictionary of configuration parameters that will be accessible to the actor.
- static format_user_output(user_id, command_id, message_code, msg_str=None) str[source]¶
Formats a string to send to users.
- Return type:
- static get_user_command_id(command=None, user_id=0, command_id=0) Tuple[int, int][source]¶
Returns commander_id, command_id based on user-supplied information.
- Parameters:
- Returns:
The commander ID and the command ID, parsed from the inputs. If they cannot be determined, returns zeros.
- Return type:
user_id, command_id
- send_command(target, command_string, *args, commander=None, command_id=None, command=None, callback=None, time_limit=None)[source]¶
Sends a command through the hub.
- Parameters:
target (
str) – The actor to command.command_string (
str) – The command to send.args – Arguments to concatenate to the command string.
commander (
Optional[str]) – The commander string to send to Tron. If not provided, a valid string is built using the name of the actor and the target.command_id (
Optional[int]) – The command id. IfNone, a sequentially increasing value will be used. You should not specify acommand_idunless you really know what you’re doing.callback (
Optional[Callable[[Reply],None]]) – A callback to invoke with each reply received from the actor.time_limit (
Optional[float]) – A delay after which the command is marked as timed out and done.
Examples
These two are equivalent
>>> actor.send_command('my_actor', 'do_something --now') >>> actor.send_command('my_actor', 'do_something', '--now')
- show_new_user_info(user_id)[source]¶
Shows information for new users. Called when a new user connects.
- async start(get_keys=True, start_nubs=True) T[source]¶
Starts the server and the Tron client connection.
- Parameters:
get_keys (
bool) – Whether to issuekeys getForcommands against the hub to retrieve the current values for the model keys.start_nubs (
bool) – IfTrue, and aTronConnectionhas been created, sends ahub startNubs <name>where<name>is the name of the actor to attempt to automatically connect it to the hub.
- Return type:
TypeVar(T, bound= BaseLegacyActor)
- write(message_code='i', message=None, command=None, user_id=0, command_id=0, concatenate=True, broadcast=False, validate=True, write_to_log=True, **kwargs)[source]¶
Writes a message to user(s).
- Parameters:
message_code (
MessageCode|str) – The message code (e.g.,'i'or':').message (
Optional[Dict[str,Any]]) – The keywords to be output. Must be a dictionary of pairs{keyword: value}. Ifvalueis a list it will be converted into a comma-separated string. To prevent unexpected casting, it is recommended forvalueto always be a string.command (
Optional[Command]) – User command; used as a default foruser_idandcommand_id. If the command is done, it is ignored.user_id (
int) – The user (transport) to which to write.Nonedefaults to 0.concatenate (
bool) – Concatenates all the keywords to be output in a single reply with the keyword-values joined with semicolons. Otherwise each keyword will be output as a different message.broadcast (
bool) – Whether to broadcast the reply. Equivalent touser_id=0.validate (
bool) – Validate the reply against the actor model. This is ignored if the actor was not started with knowledge of its own schema.write_to_log (
bool) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.kwargs – Keyword arguments that will be added to the message. If a keyword is both in
messageand inkwargs, the value inkwargssupersedesmessage.
- transports¶
Mapping of user_id to transport
- tron¶
The client connection to Tron.
- Type:
- class clu.legacy.actor.LegacyActor(*args, **kwargs)[source]¶
Bases:
ClickParser,BaseLegacyActorA legacy actor that uses the
ClickParser.
- class clu.legacy.tron.TronConnection(commander, host, port=6093, name='tron', models=[], **kwargs)[source]¶
Bases:
BaseClientAllows to send commands to Tron and manages the feed of replies.
- Parameters:
commander (
str) – The name of the commander that will send commands to Tron. Must be in the formprogram.user, for examplefoo.bar.host (
str) – The host on which Tron is running.port (
int) – The port on which Tron is running.models (
List[str]) – A list of strings with the actors whose models will be tracked.kwargs – Arguments to be passed to
BaseClient.
- send_command(target, command_string, *args, commander=None, mid=None, callback=None, time_limit=None)[source]¶
Sends a command through the hub.
- Parameters:
target – The actor to command.
command_string – The command to send.
args – Arguments to concatenate to the command string.
commander – The actor or client sending the command. The format for Tron is “commander message_id target command” where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse. If
commander=None, the instancecommandervalue will be used.mid – The message id. If
None, a sequentially increasing value will be used. You should not specify amidunless you really know what you’re doing.callback (
Optional[Callable[[Reply],None]]) – A callback to invoke with each reply received from the actor.time_limit (
Optional[float]) – A delay after which the command is marked as timed out and done.
Examples
These two are equivalent
>>> tron.send_command('my_actor', 'do_something --now') >>> tron.send_command('my_actor', 'do_something', '--now')
- class clu.legacy.tron.TronKey(name, key, keyword=None, model=None, callback=None)[source]¶
Bases:
PropertyA Tron model key with callbacks.
Similar to
Propertybut stores the original key with the keyword datamodel.
- class clu.legacy.tron.TronModel(keydict, callback=None)[source]¶
-
A JSON-compliant model for actor keywords.
- Parameters:
keydict (
KeysDictionary) – A dictionary of keys that define the datamodel.callback (
Optional[Callable[[TronModel],Any]]) – A function or coroutine to call when the datamodel changes. The function is called with the instance ofTronModeland the modified keyword. If the callback is a coroutine, it is scheduled as a task.
Maskbits¶
- class clu.tools.Maskbit(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
FlagA maskbit enumeration. Intended for subclassing.
- class clu.tools.CommandStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]¶
Bases:
Maskbit- DONE = 1¶
- CANCELLED = 2¶
- FAILED = 4¶
- TIMEDOUT = 8¶
- READY = 16¶
- RUNNING = 32¶
- CANCELLING = 64¶
- FAILING = 128¶
- DEBUG = 256¶
- ACTIVE_STATES = 224¶
- FAILED_STATES = 14¶
- FAILING_STATES = 192¶
- DONE_STATES = 15¶
- ALL_STATES = 255¶
- static code_to_status(code, default=None) CommandStatus[source]¶
Returns the status associated with a code.
If the code doesn’t have an associated status, returns
default.defaultdefaults toCommandStatus.RUNNING.- Return type:
MixIns¶
- class clu.tools.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]¶
Bases:
Generic[MaskbitType]A mixin that provides status tracking with callbacks.
Provides a status property that executes a list of callbacks when the status changes.
- Parameters:
maskbit_flags (
Type[TypeVar(MaskbitType, bound=Maskbit)]) – A class containing the available statuses as a series of maskbit flags. Usually as subclass ofenum.Flag.initial_status (
Optional[TypeVar(MaskbitType, bound=Maskbit)]) – The initial status.callback_func (
Optional[Callable[[TypeVar(MaskbitType, bound=Maskbit)],Any]]) – The function to call if the status changes. It receives the status.call_now (
bool) – Whether the callback function should be called when initialising.
- callbacks¶
A list of the callback functions to call.
- property status¶
Returns the status.
Model¶
- class clu.model.CluModel(name, callback=None)[source]¶
Bases:
CaseInsensitiveDict[T],CallbackMixInA JSON-compliant model.
- Parameters:
- class clu.model.Model(name, schema, is_file=None, additional_properties=False, **kwargs)[source]¶
-
A model with JSON validation.
In addition to the parameters in
CluModel, the following parameters are accepted:- Parameters:
schema (
Union[Type[BaseModel],Dict[str,Any],PathLike,str]) – A valid JSON schema, to be used for validation.is_file (
bool|None) – Whether the input schema is a filepath or a dictionary. IfNone, tries to guess from the type of the input.additional_properties (
bool) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used ifschema=Noneor ifadditionalPropertiesis not defined in the schema.kwargs – Additional parameters to pass to
CluModelon initialisation.
- class clu.model.ModelSet(client, actors=[], get_schema_command='get_schema', raise_exception=True, **kwargs)[source]¶
Bases:
dictA dictionary of
Modelinstances.Given a list of
actors, queries each of the actors to return their own schemas, which are then parsed and loaded asModelinstances. Since obtaining the schema require sending a command to the actor, that process happens when the coroutineload_schemasis awaited, which should usually occur when the client is started.- Parameters:
client (
BaseClient) – A client with a connection to the actors to monitor.actors (
List[str]) – A list of actor models whose schemas will be loaded.get_schema_command (
str) – The command to send to the actor to get it to return its own schema.raise_exception (
bool) – Whether to raise an exception if any of the models cannot be loaded.kwargs – Keyword arguments to be passed to
Model.
Example
>>> model_set = ModelSet(client, actors=['sop', 'guider']) >>> model_set['sop'] <Model (name='sop')>
- class clu.model.Property(name, value=None, model=None, callback=None)[source]¶
Bases:
CallbackMixInA model property with callbacks.
- Parameters:
name (
str) – The name of the property.callback (
Optional[Callable[[Any],Any]]) – The function or coroutine that will be called if the value of the key if updated. The callback is called with the instance ofPropertyas the only argument. Note that the callback will be scheduled even if the value does not change.
Store¶
- class clu.store.KeywordOutput(name, message_code, date, value) None[source]¶
Bases:
objectRecords a single output of a keyword.
Parser¶
- class clu.parsers.click.ClickParser[source]¶
Bases:
objectA command parser that uses Click at its base.
- class clu.parsers.click.CluCommand(*args, context_settings=None, **kwargs)[source]¶
Bases:
CommandOverride
click.Commandto pass the actor and command.
- class clu.parsers.click.CluGroup(*args, **kwargs) None[source]¶
Bases:
ClickAliasedGroupOverride
click.Group.Makes all child commands instances of
CluCommand.- command(*args, **kwargs)[source]¶
Override
click.Groupto useCluCommandby default.
- clu.parsers.click.timeout(seconds)[source]¶
A decorator to timeout the command after a number of
seconds.
- clu.parsers.click.cancel_command(name=None, ctx=None, ok_no_exists=True, keep_last=False)[source]¶
Cancels any running instance of a command callback.
- Parameters:
name (
str|None) – The name of the command. IfNone, callsget_current_command_name.ctx (
Context|None) – The click context. IfNone, uses the current context.ok_no_exists (
bool) – Exits silently if no command with that name is running.keep_last (
bool) – Does not cancel the last instance of the command. Useful when a command wants to cancel other instances of itself, but not itself.
- Returns:
Trueif a command was cancelled.- Return type:
result
- clu.parsers.click.get_running_tasks(cmd_name) list[Task] | None[source]¶
Returns the list of tasks for a given command name, sorted by start date.
- clu.parsers.click.get_current_command_name()[source]¶
Returns the name of the current click command.
Must be called from inside the command callback.
- class clu.parsers.json.JSONParser[source]¶
Bases:
objectA parser that receives commands and arguments as a JSON string.
See JSON parser for details on implementation and use-cases.
- parse_command(command) T[source]¶
Parses a user command.
The command string must be a serialised JSON-like string that contains at least a keyword
commandwith the name of the callback, and any number of additional arguments which will be passed to it.
Sockets¶
- class clu.protocol.TCPProtocol(loop=None, connection_callback=None, data_received_callback=None, max_connections=None)[source]¶
Bases:
ProtocolA TCP server/client based on asyncio protocols.
This is a high-level implementation of the client and server asyncio protocols. See asyncio protocol for details.
- Parameters:
loop (
AbstractEventLoop|None) – The event loop. The current event loop is used by default.connection_callback (
Optional[Callable[[Any],Any]]) – Callback to call when a new client connects.data_received_callback (
Optional[Callable[[str],Any]]) – Callback to call when a new data is received.max_connections (
Optional[int]) – How many clients the server accepts. IfNone, unlimited connections are allowed.
- class clu.protocol.PeriodicTCPServer(periodic_callback=None, sleep_time=1, **kwargs)[source]¶
Bases:
TCPProtocolA TCP server that runs a callback periodically.
- Parameters:
period_callback – Callback to run every iteration.
sleep_time (
float) – The delay between two calls toperiodic_callback.kwargs – Parameters to pass to
TCPProtocol
- class clu.protocol.TCPStreamServer(host, port, connection_callback=None, data_received_callback=None, loop=None, max_connections=None)[source]¶
Bases:
objectA TCP server based on asyncio streams.
This is a high-level implementation of the asyncio server using streams. See asyncio streams for details.
- Parameters:
host (
str) – The server host.port (
int) – The server port.connection_callback (
Optional[Callable[[Any],Any]]) – Callback to call when a new client connects or disconnects.data_received_callback (
Optional[Callable[[Any,bytes],Any]]) – Callback to call when a new data is received.loop (
Optional[AbstractEventLoop]) – The event loop. The current event loop is used by default.max_connections (
Optional[int]) – How many clients the server accepts. IfNone, unlimited connections are allowed.
- class clu.protocol.TCPStreamPeriodicServer(host, port, periodic_callback=None, sleep_time=1, **kwargs)[source]¶
Bases:
TCPStreamServerA TCP server that calls a function periodically.
- Parameters:
host (
str) – The server host.port (
int) – The server port.period_callback – Callback to run every iteration. It is called for each transport that is connected to the server and receives the transport object.
sleep_time (
float) – The delay between two calls toperiodic_callback.kwargs – Parameters to pass to
TCPStreamServer
- async start() AbstractServer[source]¶
Starts the server and returns a
Serverconnection.- Return type:
AbstractServer
- property periodic_callback¶
Returns the periodic callback.
- class clu.protocol.TCPStreamClient(host, port)[source]¶
Bases:
objectAn object containing a writer and reader stream to a TCP server.
- async clu.protocol.open_connection(host, port) TCPStreamClient[source]¶
Returns a TCP stream connection with a writer and reader.
This function is equivalent to doing
>>> client = TCPStreamClient('127.0.0.1', 5555) >>> await client.open_connection()
Instead just do
>>> client = await TCPStreamClient('127.0.0.1', 5555) >>> client.writer.write('Hi!\n'.encode())
- Parameters:
- Returns:
client – A container for the stream reader and writer.
- Return type:
- class clu.protocol.TopicListener(url=None, user='guest', password='guest', host='localhost', virtualhost='/', port=5672, ssl=False)[source]¶
Bases:
objectA class to declare and listen to AMQP queues with topic conditions.
- Parameters:
url (
str|None) – RFC3986 formatted broker address. When used, the other keyword arguments are ignored.user (
str) – The user to connect to the RabbitMQ broker.password (
str) – The password for the user.host (
str) – The host where the RabbitMQ message broker runs.virtualhost (
str) – Virtualhost parameter.'/'by default.port (
int) – The port on which the RabbitMQ message broker is running.ssl (
bool) – Whether to use TLS/SSL connection.
- async connect(exchange_name, exchange_type=ExchangeType.TOPIC, on_return_raises=True) TopicListener[source]¶
Initialise the connection.
- Parameters:
exchange_name (
str) – The name of the exchange to create.exchange_type (
ExchangeType) – The type of exchange to create.
- Return type:
- async add_queue(queue_name, callback=None, bindings='*') AbstractQueue[source]¶
Adds a queue with bindings.
- Parameters:
queue_name (
str) – The name of the queue to create.callback (
Optional[Callable[[AbstractIncomingMessage],Any]]) – A callable that will be called when a new message is received in the queue. Can be a coroutine.bindings (
Union[str,List[str]]) – The list of bindings for the queue. Can be a list of string or a single string in which the bindings are comma-separated.
- Return type:
AbstractQueue
- class clu.protocol.ReconnectingTCPClientProtocol(*args, **kwargs)[source]¶
Bases:
ProtocolA reconnecting client modelled after Twisted
ReconnectingClientFactory.Taken from https://bit.ly/3yn6MWa.
- class clu.device.Device(host, port, callback=None)[source]¶
Bases:
CallbackMixInA class that handles the TCP connection to a device.
There are two ways to create a new device. You can create a subclass from
Deviceand override theprocess_messagemethod which handles how you react to a new line being receivedclass MyDevice(Device): async def process_message(self, line): print(line) my_device = MyDevice('192.168.1.10', 4444) await my_device.start()
Note that
process_messagemust be a coroutine. Alternatively you can pass a callback that will be called instead ofprocess_messagewhen a new message arrives. The callback must also be a coroutineasync def printer(line): print(line) my_device = MyDevice('192.168.1.10', 4444, callback=printer) await my_device.start()
- Parameters:
host (
str) – The host of the device.port (
int) – The port on which the device is serving.callback (
Optional[Callable[[str],Any]]) – The callback to call with each new message received from the client (after decoding into a string). If no callback is specified,process_messageis called. If the callback is not a coroutine, it will be converted to one.
Tools¶
- clu.escape(value)[source]¶
Escapes a text using
json.dumps.
- class clu.tools.ActorHandler(actor, level=40, keyword='text', code_mapping=None, filter_warnings=None)[source]¶
Bases:
HandlerA handler that outputs log messages as actor keywords.
- Parameters:
actor – The actor instance.
level (
int) – The level above which records will be output in the actor.keyword (
str) – The keyword around which the messages will be output.code_mapping (
Optional[Dict[int,str]]) – A mapping of logging levels to actor codes. The values provided override the default mapping. For example, to make input log messages with info level be output as debug,code_mapping={logging.INFO: 'd'}.filter_warnings (
Optional[List[Type[Warning]]]) – A list of warning classes that will be issued to the actor. Subclasses of the filter warning are accepted, any other warnings will be ignored.
- class clu.tools.CallbackMixIn(callbacks=[], loop=None)[source]¶
Bases:
objectA mixin for executing callbacks.
- Parameters:
callbacks (
List[Callable[[Any],Any]]) – A list of functions or coroutines to be called.
Testing¶
- class clu.testing.MockReply(user_id, command_id, flag, data={})[source]¶
Bases:
dictStores a reply written to a transport.
The data of the message is stored as part of the dictionary.
- async clu.testing.setup_test_actor(actor, user_id=666) T[source]¶
Setups an actor for testing, mocking the client transport.
Takes an
actorand modifies it in two ways: :rtype:TypeVar(T, bound=MockedActor)Adds a
invoke_mock_commandmethod to it that allows to submit a command string as if it had been received from a transport.Mocks a client transport with
user_idthat is connected to the actor. Messages written to the transport are stored asMockReplyin aMockReplyListthat is accessible via a newactor.mock_repliesattribute.
The actor is modified in place and returned.