Skip to content

Commit

Permalink
Split daily validation posting on missing periods and week (#435)
Browse files Browse the repository at this point in the history
* make sure data is sorted before slicing

* sort data in pre-post too

* split daily validation post split by uneven frequency and week

* update whatsnew

* flake8

* fix test to make sure frame assertion happens
  • Loading branch information
alorenzo175 authored May 5, 2020
1 parent 78c4f59 commit 642fad0
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 18 deletions.
4 changes: 4 additions & 0 deletions docs/source/whatsnew/1.0.0rc1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ Enhancements
~~~~~~~~~~~~
* Reference net load forecasts provided using week-ahead persistence. (:issue:`55`) (:pull:`392`)
* Datamodel now supports ``'net_load'`` as an allowed variable. (:issue:`55`) (:pull:`392`)
* Posting of daily validation now splits requests to avoid missing periods and
limit each request to one week of data (:issue:`424`) (:pull:`435`)


Bug fixes
~~~~~~~~~
* Fix incorrect ordering of months and weekdays in metrics plots.
(:issue:`428`) (:pull:`430`)
* Ensure data is sorted from reference data sources before slicing and
posting to the API (:pull:`435`)


Contributors
Expand Down
6 changes: 5 additions & 1 deletion solarforecastarbiter/io/reference_observations/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ def update_site_observations(api, fetch_func, site, observations,
start = get_last_site_timestamp(api, site_observations, end)
logger.debug('Fetching data for %s from %s to %s', site.name, start, end)
obs_df = fetch_func(api, site, start, end)
data_in_range = obs_df[start:end]
# must be sorted for proper inexact start:end slicing
data_in_range = obs_df.sort_index()[start:end]
if data_in_range.empty:
return
for obs in site_observations:
Expand All @@ -295,6 +296,9 @@ def _prepare_data_to_post(data, variable, observation, start, end):
to prepare for posting"""
data = data[[variable]]
data = data.rename(columns={variable: 'value'})
# ensure data is sorted before slicing and for optimal order in the
# database
data = data.sort_index()
# remove all future values, some files have forward filled nightly data
data = data[start:min(end, _utcnow())]
# we assume any reference data is given at the proper intervals
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,24 @@ def test_update_site_observations_no_data(
mock_api.assert_not_called()


def test_update_site_observations_out_of_order(
mock_api, site_objects_param, mocker,
observation_objects_param, fake_ghi_data):
start = pd.Timestamp('20190101T1200Z')
end = pd.Timestamp('20190101T1230Z')
fetch = mocker.MagicMock()
fetch.return_value = fake_ghi_data.sample(frac=1)
common.update_site_observations(
mock_api, fetch, site_objects[1], observation_objects_param,
start, end)
args, _ = mock_api.post_observation_values.call_args
assert args[0] == ''
pd.testing.assert_frame_equal(
args[1], fake_ghi_data.rename(
columns={'ghi': 'value'})[start:end].resample(
args[1].index.freq).first())


@pytest.fixture()
def template_fx(mock_api, mocker):
mock_api.create_forecast = mocker.MagicMock(side_effect=lambda x: x)
Expand Down
29 changes: 24 additions & 5 deletions solarforecastarbiter/validation/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from solarforecastarbiter import pvmodel
from solarforecastarbiter.io.api import APISession
from solarforecastarbiter.validation import validator
from solarforecastarbiter.validation import validator, quality_mapping


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -391,7 +391,7 @@ def _daily_validation(session, observation, start, end, base_url):
logger.info('Validating data for %s from %s to %s',
observation.name, start, end)
observation_values = session.get_observation_values(
observation.observation_id, start, end)
observation.observation_id, start, end).sort_index()
value_series = observation_values['value'].astype(float)
if len(value_series.dropna()) < 10:
raise IndexError(
Expand All @@ -410,9 +410,28 @@ def _daily_validation(session, observation, start, end, base_url):

quality_flags.name = 'quality_flag'
observation_values.update(quality_flags)
session.post_observation_values(observation.observation_id,
observation_values,
params='donotvalidate')
return _group_continuous_week_post(
session, observation, observation_values)


def _group_continuous_week_post(session, observation, observation_values):
# observation_values expected to be sorted
# observation values already have uneven frequency checked
gid = quality_mapping.check_if_series_flagged(
observation_values['quality_flag'], 'UNEVEN FREQUENCY').cumsum()
# make series of week + year integers to further
# split data to post at most one week at a time
# ~10,000 pts of 1min data
week_int = (gid.index.week + gid.index.year).values
# combine the continuous groups with groups of weeks
# gid is unique for each group since week_int and cumsum
# increase monotonically and are positive
gid += week_int
observation_values['gid'] = gid
for _, group in observation_values.groupby('gid'):
session.post_observation_values(observation.observation_id,
group[['value', 'quality_flag']],
params='donotvalidate')


def daily_single_observation_validation(access_token, observation_id, start,
Expand Down
78 changes: 66 additions & 12 deletions solarforecastarbiter/validation/tests/test_validation_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,8 +632,9 @@ def test_daily_observation_validation_ghi(mocker, make_observation,
DESCRIPTION_MASK_MAPPING['UNEVEN FREQUENCY'] |
LATEST_VERSION_FLAG
]
assert post_mock.called_once
assert_frame_equal(post_mock.call_args[0][1], out)
assert post_mock.called
posted_df = pd.concat([cal[0][1] for cal in post_mock.call_args_list])
assert_frame_equal(posted_df, out)


def test_daily_observation_validation_ghi_zeros(mocker, make_observation,
Expand Down Expand Up @@ -677,8 +678,9 @@ def test_daily_observation_validation_ghi_zeros(mocker, make_observation,
base | DESCRIPTION_MASK_MAPPING['NIGHTTIME'] |
DESCRIPTION_MASK_MAPPING['UNEVEN FREQUENCY']
]
assert post_mock.called_once
assert_frame_equal(post_mock.call_args[0][1], out)
assert post_mock.called
posted_df = pd.concat([cal[0][1] for cal in post_mock.call_args_list])
assert_frame_equal(posted_df, out)


def test_validate_daily_dc_power(mocker, make_observation, daily_index):
Expand Down Expand Up @@ -767,8 +769,9 @@ def test_daily_observation_validation_dc_power(mocker, make_observation,
DESCRIPTION_MASK_MAPPING['NIGHTTIME'] |
LATEST_VERSION_FLAG
]
assert post_mock.called_once
assert_frame_equal(post_mock.call_args[0][1], out)
assert post_mock.called
posted_df = pd.concat([cal[0][1] for cal in post_mock.call_args_list])
assert_frame_equal(posted_df, out)


def test_validate_daily_ac_power(mocker, make_observation, daily_index):
Expand Down Expand Up @@ -863,8 +866,9 @@ def test_daily_observation_validation_ac_power(mocker, make_observation,
DESCRIPTION_MASK_MAPPING['NIGHTTIME'] |
LATEST_VERSION_FLAG
]
assert post_mock.called_once
assert_frame_equal(post_mock.call_args[0][1], out)
assert post_mock.called
posted_df = pd.concat([cal[0][1] for cal in post_mock.call_args_list])
assert_frame_equal(posted_df, out)


@pytest.mark.parametrize('var', ['air_temperature', 'wind_speed', 'dni', 'dhi',
Expand All @@ -885,13 +889,14 @@ def test_daily_observation_validation_other(var, mocker, make_observation,
return_value=data)
post_mock = mocker.patch(
'solarforecastarbiter.io.api.APISession.post_observation_values')
validate_mock = mocker.MagicMock()
validated = pd.Series(2, index=daily_index)
validate_mock = mocker.MagicMock(return_value=validated)
mocker.patch.dict(
'solarforecastarbiter.validation.tasks.IMMEDIATE_VALIDATION_FUNCS',
{var: validate_mock})
tasks.daily_single_observation_validation(
'', obs.observation_id, data.index[0], data.index[-1])
assert post_mock.called_once
assert post_mock.called
assert validate_mock.called


Expand All @@ -914,13 +919,14 @@ def test_daily_observation_validation_many(mocker, make_observation,
return_value=data)
post_mock = mocker.patch(
'solarforecastarbiter.io.api.APISession.post_observation_values')
validate_mock = mocker.MagicMock()
validated = pd.Series(2, index=daily_index)
validate_mock = mocker.MagicMock(return_value=validated)
mocker.patch.dict(
'solarforecastarbiter.validation.tasks.IMMEDIATE_VALIDATION_FUNCS',
{'dhi': validate_mock, 'dni': validate_mock})
tasks.daily_observation_validation(
'', data.index[0], data.index[-1])
assert post_mock.called_once
assert post_mock.called
assert validate_mock.call_count == 2


Expand Down Expand Up @@ -967,3 +973,51 @@ def test_daily_observation_validation_not_enough(mocker, make_observation):
'', data.index[0], data.index[-1])
assert out is None
assert log.called


def test__group_continuous_week_post(mocker, make_observation):
split_dfs = [
pd.DataFrame([(0, LATEST_VERSION_FLAG)],
columns=['value', 'quality_flag'],
index=pd.date_range(
start='2020-05-03T00:00',
end='2020-05-03T23:59',
tz='UTC',
freq='1h')),
# new week split
pd.DataFrame([(0, LATEST_VERSION_FLAG)],
columns=['value', 'quality_flag'],
index=pd.date_range(
start='2020-05-04T00:00',
end='2020-05-04T11:59',
tz='UTC',
freq='1h')),
# missing 12
pd.DataFrame(
[(0, LATEST_VERSION_FLAG | DESCRIPTION_MASK_MAPPING['UNEVEN FREQUENCY'])] + # NOQA
[(1, LATEST_VERSION_FLAG)] * 7,
columns=['value', 'quality_flag'],
index=pd.date_range(
start='2020-05-04T13:00',
end='2020-05-04T20:00',
tz='UTC',
freq='1h')),
# missing a week+
pd.DataFrame(
[(9, LATEST_VERSION_FLAG | DESCRIPTION_MASK_MAPPING['UNEVEN FREQUENCY'])] + # NOQA
[(3, LATEST_VERSION_FLAG)] * 7,
columns=['value', 'quality_flag'],
index=pd.date_range(
start='2020-05-13T09:00',
end='2020-05-13T16:59',
tz='UTC',
freq='1h')),
]
ov = pd.concat(split_dfs, axis=0)
obs = make_observation('ghi')
session = mocker.MagicMock()
tasks._group_continuous_week_post(session, obs, ov)
call_list = session.post_observation_values.call_args_list
assert len(call_list) == 4
for i, cal in enumerate(call_list):
assert_frame_equal(split_dfs[i], cal[0][1])

0 comments on commit 642fad0

Please sign in to comment.