Source code for clu.legacy.tron

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-10
# @Filename: tron.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging
import re
import time
import warnings
from threading import Lock

from typing import Any, Callable, List, Optional

import clu.base
from clu.command import Command, CommandStatus
from clu.exceptions import CluWarning
from clu.model import BaseModel, Property
from clu.protocol import ReconnectingTCPClientProtocol

from .types.keys import Key, KeysDictionary
from .types.messages import Keyword, Reply
from .types.parser import ParseError, ReplyParser


__all__ = ["TronConnection", "TronModel", "TronKey"]


[docs] class TronKey(Property): """A Tron model key with callbacks. Similar to `.Property` but stores the original key with the keyword datamodel. """ def __init__( self, name: str, key: Key, keyword: Optional[Keyword] = None, model: Optional[TronModel] = None, callback: Optional[Callable[[TronKey], Any]] = None, ): initial_value = [None] * len(key.typedValues.vtypes) super().__init__(name, value=initial_value, model=model, callback=callback) self.key = key self.keyword = None self.update_keyword(keyword)
[docs] def update_keyword(self, keyword: Optional[Keyword]): """Updates the keyword and value.""" if keyword is None: return self.keyword = keyword self.value = [value.native for value in keyword.values]
def __getitem__(self, sl): return self.value.__getitem__(sl) def __len__(self): return len(self.value)
[docs] class TronModel(BaseModel[TronKey]): """A JSON-compliant model for actor keywords. Parameters ---------- keydict A dictionary of keys that define the datamodel. callback A function or coroutine to call when the datamodel changes. The function is called with the instance of `.TronModel` and the modified keyword. If the callback is a coroutine, it is scheduled as a task. """ def __init__( self, keydict: KeysDictionary, callback: Optional[Callable[[TronModel], Any]] = None, ): super().__init__(keydict.name, callback=callback) self.keydict = keydict for key in self.keydict.keys: key = self.keydict.keys[key] self[key.name] = TronKey(key.name, key, model=self) self._lock = Lock()
[docs] def reload(self): """Reloads the model. Clears callbacks.""" model = self.keydict.name keydict = KeysDictionary.load(model) self.__init__(keydict)
[docs] def parse_reply(self, reply): """Parses a reply and updates the datamodel.""" parsed: dict[str, Any] = {} with self._lock: for reply_key in reply.keywords: self.last_seen = time.time() key_name = reply_key.name.lower() if key_name not in self.keydict: warnings.warn( f"Cannot parse unknown keyword {self.name}.{reply_key.name}.", CluWarning, ) continue # When parsed the values in reply_key are string. After consuming # it with the Key, the values become typed values. result = self.keydict.keys[key_name].consume(reply_key) if not result: warnings.warn( f"Failed parsing keyword {self.name}.{reply_key.name}.", CluWarning, ) continue self[key_name].update_keyword(reply_key) parsed[key_name] = self[key_name].value.copy() self.notify(self.flatten(), self[key_name].copy()) return parsed
class TronLoggingFilter(logging.Filter): """Logs issues with the Tron parser only to the file logger.""" def filter(self, record): return not record.getMessage().startswith("Failed parsing reply") class TronClientProtocol(ReconnectingTCPClientProtocol): """A reconnecting protocol for the Tron connection.""" def __init__(self, on_received, *args, **kwargs): super().__init__(*args, **kwargs) self._on_received = on_received self.transport: asyncio.Transport | None = None def data_received(self, data): asyncio.get_event_loop().call_soon(self._on_received, data) def connection_made(self, transport: asyncio.Transport): self.transport = transport
[docs] class TronConnection(clu.base.BaseClient): """Allows to send commands to Tron and manages the feed of replies. Parameters ---------- commander The name of the commander that will send commands to Tron. Must be in the form ``program.user``, for example ``foo.bar``. host The host on which Tron is running. port The port on which Tron is running. models A list of strings with the actors whose models will be tracked. kwargs Arguments to be passed to `.BaseClient`. """ def __init__( self, commander: str, host: str, port: int = 6093, name: str = "tron", models: List[str] = [], **kwargs, ): super().__init__(name, **kwargs) self.commander = commander if not re.match(r"[A-Za-z_]+\.[A-Za-z_]+", self.commander): raise ValueError("Invalid commander format.") self.host = host self.port = port self._mid = 1 models = models or [] #: dict: The `KeysDictionary` associated with each actor to track. self.keyword_dicts = {model: KeysDictionary.load(model) for model in models} #: dict: The model and values of each actor being tracked. self.models = {model: TronModel(self.keyword_dicts[model]) for model in models} self.rparser: Any = ReplyParser() self.protocol: TronClientProtocol | None = None self.running_commands = {} self.buffer = b"" # We want to log problems with the Tron parser, but not to the console. if self.log.sh: self.log.sh.addFilter(TronLoggingFilter())
[docs] async def start(self, get_keys=True): """Starts the connection to Tron. Parameters ---------- get_keys : bool If `True`, gets all the keys in the models. """ self.set_loop_exception_handler() self.protocol = TronClientProtocol( self._handle_reply, self.host, self.port, ) await self.protocol._connect() if not self.protocol.connected: self.log.error(f"Failed connecting to Tron on ({self.host}, {self.port})") return if get_keys: asyncio.create_task(self.get_keys()) return self
[docs] def stop(self): """Closes the connection.""" if self.protocol: self.protocol.stop_trying() if self.protocol and self.protocol.transport: self.protocol.transport.close() self.protocol = None
[docs] def connected(self): """Checks whether the client is connected.""" if self.protocol and self.protocol.connected: return True return False
async def run_forever(self): # pragma: no cover assert self.protocol and self.connected() # Keep alive until the connection is closed. while True: await asyncio.sleep(1) if self.protocol is None: return
[docs] def send_command( self, target, command_string, *args, commander=None, mid=None, callback: Optional[Callable[[Reply], None]] = None, time_limit: Optional[float] = None, ): """Sends a command through the hub. Parameters ---------- target The actor to command. command_string The command to send. args Arguments to concatenate to the command string. commander The actor or client sending the command. The format for Tron is "commander message_id target command" where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse. If ``commander=None``, the instance ``commander`` value will be used. mid The message id. If `None`, a sequentially increasing value will be used. You should not specify a ``mid`` unless you really know what you're doing. callback A callback to invoke with each reply received from the actor. time_limit A delay after which the command is marked as timed out and done. Examples -------- These two are equivalent :: >>> tron.send_command('my_actor', 'do_something --now') >>> tron.send_command('my_actor', 'do_something', '--now') """ assert self.protocol and self.protocol.transport and self.connected() mid = mid or self._mid # The mid must be a 32-bit unsigned number. if mid >= 2**32: self._mid = mid = mid % 2**32 if len(args) > 0: command_string += " " + " ".join(map(str, args)) commander = commander or self.commander command_string = f"{commander} {mid} {target} {command_string}\n" command = Command( command_string=command_string, reply_callback=callback, time_limit=time_limit, commander_id=commander, ) self.running_commands[mid] = command self.protocol.transport.write(command_string.encode()) self._mid += 1 return command
[docs] async def get_keys(self): """Gets all the keys for the models being tracked.""" # Number of keys to be requested at once n_keys = 10 for model in self.models.values(): actor = model.name keys = [key.lower() for key in model] for ii in range(0, len(keys), n_keys): keys_to_request = keys[ii : ii + n_keys] if len(keys_to_request) == 0: break keys_joined = " ".join(keys_to_request) command_string = f"getFor={actor} {keys_joined}" self.send_command("keys", command_string)
def _handle_reply(self, data: bytes): """Tracks new replies from Tron and updates the model.""" self.buffer += data lines = self.buffer.splitlines() if not self.buffer.endswith(b"\n"): self.buffer = lines.pop() else: self.buffer = b"" for line in lines: try: # Do not strip here or that will cause parsing problems. line = line.decode() reply: Reply = self.rparser.parse(line) except ParseError: self.log.warning(f"Failed parsing reply '{line.strip()}'.") continue reply_actor = reply.header.actor reply_commander_id = reply.header.cmdrName # The keys command returns keywords as if from the actor # keys_<actor> (e.g. keys_tcc). if reply_actor.startswith("keys_"): reply_actor = reply_actor.split("_")[1] parsed_data = {} if reply_actor in self.models: try: parsed_data = self.models[reply_actor].parse_reply(reply) except ParseError as ee: self.log.warning( f"Failed parsing reply {reply!r} with error: {ee!s}" ) else: # Fallback in case the actor of the reply is not in the models. # In this case the values will be strings. parsed_data = {kw.name: kw.values for kw in reply.keywords} mid = reply.header.commandId status = CommandStatus.code_to_status(reply.header.code.lower()) if mid in self.running_commands: # We may be receiving messages from a command with the same MID # that's not managed by this instance of the tron client, so we # also check the commander. if self.running_commands[mid].commander_id == reply_commander_id: self.running_commands[mid].replies.append( clu.base.Reply( message={k: v for k, v in parsed_data.items()}, message_code=reply.header.code.lower(), command=self.running_commands[mid], validated=True, keywords=reply.keywords, ) ) self.running_commands[mid].set_status(status) if self.running_commands[mid]._reply_callback is not None: self.running_commands[mid]._reply_callback(reply) if status.is_done: self.running_commands.pop(mid)