Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more Sources tests #309

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lumen/sources/base.py
Original file line number Diff line number Diff line change
@@ -587,6 +587,7 @@ class JSONSource(FileSource):
source_type = 'json'

def _resolve_template_vars(self, template):
template = str(template)
template_vars = self._template_re.findall(template)
template_values = []
for m in template_vars:
18 changes: 17 additions & 1 deletion lumen/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
from bokeh.document import Document

from lumen.config import config
from lumen.sources import FileSource, Source
from lumen.sources import FileSource, JSONSource, Source
from lumen.state import state
from lumen.variables import Variables

@@ -37,6 +37,22 @@ def create(root, **kwargs):
source.clear_cache()
state.global_sources.clear()


@pytest.fixture
def make_jsonsource():
root = config._root
def create(root, **kwargs):
config._root = root
source = JSONSource(tables={'local': './test.json'}, **kwargs)
state.sources['original'] = source
return source
yield create
config._root = root
for source in state.global_sources.values():
source.clear_cache()
state.global_sources.clear()


@pytest.fixture
def make_variable_filesource():
root = config._root
8 changes: 8 additions & 0 deletions lumen/tests/sources/catalog_intake.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
sources:
test:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: csv
args:
urlpath: '{{ CATALOG_DIR }}/test.csv'
csv_kwargs:
parse_dates: ['D']
17 changes: 17 additions & 0 deletions lumen/tests/sources/catalog_intake_sql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
sources:
test_sql:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: sql
args:
uri: 'sqlite:///{{ CATALOG_DIR }}test.db'
sql_expr: 'SELECT * FROM mixed'
sql_kwargs:
parse_dates: ['D']
test_sql_with_none:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: sql
args:
uri: 'sqlite:///{{ CATALOG_DIR }}test.db'
sql_expr: 'SELECT * FROM mixed_none'
sql_kwargs:
parse_dates: ['D']
1 change: 1 addition & 0 deletions lumen/tests/sources/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"A":{"0":0.0,"1":1.0,"2":2.0,"3":3.0,"4":4.0},"B":{"0":0.0,"1":1.0,"2":0.0,"3":1.0,"4":0.0},"C":{"0":"foo1","1":"foo2","2":"foo3","3":"foo4","4":"foo5"},"D":{"0":"2009-01-01","1":"2009-01-02","2":"2009-01-05","3":"2009-01-06","4":"2009-01-07"}}
194 changes: 24 additions & 170 deletions lumen/tests/sources/test_base.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,34 @@
import datetime as dt
import os

from pathlib import Path

import pandas as pd
import pytest

from lumen.sources import Source
from lumen.config import config
from lumen.sources import FileSource, Source
from lumen.state import state
from lumen.transforms.sql import SQLLimit


@pytest.fixture
def source(make_filesource):
root = os.path.dirname(__file__)
return make_filesource(root)


@pytest.fixture
def expected_df(column_value_type):
df = pd._testing.makeMixedDataFrame()
column, value, type = column_value_type

if type == 'single_value':
return df[df[column] == value]

elif type == 'range':
begin, end = value
return df[(df[column] >= begin) & (df[column] <= end)]

elif type == 'range_list':
conditions = False
for range in value:
begin, end = range
conditions |= ((df[column] >= begin) & (df[column] <= end))
return df[conditions]

elif type == 'list':
return df[df[column].isin(value)]

elif type == 'date':
return df[df[column] == pd.to_datetime(value)]

elif type == 'date_range':
begin, end = value
return df[(df[column] >= pd.to_datetime(begin)) & (df[column] <= pd.to_datetime(end))]

return df


def test_source_resolve_module_type():
assert Source._get_type('lumen.sources.base.Source') is Source
assert Source.source_type is None


@pytest.mark.parametrize("filter_col_A", [314, (13, 314), ['A', 314, 'def']])
@pytest.mark.parametrize("filter_col_B", [(3, 15.9)])
@pytest.mark.parametrize("filter_col_C", [[1, 'A', 'def']])
@pytest.mark.parametrize("sql_transforms", [(None, None), (SQLLimit(limit=100), SQLLimit(limit=100))])
def test_file_source_table_cache_key(source, filter_col_A, filter_col_B, filter_col_C, sql_transforms):
t1, t2 = sql_transforms
kwargs1 = {}
kwargs2 = {}
if t1 is not None:
kwargs1['sql_transforms'] = [t1]
if t2 is not None:
kwargs2['sql_transforms'] = [t2]
key1 = source._get_key('test', A=filter_col_A, B=filter_col_B, C=filter_col_C, **kwargs1)
key2 = source._get_key('test', A=filter_col_A, B=filter_col_B, C=filter_col_C, **kwargs2)
assert key1 == key2


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', (1, 3), 'range'),
('A', (1, 2), 'range'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', 'foo2', 'single_value'),
('C', ['foo1', 'foo3'], 'list'),
('D', dt.datetime(2009, 1, 2), 'single_value'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('D', dt.datetime(2009, 1, 2), 'date'),
('D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_filter(source, column_value_type, dask, expected_df):
column, value, _ = column_value_type
kwargs = {column: value, '__dask': dask}
filtered = source.get('test', **kwargs)
pd.testing.assert_frame_equal(filtered, expected_df)


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', (1, 3), 'range'),
('A', (1, 2), 'range'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', 'foo2', 'single_value'),
('C', ['foo1', 'foo3'], 'list'),
('D', dt.datetime(2009, 1, 2), 'single_value'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('D', dt.datetime(2009, 1, 2), 'date'),
('D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_get_query_cache(source, column_value_type, dask, expected_df):
column, value, _ = column_value_type
kwargs = {column: value}
source.get('test', __dask=dask, **kwargs)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache
cached_df = source._cache[cache_key]
if dask:
cached_df = cached_df.compute()
pd.testing.assert_frame_equal(cached_df, expected_df)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', ['foo1', 'foo3'], 'list'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', dt.datetime(2009, 1, 2), 'date'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_clear_cache(source, column_value_type, dask):
column, value, _ = column_value_type
kwargs = {column: value}
source.get('test', __dask=dask, **kwargs)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache
source.clear_cache()
assert len(source._cache) == 0


def test_file_source_get_query_cache_to_file(make_filesource, cachedir):
def test_source_from_spec_in_state():
config_root = config._root
root = os.path.dirname(__file__)
source = make_filesource(root, cache_dir=cachedir)
source.get('test', A=(1, 2))

cache_key = source._get_key('test', A=(1, 2))
cache_path = Path(cachedir) / f'{cache_key}_test.parq'
df = pd.read_parquet(cache_path, engine='fastparquet')

# Patch index names due to https://github.com/dask/fastparquet/issues/732
df.index.names = [None]
pd.testing.assert_frame_equal(
df,
pd._testing.makeMixedDataFrame().iloc[1:3]
)


def test_file_source_get_tables(source):
tables = source.get_tables()
assert tables == ['test']


def test_file_source_variable(make_variable_filesource):
root = os.path.dirname(__file__)
source = make_variable_filesource(root)
state.variables.tables = {'test': 'test2.csv'}
df = source.get('test')
expected = pd._testing.makeMixedDataFrame().iloc[::-1].reset_index(drop=True)
pd.testing.assert_frame_equal(df, expected)


def test_extension_of_comlicated_url(source):
url = "https://api.tfl.gov.uk/Occupancy/BikePoints/@{stations.stations.id}?app_key=random_numbers"
source.tables["test"] = url
assert source._named_files["test"][1] is None
config._root = root
original_source = FileSource(tables={'test': 'test.csv'}, kwargs={'parse_dates': ['D']})
state.sources['original'] = original_source
from_spec_source = Source.from_spec('original')

# source loaded from spec is identical with the original source
assert type(from_spec_source) == type(original_source)
assert from_spec_source.name == original_source.name
assert from_spec_source.cache_dir == original_source.cache_dir
assert from_spec_source.shared == original_source.shared
assert from_spec_source.root == original_source.root
assert from_spec_source.source_type == original_source.source_type
assert from_spec_source._supports_sql == original_source._supports_sql

# clean up cache memory
config._root = config_root
for source in state.global_sources.values():
source.clear_cache()
state.global_sources.clear()
61 changes: 36 additions & 25 deletions lumen/tests/sources/test_derived.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,11 @@

from lumen.sources.base import DerivedSource

from .utils import (
source_clear_cache, source_get_cache_no_query,
source_get_schema_update_cache, source_get_tables,
)


@pytest.fixture
def original(make_filesource):
@@ -52,11 +57,22 @@ def tables_mode_spec():
return spec


@pytest.fixture
def source(original, tables_mode_spec):
return DerivedSource.from_spec(tables_mode_spec)


@pytest.fixture
def source_tables():
return {'derived': pd._testing.makeMixedDataFrame()}


def test_derived_source_resolve_module_type():
assert DerivedSource._get_type('lumen.sources.base.DerivedSource') is DerivedSource
assert DerivedSource.source_type == 'derived'


def test_derived_mirror_source(original, mirror_mode_spec):
def test_derived_source_mirror_mode(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
assert derived.get_tables() == original.get_tables()
assert derived.get_schema() == original.get_schema()
@@ -68,7 +84,7 @@ def test_derived_mirror_source(original, mirror_mode_spec):
('transforms', [{'type': 'iloc', 'end': 3}]),
('filters', [{'type': 'constant', 'field': 'A', 'value': (0, 2)}]),
])
def test_derived_mirror_source_apply(
def test_derived_source_mirror_mode_apply(
original, mirror_mode_spec, additional_spec, expected_table, expected_schema
):
spec_key, spec_value = additional_spec
@@ -79,19 +95,11 @@ def test_derived_mirror_source_apply(
pd.testing.assert_frame_equal(derived.get('test'), expected_table)


def test_derived_tables_source(original, tables_mode_spec):
derived = DerivedSource.from_spec(tables_mode_spec)
assert derived.get_tables() == ['derived']
df = pd._testing.makeMixedDataFrame()
pd.testing.assert_frame_equal(derived.get('derived'), df)
assert original.get_schema('test') == derived.get_schema('derived')


@pytest.mark.parametrize("additional_spec", [
('transforms', [{'type': 'iloc', 'end': 3}]),
('filters', [{'type': 'constant', 'field': 'A', 'value': (0, 2)}]),
])
def test_derived_tables_source_apply(
def test_derived_source_tables_mode_apply(
original, tables_mode_spec, additional_spec, expected_table, expected_schema
):
spec_key, spec_value = additional_spec
@@ -102,19 +110,22 @@ def test_derived_tables_source_apply(
assert derived.get_schema('derived') == expected_schema


def test_derived_get_query_cache(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
df = derived.get('test')
cache_key = derived._get_key('test')
assert cache_key in derived._cache
cached_df = derived._cache[cache_key]
pd.testing.assert_frame_equal(cached_df, df)
def test_derived_source_get_tables(source, source_tables):
assert source_get_tables(source, source_tables)


def test_derived_clear_cache(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
derived.get('test')
cache_key = derived._get_key('test')
assert cache_key in derived._cache
derived.clear_cache()
assert len(derived._cache) == 0
@pytest.mark.parametrize("dask", [True, False])
def test_derived_source_get_cache_no_query(source, dask, source_tables):
for table in source_tables:
expected_table = source_tables[table]
assert source_get_cache_no_query(
source, table, expected_table, dask, use_dask=False
)


def test_derived_source_get_schema_update_cache(source):
assert source_get_schema_update_cache(source, table='derived')


def test_derived_source_clear_cache(source):
assert source_clear_cache(source, table='derived')
Loading