Skip to content

Commit

Permalink
convert observation data to netcdf in reader (#196)
Browse files Browse the repository at this point in the history
* convert observation data to netcdf in reader

* fix lint error

* add station_history to select_related

* fix tests

* bump app version
  • Loading branch information
danangmassandy authored Oct 21, 2024
1 parent 8bef960 commit f78c3fa
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 23 deletions.
2 changes: 1 addition & 1 deletion django_project/_version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.6
0.0.7
2 changes: 1 addition & 1 deletion django_project/gap/providers/airborne_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def get_measurements(self, start_date: datetime, end_date: datetime):

return Measurement.objects.select_related(
'dataset_attribute', 'dataset_attribute__attribute',
'station'
'station', 'station_history'
).filter(
date_time__gte=start_date,
date_time__lte=end_date,
Expand Down
103 changes: 83 additions & 20 deletions django_project/gap/providers/observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
"""

from datetime import datetime
from typing import List

from django.contrib.gis.db.models.functions import Distance
import numpy as np
import pandas as pd
import xarray as xr
import tempfile
from django.contrib.gis.geos import Polygon, Point
from rest_framework.exceptions import ValidationError
from django.contrib.gis.db.models.functions import Distance
from typing import List, Tuple

from gap.models import (
Dataset,
Expand Down Expand Up @@ -45,7 +47,8 @@ def __init__(
location_input: DatasetReaderInput,
attributes: List[DatasetAttribute],
start_date: datetime,
end_date: datetime) -> None:
end_date: datetime,
nearest_stations) -> None:
"""Initialize ObservationReaderValue class.
:param val: value that has been read
Expand All @@ -58,6 +61,7 @@ def __init__(
super().__init__(val, location_input, attributes)
self.start_date = start_date
self.end_date = end_date
self.nearest_stations = nearest_stations

def to_csv_stream(self, suffix='.csv', separator=','):
"""Generate csv bytes stream.
Expand Down Expand Up @@ -95,16 +99,75 @@ def to_csv_stream(self, suffix='.csv', separator=','):
yield bytes(','.join(data) + '\n', 'utf-8')

def to_netcdf_stream(self):
"""Generate NetCDF.
"""Generate NetCDF."""
# create date array
date_array = pd.date_range(
self.start_date.date().isoformat(),
self.end_date.date().isoformat()
)

:raises ValidationError: Not supported for Dataset
"""
raise ValidationError({
'Invalid Request Parameter': (
'Output format netcdf is not available '
'for Observation Dataset!'
# sort lat and lon array
lat_array = set()
lon_array = set()
station: Station
for station in self.nearest_stations:
x = round(station.geometry.x, 5)
y = round(station.geometry.y, 5)
lon_array.add(x)
lat_array.add(y)
lat_array = sorted(lat_array)
lon_array = sorted(lon_array)
lat_array = pd.Index(lat_array, dtype='float64')
lon_array = pd.Index(lon_array, dtype='float64')

# define the data variables
data_vars = {}
empty_shape = (len(date_array), len(lat_array), len(lon_array))
for attr in self.attributes:
var = attr.attribute.variable_name
data_vars[var] = (
['date', 'lat', 'lon'],
np.empty(empty_shape)
)
})

# create the dataset
ds = xr.Dataset(
data_vars=data_vars,
coords={
'date': ('date', date_array),
'lat': ('lat', lat_array),
'lon': ('lon', lon_array)
}
)

# assign values to the dataset
for val in self.values:
date_idx = date_array.get_loc(val.get_datetime_repr('%Y-%m-%d'))
loc = val.location
lat_idx = lat_array.get_loc(round(loc.y, 5))
lon_idx = lon_array.get_loc(round(loc.x, 5))

for attr in self.attributes:
var_name = attr.attribute.variable_name
if var_name not in val.values:
continue
ds[var_name][date_idx, lat_idx, lon_idx] = (
val.values[var_name]
)

# write to netcdf
with (
tempfile.NamedTemporaryFile(
suffix=".nc", delete=True, delete_on_close=False)
) as tmp_file:
ds.to_netcdf(
tmp_file.name, format='NETCDF4', engine='netcdf4')
with open(tmp_file.name, 'rb') as f:
while True:
chunk = f.read(self.chunk_size_in_bytes)
if not chunk:
break
yield chunk


class ObservationDatasetReader(BaseDatasetReader):
Expand All @@ -114,8 +177,7 @@ def __init__(
self, dataset: Dataset, attributes: List[DatasetAttribute],
location_input: DatasetReaderInput, start_date: datetime,
end_date: datetime,
altitudes: (float, float) = None

altitudes: Tuple[float, float] = None
) -> None:
"""Initialize ObservationDatasetReader class.
Expand All @@ -135,6 +197,7 @@ def __init__(
altitudes=altitudes
)
self.results: List[DatasetTimelineValue] = []
self.nearest_stations = None

def query_by_altitude(self, qs):
"""Query by altitude."""
Expand Down Expand Up @@ -209,17 +272,17 @@ def get_nearest_stations(self):

def get_measurements(self, start_date: datetime, end_date: datetime):
"""Return measurements data."""
nearest_stations = self.get_nearest_stations()
if nearest_stations is None:
self.nearest_stations = self.get_nearest_stations()
if self.nearest_stations is None:
return
return Measurement.objects.select_related(
'dataset_attribute', 'dataset_attribute__attribute',
'station'
'station', 'station_history'
).filter(
date_time__gte=start_date,
date_time__lte=end_date,
dataset_attribute__in=self.attributes,
station__in=nearest_stations
station__in=self.nearest_stations
).order_by('date_time', 'station', 'dataset_attribute')

def read_historical_data(self, start_date: datetime, end_date: datetime):
Expand Down Expand Up @@ -287,5 +350,5 @@ def get_data_values(self) -> DatasetReaderValue:
"""
return ObservationReaderValue(
self.results, self.location_input, self.attributes,
self.start_date, self.end_date
self.start_date, self.end_date, self.nearest_stations
)
20 changes: 19 additions & 1 deletion django_project/gap/tests/providers/test_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from gap.providers import (
ObservationDatasetReader
)
from gap.providers.observation import ObservationReaderValue
from gap.factories import (
ProviderFactory,
DatasetFactory,
Expand All @@ -24,7 +25,8 @@
)
from gap.utils.reader import (
DatasetReaderInput,
LocationInputType
LocationInputType,
DatasetTimelineValue
)


Expand Down Expand Up @@ -140,3 +142,19 @@ def test_read_historical_data_multiple_locations(self):
reader.read_historical_data(dt1, dt2)
data_value = reader.get_data_values()
self.assertEqual(len(data_value._val), 3)

def test_observation_to_netcdf_stream(self):
"""Test convert observation value to netcdf stream."""
val = DatasetTimelineValue(
self.start_date,
{
self.dataset_attr.attribute.variable_name: 20
},
self.location_input.point
)
reader_value = ObservationReaderValue(
[val], self.location_input, [self.dataset_attr],
self.start_date, self.end_date, [self.station])
d = reader_value.to_netcdf_stream()
res = list(d)
self.assertIsNotNone(res)

0 comments on commit f78c3fa

Please sign in to comment.