API Reference#
Base classes#
- class clu.base.BaseClient(name, version=None, loop=None, log_dir=None, log=None, verbose=False, validate=True, config={})[source]#
Bases:
object
A base client that can be used for listening or for an actor.
This class defines a new client. Clients differ from actors in that they do not receive commands or issue replies, but do send commands to other actors and listen to the keyword-value flow. All actors are also clients and any actor should subclass from
BaseClient
.Normally a new instance of a client or actor is created by passing a configuration file path to
from_config
which defines how the client must be started.- Parameters:
name (
str
) – The name of the client.loop (
Optional
[AbstractEventLoop
]) – The event loop. IfNone
, the current event loop will be used.log_dir (
Union
[Path
,str
,None
]) – The directory where to store the file logs.log (
Optional
[SDSSLogger
]) – ALogger
instance to be used for logging instead of creating a new one.verbose (
Union
[bool
,int
]) – Whether to log to stdout. Can be an integer logging level.validate (
bool
) – Whether to actor should validate its own messages against its model (if it has one). This is a global parameter that can be overridden when callingwrite
.config (
dict
) – A dictionary of configuration parameters that will be accessible to the client.
- classmethod from_config(config, *args, loader=<class 'yaml.loader.FullLoader'>, **kwargs)[source]#
Parses a configuration file.
- proxy(actor) ProxyClient [source]#
- Return type:
ProxyClient
Creates a proxy for an actor.
Returns a
ProxyClient
that simplifies running a command multiple times. For exampleawait client.send_command("focus_state", "moverelative -1000 UM")
can be replaced with
focus_stage = client.proxy("focus_stage") await focus_stage.send_command("moverelative", "-1000", "UM")
- class clu.base.BaseActor(*args, schema=None, store=False, additional_properties=False, **kwargs)[source]#
Bases:
BaseClient
An actor based on
asyncio
.This class expands
BaseClient
with a parsing system for new commands and placeholders for methods for handling new commands and writing replies, which should be overridden by the specific actors.- Parameters:
schema (
Union
[Dict
[str
,Any
],Path
,str
,None
]) – The schema for the actor replies, as a JSONschema dictionary or path to a JSON file. IfNone
, defaults to the internal basic schema.store (
bool
|list
[str
]) – Whether to store the output keywords in aKeywordStore
.False
(the default), disables the feature.True
will store a record of all the output keywords. A list of keyword names to store can also be passed.additional_properties (
bool
) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used ifschema=None
or ifadditionalProperties
is not defined in the schema.
- invoke_mock_command(command_str, command_id=0) Command [source]#
- Return type:
Send a new command to an actor for testing.
Requires calling
setup_test_actor
.
- load_schema(schema, is_file=True, additional_properties=False) Model | None [source]#
-
Loads and validates the actor schema.
- abstract new_command()[source]#
Handles a new command.
Must be overridden by the subclass and call
parse_command
with aCommand
object.
- write(message_code=MessageCode.INFO, message=None, command=None, broadcast=False, validate=None, expand_exceptions=True, traceback_frame=0, silent=False, internal=False, emit=True, write_to_log=True, **kwargs) Reply [source]#
Writes a message to user(s).
The reply to the users will be formatted by the actor class into message specific to the communication channel. Additional keywords passed to this method will be used to complete the message (as long as they don’t overlap with named parameters). For example
actor.write('i', message={'text': 'Hi!', 'value1': 1})
and
actor.write('i', text='Hi!', value1=1)
are equivalent and
actor.write('i', message={'text': 'Hi!'}, value1=1)
is equivalent to
actor.write('i', message={'text': 'Hi!', 'value1': 1})
This method generates a
Reply
object that is passed to the_write_internal
method in the class, which processes it and outputs the message to the users using the appropriate transport. Ifcommand
is passed, the reply is added tocommand.replies
.- Parameters:
message_code (
MessageCode
|str
) – The message code (e.g.,'i'
or':'
).message (
Union
[Dict
[str
,Any
],str
,None
]) – The keywords to be output. Must be a dictionary of pairs{keyword: value}
.command (
Optional
[BaseCommand
]) – The command to which we are replying. If not set, it is assumed that this is a broadcast.broadcast (
bool
) – Whether to broadcast the message to all the users or only to the commander.validate (
bool
|None
) – Validate the reply against the actor schema. This is ignored if the actor was not started with knowledge of its own schema. IfNone
, defaults to the actor global behaviour.expand_exceptions (
bool
) – If the value of one of the keywords is an exception object andexpand_exception=True
, the exception information and traceback will be expanded. Otherwise the exception will just be stringified.traceback_frame (
int
) – The frame from the traceback to use ifexpand_exception=True
. By default reports the file and line of the last traceback frame.silent (
bool
) – WhenTrue
does not output the message to the users. This can be used to issue internal commands that update the internal model but that don’t clutter the output.internal (
bool
) – Marks theReply
instance as internal. Internal replies are expected to be emitted to the users, but with a header indicating that the message is not for broader consumption. This can be useful when one wants to reply with verbose information that can be skipped from CLI or logs.emit (
bool
) – Whether to call the actor internal write method. Should beTrue
but it’s sometimes useful to callwrite
withemit=False
when one is overriding the method and wants to control when to call the internal method. In that case, aReply
object is returned but nothing is output to the users.write_to_log (
bool
) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.kwargs – Keyword arguments that will used to update the message.
- Return type:
Client#
- class clu.client.AMQPClient(name=None, url=None, user='guest', password='guest', host='localhost', port=5672, virtualhost='/', ssl=False, version=None, loop=None, log_dir=None, log=None, models=[], **kwargs)[source]#
Bases:
BaseClient
Defines a new client based on the AMQP standard.
To start a new client first instantiate the class and then run
start
as a coroutine. Note thatstart
does not block so you will need to use asyncio’srun_forever
or a similar system>>> loop = asyncio.get_event_loop() >>> client = await AMQPClient('my_client', host='localhost').start() >>> loop.run_forever()
- Parameters:
url (
Optional
[str
]) – RFC3986 formatted broker address. When used, the other connection keyword arguments are ignored.user (
str
) – The user to connect to the AMQP broker. Defaults toguest
.password (
str
) – The password for the user. Defaults toguest
.host (
str
) – The host where the AMQP message broker runs. Defaults tolocalhost
.virtualhost (
str
) – Virtualhost parameter.'/'
by default.port (
int
) – The port on which the AMQP broker is running. Defaults to 5672.ssl (
bool
) – Whether to use TLS/SSL connection.loop (
Optional
[AbstractEventLoop
]) – The event loop. IfNone
, the current event loop will be used.log_dir (
Union
[Path
,str
,None
]) – The directory where to store the logs. Defaults to$HOME/logs/<name>
where<name>
is the name of the actor.log (
Optional
[SDSSLogger
]) – ALogger
instance to be used for logging instead of creating a new one.parser – A click command parser that is a subclass of
CluGroup
. IfNone
, the active parser will be used.models (
List
[str
]) – A list of actor models whose schemas will be monitored.
- add_reply_callback(callback_func)[source]#
Adds a callback that is called when a new reply is received.
- async handle_reply(message) AMQPReply [source]#
Handles a reply received from the exchange.
Creates a new instance of
AMQPReply
from themessage
. If the reply is valid it updates any running command.- Parameters:
message (
AbstractIncomingMessage
) – The message received.- Returns:
The
AMQPReply
object created from the message.- Return type:
reply
- async send_command(consumer, command_string, *args, command_id=None, callback=None, internal=False, command=None, time_limit=None, await_command=True)[source]#
Commands another actor over its RCP queue.
- Parameters:
consumer (
str
) – The actor we are commanding.command_string (
str
) – The command string that will be parsed by the remote actor.args – Arguments to concatenate to the command string.
command_id (
str
|None
) – The command ID associated with this command. If empty, an unique identifier will be attached.callback (
Optional
[Callable
[[AMQPReply
],None
]]) – A callback to invoke with each reply received from the actor.internal (
bool
) – Whether to mark the command as internal, in which case replies will also be considered internal.command (
Optional
[Command
]) – TheCommand
that initiated the new command. Only relevant for actors.time_limit (
Optional
[float
]) – A delay after which the command is marked as timed out and done.await_command (
bool
) – IfTrue
, awaits the command until it finishes.
Examples
These two are equivalent
>>> client.send_command('my_actor', 'do_something --now') >>> client.send_command('my_actor', 'do_something', '--now')
- class clu.client.AMQPReply(message, log=None)[source]#
Bases:
object
Wrapper for an
IncomingMessage
that expands and decodes it.- Parameters:
- is_valid#
Whether the message is valid and correctly parsed.
- body#
The body of the message, as a JSON dictionary.
- info#
The info dictionary.
- headers#
The headers of the message, decoded if they are bytes.
- message_code#
The message code.
- internal#
Whether this reply was marked internal.
- sender#
The name of the actor that sends the reply.
- command_id#
The command ID.
Actor#
- class clu.actor.AMQPBaseActor(*args, **kwargs)[source]#
Bases:
AMQPClient
,BaseActor
An actor class that uses AMQP message brokering.
This class differs from
LegacyActor
in that it uses an AMQP messaging broker (typically RabbitMQ) to communicate with other actors in the system, instead of standard TCP sockets. Although the internals and protocols are different the entry points and behaviour for both classes should be almost identical.This class needs to be subclassed with a command parser.
See the documentation for
AMQPActor
andAMQPClient
for additional parameter information.
- class clu.actor.AMQPActor(*args, **kwargs)[source]#
Bases:
ClickParser
,AMQPBaseActor
An
AMQP actor
that uses aclick parser
.
- class clu.actor.TCPBaseActor(name, host=None, port=None, *args, **kwargs)[source]#
Bases:
BaseActor
A TCP base actor that replies using JSON.
This implementation of
BaseActor
uses TCP as command/reply channel and replies to the user by sending a JSON-valid string. This makes it useful as a “device” actor that is not connected to the central message parsing system but that we still want to accept commands and reply with easily parseable messages.Commands received by this actor must be in the format
[<uid>] <command string>
, where<uid>
is any integer unique identifier that will be used ascommand_id
and appended to any reply.This is a base actor that does not include a parser. It must be subclassed with a concrete parser that overrides
parse_command
.- Parameters:
- async start() TCPBaseActor_co [source]#
- Return type:
TypeVar
(TCPBaseActor_co
, bound= TCPBaseActor)
Starts the TCP server.
- write(*args, **kwargs)[source]#
Writes a message to user(s) as a JSON.
A
header
keyword with thecommander_id
(i.e., the user id of the transport that sent the command),command_id
,message_code
, andsender
is added to each message. The payload of the message is written todata
. An example of a valid message is{ "header": { "command_id": 0, "commander_id": 1, "message_code": "i", "sender": "test_camera" }, "message": { "camera": { "name": "test_camera", "uid": "DEV_12345" }, "status": { "temperature": 25.0, "cooler": 10.0 } } }
Although the messsage is displayed here in multiple lines, it is written as a single line to the TCP clients to facilitate parsing. For a multiline output, which is more human-readable, use the
multiline
command.See
write
for details on the allowed parameters.
- server#
The server to talk to this actor.
- Type:
- transports#
Mapping of commander_id to transport
- class clu.actor.JSONActor(*args, **kwargs)[source]#
Bases:
ClickParser
,TCPBaseActor
An implementation of
TCPBaseActor
that uses a Click command parser.
Command#
- class clu.command.BaseCommand(commander_id=None, command_id=0, consumer_id=0, actor=None, parent=None, reply_callback=None, status_callback=None, call_now=False, default_keyword='text', silent=False, internal=False, write_to_log=True, time_limit=None, loop=None)[source]#
Bases:
Future
[Future_co
],StatusMixIn
[CommandStatus
],Generic
[Actor_co
,Future_co
]Base class for commands of all types (user and device).
A
BaseCommand
instance is aFuture
whose result gets set when the status is done (either successfully or not).- Parameters:
commander_id (
Union
[int
,str
,None
]) – The ID of the commander issuing this command. Can be a string or an integer. Normally the former is used for new-style actor and the latter for legacy actors.command_id (
Union
[int
,str
]) – The ID associated to this command. As with the commander_id, it can be a string or an integerconsumer_id (
Union
[int
,str
]) – The actor that is consuming this command. Normally this is our own actor but if we are commanding another actorconsumer_id
will be the destination actor.actor (
Optional
[TypeVar
(Actor_co
, bound= clu.base.BaseActor)]) – The actor instance associated to this command.parent (
Optional
[BaseCommand
[TypeVar
(Actor_co
, bound= clu.base.BaseActor),TypeVar
(Future_co
, bound= BaseCommand)]]) – AnotherBaseCommand
object that is issuing this subcommand. Messages emitted by the command will use the parentcommand_id
.reply_callback (
Optional
[Callable
[[Any
],None
]]) – A callback that gets called when a client command receives a reply from the actor.status_callback (
Optional
[Callable
[[CommandStatus
],Any
]]) – A function to call when the status changes.call_now (
bool
) – Whether to callstatus_callback
when initialising the command.default_keyword (
str
) – The keyword to use when writing a message that does not specify a keyword.silent (
bool
) – A silent command will call the actorwrite
method withsilent=True
, which will update the internal model and record all the output replies but will not write them to the users.internal (
bool
) – A silent command will call the actorwrite
method withinternal=True
, which will instruct the actor to add the internal flag to the header of the reply.write_to_log (
bool
) – Whether to write replies to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.time_limit (
float
|None
) – Time out the command if it has been running for this long.loop (
Optional
[AbstractEventLoop
]) – The event loop.
- send_command(target, command_string, new_command=True, *args, **kwargs) BaseCommand[Actor_co, BaseCommand] | Awaitable[BaseCommand] [source]#
- Return type:
Union
[BaseCommand
[TypeVar
(Actor_co
, bound= clu.base.BaseActor),BaseCommand
],Awaitable
[BaseCommand
]]
Sends a command to an actor using the commander ID of this command.
- set_status(status, message=None, silent=False, **kwargs) BaseCommand [source]#
- Return type:
Same as
status
but allows to specify a message to the users.
- write(message_code=MessageCode.INFO, message=None, broadcast=False, **kwargs)[source]#
Writes to the user(s).
- context: click.Context | None#
The click context, if the click parser is used.
- replies#
A list of replies this command has received. The type of reply object depends on the actor or client issuing the command.
- property status: CommandStatus#
Returns the status.
- class clu.command.Command(command_string='', transport=None, **kwargs)[source]#
Bases:
BaseCommand
[Actor_co
, Command]A command from a user.
- Parameters:
command_string (
str
) – The string that defines the body of the command.transport (
Optional
[Any
]) – The TCP transport associated with this command (only relevant forLegacyActor
commands).
- child_command(command_string)[source]#
Starts a sub-command on the actor currently running this command.
The practical effect is to run another of the same actor’s commands as if it were part of the current
Command
.
- body#
The body of the command.
- raw_command_string#
The raw command string.
- class clu.command.FakeCommand(log, actor=None)[source]#
Bases:
BaseCommand
A fake command that output to a logger.
- class clu.command.TimedCommand(command_string, delay=1, first_silent=False)[source]#
Bases:
object
A command to be executed on a loop.
- Parameters:
- class clu.command.TimedCommandList(actor, resolution=0.5, loop=None)[source]#
Bases:
list
A list of
TimedCommand
objects that will be executed on a loop.- Parameters:
actor (
BaseActor
) – The actor in which the commands are to be run.resolution – In seconds, how frequently to check if any of the
TimedCommand
must be executed.
- add_command(command_string, **kwargs)[source]#
Adds a new
TimedCommand
.
- start() TimedCommandList [source]#
- Return type:
Starts the loop.
- clu.command.parse_legacy_command(command_string) Tuple[str | None, int, str] [source]#
Parses a command received by a legacy actor.
- Parameters:
command_string (
str
) – The command to parse, including an optional header.- Returns:
The command ID, and the command body parsed from the command string.
- Return type:
command_id, command_body
Legacy#
- class clu.legacy.actor.BaseLegacyActor(name, host, port, tron_host=None, tron_port=None, models=[], version=None, loop=None, log_dir=None, log=None, verbose=False, schema=None, store=False, additional_properties=False, config={})[source]#
Bases:
BaseActor
An actor that provides compatibility with the SDSS opscore protocol.
The TCP servers need to be started by awaiting the coroutine
start
. Note thatstart
does not block so you will need to use asyncio’srun_forever
or a similar system>>> loop = asyncio.get_event_loop() >>> my_actor = await LegacyActor('my_actor', '127.0.0.1', 9999, loop=loop) >>> my_actor.start() >>> loop.run_forever()
- Parameters:
name (
str
) – The name of the actor.host (
str
) – The host where the TCP server will run.port (
int
) – The port of the TCP server.tron_host (
Optional
[str
]) – The host on which Tron is running.tron_port (
Optional
[int
]) – The port on which Tron is running.models (
List
[str
]) – A list of strings with the actors whose models will be tracked.loop (
Optional
[AbstractEventLoop
]) – The event loop. IfNone
, the current event loop will be used.log_dir (
Union
[Path
,str
,None
]) – The directory where to store the logs. Defaults to$HOME/logs/<name>
where<name>
is the name of the actor.log (
Optional
[Logger
]) – ALogger
instance to be used for logging instead of creating a new one.schema (
Union
[Path
,str
,None
]) – The path to the datamodel schema for the actor, in JSON Schema format. If the schema is provided all replies will be validated against it. An invalid reply will fail and not be emitted. The schema can also be set when subclassing by setting the classschema
attribute.store (
bool
|list
[str
]) – Whether to store the output keywords in aKeywordStore
.False
(the default), disables the feature.True
will store a record of all the output keywords. A list of keyword names to store can also be passed.config (
Dict
[str
,Any
]) – A dictionary of configuration parameters that will be accessible to the actor.
- static format_user_output(user_id, command_id, message_code, msg_str=None) str [source]#
- Return type:
Formats a string to send to users.
- static get_user_command_id(command=None, user_id=0, command_id=0) Tuple[int, int] [source]#
Returns commander_id, command_id based on user-supplied information.
- Parameters:
- Returns:
The commander ID and the command ID, parsed from the inputs. If they cannot be determined, returns zeros.
- Return type:
user_id, command_id
- send_command(target, command_string, *args, commander=None, command_id=None, command=None, callback=None, time_limit=None)[source]#
Sends a command through the hub.
- Parameters:
target (
str
) – The actor to command.command_string (
str
) – The command to send.args – Arguments to concatenate to the command string.
commander (
Optional
[str
]) – The commander string to send to Tron. If not provided, a valid string is built using the name of the actor and the target.command_id (
Optional
[int
]) – The command id. IfNone
, a sequentially increasing value will be used. You should not specify acommand_id
unless you really know what you’re doing.callback (
Optional
[Callable
[[Reply
],None
]]) – A callback to invoke with each reply received from the actor.time_limit (
Optional
[float
]) – A delay after which the command is marked as timed out and done.
Examples
These two are equivalent
>>> actor.send_command('my_actor', 'do_something --now') >>> actor.send_command('my_actor', 'do_something', '--now')
- show_new_user_info(user_id)[source]#
Shows information for new users. Called when a new user connects.
- async start(get_keys=True, start_nubs=True) T [source]#
Starts the server and the Tron client connection.
- Parameters:
get_keys (
bool
) – Whether to issuekeys getFor
commands against the hub to retrieve the current values for the model keys.start_nubs (
bool
) – IfTrue
, and aTronConnection
has been created, sends ahub startNubs <name>
where<name>
is the name of the actor to attempt to automatically connect it to the hub.
- Return type:
TypeVar
(T
, bound= BaseLegacyActor)
- write(message_code='i', message=None, command=None, user_id=0, command_id=0, concatenate=True, broadcast=False, validate=True, write_to_log=True, **kwargs)[source]#
Writes a message to user(s).
- Parameters:
message_code (
MessageCode
|str
) – The message code (e.g.,'i'
or':'
).message (
Optional
[Dict
[str
,Any
]]) – The keywords to be output. Must be a dictionary of pairs{keyword: value}
. Ifvalue
is a list it will be converted into a comma-separated string. To prevent unexpected casting, it is recommended forvalue
to always be a string.command (
Optional
[Command
]) – User command; used as a default foruser_id
andcommand_id
. If the command is done, it is ignored.user_id (
int
) – The user (transport) to which to write.None
defaults to 0.concatenate (
bool
) – Concatenates all the keywords to be output in a single reply with the keyword-values joined with semicolons. Otherwise each keyword will be output as a different message.broadcast (
bool
) – Whether to broadcast the reply. Equivalent touser_id=0
.validate (
bool
) – Validate the reply against the actor model. This is ignored if the actor was not started with knowledge of its own schema.write_to_log (
bool
) – Whether to write the reply to the log. Defaults to yes but it may be useful to prevent large repetitive replies cluttering the log.kwargs – Keyword arguments that will be added to the message. If a keyword is both in
message
and inkwargs
, the value inkwargs
supersedesmessage
.
- transports#
Mapping of user_id to transport
- tron#
The client connection to Tron.
- Type:
- class clu.legacy.actor.LegacyActor(*args, **kwargs)[source]#
Bases:
ClickParser
,BaseLegacyActor
A legacy actor that uses the
ClickParser
.
- class clu.legacy.tron.TronConnection(commander, host, port=6093, name='tron', models=[], **kwargs)[source]#
Bases:
BaseClient
Allows to send commands to Tron and manages the feed of replies.
- Parameters:
commander (
str
) – The name of the commander that will send commands to Tron. Must be in the formprogram.user
, for examplefoo.bar
.host (
str
) – The host on which Tron is running.port (
int
) – The port on which Tron is running.models (
List
[str
]) – A list of strings with the actors whose models will be tracked.kwargs – Arguments to be passed to
BaseClient
.
- send_command(target, command_string, *args, commander=None, mid=None, callback=None, time_limit=None)[source]#
Sends a command through the hub.
- Parameters:
target – The actor to command.
command_string – The command to send.
args – Arguments to concatenate to the command string.
commander – The actor or client sending the command. The format for Tron is “commander message_id target command” where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse. If
commander=None
, the instancecommander
value will be used.mid – The message id. If
None
, a sequentially increasing value will be used. You should not specify amid
unless you really know what you’re doing.callback (
Optional
[Callable
[[Reply
],None
]]) – A callback to invoke with each reply received from the actor.time_limit (
Optional
[float
]) – A delay after which the command is marked as timed out and done.
Examples
These two are equivalent
>>> tron.send_command('my_actor', 'do_something --now') >>> tron.send_command('my_actor', 'do_something', '--now')
- class clu.legacy.tron.TronKey(name, key, keyword=None, model=None, callback=None)[source]#
Bases:
Property
A Tron model key with callbacks.
Similar to
Property
but stores the original key with the keyword datamodel.
- class clu.legacy.tron.TronModel(keydict, callback=None)[source]#
-
A JSON-compliant model for actor keywords.
- Parameters:
keydict (
KeysDictionary
) – A dictionary of keys that define the datamodel.callback (
Optional
[Callable
[[TronModel
],Any
]]) – A function or coroutine to call when the datamodel changes. The function is called with the instance ofTronModel
and the modified keyword. If the callback is a coroutine, it is scheduled as a task.
Maskbits#
- class clu.tools.Maskbit(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Flag
A maskbit enumeration. Intended for subclassing.
- class clu.tools.CommandStatus(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#
Bases:
Maskbit
- DONE = 1#
- CANCELLED = 2#
- FAILED = 4#
- TIMEDOUT = 8#
- READY = 16#
- RUNNING = 32#
- CANCELLING = 64#
- FAILING = 128#
- DEBUG = 256#
- ACTIVE_STATES = 224#
- FAILED_STATES = 14#
- FAILING_STATES = 192#
- DONE_STATES = 15#
- ALL_STATES = 255#
- static code_to_status(code, default=None) CommandStatus [source]#
- Return type:
Returns the status associated with a code.
If the code doesn’t have an associated status, returns
default
.default
defaults toCommandStatus.RUNNING
.
MixIns#
- class clu.tools.StatusMixIn(maskbit_flags, initial_status=None, callback_func=None, call_now=False)[source]#
Bases:
Generic
[MaskbitType
]A mixin that provides status tracking with callbacks.
Provides a status property that executes a list of callbacks when the status changes.
- Parameters:
maskbit_flags (
Type
[TypeVar
(MaskbitType
, bound=Maskbit
)]) – A class containing the available statuses as a series of maskbit flags. Usually as subclass ofenum.Flag
.initial_status (
Optional
[TypeVar
(MaskbitType
, bound=Maskbit
)]) – The initial status.callback_func (
Optional
[Callable
[[TypeVar
(MaskbitType
, bound=Maskbit
)],Any
]]) – The function to call if the status changes. It receives the status.call_now (
bool
) – Whether the callback function should be called when initialising.
- callbacks#
A list of the callback functions to call.
- property status#
Returns the status.
Model#
- class clu.model.BaseModel(name, callback=None)[source]#
Bases:
CaseInsensitiveDict
[T
],CallbackMixIn
A JSON-compliant model.
- Parameters:
- class clu.model.Model(name, schema, is_file=False, additional_properties=False, **kwargs)[source]#
-
A model with JSON validation.
In addition to the parameters in
BaseModel
, the following parameters are accepted:- Parameters:
schema (
Union
[Dict
[str
,Any
],PathLike
,str
]) – A valid JSON schema, to be used for validation.is_file (
bool
) – Whether the input schema is a filepath or a dictionary.additional_properties (
bool
) – Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used ifschema=None
or ifadditionalProperties
is not defined in the schema.kwargs – Additional parameters to pass to
BaseModel
on initialisation.
- class clu.model.ModelSet(client, actors=[], get_schema_command='get_schema', raise_exception=True, **kwargs)[source]#
Bases:
dict
A dictionary of
Model
instances.Given a list of
actors
, queries each of the actors to return their own schemas, which are then parsed and loaded asModel
instances. Since obtaining the schema require sending a command to the actor, that process happens when the coroutineload_schemas
is awaited, which should usually occur when the client is started.- Parameters:
client (
BaseClient
) – A client with a connection to the actors to monitor.actors (
List
[str
]) – A list of actor models whose schemas will be loaded.get_schema_command (
str
) – The command to send to the actor to get it to return its own schema.raise_exception (
bool
) – Whether to raise an exception if any of the models cannot be loaded.kwargs – Keyword arguments to be passed to
Model
.
Example
>>> model_set = ModelSet(client, actors=['sop', 'guider']) >>> model_set['sop'] <Model (name='sop')>
- class clu.model.Property(name, value=None, model=None, callback=None)[source]#
Bases:
CallbackMixIn
A model property with callbacks.
- Parameters:
name (
str
) – The name of the property.callback (
Optional
[Callable
[[Any
],Any
]]) – The function or coroutine that will be called if the value of the key if updated. The callback is called with the instance ofProperty
as the only argument. Note that the callback will be scheduled even if the value does not change.
Store#
- class clu.store.KeywordOutput(name, message_code, date, value) None [source]#
Bases:
object
Records a single output of a keyword.
Parser#
- class clu.parsers.click.ClickParser[source]#
Bases:
object
A command parser that uses Click at its base.
- class clu.parsers.click.CluCommand(*args, context_settings=None, **kwargs)[source]#
Bases:
Command
Override
click.Command
to pass the actor and command.
- class clu.parsers.click.CluGroup(name=None, commands=None, **attrs) None [source]#
Bases:
Group
Override
click.Group
.Makes all child commands instances of
CluCommand
.- command(*args, **kwargs)[source]#
Override
click.Group
to useCluCommand
by default.
- clu.parsers.click.timeout(seconds)[source]#
A decorator to timeout the command after a number of
seconds
.
- clu.parsers.click.cancel_command(name=None, ctx=None, ok_no_exists=True, keep_last=False)[source]#
Cancels any running instance of a command callback.
- Parameters:
name (
str
|None
) – The name of the command. IfNone
, callsget_current_command_name
.ctx (
Context
|None
) – The click context. IfNone
, uses the current context.ok_no_exists (
bool
) – Exits silently if no command with that name is running.keep_last (
bool
) – Does not cancel the last instance of the command. Useful when a command wants to cancel other instances of itself, but not itself.
- Returns:
True
if a command was cancelled.- Return type:
result
- clu.parsers.click.get_running_tasks(cmd_name) list[Task] | None [source]#
-
Returns the list of tasks for a given command name, sorted by start date.
- clu.parsers.click.get_current_command_name()[source]#
Returns the name of the current click command.
Must be called from inside the command callback.
- class clu.parsers.json.JSONParser[source]#
Bases:
object
A parser that receives commands and arguments as a JSON string.
See JSON parser for details on implementation and use-cases.
- parse_command(command) T [source]#
-
Parses a user command.
The command string must be a serialised JSON-like string that contains at least a keyword
command
with the name of the callback, and any number of additional arguments which will be passed to it.
Sockets#
- class clu.protocol.TCPProtocol(loop=None, connection_callback=None, data_received_callback=None, max_connections=None)[source]#
Bases:
Protocol
A TCP server/client based on asyncio protocols.
This is a high-level implementation of the client and server asyncio protocols. See asyncio protocol for details.
- Parameters:
loop (
AbstractEventLoop
|None
) – The event loop. The current event loop is used by default.connection_callback (
Optional
[Callable
[[Any
],Any
]]) – Callback to call when a new client connects.data_received_callback (
Optional
[Callable
[[str
],Any
]]) – Callback to call when a new data is received.max_connections (
Optional
[int
]) – How many clients the server accepts. IfNone
, unlimited connections are allowed.
- class clu.protocol.PeriodicTCPServer(periodic_callback=None, sleep_time=1, **kwargs)[source]#
Bases:
TCPProtocol
A TCP server that runs a callback periodically.
- Parameters:
period_callback – Callback to run every iteration.
sleep_time (
float
) – The delay between two calls toperiodic_callback
.kwargs – Parameters to pass to
TCPProtocol
- class clu.protocol.TCPStreamServer(host, port, connection_callback=None, data_received_callback=None, loop=None, max_connections=None)[source]#
Bases:
object
A TCP server based on asyncio streams.
This is a high-level implementation of the asyncio server using streams. See asyncio streams for details.
- Parameters:
host (
str
) – The server host.port (
int
) – The server port.connection_callback (
Optional
[Callable
[[Any
],Any
]]) – Callback to call when a new client connects or disconnects.data_received_callback (
Optional
[Callable
[[Any
,bytes
],Any
]]) – Callback to call when a new data is received.loop (
Optional
[AbstractEventLoop
]) – The event loop. The current event loop is used by default.max_connections (
Optional
[int
]) – How many clients the server accepts. IfNone
, unlimited connections are allowed.
- class clu.protocol.TCPStreamPeriodicServer(host, port, periodic_callback=None, sleep_time=1, **kwargs)[source]#
Bases:
TCPStreamServer
A TCP server that calls a function periodically.
- Parameters:
host (
str
) – The server host.port (
int
) – The server port.period_callback – Callback to run every iteration. It is called for each transport that is connected to the server and receives the transport object.
sleep_time (
float
) – The delay between two calls toperiodic_callback
.kwargs – Parameters to pass to
TCPStreamServer
- async start() AbstractServer [source]#
- Return type:
AbstractServer
Starts the server and returns a
Server
connection.
- property periodic_callback#
Returns the periodic callback.
- class clu.protocol.TCPStreamClient(host, port)[source]#
Bases:
object
An object containing a writer and reader stream to a TCP server.
- async clu.protocol.open_connection(host, port) TCPStreamClient [source]#
Returns a TCP stream connection with a writer and reader.
This function is equivalent to doing
>>> client = TCPStreamClient('127.0.0.1', 5555) >>> await client.open_connection()
Instead just do
>>> client = await TCPStreamClient('127.0.0.1', 5555) >>> client.writer.write('Hi!\n'.encode())
- Parameters:
- Returns:
client – A container for the stream reader and writer.
- Return type:
- class clu.protocol.TopicListener(url=None, user='guest', password='guest', host='localhost', virtualhost='/', port=5672, ssl=False)[source]#
Bases:
object
A class to declare and listen to AMQP queues with topic conditions.
- Parameters:
url (
str
|None
) – RFC3986 formatted broker address. When used, the other keyword arguments are ignored.user (
str
) – The user to connect to the RabbitMQ broker.password (
str
) – The password for the user.host (
str
) – The host where the RabbitMQ message broker runs.virtualhost (
str
) – Virtualhost parameter.'/'
by default.port (
int
) – The port on which the RabbitMQ message broker is running.ssl (
bool
) – Whether to use TLS/SSL connection.
- async connect(exchange_name, exchange_type=ExchangeType.TOPIC, on_return_raises=True) TopicListener [source]#
Initialise the connection.
- Parameters:
exchange_name (
str
) – The name of the exchange to create.exchange_type (
ExchangeType
) – The type of exchange to create.
- Return type:
- async add_queue(queue_name, callback=None, bindings='*') AbstractQueue [source]#
Adds a queue with bindings.
- Parameters:
queue_name (
str
) – The name of the queue to create.callback (
Optional
[Callable
[[AbstractIncomingMessage
],Any
]]) – A callable that will be called when a new message is received in the queue. Can be a coroutine.bindings (
Union
[str
,List
[str
]]) – The list of bindings for the queue. Can be a list of string or a single string in which the bindings are comma-separated.
- Return type:
AbstractQueue
- class clu.protocol.ReconnectingTCPClientProtocol(*args, **kwargs)[source]#
Bases:
Protocol
A reconnecting client modelled after Twisted
ReconnectingClientFactory
.Taken from https://bit.ly/3yn6MWa.
- class clu.device.Device(host, port, callback=None)[source]#
Bases:
CallbackMixIn
A class that handles the TCP connection to a device.
There are two ways to create a new device. You can create a subclass from
Device
and override theprocess_message
method which handles how you react to a new line being receivedclass MyDevice(Device): async def process_message(self, line): print(line) my_device = MyDevice('192.168.1.10', 4444) await my_device.start()
Note that
process_message
must be a coroutine. Alternatively you can pass a callback that will be called instead ofprocess_message
when a new message arrives. The callback must also be a coroutineasync def printer(line): print(line) my_device = MyDevice('192.168.1.10', 4444, callback=printer) await my_device.start()
- Parameters:
host (
str
) – The host of the device.port (
int
) – The port on which the device is serving.callback (
Optional
[Callable
[[str
],Any
]]) – The callback to call with each new message received from the client (after decoding into a string). If no callback is specified,process_message
is called. If the callback is not a coroutine, it will be converted to one.
Tools#
- clu.escape(value)[source]#
Escapes a text using
json.dumps
.
- class clu.tools.ActorHandler(actor, level=40, keyword='text', code_mapping=None, filter_warnings=None)[source]#
Bases:
Handler
A handler that outputs log messages as actor keywords.
- Parameters:
actor – The actor instance.
level (
int
) – The level above which records will be output in the actor.keyword (
str
) – The keyword around which the messages will be output.code_mapping (
Optional
[Dict
[int
,str
]]) – A mapping of logging levels to actor codes. The values provided override the default mapping. For example, to make input log messages with info level be output as debug,code_mapping={logging.INFO: 'd'}
.filter_warnings (
Optional
[List
[Type
[Warning
]]]) – A list of warning classes that will be issued to the actor. Subclasses of the filter warning are accepted, any other warnings will be ignored.
- class clu.tools.CallbackMixIn(callbacks=[], loop=None)[source]#
Bases:
object
A mixin for executing callbacks.
- Parameters:
callbacks (
List
[Callable
[[Any
],Any
]]) – A list of functions or coroutines to be called.
Testing#
- class clu.testing.MockReply(user_id, command_id, flag, data={})[source]#
Bases:
dict
Stores a reply written to a transport.
The data of the message is stored as part of the dictionary.
- async clu.testing.setup_test_actor(actor, user_id=666) T [source]#
Setups an actor for testing, mocking the client transport.
Takes an
actor
and modifies it in two ways: :rtype:TypeVar
(T
, bound=MockedActor
)Adds a
invoke_mock_command
method to it that allows to submit a command string as if it had been received from a transport.Mocks a client transport with
user_id
that is connected to the actor. Messages written to the transport are stored asMockReply
in aMockReplyList
that is accessible via a newactor.mock_replies
attribute.
The actor is modified in place and returned.