API Reference
This library provides statically typed classes for all message types
used by Tenta: MeasurementMessage
, LogMessage
, AcknowledgmentMessage
,
ConfigurationMessage
, and TLSParameters
.
The TentaClient
class provides a simple interface for sending messages
to the Tenta server using Paho MQTT.
Module tenta.types
ConfigurationMessage Objects
class ConfigurationMessage()
A configuration message published by the server. Used by the TentaClient
class to produce a typed object for received configs.
__init__
def __init__(revision: int, configuration: Any) -> None
Create a configuration message.
Arguments:
revision
- Config revision number.configuration
- The configuration object.
LogMessage Objects
class LogMessage()
A log message published by the client.
__init__
def __init__(severity: Literal["info", "warning", "error"],
message: str,
revision: Optional[int] = None,
timestamp: Optional[float] = None) -> None
Create a log message.
Arguments:
severity
- Severity of the log message.message
- The log message content.revision
- Config revision of the log.timestamp
- Timestamp of the measurement. If not provided, theTentaClient
uses the current time.
MeasurementMessage Objects
class MeasurementMessage()
A measurement message published by the client.
__init__
def __init__(value: Dict[str, Union[float, int]],
revision: Optional[int] = None,
timestamp: Optional[float] = None) -> None
Create a measurement message.
Arguments:
value
- Value of the measurement. E.g.{"temperature": 20.0, "humidity": 45.4}
.revision
- Config revision of the measurement.timestamp
- Timestamp of the measurement. If not provided, theTentaClient
uses the current time.
AcknowledgmentMessage Objects
class AcknowledgmentMessage()
An acknowledgment message published by the client.
__init__
def __init__(revision: int,
success: bool,
timestamp: Optional[float] = None) -> None
Create an acknowledgment message.
Arguments:
revision
- The config revision to be acknowledged.success
- Whether the config was processed/accepted successfully.timestamp
- Timestamp of the acceptance. If not provided, theTentaClient
uses the current time.
TLSParameters Objects
class TLSParameters()
TLS parameters for the MQTT connection. Passed as
is to paho.mqtt.client.Client.tls_set
.
__init__
def __init__(ca_certs: Optional[str] = None,
certfile: Optional[str] = None,
keyfile: Optional[str] = None,
cert_reqs: Optional[Any] = None,
tls_version: Optional[int] = None,
ciphers: Optional[str] = None,
keyfile_password: Optional[str] = None,
alpn_protocols: Optional[List[str]] = None) -> None
Create a new TLS parameters object.
Module tenta.client
TentaClient Objects
class TentaClient()
__init__
def __init__(mqtt_host: str,
mqtt_port: int,
mqtt_identifier: str,
mqtt_password: str,
mqtt_client_id: Optional[str] = "",
connection_timeout: int = 8,
sensor_identifier: Optional[str] = None,
receive_configs: bool = True,
on_config_message: Optional[Callable[[ConfigurationMessage],
None]] = None,
on_publish: Optional[Callable[[int], None]] = None,
tls_context: Optional[ssl.SSLContext] = None,
tls_parameters: Optional[TLSParameters] = None,
tls_insecure: Optional[bool] = None) -> None
Create a new Tenta client. Prevents creating multiple instances.
You can look at the advanced example in the documentation to see how to pass the TLS parameters.
Arguments:
mqtt_host
- The host of the MQTT broker.mqtt_port
- The port of the MQTT broker.mqtt_identifier
- The MQTT identifier.mqtt_password
- The MQTT password.connection_timeout
- How many seconds to wait for the initial connection to the MQTT broker until aTimeoutError
is raised.sensor_identifier
- A sensor identifier. If this isNone
, the client will expect a sensor identifier with each individual message, you publish. It will raise aValueError
if you try to publish a message without a sensor identifier.receive_configs
- Whether to subscribe to the configuration topic. If this is set toTrue
but no sensor identifier is specified, aValueError
is raised.on_config_message
- A callback that is called when a new configuration message is received. The function receives theConfigurationMessage
as an argument.on_publish
- A callback that is called when a message is published. The function receives themessage_id
of the published message as an argument.tls_context
- The TLS context to use for the connection. This will be passed as is topaho.mqtt.client.Client.tls_set_context
.tls_parameters
- The TLS parameters to use for the connection. This will be passed as is topaho.mqtt.client.Client.tls_set
.tls_insecure
- Whether to disable TLS verification. This will be passed as is topaho.mqtt.client.Client.tls_insecure_set
.
Raises:
RuntimeError
- If there is already a Tenta client instance.ConnectionError
- If the client could not connect to the MQTT broker.
publish
def publish(messages: Union[
LogMessage,
MeasurementMessage,
AcknowledgmentMessage,
List[LogMessage],
List[MeasurementMessage],
List[AcknowledgmentMessage],
],
sensor_identifier: Optional[str] = None,
wait_for_publish: bool = False,
wait_for_publish_timeout: int = 60) -> int
Publish a list of messages to the MQTT broker. All messages must be of the same type
(LogMessage
, MeasurementMessage
or AcknowledgmentMessage
). They will be published
in a single MQTT message.
Arguments:
messages
- A list of messages to publish.sensor_identifier
- A sensor identifier. If this isNone
, the client will use the sensor identifier that was specified when creating the client. If no sensor identifier was specified when creating the client, aValueError
is raised.wait_for_publish
- Whether to wait for the message to be published.wait_for_publish_timeout
- How many seconds to wait for the message to be published
Returns:
The message_id
of the MQTT message.
was_message_published
def was_message_published(message_id: int) -> bool
Check if a message with a given id was published.
Arguments:
message_id
- Themessage_id
of the message.
Returns:
Whether the message was published.
get_active_message_count
def get_active_message_count() -> int
Get how many messages have not yet been published.
Returns:
The number of messages that have not yet been published.
wait_for_publish
def wait_for_publish(timeout: Optional[int] = 60) -> None
Wait until all messages have been published.
Arguments:
timeout
- How many seconds to wait until aTimeoutError
is raised.
get_latest_received_config_message
def get_latest_received_config_message() -> Optional[ConfigurationMessage]
Return the latest received configuration.
Returns:
The latest received configuration or None
if no configuration has been received yet.
teardown
def teardown() -> None
Disconnect from the MQTT broker and stop the client loop.