Implement Deutscher Wetterdienst API request
Implement the following two wrapper funcions as Connectors:
import pandas as pd
from wetterdienst import Parameter, Resolution, Settings, Period
from wetterdienst.provider.dwd.observation.api import (
DwdObservationRequest,
DwdObservationDataset,
DwdObservationResolution,
DwdObservationParameter,
DwdObservationPeriod,
)
from wetterdienst.provider.dwd.mosmix.api import (
DwdMosmixRequest,
DwdMosmixType,
DwdMosmixParameter,
DwdMosmixStationGroup,
DwdForecastDate,
)
from typing import List, Union, Optional, Tuple
import datetime
def get_observation_data_for_stations(
parameter: Union[
Union[str, DwdObservationDataset, DwdObservationParameter],
List[Union[str, DwdObservationDataset, DwdObservationParameter]],
],
resolution: Union[str, Resolution, DwdObservationResolution],
station_id: Union[str, Tuple[str, ...], List[str]] = None,
latlon: Tuple[float, float] = None,
number_of_stations: int = 1,
period: Optional[Union[str, Period, DwdObservationPeriod, List[Union[str, Period, DwdObservationPeriod]]]] = None,
start_date: Optional[Union[str, datetime.datetime]] = None,
end_date: Optional[Union[str, datetime.datetime]] = None,
settings: Optional[Settings] = None,
skip_empty_stations: bool = True,
) -> pd.DataFrame:
"""
Retrieves historical or current weather observations for specific stations or nearest stations as pandas dataframe.
:param parameter: The parameter(s) to retrieve observations for. It can be a single parameter or a list of parameters.
:param resolution: The resolution of the observations.
:param station_id: The ID(s) of the station(s) to retrieve observations for. It can be a single station ID or a list of station IDs.
:param latlon: The latitude and longitude coordinates of the location to retrieve observations for.
:param number_of_stations: The number of stations to retrieve observations for when using latlon parameter. Defaults to 1.
:param period: The period of the observations. It can be a single period or a list of periods. Defaults to None.
:param start_date: The start date for filtering returned dataframe. Defaults to None.
:param end_date: The end date for filtering returned dataframe. Defaults to None.
:param settings: Additional settings for the request. Defaults to None.
:param skip_empty_stations: Whether to skip empty stations in the result dataframe. Defaults to True.
:return: The retrieved historical or current data in a pandas DataFrame format.
:raises ValueError: If neither station_id nor latlon is provided.
"""
# Create settings object, skip empty stations
_settings = Settings(settings)
_settings.ts_skip_empty = skip_empty_stations
request = DwdObservationRequest(
parameter=parameter,
resolution=resolution,
period=period,
start_date=start_date,
end_date=end_date,
settings=_settings,
)
# Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations
if station_id is None and latlon is None:
raise ValueError('One of `station_id` or `latlon` must be provided')
elif station_id is not None:
stations = request.filter_by_station_id(station_id)
else:
stations = request.filter_by_rank(ETA_LOCATION, rank=number_of_stations)
# Convert to pandas and pivot values so date is the index and stations and datapoints are the columns
result_df = stations.values.all().df.to_pandas()
result_df = result_df.pivot(values="value", columns=("station_id", "parameter"), index="date")
# Drop the first level of the column index if there is only one station
if len(result_df.columns.levels[0]) == 1:
result_df.columns = result_df.columns.droplevel(0)
return result_df
def get_mosmix_data_for_stations(
parameter: Optional[List[Union[str, DwdMosmixParameter, Parameter]]],
mosmix_type: Union[str, DwdMosmixType],
station_id: Union[str, Tuple[str, ...], List[str]] = None,
latlon: Tuple[float, float] = None,
number_of_stations: int = 1,
start_issue: Optional[Union[str, datetime.datetime, DwdForecastDate]] = DwdForecastDate.LATEST,
end_issue: Optional[Union[str, datetime.datetime]] = None,
start_date: Optional[Union[str, datetime.datetime]] = None,
end_date: Optional[Union[str, datetime.datetime]] = None,
station_group: Optional[DwdMosmixStationGroup] = None,
settings: Optional[Settings] = None,
skip_empty_stations: bool = True,
) -> pd.DataFrame:
"""
Retrieves MOSMIX data for a specific stations or nearest stations as pandas dataframe.
:param parameter: The parameter(s) to retrieve observations for. It can be a single parameter or a list of parameters.
:param mosmix_type: The MOSMIX model to retrieve ("large" or "small").
:param station_id: The ID(s) of the station(s) to retrieve observations for. It can be a single station ID or a list of station IDs.
:param latlon: The latitude and longitude coordinates of the location to retrieve observations for.
:param number_of_stations: The number of stations to retrieve observations for when using latlon parameter. Defaults to 1.
:param start_issue: Start of issue of mosmix which should be caught.
:param end_issue: The end issue date.
:param start_date: The start date for filtering returned dataframe.
:param end_date: The end date.
:param station_group: The station group.
:param settings: Additional settings for the request. Defaults to None.
:param skip_empty_stations: Whether to skip empty stations in the result dataframe. Defaults to True.
:return: The retrieved MOSMIX data in a pandas DataFrame format.
:raises ValueError: If neither station_id nor latlon is provided.
"""
# Create settings object, skip empty stations
_settings = Settings(settings)
_settings.ts_skip_empty = skip_empty_stations
request = DwdMosmixRequest(
parameter=parameter,
mosmix_type=mosmix_type,
start_issue=start_issue,
end_issue=end_issue,
start_date=start_date,
end_date=end_date,
station_group=station_group,
settings=settings,
)
# Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations
if station_id is None and latlon is None:
raise ValueError('One of `station_id` or `latlon` must be provided')
elif station_id is not None:
stations = request.filter_by_station_id(station_id)
else:
stations = request.filter_by_rank(ETA_LOCATION, rank=number_of_stations)
# Convert to pandas and pivot values so date is the index and stations and datapoints are the columns
result_df = stations.values.all().df.to_pandas()
result_df = result_df.pivot(values="value", columns=("station_id", "parameter"), index="date")
# Drop the first level of the column index if there is only one station
if len(result_df.columns.levels[0]) == 1:
result_df.columns = result_df.columns.droplevel(0)
return result_df
Edited by Julius Balzer