Skip to content

Commit

Permalink
Better management of ConfigDictionaries (#255)
Browse files Browse the repository at this point in the history
 Create methods for easily constructed nested configs on solids and
contexts

Right now, constructing nested dictionaries for configurations on solids
and contexts is a big pain. The user must make up a throwaway name for
type and then do multiple layers of nesting, coming up with more
arbitrary names a long the way. The syntax is also verbose.

This PR address that issue.

What was before:

   ConfigDefinition(
        types.ConfigDictionary(
            'PipelineName.Solid.SolidName.ConfigDict',
            {
                'foo' : types.Field(types.String),
                'nested_dict' : types.Field(
                    types.ConfigDictionary(
                        'PipelineName.Solid.SolidName.NestedDict.ConfigDict',
                        {
                            'bar' : types.Field(types.String),
                        },
                    ),
                ),
            },
        ),
    )

becomes

    ConfigDefinition.solid_config_def_dict(
        'pipeline_name',
        'solid_name',
        {
            'foo': types.Field(types.String),
            'nested_dict': {
                'bar': types.Field(types.String),
            },
        },
    )

This should make it far less painful to construct configuration types
that are custom to a context or solid, reducing lots of boilerplate.
  • Loading branch information
schrockn authored Nov 1, 2018
1 parent fb070cc commit 6b0263c
Show file tree
Hide file tree
Showing 7 changed files with 508 additions and 31 deletions.
21 changes: 4 additions & 17 deletions python_modules/dagster/dagster/core/config_types.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import re
from dagster import check

from dagster.utils import camelcase

from .config import (
Context,
Environment,
Expand Down Expand Up @@ -296,9 +297,7 @@ def all_optional_type(dagster_type):
check.inst_param(dagster_type, 'dagster_type', DagsterType)

if isinstance(dagster_type, DagsterCompositeType):
for field in dagster_type.field_dict.values():
if not field.is_optional:
return False
return dagster_type.all_fields_optional
return True


Expand All @@ -318,7 +317,7 @@ def __init__(self, name, pipeline_def):
if solid.definition.config_def:
solid_name = camelcase(solid.name)
solid_config_type = SolidConfigType(
'{pipeline_name}.{solid_name}.SolidConfig'.format(
'{pipeline_name}.SolidConfig.{solid_name}'.format(
pipeline_name=pipeline_name,
solid_name=solid_name,
),
Expand Down Expand Up @@ -347,15 +346,3 @@ def __init__(self, name):

def evaluate_value(self, value):
return process_incoming_composite_value(self, value, lambda val: Execution(**val))


# Adapted from https://github.com/okunishinishi/python-stringcase/blob/master/stringcase.py
def camelcase(string):
string = re.sub(r'^[\-_\.]', '', str(string))
if not string:
return string
return str(string[0]).upper() + re.sub(
r'[\-_\.\s]([a-z])',
lambda matched: str(matched.group(1)).upper(),
string[1:],
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,21 @@

from dagster import (
ConfigDefinition,
types,
Field,
DagsterEvaluateValueError,
DagsterInvalidDefinitionError,
Field,
PipelineContextDefinition,
PipelineDefinition,
SolidDefinition,
types,
)

from dagster.core.types import process_incoming_composite_value
from dagster.core.types import (
process_incoming_composite_value,
ScopedConfigInfo,
)

from dagster.core.definitions import build_config_dict_type


def test_noop_config():
Expand Down Expand Up @@ -382,3 +391,243 @@ def test_custom_composite_type():
'foo': 'some_string',
'bar': 'not_an_int',
})


def single_elem(ddict):
return list(ddict.items())[0]


def test_build_config_dict_type():
single_cd_type = build_config_dict_type(
['PipelineName', 'SingleField'],
{'foo': types.Field(types.String)},
ScopedConfigInfo(
pipeline_def_name='pipeline_name',
solid_def_name='single_field',
),
)
assert isinstance(single_cd_type, types.ConfigDictionary)
assert single_cd_type.name == 'PipelineName.SingleField.ConfigDict'
assert len(single_cd_type.field_dict) == 1
foo_name, foo_field = single_elem(single_cd_type.field_dict)
assert foo_name == 'foo'
assert foo_field.dagster_type is types.String


def test_build_single_nested():
def _assert_facts(single_nested):
assert single_nested.name == 'PipelineName.Solid.SolidName.ConfigDict'
assert set(single_nested.field_dict.keys()) == set(['foo', 'nested_dict'])

assert single_nested.field_dict['nested_dict'].is_optional is False
nested_field_type = single_nested.field_dict['nested_dict'].dagster_type

assert isinstance(nested_field_type, types.ConfigDictionary)
assert nested_field_type.name == 'PipelineName.Solid.SolidName.NestedDict.ConfigDict'
assert nested_field_type.field_name_set == set(['bar'])

old_style_config_def = ConfigDefinition(
types.ConfigDictionary(
'PipelineName.Solid.SolidName.ConfigDict',
{
'foo':
types.Field(types.String),
'nested_dict':
types.Field(
types.ConfigDictionary(
'PipelineName.Solid.SolidName.NestedDict.ConfigDict',
{
'bar': types.Field(types.String),
},
),
),
},
),
)

_assert_facts(old_style_config_def.config_type)

single_nested_manual = build_config_dict_type(
['PipelineName', 'Solid', 'SolidName'],
{
'foo': types.Field(types.String),
'nested_dict': {
'bar': types.Field(types.String),
},
},
ScopedConfigInfo(
pipeline_def_name='pipeline_name',
solid_def_name='solid_name',
),
)

_assert_facts(single_nested_manual)

nested_from_config_def = ConfigDefinition.solid_config_dict(
'pipeline_name',
'solid_name',
{
'foo': types.Field(types.String),
'nested_dict': {
'bar': types.Field(types.String),
},
},
)

_assert_facts(nested_from_config_def.config_type)


def test_build_double_nested():
double_config_type = ConfigDefinition.context_config_dict(
'some_pipeline',
'some_context',
{
'level_one': {
'level_two': {
'field': types.Field(types.String)
}
}
},
).config_type

assert double_config_type.name == 'SomePipeline.Context.SomeContext.ConfigDict'

level_one_type = double_config_type.field_dict['level_one'].dagster_type

assert isinstance(level_one_type, types.ConfigDictionary)
assert level_one_type.name == 'SomePipeline.Context.SomeContext.LevelOne.ConfigDict'
assert level_one_type.field_name_set == set(['level_two'])

level_two_type = level_one_type.field_dict['level_two'].dagster_type

assert level_two_type.name == 'SomePipeline.Context.SomeContext.LevelOne.LevelTwo.ConfigDict'
assert level_two_type.field_name_set == set(['field'])


def test_build_optionality():
optional_test_type = ConfigDefinition.solid_config_dict(
'some_pipeline',
'some_solid',
{
'required': {
'value': types.Field(types.String),
},
'optional': {
'value': types.Field(types.String, is_optional=True),
}
},
).config_type

assert optional_test_type.field_dict['required'].is_optional is False
assert optional_test_type.field_dict['optional'].is_optional is True


def test_pipeline_name_mismatch_error():
with pytest.raises(DagsterInvalidDefinitionError, match='wrong pipeline name'):
PipelineDefinition(
name='pipeline_mismatch_test',
solids=[
SolidDefinition(
name='some_solid',
inputs=[],
outputs=[],
config_def=ConfigDefinition.solid_config_dict(
'wrong_pipeline',
'some_solid',
{},
),
transform_fn=lambda *_args: None,
),
],
)

with pytest.raises(DagsterInvalidDefinitionError, match='wrong pipeline name'):
PipelineDefinition(
name='pipeline_mismatch_test',
solids=[],
context_definitions={
'some_context':
PipelineContextDefinition(
context_fn=lambda *_args: None,
config_def=ConfigDefinition.context_config_dict(
'not_a_match',
'some_context',
{},
)
)
}
)


def test_solid_name_mismatch():
with pytest.raises(DagsterInvalidDefinitionError, match='wrong solid name'):
PipelineDefinition(
name='solid_name_mismatch',
solids=[
SolidDefinition(
name='dont_match_me',
inputs=[],
outputs=[],
config_def=ConfigDefinition.solid_config_dict(
'solid_name_mismatch',
'nope',
{},
),
transform_fn=lambda *_args: None,
),
],
)

with pytest.raises(DagsterInvalidDefinitionError, match='context_config_dict'):
PipelineDefinition(
name='solid_name_mismatch',
solids=[
SolidDefinition(
name='dont_match_me',
inputs=[],
outputs=[],
config_def=ConfigDefinition.context_config_dict(
'solid_name_mismatch',
'dont_match_me',
{},
),
transform_fn=lambda *_args: None,
),
],
)


def test_context_name_mismatch():
with pytest.raises(DagsterInvalidDefinitionError, match='wrong context name'):
PipelineDefinition(
name='context_name_mismatch',
solids=[],
context_definitions={
'test':
PipelineContextDefinition(
context_fn=lambda *_args: None,
config_def=ConfigDefinition.context_config_dict(
'context_name_mismatch',
'nope',
{},
)
)
}
)

with pytest.raises(DagsterInvalidDefinitionError, match='solid_config_dict'):
PipelineDefinition(
name='context_name_mismatch',
solids=[],
context_definitions={
'test':
PipelineContextDefinition(
context_fn=lambda *_args: None,
config_def=ConfigDefinition.solid_config_dict(
'context_name_mismatch',
'some_solid',
{},
)
)
}
)
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,9 @@
SolidDictionaryType,
SpecificContextConfig,
all_optional_user_config,
camelcase,
)


def test_camelcase():
assert camelcase('foo') == 'Foo'
assert camelcase('foo_bar') == 'FooBar'
assert camelcase('foo.bar') == 'FooBar'
assert camelcase('foo-bar') == 'FooBar'


def test_context_config_any():
context_defs = {
'test':
Expand Down Expand Up @@ -417,7 +409,7 @@ def test_whole_environment():
solids_type = environment_type.field_dict['solids'].dagster_type
assert solids_type.name == 'SomePipeline.SolidsConfigDictionary'
assert solids_type.field_dict['int_config_solid'
].dagster_type.name == 'SomePipeline.IntConfigSolid.SolidConfig'
].dagster_type.name == 'SomePipeline.SolidConfig.IntConfigSolid'
assert environment_type.field_dict['expectations'
].dagster_type.name == 'SomePipeline.ExpectationsConfig'

Expand Down
Loading

0 comments on commit 6b0263c

Please sign in to comment.