Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more Sources tests #309

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lumen/sources/base.py
Original file line number Diff line number Diff line change
@@ -587,6 +587,7 @@ class JSONSource(FileSource):
source_type = 'json'

def _resolve_template_vars(self, template):
template = str(template)
template_vars = self._template_re.findall(template)
template_values = []
for m in template_vars:
18 changes: 17 additions & 1 deletion lumen/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -10,7 +10,7 @@
from bokeh.document import Document

from lumen.config import config
from lumen.sources import FileSource, Source
from lumen.sources import FileSource, JSONSource, Source
from lumen.state import state
from lumen.variables import Variables

@@ -37,6 +37,22 @@ def create(root, **kwargs):
source.clear_cache()
state.global_sources.clear()


@pytest.fixture
def make_jsonsource():
root = config._root
def create(root, **kwargs):
config._root = root
source = JSONSource(tables={'local': './test.json'}, **kwargs)
state.sources['original'] = source
return source
yield create
config._root = root
for source in state.global_sources.values():
source.clear_cache()
state.global_sources.clear()


@pytest.fixture
def make_variable_filesource():
root = config._root
8 changes: 8 additions & 0 deletions lumen/tests/sources/catalog_intake.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
sources:
test:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: csv
args:
urlpath: '{{ CATALOG_DIR }}/test.csv'
csv_kwargs:
parse_dates: ['D']
17 changes: 17 additions & 0 deletions lumen/tests/sources/catalog_intake_sql.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
sources:
test_sql:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: sql
args:
uri: 'sqlite:///{{ CATALOG_DIR }}test.db'
sql_expr: 'SELECT * FROM mixed'
sql_kwargs:
parse_dates: ['D']
test_sql_with_none:
description: Generated using pandas.util.testing.makeMixedDataFrame()
driver: sql
args:
uri: 'sqlite:///{{ CATALOG_DIR }}test.db'
sql_expr: 'SELECT * FROM mixed_none'
sql_kwargs:
parse_dates: ['D']
1 change: 1 addition & 0 deletions lumen/tests/sources/test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"A":{"0":0.0,"1":1.0,"2":2.0,"3":3.0,"4":4.0},"B":{"0":0.0,"1":1.0,"2":0.0,"3":1.0,"4":0.0},"C":{"0":"foo1","1":"foo2","2":"foo3","3":"foo4","4":"foo5"},"D":{"0":"2009-01-01","1":"2009-01-02","2":"2009-01-05","3":"2009-01-06","4":"2009-01-07"}}
194 changes: 24 additions & 170 deletions lumen/tests/sources/test_base.py
Original file line number Diff line number Diff line change
@@ -1,180 +1,34 @@
import datetime as dt
import os

from pathlib import Path

import pandas as pd
import pytest

from lumen.sources import Source
from lumen.config import config
from lumen.sources import FileSource, Source
from lumen.state import state
from lumen.transforms.sql import SQLLimit


@pytest.fixture
def source(make_filesource):
root = os.path.dirname(__file__)
return make_filesource(root)


@pytest.fixture
def expected_df(column_value_type):
df = pd._testing.makeMixedDataFrame()
column, value, type = column_value_type

if type == 'single_value':
return df[df[column] == value]

elif type == 'range':
begin, end = value
return df[(df[column] >= begin) & (df[column] <= end)]

elif type == 'range_list':
conditions = False
for range in value:
begin, end = range
conditions |= ((df[column] >= begin) & (df[column] <= end))
return df[conditions]

elif type == 'list':
return df[df[column].isin(value)]

elif type == 'date':
return df[df[column] == pd.to_datetime(value)]

elif type == 'date_range':
begin, end = value
return df[(df[column] >= pd.to_datetime(begin)) & (df[column] <= pd.to_datetime(end))]

return df


def test_source_resolve_module_type():
assert Source._get_type('lumen.sources.base.Source') is Source
assert Source.source_type is None


@pytest.mark.parametrize("filter_col_A", [314, (13, 314), ['A', 314, 'def']])
@pytest.mark.parametrize("filter_col_B", [(3, 15.9)])
@pytest.mark.parametrize("filter_col_C", [[1, 'A', 'def']])
@pytest.mark.parametrize("sql_transforms", [(None, None), (SQLLimit(limit=100), SQLLimit(limit=100))])
def test_file_source_table_cache_key(source, filter_col_A, filter_col_B, filter_col_C, sql_transforms):
t1, t2 = sql_transforms
kwargs1 = {}
kwargs2 = {}
if t1 is not None:
kwargs1['sql_transforms'] = [t1]
if t2 is not None:
kwargs2['sql_transforms'] = [t2]
key1 = source._get_key('test', A=filter_col_A, B=filter_col_B, C=filter_col_C, **kwargs1)
key2 = source._get_key('test', A=filter_col_A, B=filter_col_B, C=filter_col_C, **kwargs2)
assert key1 == key2


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', (1, 3), 'range'),
('A', (1, 2), 'range'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', 'foo2', 'single_value'),
('C', ['foo1', 'foo3'], 'list'),
('D', dt.datetime(2009, 1, 2), 'single_value'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('D', dt.datetime(2009, 1, 2), 'date'),
('D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_filter(source, column_value_type, dask, expected_df):
column, value, _ = column_value_type
kwargs = {column: value, '__dask': dask}
filtered = source.get('test', **kwargs)
pd.testing.assert_frame_equal(filtered, expected_df)


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', (1, 3), 'range'),
('A', (1, 2), 'range'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', 'foo2', 'single_value'),
('C', ['foo1', 'foo3'], 'list'),
('D', dt.datetime(2009, 1, 2), 'single_value'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('D', dt.datetime(2009, 1, 2), 'date'),
('D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_get_query_cache(source, column_value_type, dask, expected_df):
column, value, _ = column_value_type
kwargs = {column: value}
source.get('test', __dask=dask, **kwargs)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache
cached_df = source._cache[cache_key]
if dask:
cached_df = cached_df.compute()
pd.testing.assert_frame_equal(cached_df, expected_df)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache


@pytest.mark.parametrize(
"column_value_type", [
('A', 1, 'single_value'),
('A', [(0, 1), (3, 4)], 'range_list'),
('C', ['foo1', 'foo3'], 'list'),
('D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('D', dt.datetime(2009, 1, 2), 'date'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_clear_cache(source, column_value_type, dask):
column, value, _ = column_value_type
kwargs = {column: value}
source.get('test', __dask=dask, **kwargs)
cache_key = source._get_key('test', **kwargs)
assert cache_key in source._cache
source.clear_cache()
assert len(source._cache) == 0


def test_file_source_get_query_cache_to_file(make_filesource, cachedir):
def test_source_from_spec_in_state():
config_root = config._root
root = os.path.dirname(__file__)
source = make_filesource(root, cache_dir=cachedir)
source.get('test', A=(1, 2))

cache_key = source._get_key('test', A=(1, 2))
cache_path = Path(cachedir) / f'{cache_key}_test.parq'
df = pd.read_parquet(cache_path, engine='fastparquet')

# Patch index names due to https://github.com/dask/fastparquet/issues/732
df.index.names = [None]
pd.testing.assert_frame_equal(
df,
pd._testing.makeMixedDataFrame().iloc[1:3]
)


def test_file_source_get_tables(source):
tables = source.get_tables()
assert tables == ['test']


def test_file_source_variable(make_variable_filesource):
root = os.path.dirname(__file__)
source = make_variable_filesource(root)
state.variables.tables = {'test': 'test2.csv'}
df = source.get('test')
expected = pd._testing.makeMixedDataFrame().iloc[::-1].reset_index(drop=True)
pd.testing.assert_frame_equal(df, expected)


def test_extension_of_comlicated_url(source):
url = "https://api.tfl.gov.uk/Occupancy/BikePoints/@{stations.stations.id}?app_key=random_numbers"
source.tables["test"] = url
assert source._named_files["test"][1] is None
config._root = root
original_source = FileSource(tables={'test': 'test.csv'}, kwargs={'parse_dates': ['D']})
state.sources['original'] = original_source
from_spec_source = Source.from_spec('original')

# source loaded from spec is identical with the original source
assert type(from_spec_source) == type(original_source)
assert from_spec_source.name == original_source.name
assert from_spec_source.cache_dir == original_source.cache_dir
assert from_spec_source.shared == original_source.shared
assert from_spec_source.root == original_source.root
assert from_spec_source.source_type == original_source.source_type
assert from_spec_source._supports_sql == original_source._supports_sql

# clean up cache memory
config._root = config_root
for source in state.global_sources.values():
source.clear_cache()
state.global_sources.clear()
61 changes: 36 additions & 25 deletions lumen/tests/sources/test_derived.py
Original file line number Diff line number Diff line change
@@ -5,6 +5,11 @@

from lumen.sources.base import DerivedSource

from .utils import (
source_clear_cache, source_get_cache_no_query,
source_get_schema_update_cache, source_get_tables,
)


@pytest.fixture
def original(make_filesource):
@@ -52,11 +57,22 @@ def tables_mode_spec():
return spec


@pytest.fixture
def source(original, tables_mode_spec):
return DerivedSource.from_spec(tables_mode_spec)


@pytest.fixture
def source_tables():
return {'derived': pd._testing.makeMixedDataFrame()}


def test_derived_source_resolve_module_type():
assert DerivedSource._get_type('lumen.sources.base.DerivedSource') is DerivedSource
assert DerivedSource.source_type == 'derived'


def test_derived_mirror_source(original, mirror_mode_spec):
def test_derived_source_mirror_mode(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
assert derived.get_tables() == original.get_tables()
assert derived.get_schema() == original.get_schema()
@@ -68,7 +84,7 @@ def test_derived_mirror_source(original, mirror_mode_spec):
('transforms', [{'type': 'iloc', 'end': 3}]),
('filters', [{'type': 'constant', 'field': 'A', 'value': (0, 2)}]),
])
def test_derived_mirror_source_apply(
def test_derived_source_mirror_mode_apply(
original, mirror_mode_spec, additional_spec, expected_table, expected_schema
):
spec_key, spec_value = additional_spec
@@ -79,19 +95,11 @@ def test_derived_mirror_source_apply(
pd.testing.assert_frame_equal(derived.get('test'), expected_table)


def test_derived_tables_source(original, tables_mode_spec):
derived = DerivedSource.from_spec(tables_mode_spec)
assert derived.get_tables() == ['derived']
df = pd._testing.makeMixedDataFrame()
pd.testing.assert_frame_equal(derived.get('derived'), df)
assert original.get_schema('test') == derived.get_schema('derived')


@pytest.mark.parametrize("additional_spec", [
('transforms', [{'type': 'iloc', 'end': 3}]),
('filters', [{'type': 'constant', 'field': 'A', 'value': (0, 2)}]),
])
def test_derived_tables_source_apply(
def test_derived_source_tables_mode_apply(
original, tables_mode_spec, additional_spec, expected_table, expected_schema
):
spec_key, spec_value = additional_spec
@@ -102,19 +110,22 @@ def test_derived_tables_source_apply(
assert derived.get_schema('derived') == expected_schema


def test_derived_get_query_cache(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
df = derived.get('test')
cache_key = derived._get_key('test')
assert cache_key in derived._cache
cached_df = derived._cache[cache_key]
pd.testing.assert_frame_equal(cached_df, df)
def test_derived_source_get_tables(source, source_tables):
assert source_get_tables(source, source_tables)


def test_derived_clear_cache(original, mirror_mode_spec):
derived = DerivedSource.from_spec(mirror_mode_spec)
derived.get('test')
cache_key = derived._get_key('test')
assert cache_key in derived._cache
derived.clear_cache()
assert len(derived._cache) == 0
@pytest.mark.parametrize("dask", [True, False])
def test_derived_source_get_cache_no_query(source, dask, source_tables):
for table in source_tables:
expected_table = source_tables[table]
assert source_get_cache_no_query(
source, table, expected_table, dask, use_dask=False
)


def test_derived_source_get_schema_update_cache(source):
assert source_get_schema_update_cache(source, table='derived')


def test_derived_source_clear_cache(source):
assert source_clear_cache(source, table='derived')
144 changes: 144 additions & 0 deletions lumen/tests/sources/test_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import datetime as dt
import os

from pathlib import Path

import pandas as pd
import pytest

from lumen.sources import FileSource
from lumen.state import state
from lumen.transforms.sql import SQLLimit

from .utils import (
source_clear_cache, source_filter, source_get_cache_query,
source_get_schema_update_cache, source_get_tables, source_table_cache_key,
)


@pytest.fixture
def source(make_filesource):
root = os.path.dirname(__file__)
return make_filesource(root)


@pytest.fixture
def source_tables():
return {'test': pd._testing.makeMixedDataFrame()}


def test_source_resolve_module_type():
assert FileSource._get_type('lumen.sources.FileSource') is FileSource
assert FileSource.source_type == 'file'


def test_file_source_get_tables(source, source_tables):
assert source_get_tables(source, source_tables)


@pytest.mark.parametrize("table", ['test'])
@pytest.mark.parametrize("filter_col_A", [314, (13, 314), ['A', 314, 'def']])
@pytest.mark.parametrize("filter_col_B", [(3, 15.9)])
@pytest.mark.parametrize("filter_col_C", [[1, 'A', 'def']])
@pytest.mark.parametrize("sql_transforms", [(None, None), (SQLLimit(limit=100), SQLLimit(limit=100))])
def test_file_source_table_cache_key(source, table, filter_col_A, filter_col_B, filter_col_C, sql_transforms):
t1, t2 = sql_transforms
kwargs1 = {'A': filter_col_A, 'B': filter_col_B, 'C': filter_col_C}
kwargs2 = {'A': filter_col_A, 'B': filter_col_B, 'C': filter_col_C}
if t1 is not None:
kwargs1['sql_transforms'] = [t1]
if t2 is not None:
kwargs2['sql_transforms'] = [t2]
assert source_table_cache_key(source, table, kwargs1, kwargs2)


@pytest.mark.parametrize(
"table_column_value_type", [
('test', 'A', 1, 'single_value'),
('test', 'A', (1, 3), 'range'),
('test', 'A', [(0, 1), (3, 4)], 'range_list'),
('test', 'C', 'foo2', 'single_value'),
('test', 'C', ['foo1', 'foo3'], 'list'),
('test', 'D', dt.datetime(2009, 1, 2), 'single_value'),
('test', 'D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('test', 'D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('test', 'D', dt.datetime(2009, 1, 2), 'date'),
('test', 'D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_filter(source, table_column_value_type, dask, expected_filtered_df):
assert source_filter(source, table_column_value_type, dask, expected_filtered_df)


@pytest.mark.parametrize(
"table_column_value_type", [
('test', 'A', 1, 'single_value'),
('test', 'A', (1, 3), 'range'),
('test', 'A', [(0, 1), (3, 4)], 'range_list'),
('test', 'C', 'foo2', 'single_value'),
('test', 'C', ['foo1', 'foo3'], 'list'),
('test', 'D', dt.datetime(2009, 1, 2), 'single_value'),
('test', 'D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('test', 'D', [dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)], 'list'),
('test', 'D', dt.datetime(2009, 1, 2), 'date'),
('test', 'D', (dt.date(2009, 1, 2), dt.date(2009, 1, 5)), 'date_range'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_file_source_get_cache_query(source, table_column_value_type, dask, expected_filtered_df):
assert source_get_cache_query(source, table_column_value_type, dask, expected_filtered_df)


def test_file_source_get_schema_update_cache(source):
assert source_get_schema_update_cache(source, table='test')


@pytest.mark.parametrize(
"table_column_value_type", [
('test', 'A', 1, 'single_value'),
('test', 'A', [(0, 1), (3, 4)], 'range_list'),
('test', 'C', ['foo1', 'foo3'], 'list'),
('test', 'D', (dt.datetime(2009, 1, 2), dt.datetime(2009, 1, 5)), 'range'),
('test', 'D', dt.datetime(2009, 1, 2), 'date'),
]
)
@pytest.mark.parametrize("dask", [True])
def test_file_source_clear_cache(source, table_column_value_type, dask):
table, column, value, _ = table_column_value_type
kwargs = {column: value}
if dask is not None:
kwargs['__dask'] = dask
assert source_clear_cache(source, table, kwargs)


def test_file_source_get_cache_query_to_file(make_filesource, cachedir):
root = os.path.dirname(__file__)
source = make_filesource(root, cache_dir=cachedir)
source.get('test', A=(1, 2))

cache_key = source._get_key('test', A=(1, 2))
cache_path = Path(cachedir) / f'{cache_key}_test.parq'
df = pd.read_parquet(cache_path, engine='fastparquet')

# Patch index names due to https://github.com/dask/fastparquet/issues/732
df.index.names = [None]
pd.testing.assert_frame_equal(
df,
pd._testing.makeMixedDataFrame().iloc[1:3]
)


def test_file_source_variable(make_variable_filesource):
root = os.path.dirname(__file__)
source = make_variable_filesource(root)
state.variables.tables = {'test': 'test2.csv'}
df = source.get('test')
expected = pd._testing.makeMixedDataFrame().iloc[::-1].reset_index(drop=True)
pd.testing.assert_frame_equal(df, expected)


def test_file_source_extension_of_comlicated_url(source):
url = "https://api.tfl.gov.uk/Occupancy/BikePoints/@{stations.stations.id}?app_key=random_numbers"
source.tables["test"] = url
assert source._named_files["test"][1] is None
73 changes: 48 additions & 25 deletions lumen/tests/sources/test_intake.py
Original file line number Diff line number Diff line change
@@ -7,46 +7,56 @@

from lumen.sources.intake import IntakeSource

from .utils import (
source_filter, source_get_cache_no_query, source_get_schema_update_cache,
source_get_tables,
)


@pytest.fixture
def source():
root = os.path.dirname(__file__)
return IntakeSource(
uri=os.path.join(root, 'catalog.yml'), root=root
)
return IntakeSource(uri=os.path.join(root, 'catalog_intake.yml'), root=root)


@pytest.fixture
def source_tables():
df_test = pd._testing.makeMixedDataFrame()
df_test_sql = pd._testing.makeMixedDataFrame()
df_test_sql_none = pd._testing.makeMixedDataFrame()
df_test_sql_none['C'] = ['foo1', None, 'foo3', None, 'foo5']
tables = {
'test': df_test,
'test_sql': df_test_sql,
'test_sql_with_none': df_test_sql_none,
return {'test': pd._testing.makeMixedDataFrame()}


@pytest.fixture
def source_schemas():
schemas = {
'test': {
'A': {'inclusiveMaximum': 4.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'B': {'inclusiveMaximum': 1.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'C': {'enum': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'], 'type': 'string'},
'D': {
'format': 'datetime',
'inclusiveMaximum': '2009-01-07T00:00:00',
'inclusiveMinimum': '2009-01-01T00:00:00',
'type': 'string'
}
}
}
return tables
return schemas


def test_intake_resolve_module_type():
def test_intake_source_resolve_module_type():
assert IntakeSource._get_type('lumen.sources.intake_sql.IntakeSource') is IntakeSource
assert IntakeSource.source_type == 'intake'


def test_intake_source_from_file(source):
df = pd._testing.makeMixedDataFrame()
pd.testing.assert_frame_equal(source.get('test'), df)
def test_intake_source_from_file(source, source_tables):
assert source_get_tables(source, source_tables)


def test_intake_source_from_dict():
def test_intake_source_from_dict(source_tables):
root = os.path.dirname(__file__)
with open(os.path.join(root, 'catalog.yml')) as f:
with open(os.path.join(root, 'catalog_intake.yml')) as f:
catalog = yaml.load(f, Loader=yaml.Loader)
source = IntakeSource(catalog=catalog, root=root)
df = pd._testing.makeMixedDataFrame()
pd.testing.assert_frame_equal(source.get('test'), df)
assert source_get_tables(source, source_tables)


@pytest.mark.parametrize(
@@ -63,8 +73,21 @@ def test_intake_source_from_dict():
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_intake_filter(source, table_column_value_type, dask, expected_filtered_df):
table, column, value, _ = table_column_value_type
kwargs = {column: value}
filtered = source.get(table, __dask=dask, **kwargs)
pd.testing.assert_frame_equal(filtered, expected_filtered_df)
def test_intake_source_filter(source, table_column_value_type, dask, expected_filtered_df):
assert source_filter(source, table_column_value_type, dask, expected_filtered_df)


@pytest.mark.parametrize("dask", [True, False])
def test_intake_source_get_cache_no_query(source, dask, source_tables):
for table in source_tables:
expected_table = source_tables[table]
assert source_get_cache_no_query(source, table, expected_table, dask, use_dask=True)


def test_intake_source_get_schema(source, source_schemas):
assert source.get_schema('test') == source_schemas['test']


def test_intake_source_get_schema_update_cache(source, source_tables):
for table in source_tables:
assert source_get_schema_update_cache(source, table)
85 changes: 37 additions & 48 deletions lumen/tests/sources/test_intake_sql.py
Original file line number Diff line number Diff line change
@@ -7,44 +7,36 @@
from lumen.sources.intake_sql import IntakeSQLSource
from lumen.transforms.sql import SQLGroupBy

from .utils import (
source_filter, source_get_schema_not_update_cache, source_get_tables,
source_table_cache_key,
)


@pytest.fixture
def source():
root = os.path.dirname(__file__)
intake_sql_source = IntakeSQLSource(
uri=os.path.join(root, 'catalog.yml'), root=root
uri=os.path.join(root, 'catalog_intake_sql.yml'), root=root
)
return intake_sql_source


@pytest.fixture
def source_tables():
df_test = pd._testing.makeMixedDataFrame()
df_test_sql = pd._testing.makeMixedDataFrame()
df_test_sql_none = pd._testing.makeMixedDataFrame()
df_test_sql_none['C'] = ['foo1', None, 'foo3', None, 'foo5']
tables = {
'test': df_test,
'test_sql': df_test_sql,
'test_sql_with_none': df_test_sql_none,
}
return tables


def test_intake_sql_resolve_module_type():
assert IntakeSQLSource._get_type('lumen.sources.intake_sql.IntakeSQLSource') is IntakeSQLSource
assert IntakeSQLSource.source_type == 'intake_sql'


def test_intake_sql_get_tables(source, source_tables):
tables = source.get_tables()
assert tables == list(source_tables.keys())
for table in tables:
pd.testing.assert_frame_equal(source.get(table), source_tables[table])


def test_intake_sql_get_schema(source):
expected_sql = {
@pytest.fixture
def source_schemas():
test_sql_schema = {
'A': {'inclusiveMaximum': 4.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'B': {'inclusiveMaximum': 1.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'C': {'enum': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'], 'type': 'string'},
@@ -55,20 +47,7 @@ def test_intake_sql_get_schema(source):
'type': 'string'
}
}
expected_csv = dict(expected_sql, D={
'format': 'datetime',
'inclusiveMaximum': '2009-01-07T00:00:00',
'inclusiveMinimum': '2009-01-01T00:00:00',
'type': 'string'
})
assert source.get_schema('test_sql') == expected_sql
assert list(source._schema_cache.keys()) == ['test_sql']
assert source.get_schema('test') == expected_csv
assert list(source._schema_cache.keys()) == ['test_sql', 'test']


def test_intake_sql_get_schema_with_none(source):
expected_sql = {
test_sql_with_none_schema = {
'A': {'inclusiveMaximum': 4.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'B': {'inclusiveMaximum': 1.0, 'inclusiveMinimum': 0.0, 'type': 'number'},
'C': {'enum': ['foo1', None, 'foo3', 'foo5'], 'type': 'string'},
@@ -79,13 +58,30 @@ def test_intake_sql_get_schema_with_none(source):
'type': 'string'
}
}
assert source.get_schema('test_sql_with_none') == expected_sql
assert list(source._schema_cache.keys()) == ['test_sql_with_none']
schemas = {
'test_sql': test_sql_schema,
'test_sql_with_none': test_sql_with_none_schema,
}
return schemas


def test_intake_sql_get_schema_cache(source):
source.get_schema('test_sql')
assert 'test_sql' in source._schema_cache
def test_intake_sql_resolve_module_type():
assert IntakeSQLSource._get_type('lumen.sources.intake_sql.IntakeSQLSource') is IntakeSQLSource
assert IntakeSQLSource.source_type == 'intake_sql'


def test_intake_sql_get_tables(source, source_tables):
assert source_get_tables(source, source_tables)


def test_intake_sql_get_schema(source, source_schemas):
for table in source_schemas:
assert source.get_schema(table) == source_schemas[table]


def test_intake_sql_cache_key(source, source_tables):
for table in source_tables:
assert source_table_cache_key(source, table)


@pytest.mark.parametrize(
@@ -105,10 +101,9 @@ def test_intake_sql_get_schema_cache(source):
)
@pytest.mark.parametrize("dask", [True, False])
def test_intake_sql_filter(source, table_column_value_type, dask, expected_filtered_df):
table, column, value, _ = table_column_value_type
kwargs = {column: value}
filtered = source.get(table, __dask=dask, **kwargs)
pd.testing.assert_frame_equal(filtered, expected_filtered_df.reset_index(drop=True))
assert source_filter(
source, table_column_value_type, dask, expected_filtered_df.reset_index(drop=True)
)


def test_intake_sql_transforms(source, source_tables):
@@ -134,11 +129,5 @@ def test_intake_sql_transforms_cache(source, source_tables):
assert cache_key in source._cache


def test_intake_sql_clear_cache(source):
source.get('test_sql')
source.get_schema('test_sql')
assert len(source._cache) == 1
assert len(source._schema_cache) == 1
source.clear_cache()
assert len(source._cache) == 0
assert len(source._schema_cache) == 0
def test_intake_sql_get_schema_not_update_cache(source):
assert source_get_schema_not_update_cache(source, table='test_sql')
149 changes: 149 additions & 0 deletions lumen/tests/sources/test_joined.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import datetime as dt
import os

import numpy as np
import pandas as pd
import pytest

from lumen.sources import JoinedSource
from lumen.sources.intake_sql import IntakeSQLSource

from .utils import source_get_tables


@pytest.fixture
def file_source(make_filesource):
root = os.path.dirname(__file__)
return make_filesource(root)


@pytest.fixture
def intake_sql_source():
root = os.path.dirname(__file__)
intake_sql_source = IntakeSQLSource(
uri=os.path.join(root, 'catalog_intake_sql.yml'), root=root
)
return intake_sql_source


@pytest.fixture
def source(file_source, intake_sql_source, index):
sources = {
'file_source': file_source,
'intake_sql_source': intake_sql_source,
}
tables = {
'joined_table': [
{'source': 'file_source',
'table': 'test',
},
{'source': 'intake_sql_source',
'table': 'test_sql_with_none',
}
]
}
if index is not None:
tables['joined_table'][0]['index'] = index
tables['joined_table'][1]['index'] = index

intake_sql_source = JoinedSource(sources=sources, tables=tables)
return intake_sql_source


@pytest.fixture
def source_tables_default_index():
format = '%Y-%m-%dT%H:%M:%S'
table = pd.DataFrame({
'A': [0., 1., 2., 3., 4., 1., 3.],
'B': [0., 1., 0., 1., 0., 1., 1.],
'C': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5', None, None],
'D': [
dt.datetime.strptime('2009-01-01T00:00:00', format),
dt.datetime.strptime('2009-01-02T00:00:00', format),
dt.datetime.strptime('2009-01-05T00:00:00', format),
dt.datetime.strptime('2009-01-06T00:00:00', format),
dt.datetime.strptime('2009-01-07T00:00:00', format),
dt.datetime.strptime('2009-01-02T00:00:00', format),
dt.datetime.strptime('2009-01-06T00:00:00', format),
],
})
return {'joined_table': table}


@pytest.fixture
def source_tables_index_A():
format = '%Y-%m-%dT%H:%M:%S'
table = pd.DataFrame({
'A': [0., 1., 2., 3., 4.],
'B_x': [0., 1., 0., 1., 0.],
'C_x': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'],
'D_x': [
dt.datetime.strptime('2009-01-01T00:00:00', format),
dt.datetime.strptime('2009-01-02T00:00:00', format),
dt.datetime.strptime('2009-01-05T00:00:00', format),
dt.datetime.strptime('2009-01-06T00:00:00', format),
dt.datetime.strptime('2009-01-07T00:00:00', format),
],
'B_y': [0., 1., 0., 1., 0.],
'C_y': ['foo1', None, 'foo3', None, 'foo5'],
'D_y': [
dt.datetime.strptime('2009-01-01T00:00:00', format),
dt.datetime.strptime('2009-01-02T00:00:00', format),
dt.datetime.strptime('2009-01-05T00:00:00', format),
dt.datetime.strptime('2009-01-06T00:00:00', format),
dt.datetime.strptime('2009-01-07T00:00:00', format),
],
})
return {'joined_table': table}


@pytest.fixture
def source_tables_index_C():
format = '%Y-%m-%dT%H:%M:%S'
table = pd.DataFrame({
'A_x': [0., 1., 2., 3., 4.],
'B_x': [0., 1., 0., 1., 0.],
'C': ['foo1', 'foo2', 'foo3', 'foo4', 'foo5'],
'D_x': [
dt.datetime.strptime('2009-01-01T00:00:00', format),
dt.datetime.strptime('2009-01-02T00:00:00', format),
dt.datetime.strptime('2009-01-05T00:00:00', format),
dt.datetime.strptime('2009-01-06T00:00:00', format),
dt.datetime.strptime('2009-01-07T00:00:00', format),
],
'A_y': [0., np.nan, 2., np.nan, 4.],
'B_y': [0., np.nan, 0., np.nan, 0.],
'D_y': [
dt.datetime.strptime('2009-01-01T00:00:00', format),
pd.NaT,
dt.datetime.strptime('2009-01-05T00:00:00', format),
pd.NaT,
dt.datetime.strptime('2009-01-07T00:00:00', format),
],
})
return {'joined_table': table}


@pytest.fixture
def source_tables(
index,
source_tables_default_index,
source_tables_index_A,
source_tables_index_C,
):
tables_dict = {
None: source_tables_default_index,
'A': source_tables_index_A,
'C': source_tables_index_C,
}
return tables_dict[index]


def test_source_resolve_module_type():
assert JoinedSource._get_type('lumen.sources.JoinedSource') is JoinedSource
assert JoinedSource.source_type == 'join'


@pytest.mark.parametrize("index", [None, 'A', 'C'])
def test_source(source, source_tables, index):
assert source_get_tables(source, source_tables)
50 changes: 50 additions & 0 deletions lumen/tests/sources/test_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import os

import pandas as pd
import pytest

from lumen.sources import JSONSource

from .utils import source_filter, source_get_cache_no_query


@pytest.fixture
def source(make_jsonsource):
root = os.path.dirname(__file__)
return make_jsonsource(root)


@pytest.fixture
def source_tables():
df = pd._testing.makeMixedDataFrame()
df['D'] = pd.to_datetime(df['D']).dt.date.astype(str)
df = df.astype({'A': int, 'B': int, 'D': 'str'})
return {'local': df}


def test_json_source_resolve_module_type():
assert JSONSource._get_type('lumen.sources.JSONSource') is JSONSource
assert JSONSource.source_type == 'json'


@pytest.mark.parametrize(
"table_column_value_type", [
('local', 'A', 1.0, 'single_value'),
('local', 'A', (1, 3), 'range'),
('local', 'A', [(0, 1), (3, 4)], 'range_list'),
('local', 'C', 'foo2', 'single_value'),
('local', 'C', ['foo1', 'foo3'], 'list'),
]
)
@pytest.mark.parametrize("dask", [True, False])
def test_json_source_filter(source, table_column_value_type, dask, expected_filtered_df):
assert source_filter(
source, table_column_value_type, dask, expected_filtered_df, check_dtype=False
)


@pytest.mark.parametrize("dask", [True, False])
def test_json_source_get_cache_no_query(source, dask, source_tables):
assert source_get_cache_no_query(
source, 'local', source_tables['local'], dask, use_dask=False, check_dtype=False,
)
101 changes: 101 additions & 0 deletions lumen/tests/sources/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pandas as pd


def source_get_tables(source, source_tables, check_dtype=True):
assert source.get_tables() == list(source_tables.keys())
for table in source_tables:
pd.testing.assert_frame_equal(
source.get(table), source_tables[table], check_dtype=check_dtype
)
return True


def source_table_cache_key(source, table, kwargs1={}, kwargs2={}):
key1 = source._get_key(table, **kwargs1)
key2 = source._get_key(table, **kwargs2)
assert key1 == key2
return True


def source_filter(source, table_column_value_type, dask, expected_df, check_dtype=True):
table, column, value, _ = table_column_value_type
kwargs = {column: value}
if dask is not None:
kwargs['__dask'] = dask
filtered = source.get(table, **kwargs)
pd.testing.assert_frame_equal(filtered, expected_df, check_dtype=check_dtype)
return True


def source_get_cache_query(source, table_column_value_type, dask, expected_df, check_dtype=True):
table, column, value, _ = table_column_value_type
kwargs = {column: value}
if dask:
source.get(table, __dask=dask, **kwargs)
else:
source.get(table, **kwargs)
cache_key = source._get_key(table, **kwargs)
assert cache_key in source._cache
cached_df = source._cache[cache_key]
if dask:
cached_df = cached_df.compute()
pd.testing.assert_frame_equal(cached_df, expected_df, check_dtype=check_dtype)
cache_key = source._get_key(table, **kwargs)
assert cache_key in source._cache
return True


def source_get_cache_no_query(source, table, expected_df, dask=None, use_dask=False, check_dtype=True):
source.get(table, __dask=dask)
cache_key = source._get_key(table)
assert cache_key in source._cache
assert len(source._cache) == 1
cached_df = source._cache[cache_key]
if use_dask:
# if use_dask is True,
# always return a dask DataFrame no matter the value of __dask in the query
cached_df = cached_df.compute()
pd.testing.assert_frame_equal(cached_df, expected_df, check_dtype=check_dtype)
cache_key = source._get_key(table)
assert cache_key in source._cache
assert len(source._cache) == 1
return True


def source_get_schema_update_cache(source, table):
# for some source type,
# source.get_schema() updates both source._schema_cache and source._cache
source.get_schema(table)
assert table in source._schema_cache
assert len(source._schema_cache) == 1
assert len(source._cache) == 1
# schema for this table is now cached, call source.get_schema() again does not update cache
source.get_schema(table)
assert len(source._schema_cache) == 1
assert table in source._schema_cache
assert len(source._cache) == 1
return True


def source_get_schema_not_update_cache(source, table):
# for some source type,
# source.get_schema() only updates source._schema_cache, source._cache remains the same
source.get_schema(table)
assert len(source._schema_cache) == 1
assert table in source._schema_cache
assert len(source._cache) == 0
# schema for this table is now cached, call source.get_schema() again does not update cache
source.get_schema(table)
assert len(source._schema_cache) == 1
assert table in source._schema_cache
assert len(source._cache) == 0
return True


def source_clear_cache(source, table, kwargs={}):
source.get(table, **kwargs)
source.get_schema(table)
source.clear_cache()
assert len(source._cache) == 0
assert len(source._schema_cache) == 0
return True