Source code for diva.data.data_downloader_local.data_retriever

# Copyright 2024 Mews
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#     http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.

""" Service that retrieve and process data from cmwf """
import os
import sys

from cqpro.download_data import Downloader

sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from diva import parameters as pr
import xarray as xr


[docs] class ServiceDataRetriever: def __init__(self, params): """ Initializes the data_retriever class with the specified parameters. Parameters: ---------- params : tuple A tuple containing: - output_path (str): The path where the data will be saved. - list_data (list): A list of variables to retrieve (e.g., temperature, precipitation). - list_years (list): A list of years for which data is required. - dataset (str): The dataset from which data is to be retrieved. - box_lat_lon (dict): A dictionary containing the geographical boundaries (latitude and longitude). """ ( self.__output_path, self.__list_data, self.__list_years, self.__dataset, self.__box_lat_lon, ) = params
[docs] def get_data(self): """ Retrieves data from the specified dataset based on the provided parameters. This function downloads data for the variables listed in `self.__list_data` over the years specified in `self.__list_years`. The data is fetched for the geographic area defined by `self.__box_lat_lon`, with a fixed resolution of 0.1 degrees. The data is stored at the path specified by `self.__output_path`. The data is retrieved for: - Every hour at 00:00, 06:00, 12:00, and 18:00. - All days of the month. - All months of the year. - The specified geographic bounding box. Returns: ------- None """ for name_data in self.__list_data: DATASET = self.__dataset VARS = [name_data] YEARS = self.__list_years HOURS = ["%02d:00" % (e,) for e in [0, 6, 12, 18]] # Every 6h DAYS = ["%02d" % (e,) for e in range(1, 32)] # All days MONTHS = ["%02d" % (e,) for e in range(1, 13)] # All months RESOL = 0.1 d = Downloader( variables=VARS, years=YEARS, months=MONTHS, days=DAYS, hours=HOURS, output_path=self.__output_path, LON_MIN=self.__box_lat_lon["lon_min"], LAT_MIN=self.__box_lat_lon["lat_min"], LON_MAX=self.__box_lat_lon["lon_max"], LAT_MAX=self.__box_lat_lon["lat_max"], country=None, dataset=DATASET, resolution=RESOL, ) d.download()
[docs] def combine_data(): """ Combines downloaded data for each variable into a single file. This function merges data files stored in subfolders corresponding to different variables (e.g., temperature, precipitation, wind, pressure). The combined data for each variable is saved as a single NetCDF file. The function processes the following data types: - Temperature: Converted from Kelvin to Celsius. - Precipitation: Multiplied by 100 to convert to desired units. - Wind - Pressure - [NOT USED CURENTLY] Sea Surface Temperature: Converted from Kelvin to Celsius and filtered for times at 00:00 and 12:00. The combined data is saved with a filename indicating the data type and the time period. Returns: ------- None """ # ----- combine temperature data ------ path_temperature = os.path.join(pr.path_data, "input/temperature/") list_temperatures = [ elm for elm in os.listdir(path_temperature) if "proj" not in elm ] list_temperatures.sort() if list_temperatures is not None: all_temperatures = [] for file_temperature in list_temperatures: path = path_temperature + file_temperature temperature = xr.open_dataset( path, chunks=1000, ) temperature = temperature[["t2m"]] all_temperatures.append(temperature) temperature_all = xr.concat(all_temperatures, dim="time") temperature_all = temperature_all.sel(expver=1) temperature_all = temperature_all - 273.15 temperature_all = temperature_all.astype("float32") path = pr.path_data + "temperature_europe_1950_2024" + ".nc" temperature_all.chunk({"time": 1000}).to_netcdf(path) # ----- combine precipitation data ------ path_precipitation = os.path.join(pr.path_data, "input/total_precipitation/") list_precipitation = [ elm for elm in os.listdir(path_precipitation) if "proj" not in elm ] list_precipitation.sort() if list_precipitation is not None: all_precipitations = [] for file_precipitation in list_precipitation: path = path + file_precipitation + ".nc" precipitation = xr.open_dataset( path, chunks=1000, ) all_precipitations.append(precipitation) rainfall_all = xr.concat(all_precipitations, dim="time") rainfall_all = rainfall_all * 100 rainfall_all = rainfall_all.astype("float32") path = pr.path_data + "total_precipitation_all" + ".nc" rainfall_all.to_netcdf(path) # ----- combine wind data ------ path_wind = os.path.join(pr.path_data, "input/wind/") list_wind = [elm for elm in os.listdir(path_wind) if "proj" not in elm] list_wind.sort() if list_wind is not None: all_wind = [] for file_wind in list_wind: path = path + file_wind + ".nc" wind = xr.open_dataset( path, chunks=1000, ) all_wind.append(wind) wind_all = xr.concat(all_wind, dim="time") wind_all = wind_all.astype("float32") path = pr.path_data + "total_wind_all" + ".nc" wind_all.to_netcdf(path) # ----- combine pressure data ------ path_pressure = os.path.join(pr.path_data, "input/pressure/") list_pressure = [elm for elm in os.listdir(path_pressure) if "proj" not in elm] list_pressure.sort() if list_pressure is not None: all_pressure = [] for file_pressure in list_pressure: path = path + file_pressure + ".nc" pressure = xr.open_dataset( path, chunks=1000, ) all_pressure.append(pressure) pressure_all = xr.concat(all_pressure, dim="time") pressure_all = pressure_all.astype("float32") path = pr.path_data + "total_pressure_all" + ".nc" pressure_all.to_netcdf(path) # ----- combine temp_sea data ------ path_temp_sea = os.path.join(pr.path_data, "input/pressure/") path_temp_sea = "/home/userml/said_workspace/destine/src/data/data/input/" list_temp_sea = [elm for elm in os.listdir(path_temp_sea) if "proj" not in elm] list_temp_sea.sort() if list_temp_sea is not None: all_temp_sea = [] for file_temp_sea in list_temp_sea: path = path_temp_sea + file_temp_sea temp_sea = xr.open_dataset( path, chunks=1000, ) all_temp_sea.append(temp_sea) all_temp_sea = xr.concat(all_temp_sea, dim="time") all_temp_sea = all_temp_sea.astype("float32").sel(expver=1) all_temp_sea = all_temp_sea - 273.15 all_temp_sea_subset = all_temp_sea.where( (all_temp_sea.time.dt.hour == 12) | (all_temp_sea.time.dt.hour == 00), drop=True, ) path = path_temp_sea + "all_temp_sea" + ".nc" all_temp_sea_subset.to_netcdf(path)