Implement concurrent reading in connections
Reading nodes can be significantly sped up by implementing threading for all read-operations, especially with large number of nodes.
This could be implemented using the concurrent.futures.ThreadPoolExecutor
class for Modbus, OPC UA and EnEffCo connections.
Example in EnEffCoConnection:
def read_series(
self, from_time: datetime, to_time: datetime, nodes: Optional[Nodes] = None, interval: TimeStep = 1
) -> pd.DataFrame:
nodes = self._validate_nodes(nodes)
interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval)
def read_node(node):
request_url = "datapoint/{}/value?from={}&to={}&timeInterval={}&includeNanValues=True".format(
self.id_from_code(node.eneffco_code),
self.timestr_from_datetime(from_time),
self.timestr_from_datetime(to_time),
str(int(interval.total_seconds())),
)
response = self._raw_request("GET", request_url)
response = response.json()
data = pd.DataFrame(
data=(r["Value"] for r in response),
index=pd.to_datetime([r["From"] for r in response], utc=True, format="%Y-%m-%dT%H:%M:%SZ").tz_convert(
self._local_tz
),
columns=[node.name],
dtype="float64",
)
data.index.name = "Time (with timezone)"
return data
with concurrent.futures.ThreadPoolExecutor() as executer:
results = executer.map(read_node, nodes)
values = pd.concat(results, axis=1, sort=False)
return values