[docs]classDevice(CallbackMixIn):"""A class that handles the TCP connection to a device. There are two ways to create a new device. You can create a subclass from `.Device` and override the `.process_message` method which handles how you react to a new line being received :: class MyDevice(Device): async def process_message(self, line): print(line) my_device = MyDevice('192.168.1.10', 4444) await my_device.start() Note that `.process_message` must be a coroutine. Alternatively you can pass a callback that will be called instead of `.process_message` when a new message arrives. The callback must also be a coroutine :: async def printer(line): print(line) my_device = MyDevice('192.168.1.10', 4444, callback=printer) await my_device.start() Parameters ---------- host The host of the device. port The port on which the device is serving. callback The callback to call with each new message received from the client (after decoding into a string). If no callback is specified, `.process_message` is called. If the callback is not a coroutine, it will be converted to one. """def__init__(self,host:str,port:int,callback:Optional[Callable[[str],Any]]=None,):self.host=hostself.port=port# TCPStreamClient: the connection to the device.self._client=Noneself.listener=Nonecallback=callbackorself.process_messageCallbackMixIn.__init__(self,callbacks=[callback])
[docs]asyncdefstart(self:T)->T:"""Opens the connection and starts the listener."""ifself.is_connected():raiseRuntimeError("connection is already running.")self._client=awaitopen_connection(self.host,self.port)self.listener=asyncio.create_task(self._listen())returnself
[docs]asyncdefstop(self):"""Closes the connection and stops the listener."""ifself._client:assertself._client.writerself._client.writer.close()awaitself._client.writer.wait_closed()withcontextlib.suppress(asyncio.CancelledError):ifself.listener:self.listener.cancel()awaitself.listener
[docs]defis_connected(self)->bool:"""Returns `True` if the connection is open."""ifself._clientisNone:returnFalseassertself._client.writerreturnnotself._client.writer.is_closing()
[docs]defwrite(self,message:str,newline="\n"):"""Write to the device. The message is encoded and a new line added."""assertself._clientandself._client.writerassertself.is_connected()andself._client.writer,"device is not connected"ifnotmessage.endswith(newline):message=message+newlineself._client.writer.write(message.encode())
asyncdef_listen(self):"""Listens to the reader stream and callbacks on message received."""ifnotself._clientornotself._client.reader:raiseRuntimeError("connection is not open.")whileTrue:line=awaitself._client.reader.readline()ifline==b""orself._client.reader.at_eof():breakline=line.decode().strip()self.notify(line)
[docs]asyncdefprocess_message(self,line:str):# pragma: no cover"""Processes a newly received message."""pass