#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# @Author: José Sánchez-Gallego (gallegoj@uw.edu)
# @Date: 2019-05-17
# @Filename: model.py
# @License: BSD 3-clause (http://www.opensource.org/licenses/BSD-3-Clause)
from __future__ import annotations
import json
import pathlib
import warnings
from os import PathLike
from typing import Any, Callable, Dict, List, Optional, TypeVar, Union, cast
import jsonschema
import jsonschema.exceptions
import jsonschema.validators
import clu.base
from .exceptions import CluError, CluWarning
from .tools import CallbackMixIn, CaseInsensitiveDict
__all__ = ["Property", "BaseModel", "Model", "ModelSet"]
SchemaType = Union[Dict[str, Any], PathLike, str]
DEFAULT_SCHEMA = {
"text": {"type": "string"},
"schema": {"type": "string"},
"version": {"type": "string"},
"help": {
"oneOf": [
{"type": "array", "items": {"type": "string"}},
{"type": "string"},
]
},
"error": {
"oneOf": [
{"type": "array", "items": {"type": "string"}},
{"type": "string"},
]
},
"yourUserID": {"type": "integer"},
"UserInfo": {
"type": "array",
"items": [{"type": "integer"}, {"type": "string"}],
},
"num_users": {"type": "integer"},
}
[docs]class Property(CallbackMixIn):
"""A model property with callbacks.
Parameters
----------
name
The name of the property.
value
The value of the property.
model
The parent model.
callback
The function or coroutine that will be called if the value of the key
if updated. The callback is called with the instance of `Property`
as the only argument. Note that the callback will be scheduled even
if the value does not change.
"""
def __init__(
self,
name: str,
value: Optional[Any] = None,
model: Optional[Any] = None,
callback: Optional[Callable[[Any], Any]] = None,
):
self.name = name
self._value = value
self.model = model
CallbackMixIn.__init__(self, [callback] if callback else [])
def __repr__(self):
return f"<{self.__class__.__name__!s} ({self.name}): {self.value}>"
def __str__(self):
return str(self.value)
@property
def value(self) -> Any:
"""The value associated to the key."""
return self._value
@value.setter
def value(self, new_value: Any):
"""Sets the value of the key and schedules the callback."""
self._value = new_value
self.notify(self)
[docs] def flatten(self) -> Dict[str, Any]:
"""Returns a dictionary with the name and value of the property."""
return {self.name: self.value}
T = TypeVar("T", bound=Property)
[docs]class BaseModel(CaseInsensitiveDict[T], CallbackMixIn):
"""A JSON-compliant model.
Parameters
----------
name
The name of the model.
callback
A function or coroutine to call when the datamodel changes. The
function is called with the instance of `.BaseModel` and the key that
changed.
"""
def __init__(
self,
name: str,
callback: Optional[Callable[[Any], Any]] = None,
**kwargs,
):
self.name = name
CaseInsensitiveDict.__init__(self, {})
CallbackMixIn.__init__(self, [callback] if callback else [])
def __repr__(self):
return f"<Model ({self.name})>"
def __str__(self):
return str(self.flatten())
[docs] def flatten(self) -> Dict[str, Any]:
"""Returns a dictionary of values.
Return a dictionary in which the `Property` instances are replaced
with their values.
"""
return {key: prop.value for key, prop in self.items()}
[docs] def jsonify(self) -> str:
"""Returns a JSON string with the model."""
return json.dumps(self.flatten())
[docs]class Model(BaseModel[Property]):
"""A model with JSON validation.
In addition to the parameters in `.BaseModel`, the following parameters
are accepted:
Parameters
----------
schema
A valid JSON schema, to be used for validation.
is_file
Whether the input schema is a filepath or a dictionary.
"""
VALIDATOR = jsonschema.Draft7Validator
def __init__(self, name: str, schema: SchemaType, is_file: bool = False, **kwargs):
if is_file:
schema = cast(PathLike, schema)
schema = open(pathlib.Path(schema).expanduser(), "r").read()
if isinstance(schema, str):
try:
schema = json.loads(schema)
except json.JSONDecodeError:
raise ValueError("cannot parse input schema.")
self.schema = cast("Dict[str, Any]", schema)
if not self.check_schema(self.schema):
raise ValueError(f"schema {name!r} is invalid.")
type_checker = self.VALIDATOR.TYPE_CHECKER.redefine(
"array", lambda checker, instance: isinstance(instance, (list, tuple))
)
self.VALIDATOR = jsonschema.validators.extend(
self.VALIDATOR,
type_checker=type_checker,
)
self.validator = self.VALIDATOR(self.schema)
if (
"type" not in self.schema
or self.schema["type"] != "object"
or "properties" not in self.schema
):
raise ValueError("Schema must be of type object.")
# All models must have these keys.
for prop in DEFAULT_SCHEMA:
if prop not in self.schema["properties"]:
self.schema["properties"][prop] = DEFAULT_SCHEMA[prop]
super().__init__(name, **kwargs)
for name in self.schema["properties"]:
self[name] = Property(name, model=self)
[docs] @staticmethod
def check_schema(schema: Dict[str, Any]) -> bool:
"""Checks whether a JSON schema is valid.
Parameters
----------
schema
The schema to check as a dictionary.
Returns
-------
result
Returns `True` if the schema is a valid JSON schema, `False`
otherwise.
"""
try:
Model.VALIDATOR.check_schema(schema)
return True
except jsonschema.SchemaError:
return False
[docs] def update_model(self, instance: Dict[str, Any]):
"""Validates a new instance and updates the model."""
try:
self.validator.validate(instance)
except jsonschema.exceptions.ValidationError as err:
return False, err
for key, value in instance.items():
if key in self:
if isinstance(self[key].value, dict) and isinstance(value, dict):
# Copy previous value and update it but then assign it to
# force the callback in the property.
new_value = self[key].value.copy()
new_value.update(value)
self[key].value = new_value
else:
self[key].value = value
self.notify(self, self[key])
return True, None
[docs]class ModelSet(dict):
"""A dictionary of `.Model` instances.
Given a list of ``actors``, queries each of the actors to return their
own schemas, which are then parsed and loaded as `.Model` instances.
Since obtaining the schema require sending a command to the actor, that
process happens when the coroutine `.load_schemas` is awaited, which
should usually occur when the client is started.
Parameters
----------
client
A client with a connection to the actors to monitor.
actors
A list of actor models whose schemas will be loaded.
get_schema_command
The command to send to the actor to get it to return its own schema.
raise_exception
Whether to raise an exception if any of the models cannot be loaded.
kwargs
Keyword arguments to be passed to `Model`.
Example
-------
>>> model_set = ModelSet(client, actors=['sop', 'guider'])
>>> model_set['sop']
<Model (name='sop')>
"""
def __init__(
self,
client: clu.base.BaseClient,
actors: List[str] = [],
get_schema_command: str = "get_schema",
raise_exception: bool = True,
**kwargs,
):
dict.__init__(self, {})
self.client = client
self.actors = actors
self.__raise_exception = raise_exception
self.__get_schema_command = get_schema_command
self.__kwargs = kwargs
[docs] async def load_schemas(self, actors: Optional[List[str]] = None):
"""Loads the actor schames."""
actors = actors or self.actors or []
schema = None
for actor in actors:
try:
cmd = await self.client.send_command(actor, self.__get_schema_command)
await cmd
if cmd.status.did_fail:
raise CluError(f"Failed getting schema for {actor}.")
else:
for reply in cmd.replies:
if "schema" in reply.body:
schema = json.loads(reply.body["schema"])
break
if schema is None:
raise CluError(f"{actor} did not reply with a model.")
self[actor] = Model(actor, schema, **self.__kwargs)
except Exception as err:
if not self.__raise_exception:
warnings.warn(f"Cannot load model {actor!r}. {err}", CluWarning)
continue
raise