Skip to content

Implement concurrent reading in connections

Reading nodes can be significantly sped up by implementing threading for all read-operations, especially with large number of nodes.

This could be implemented using the concurrent.futures.ThreadPoolExecutor class for Modbus, OPC UA and EnEffCo connections.

Example in EnEffCoConnection:

def read_series(
    self, from_time: datetime, to_time: datetime, nodes: Optional[Nodes] = None, interval: TimeStep = 1
) -> pd.DataFrame:
    
    nodes = self._validate_nodes(nodes)
    interval = interval if isinstance(interval, timedelta) else timedelta(seconds=interval)

    def read_node(node):
        request_url = "datapoint/{}/value?from={}&to={}&timeInterval={}&includeNanValues=True".format(
            self.id_from_code(node.eneffco_code),
            self.timestr_from_datetime(from_time),
            self.timestr_from_datetime(to_time),
            str(int(interval.total_seconds())),
        )

        response = self._raw_request("GET", request_url)
        response = response.json()

        data = pd.DataFrame(
            data=(r["Value"] for r in response),
            index=pd.to_datetime([r["From"] for r in response], utc=True, format="%Y-%m-%dT%H:%M:%SZ").tz_convert(
                self._local_tz
            ),
            columns=[node.name],
            dtype="float64",
        )
        data.index.name = "Time (with timezone)"
        return data

    with concurrent.futures.ThreadPoolExecutor() as executer:
        results = executer.map(read_node, nodes)

    values = pd.concat(results, axis=1, sort=False)
    return values