Source code for clu.legacy.tron

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-10
# @Filename: tron.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import logging

from typing import Any, Callable, List, Optional

from clu.base import BaseClient
from clu.command import Command, CommandStatus
from clu.model import BaseModel, Property
from clu.protocol import open_connection

from .types.keys import Key, KeysDictionary
from .types.messages import Keyword
from .types.parser import ParseError, ReplyParser


__all__ = ["TronConnection", "TronModel", "TronKey"]


[docs]class TronKey(Property): """A Tron model key with callbacks. Similar to `.Property` but stores the original key with the keyword datamodel. """ def __init__( self, name: str, key: Key, keyword: Optional[Keyword] = None, model: Optional[TronModel] = None, callback: Optional[Callable[[TronKey], Any]] = None, ): initial_value = [None] * len(key.typedValues.vtypes) super().__init__(name, value=initial_value, model=model, callback=callback) self.key = key self.keyword = None self.update_keyword(keyword)
[docs] def update_keyword(self, keyword: Optional[Keyword]): """Updates the keyword and value.""" if keyword is None: return self.keyword = keyword self.value = [value.native for value in keyword.values]
def __getitem__(self, sl): return self.value.__getitem__(sl)
[docs]class TronModel(BaseModel[TronKey]): """A JSON-compliant model for actor keywords. Parameters ---------- keydict A dictionary of keys that define the datamodel. callback A function or coroutine to call when the datamodel changes. The function is called with the instance of `.TronModel` and the modified keyword. If the callback is a coroutine, it is scheduled as a task. log Where to log messages. """ def __init__( self, keydict: KeysDictionary, callback: Callable[[TronModel], Any] = None, log: Optional[logging.Logger] = None, ): super().__init__(keydict.name, callback=callback, log=log) self.keydict = keydict for key in self.keydict.keys: key = self.keydict.keys[key] self[key.name] = TronKey(key.name, key, model=self)
[docs] def reload(self): """Reloads the model. Clears callbacks.""" model = self.keydict.name keydict = KeysDictionary.load(model) self.__init__(keydict)
[docs] def parse_reply(self, reply): """Parses a reply and updates the datamodel.""" for reply_key in reply.keywords: key_name = reply_key.name.lower() if key_name not in self.keydict: raise ParseError( f"Cannot parse unknown keyword {self.name}.{reply_key.name}." ) # When parsed the values in reply_key are string. After consuming # it with the Key, the values become typed values. result = self.keydict.keys[key_name].consume(reply_key) if not result: raise ParseError( f"Failed parsing keyword {self.name}.{reply_key.name}." ) self[key_name].update_keyword(reply_key) self.notify(self, self[key_name])
class TronLoggingFilter(logging.Filter): """Logs issues with the Tron parser only to the file logger.""" def filter(self, record): return not record.getMessage().startswith("Failed parsing reply")
[docs]class TronConnection(BaseClient): """Allows to send commands to Tron and manages the feed of replies. Parameters ---------- name The name of the client. host The host on which Tron is running. port The port on which Tron is running. models A list of strings with the actors whose models will be tracked. kwargs Arguments to be passed to `.BaseClient`. """ def __init__( self, host: str, port: int = 6093, name: str = "tron", models: List[str] = [], **kwargs, ): super().__init__(name, **kwargs) self.host = host self.port = port self._mid = 1 models = models or [] #: dict: The `KeysDictionary` associated with each actor to track. self.keyword_dicts = {model: KeysDictionary.load(model) for model in models} #: dict: The model and values of each actor being tracked. self.models = {model: TronModel(self.keyword_dicts[model]) for model in models} self._parser = None self.rparser: Any = ReplyParser() self._client = None self.running_commands = {} # We want to log problems with the Tron parser, but not to the console. if self.log.sh: self.log.sh.addFilter(TronLoggingFilter())
[docs] async def start(self, get_keys=True): """Starts the connection to Tron. Parameters ---------- get_keys : bool If `True`, gets all the keys in the models. """ self._client = await open_connection(self.host, self.port) self._parser = asyncio.create_task(self._handle_reply()) if get_keys: asyncio.create_task(self.get_keys()) return self
[docs] def stop(self): """Closes the connection.""" self._client.close() self._parser.cancel()
async def run_forever(self): # Keep alive until the connection is closed. await self._client.writer.wait_closed()
[docs] def send_command(self, target, command_string, commander="tron.tron", mid=None): """Sends a command through the hub. Parameters ---------- target The actor to command. command_string The command to send. commander The actor or client sending the command. The format for Tron is "commander message_id target command" where commander needs to start with a letter and have a program and a user joined by a dot. Otherwise the command will be accepted but the reply will fail to parse. mid The message id. If `None`, a sequentially increasing value will be used. You should not specify a ``mid`` unless you really know what you're doing. """ mid = mid or self._mid # The mid must be a 32-bit unsigned number. if mid >= 2 ** 32: self._mid = mid = mid % 2 ** 32 command_string = f"{commander} {mid} {target} {command_string}\n" command = Command(command_string=command_string) command.set_status("RUNNING") self.running_commands[mid] = command self._client.writer.write(command_string.encode()) self._mid += 1 return command
[docs] async def get_keys(self): """Gets all the keys for the models being tracked.""" # Number of keys to be requested at once n_keys = 10 for model in self.models.values(): actor = model.name keys = [key.lower() for key in model] for ii in range(0, len(keys), n_keys): keys_to_request = keys[ii : ii + n_keys] if len(keys_to_request) == 0: break keys_joined = " ".join(keys_to_request) command_string = f"getFor={actor} {keys_joined}" self.send_command("keys", command_string)
async def _handle_reply(self): """Tracks new replies from Tron and updates the model.""" while True: line = await self._client.reader.readline() if self._client.reader.at_eof(): self.log.error( "Client received EOF. This usually means that " "Tron is not responding. Closing the connection." ) self.stop() return try: # Do not strip here or that will cause parsing problems. line = line.decode() reply = self.rparser.parse(line) except ParseError: self.log.warning(f"Failed parsing reply '{line.strip()}'.") continue actor = reply.header.actor # The keys command returns keywords as if from the actor # keys_<actor> (e.g. keys_tcc). if actor.startswith("keys_"): actor = actor.split("_")[1] if actor in self.models: try: self.models[actor].parse_reply(reply) except ParseError as ee: self.log.warning( f"Failed parsing reply {reply!r} with error: {ee!s}" ) mid = reply.header.commandId status = CommandStatus.code_to_status(reply.header.code.lower()) if mid in self.running_commands: if status.is_done: self.running_commands[mid].replies.append(reply) command = self.running_commands.pop(mid) command.set_status(status)