# Copyright 2024 Mews
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and limitations under the License.
""" Service that retrieve and process data from cmwf """
import os
import sys
from cqpro.download_data import Downloader
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
from diva import parameters as pr
import xarray as xr
[docs]
class ServiceDataRetriever:
def __init__(self, params):
"""
Initializes the data_retriever class with the specified parameters.
Parameters:
----------
params : tuple
A tuple containing:
- output_path (str): The path where the data will be saved.
- list_data (list): A list of variables to retrieve (e.g., temperature, precipitation).
- list_years (list): A list of years for which data is required.
- dataset (str): The dataset from which data is to be retrieved.
- box_lat_lon (dict): A dictionary containing the geographical boundaries (latitude and longitude).
"""
(
self.__output_path,
self.__list_data,
self.__list_years,
self.__dataset,
self.__box_lat_lon,
) = params
[docs]
def get_data(self):
"""
Retrieves data from the specified dataset based on the provided parameters.
This function downloads data for the variables listed in `self.__list_data` over the years specified in `self.__list_years`.
The data is fetched for the geographic area defined by `self.__box_lat_lon`, with a fixed resolution of 0.1 degrees.
The data is stored at the path specified by `self.__output_path`.
The data is retrieved for:
- Every hour at 00:00, 06:00, 12:00, and 18:00.
- All days of the month.
- All months of the year.
- The specified geographic bounding box.
Returns:
-------
None
"""
for name_data in self.__list_data:
DATASET = self.__dataset
VARS = [name_data]
YEARS = self.__list_years
HOURS = ["%02d:00" % (e,) for e in [0, 6, 12, 18]] # Every 6h
DAYS = ["%02d" % (e,) for e in range(1, 32)] # All days
MONTHS = ["%02d" % (e,) for e in range(1, 13)] # All months
RESOL = 0.1
d = Downloader(
variables=VARS,
years=YEARS,
months=MONTHS,
days=DAYS,
hours=HOURS,
output_path=self.__output_path,
LON_MIN=self.__box_lat_lon["lon_min"],
LAT_MIN=self.__box_lat_lon["lat_min"],
LON_MAX=self.__box_lat_lon["lon_max"],
LAT_MAX=self.__box_lat_lon["lat_max"],
country=None,
dataset=DATASET,
resolution=RESOL,
)
d.download()
[docs]
def combine_data():
"""
Combines downloaded data for each variable into a single file.
This function merges data files stored in subfolders corresponding to different variables (e.g., temperature,
precipitation, wind, pressure). The combined data for each variable is saved as a single NetCDF file.
The function processes the following data types:
- Temperature: Converted from Kelvin to Celsius.
- Precipitation: Multiplied by 100 to convert to desired units.
- Wind
- Pressure
- [NOT USED CURENTLY] Sea Surface Temperature: Converted from Kelvin to Celsius and filtered for times at 00:00 and 12:00.
The combined data is saved with a filename indicating the data type and the time period.
Returns:
-------
None
"""
# ----- combine temperature data ------
path_temperature = os.path.join(pr.path_data, "input/temperature/")
list_temperatures = [
elm for elm in os.listdir(path_temperature) if "proj" not in elm
]
list_temperatures.sort()
if list_temperatures is not None:
all_temperatures = []
for file_temperature in list_temperatures:
path = path_temperature + file_temperature
temperature = xr.open_dataset(
path,
chunks=1000,
)
temperature = temperature[["t2m"]]
all_temperatures.append(temperature)
temperature_all = xr.concat(all_temperatures, dim="time")
temperature_all = temperature_all.sel(expver=1)
temperature_all = temperature_all - 273.15
temperature_all = temperature_all.astype("float32")
path = pr.path_data + "temperature_europe_1950_2024" + ".nc"
temperature_all.chunk({"time": 1000}).to_netcdf(path)
# ----- combine precipitation data ------
path_precipitation = os.path.join(pr.path_data, "input/total_precipitation/")
list_precipitation = [
elm for elm in os.listdir(path_precipitation) if "proj" not in elm
]
list_precipitation.sort()
if list_precipitation is not None:
all_precipitations = []
for file_precipitation in list_precipitation:
path = path + file_precipitation + ".nc"
precipitation = xr.open_dataset(
path,
chunks=1000,
)
all_precipitations.append(precipitation)
rainfall_all = xr.concat(all_precipitations, dim="time")
rainfall_all = rainfall_all * 100
rainfall_all = rainfall_all.astype("float32")
path = pr.path_data + "total_precipitation_all" + ".nc"
rainfall_all.to_netcdf(path)
# ----- combine wind data ------
path_wind = os.path.join(pr.path_data, "input/wind/")
list_wind = [elm for elm in os.listdir(path_wind) if "proj" not in elm]
list_wind.sort()
if list_wind is not None:
all_wind = []
for file_wind in list_wind:
path = path + file_wind + ".nc"
wind = xr.open_dataset(
path,
chunks=1000,
)
all_wind.append(wind)
wind_all = xr.concat(all_wind, dim="time")
wind_all = wind_all.astype("float32")
path = pr.path_data + "total_wind_all" + ".nc"
wind_all.to_netcdf(path)
# ----- combine pressure data ------
path_pressure = os.path.join(pr.path_data, "input/pressure/")
list_pressure = [elm for elm in os.listdir(path_pressure) if "proj" not in elm]
list_pressure.sort()
if list_pressure is not None:
all_pressure = []
for file_pressure in list_pressure:
path = path + file_pressure + ".nc"
pressure = xr.open_dataset(
path,
chunks=1000,
)
all_pressure.append(pressure)
pressure_all = xr.concat(all_pressure, dim="time")
pressure_all = pressure_all.astype("float32")
path = pr.path_data + "total_pressure_all" + ".nc"
pressure_all.to_netcdf(path)
# ----- combine temp_sea data ------
path_temp_sea = os.path.join(pr.path_data, "input/pressure/")
path_temp_sea = "/home/userml/said_workspace/destine/src/data/data/input/"
list_temp_sea = [elm for elm in os.listdir(path_temp_sea) if "proj" not in elm]
list_temp_sea.sort()
if list_temp_sea is not None:
all_temp_sea = []
for file_temp_sea in list_temp_sea:
path = path_temp_sea + file_temp_sea
temp_sea = xr.open_dataset(
path,
chunks=1000,
)
all_temp_sea.append(temp_sea)
all_temp_sea = xr.concat(all_temp_sea, dim="time")
all_temp_sea = all_temp_sea.astype("float32").sel(expver=1)
all_temp_sea = all_temp_sea - 273.15
all_temp_sea_subset = all_temp_sea.where(
(all_temp_sea.time.dt.hour == 12) | (all_temp_sea.time.dt.hour == 00),
drop=True,
)
path = path_temp_sea + "all_temp_sea" + ".nc"
all_temp_sea_subset.to_netcdf(path)