Source code for clu.model

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-17
# @Filename: model.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)

from __future__ import annotations

import asyncio
import json
import pathlib
import warnings
from copy import copy
from os import PathLike
from time import time

from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, cast

import jsonschema
import jsonschema.exceptions
import jsonschema.validators

import clu.base

from .exceptions import CluError, CluWarning
from .tools import CallbackMixIn, CaseInsensitiveDict


__all__ = ["Property", "BaseModel", "Model", "ModelSet"]


SchemaType = Union[Dict[str, Any], PathLike, str]

DEFAULT_SCHEMA = {
    "text": {"type": "string"},
    "schema": {"type": "string"},
    "version": {"type": "string"},
    "help": {
        "oneOf": [
            {"type": "array", "items": {"type": "string"}},
            {"type": "string"},
        ]
    },
    "error": {
        "oneOf": [
            {"type": "array", "items": {"type": "string"}},
            {"type": "string"},
            {
                "type": "object",
                "properties": {
                    "module": {"type": "string"},
                    "type": {"type": "string"},
                    "message": {"type": "string"},
                    "filename": {"oneOf": [{"type": "string"}, {"type": "null"}]},
                    "lineno": {"oneOf": [{"type": "integer"}, {"type": "null"}]},
                },
            },
        ]
    },
    "yourUserID": {"type": "integer"},
    "UserInfo": {
        "type": "array",
        "items": [{"type": "integer"}, {"type": "string"}],
    },
    "num_users": {"type": "integer"},
    "command_model": {"oneOf": [{"type": "array"}, {"type": "object"}]},
}


[docs] class Property(CallbackMixIn): """A model property with callbacks. Parameters ---------- name The name of the property. value The value of the property. model The parent model. callback The function or coroutine that will be called if the value of the key if updated. The callback is called with the instance of `Property` as the only argument. Note that the callback will be scheduled even if the value does not change. """ def __init__( self, name: str, value: Optional[Any] = None, model: Optional[Any] = None, callback: Optional[Callable[[Any], Any]] = None, ): self.name = name self._value = value self.last_seen: float | None = None self.model = model self.in_schema: bool = True CallbackMixIn.__init__(self, [callback] if callback else []) def __repr__(self): return f"<{self.__class__.__name__!s} ({self.name}): {self.value}>" def __str__(self): return str(self.value) @property def value(self) -> Any: """The value associated to the key.""" return self._value @value.setter def value(self, new_value: Any): """Sets the value of the key and schedules the callback.""" self._value = new_value self.last_seen = time() self.notify(self.copy())
[docs] def copy(self): """Returns a copy of self.""" return copy(self)
[docs] def flatten(self) -> Dict[str, Any]: """Returns a dictionary with the name and value of the property.""" return {self.name: self.value}
T = TypeVar("T", bound=Property)
[docs] class BaseModel(CaseInsensitiveDict[T], CallbackMixIn): """A JSON-compliant model. Parameters ---------- name The name of the model. callback A function or coroutine to call when the datamodel changes. The function is called with the flattened instance of `.BaseModel` and the key that changed. """ def __init__(self, name: str, callback: Optional[Callable[[Any], Any]] = None): self.name = name self.last_seen = None CaseInsensitiveDict.__init__(self, {}) CallbackMixIn.__init__(self, [callback] if callback else []) def __repr__(self): return f"<Model ({self.name})>" def __str__(self): return str(self.flatten())
[docs] def flatten(self) -> Dict[str, Any]: """Returns a dictionary of values. Return a dictionary in which the `Property` instances are replaced with their values. """ return {key: prop.value for key, prop in self.items()}
[docs] def jsonify(self) -> str: """Returns a JSON string with the model.""" return json.dumps(self.flatten())
[docs] class Model(BaseModel[Property]): """A model with JSON validation. In addition to the parameters in `.BaseModel`, the following parameters are accepted: Parameters ---------- schema A valid JSON schema, to be used for validation. is_file Whether the input schema is a filepath or a dictionary. additional_properties Whether to allow additional properties in the schema, other than the ones defined by the schema. This parameter only is used if ``schema=None`` or if ``additionalProperties`` is not defined in the schema. kwargs Additional parameters to pass to `.BaseModel` on initialisation. """ VALIDATOR = jsonschema.Draft7Validator def __init__( self, name: str, schema: SchemaType, is_file: bool = False, additional_properties: bool = False, **kwargs, ): if is_file: schema = cast(PathLike, schema) schema = open(pathlib.Path(schema).expanduser(), "r").read() if isinstance(schema, str): try: schema = json.loads(schema) except json.JSONDecodeError: raise ValueError("cannot parse input schema.") self.schema = cast("Dict[str, Any]", schema) if not self.check_schema(self.schema): raise ValueError(f"schema {name!r} is invalid.") if ( "type" not in self.schema or self.schema["type"] != "object" or "properties" not in self.schema ): raise ValueError("Schema must be of type object.") # All models must have these keys. for prop in DEFAULT_SCHEMA: if prop not in self.schema["properties"]: self.schema["properties"][prop] = DEFAULT_SCHEMA[prop] if "additionalProperties" not in self.schema: self.schema["additionalProperties"] = additional_properties type_checker = self.VALIDATOR.TYPE_CHECKER.redefine( "array", lambda checker, instance: isinstance(instance, (list, tuple)) ) self.VALIDATOR = jsonschema.validators.extend( self.VALIDATOR, type_checker=type_checker, ) self.validator = self.VALIDATOR(self.schema) self._lock = asyncio.Lock() super().__init__(name, **kwargs) for name in self.schema["properties"]: self[name] = Property(name, model=self)
[docs] @staticmethod def check_schema(schema: Dict[str, Any]) -> bool: """Checks whether a JSON schema is valid. Parameters ---------- schema The schema to check as a dictionary. Returns ------- result Returns `True` if the schema is a valid JSON schema, `False` otherwise. """ try: Model.VALIDATOR.check_schema(schema) return True except jsonschema.SchemaError: return False
[docs] def validate(self, instance: Dict[str, Any], update_model: bool = True): """Validates a new instance.""" try: self.validator.validate(instance) except jsonschema.exceptions.ValidationError as err: return False, err if update_model: self.update_model(instance) return True, None
[docs] def update_model(self, instance: Dict[str, Any]): """Validates a new instance and updates the model.""" self.last_seen = time() for key, value in instance.items(): if ( key in self and isinstance(self[key].value, dict) and isinstance(value, dict) ): # Copy previous value and update it but then assign it to # force the callback in the property. new_value = self[key].value.copy() new_value.update(value) self[key].value = new_value else: # The enforcement of the schema is on the actor side. In # addition, there may be legal properties that we have not # considered, e.g., patternProperties. If the key is not # in the Model, we add it as a new property. if key not in self: self[key] = Property(key, model=self) self[key].in_schema = False self[key].value = value self.notify(self.flatten().copy(), self[key].copy())
[docs] class ModelSet(dict): """A dictionary of `.Model` instances. Given a list of ``actors``, queries each of the actors to return their own schemas, which are then parsed and loaded as `.Model` instances. Since obtaining the schema require sending a command to the actor, that process happens when the coroutine `.load_schemas` is awaited, which should usually occur when the client is started. Parameters ---------- client A client with a connection to the actors to monitor. actors A list of actor models whose schemas will be loaded. get_schema_command The command to send to the actor to get it to return its own schema. raise_exception Whether to raise an exception if any of the models cannot be loaded. kwargs Keyword arguments to be passed to `Model`. Example ------- >>> model_set = ModelSet(client, actors=['sop', 'guider']) >>> model_set['sop'] <Model (name='sop')> """ def __init__( self, client: clu.base.BaseClient, actors: List[str] = [], get_schema_command: str = "get_schema", raise_exception: bool = True, **kwargs, ): dict.__init__(self, {}) self.client = client self.actors = actors self.__raise_exception = raise_exception self.__get_schema_command = get_schema_command self.__kwargs = kwargs
[docs] async def add_actor(self, actor: str): """Adds an actor schema.""" schema = None try: cmd = await self.client.send_command( actor, self.__get_schema_command, internal=True, ) await cmd if cmd.status.did_fail: raise CluError(f"Failed getting schema for {actor}.") else: for reply in cmd.replies: if "schema" in reply.message: schema = json.loads(reply.message["schema"]) break if schema is None: raise CluError(f"{actor} did not reply with a model.") self[actor] = Model(actor, schema, **self.__kwargs) except Exception as err: if not self.__raise_exception: warnings.warn(f"Cannot load model {actor!r}. {err}", CluWarning) return raise
[docs] async def load_schemas(self, actors: Optional[List[str]] = None): """Loads the actor schemas.""" actors = actors or self.actors or [] for actor in actors: await self.add_actor(actor)