Skip to content

Implement Deutscher Wetterdienst API request

Implement the following two wrapper funcions as Connectors:

(Source: https://git.ptw.maschinenbau.tu-darmstadt.de/eta-fabrik/projekte/eta-gekko-framework/-/blob/backup/examples/API_wetterdienst.ipynb?ref_type=heads)

import pandas as pd
from wetterdienst import Parameter, Resolution, Settings, Period
from wetterdienst.provider.dwd.observation.api import (
    DwdObservationRequest,
    DwdObservationDataset,
    DwdObservationResolution,
    DwdObservationParameter,
    DwdObservationPeriod,
)
from wetterdienst.provider.dwd.mosmix.api import (
    DwdMosmixRequest,
    DwdMosmixType,
    DwdMosmixParameter,
    DwdMosmixStationGroup,
    DwdForecastDate,
)
from typing import List, Union, Optional, Tuple
import datetime


def get_observation_data_for_stations(
    parameter: Union[
        Union[str, DwdObservationDataset, DwdObservationParameter],
        List[Union[str, DwdObservationDataset, DwdObservationParameter]],
    ],
    resolution: Union[str, Resolution, DwdObservationResolution],
    station_id: Union[str, Tuple[str, ...], List[str]] = None, 
    latlon: Tuple[float, float] = None,
    number_of_stations: int = 1,
    period: Optional[Union[str, Period, DwdObservationPeriod, List[Union[str, Period, DwdObservationPeriod]]]] = None,
    start_date: Optional[Union[str, datetime.datetime]] = None,
    end_date: Optional[Union[str, datetime.datetime]] = None,
    settings: Optional[Settings] = None,
    skip_empty_stations: bool = True,
) -> pd.DataFrame:
    """
    Retrieves historical or current weather observations for specific stations or nearest stations as pandas dataframe.

    :param parameter: The parameter(s) to retrieve observations for. It can be a single parameter or a list of parameters.
    :param resolution: The resolution of the observations.
    :param station_id: The ID(s) of the station(s) to retrieve observations for. It can be a single station ID or a list of station IDs.
    :param latlon: The latitude and longitude coordinates of the location to retrieve observations for.
    :param number_of_stations: The number of stations to retrieve observations for when using latlon parameter. Defaults to 1.
    :param period: The period of the observations. It can be a single period or a list of periods. Defaults to None.
    :param start_date: The start date for filtering returned dataframe. Defaults to None.
    :param end_date: The end date for filtering returned dataframe. Defaults to None.
    :param settings: Additional settings for the request. Defaults to None.
    :param skip_empty_stations: Whether to skip empty stations in the result dataframe. Defaults to True.
    :return: The retrieved historical or current data in a pandas DataFrame format.
    :raises ValueError: If neither station_id nor latlon is provided.
    """

    # Create settings object, skip empty stations
    _settings = Settings(settings)
    _settings.ts_skip_empty = skip_empty_stations
    
    request = DwdObservationRequest(
        parameter=parameter,
        resolution=resolution,
        period=period,
        start_date=start_date,
        end_date=end_date,
        settings=_settings,
    )

    # Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations
    if station_id is None and latlon is None:
        raise ValueError('One of `station_id` or `latlon` must be provided')
    elif station_id is not None:
        stations = request.filter_by_station_id(station_id)
    else:
        stations = request.filter_by_rank(ETA_LOCATION, rank=number_of_stations)
        
    # Convert to pandas and pivot values so date is the index and stations and datapoints are the columns
    result_df = stations.values.all().df.to_pandas()
    result_df = result_df.pivot(values="value", columns=("station_id", "parameter"), index="date")

    # Drop the first level of the column index if there is only one station
    if len(result_df.columns.levels[0]) == 1:
        result_df.columns = result_df.columns.droplevel(0)

    return result_df


def get_mosmix_data_for_stations(
    parameter: Optional[List[Union[str, DwdMosmixParameter, Parameter]]],
    mosmix_type: Union[str, DwdMosmixType],
    station_id: Union[str, Tuple[str, ...], List[str]] = None, 
    latlon: Tuple[float, float] = None,
    number_of_stations: int = 1,
    start_issue: Optional[Union[str, datetime.datetime, DwdForecastDate]] = DwdForecastDate.LATEST,
    end_issue: Optional[Union[str, datetime.datetime]] = None,
    start_date: Optional[Union[str, datetime.datetime]] = None,
    end_date: Optional[Union[str, datetime.datetime]] = None,
    station_group: Optional[DwdMosmixStationGroup] = None,
    settings: Optional[Settings] = None,
    skip_empty_stations: bool = True,
) -> pd.DataFrame:
    """
    Retrieves MOSMIX data for a specific stations or nearest stations as pandas dataframe.

    :param parameter: The parameter(s) to retrieve observations for. It can be a single parameter or a list of parameters.
    :param mosmix_type: The MOSMIX model to retrieve ("large" or "small").
    :param station_id: The ID(s) of the station(s) to retrieve observations for. It can be a single station ID or a list of station IDs.
    :param latlon: The latitude and longitude coordinates of the location to retrieve observations for.
    :param number_of_stations: The number of stations to retrieve observations for when using latlon parameter. Defaults to 1.
    :param start_issue: Start of issue of mosmix which should be caught.
    :param end_issue: The end issue date.
    :param start_date: The start date for filtering returned dataframe.
    :param end_date: The end date.
    :param station_group: The station group.
    :param settings: Additional settings for the request. Defaults to None.
    :param skip_empty_stations: Whether to skip empty stations in the result dataframe. Defaults to True.
    :return: The retrieved MOSMIX data in a pandas DataFrame format.
    :raises ValueError: If neither station_id nor latlon is provided.
    """

    # Create settings object, skip empty stations
    _settings = Settings(settings)
    _settings.ts_skip_empty = skip_empty_stations

    request = DwdMosmixRequest(
        parameter=parameter,
        mosmix_type=mosmix_type,
        start_issue=start_issue,
        end_issue=end_issue,
        start_date=start_date,
        end_date=end_date,
        station_group=station_group,
        settings=settings,
    )

    # Retrieve stations. If station_id is provided, use it, otherwise use latlon to get nearest stations
    if station_id is None and latlon is None:
        raise ValueError('One of `station_id` or `latlon` must be provided')
    elif station_id is not None:
        stations = request.filter_by_station_id(station_id)
    else:
        stations = request.filter_by_rank(ETA_LOCATION, rank=number_of_stations)

    # Convert to pandas and pivot values so date is the index and stations and datapoints are the columns
    result_df = stations.values.all().df.to_pandas()
    result_df = result_df.pivot(values="value", columns=("station_id", "parameter"), index="date")

    # Drop the first level of the column index if there is only one station
    if len(result_df.columns.levels[0]) == 1:
        result_df.columns = result_df.columns.droplevel(0)

    return result_df
Edited by Julius Balzer