Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Lineage for data and code #12

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open

[WIP] Lineage for data and code #12

wants to merge 20 commits into from

Conversation

YuanmingLeee
Copy link
Contributor

Close #11

What is done in this MR

By using the OpenLineage airflow service provider, DataCI is now able to track lineages among different versions of data and code (stage, pipeline)

How to reproduce?

  1. Start servers
# start airflow backend
# Remember to activate dataci venv and export PYTHONPATH at dataci project root
airflow standalone
# start DataCI metadata server
cd metadata
python server.py
  1. Publish a test pipeline
    The following code is available at tests/lineage/python_ops_pipeline.py
from datetime import datetime
from dataci.plugins.decorators import dag, Dataset, stage


@stage
def task1(df):
    return df


@stage
def task2_0(df):
    return df


@stage
def task2_1(df):
    return df


@stage
def task3(df1, df2):
    import pandas as pd

    return pd.concat([df1, df2])


@dag(
    start_date=datetime(2020, 7, 30), schedule=None,
)
def python_ops_pipeline():
    raw_dataset_train = Dataset.get('test.yelp_review_test@latest')
    dataset1 = Dataset(name='test.task1_out', dataset_files=task1(raw_dataset_train))
    dataset2_0 = Dataset(name='test.task2_0_out', dataset_files=task2_0(dataset1))
    dataset2_1 = Dataset(name='test.task2_1_out', dataset_files=task2_1(dataset1))
    dataset3 = Dataset(name='test.task3_out', dataset_files=task3(dataset2_0, dataset2_1))


# Build the pipeline
python_ops_dag = python_ops_pipeline()

if __name__ == '__main__':
    test_dataset = Dataset('test.yelp_review_test', dataset_files=[
            {'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0,
             'text': "I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.", },
            {'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0,
             'text': "Friend took me for lunch.  Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!", },
            {'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0,
             'text': "I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.", },
        ])
    test_dataset.publish(version_tag='2020-10')
    python_ops_dag.publish()
  1. Run the pipeline using Airflow web UI
    image

  2. The lineage is tracked, write a simple code to query.
    Here we track the 3-level dataset downstreams from the test dataset we just manually published.

from dataci.models import Dataset

import networkx as nx

dataset = Dataset.get('test.yelp_review_test@2020-10')
graph = dataset.downstream(n=3, type='dataset')
print(nx.nx_agraph.to_agraph(graph))

Expected Result

Sorry for the bad visualization 😿

strict digraph "" {
        "JobView(type='dataset', workspace='test', name='yelp_review_test', version='117bce7ded8bc6d0815813954549f026')" -> "JobView(type='run', workspace='default', name='6cd6e83a-ac2b-35a2-a1ab-856deb17a0fe', version='1')";
        "JobView(type='dataset', workspace='test', name='yelp_review_test', version='117bce7ded8bc6d0815813954549f026')" -> "JobView(type='run', workspace='default', name='934b65df-2704-3674-b303-0d184e1be86a', version='1')";
        "JobView(type='dataset', workspace='test', name='yelp_review_test', version='117bce7ded8bc6d0815813954549f026')" -> "JobView(type='run', workspace='default', name='9fef35c6-2989-3954-9f71-d5f9b4c75896', version='1')";
        "JobView(type='run', workspace='default', name='6cd6e83a-ac2b-35a2-a1ab-856deb17a0fe', version='1')" -> "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')";
        "JobView(type='run', workspace='default', name='934b65df-2704-3674-b303-0d184e1be86a', version='1')" -> "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')";
        "JobView(type='run', workspace='default', name='9fef35c6-2989-3954-9f71-d5f9b4c75896', version='1')" -> "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='32221cc3-b9b1-3bea-97f5-89a262f4447e', version='1')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='3e9e6486-5d43-3f77-8045-901ee91d7abe', version='1')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='6110e949-f1bd-3d3a-aa36-8add66867fd5', version='1')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='6a39199c-6f28-3fd4-8fe0-cf1d9826068a', version='1')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='77a762c1-d5ac-32c0-bf53-ae1d6a9f3b69', version='1')";
        "JobView(type='dataset', workspace='test', name='task1_out', version='c2f2e0e7ebb76f8e09a005d8e214b817')" -> "JobView(type='run', workspace='default', name='7d552a38-f8e8-350e-8cb3-e53ffc079d15', version='1')";
        "JobView(type='run', workspace='default', name='32221cc3-b9b1-3bea-97f5-89a262f4447e', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')";
        "JobView(type='run', workspace='default', name='3e9e6486-5d43-3f77-8045-901ee91d7abe', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')";
        "JobView(type='run', workspace='default', name='6110e949-f1bd-3d3a-aa36-8add66867fd5', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')";
        "JobView(type='run', workspace='default', name='6a39199c-6f28-3fd4-8fe0-cf1d9826068a', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')";
        "JobView(type='run', workspace='default', name='77a762c1-d5ac-32c0-bf53-ae1d6a9f3b69', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')";
        "JobView(type='run', workspace='default', name='7d552a38-f8e8-350e-8cb3-e53ffc079d15', version='1')" -> "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')";
        "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')" -> "JobView(type='run', workspace='default', name='8edcaa14-d2ca-3848-8059-afacf9c05821', version='1')";
        "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')" -> "JobView(type='run', workspace='default', name='b97b6cd3-4cec-3a7b-8819-293a706e51f4', version='1')";
        "JobView(type='dataset', workspace='test', name='task2_1_out', version='0a9739009dc34d00f9aadaae6652955b')" -> "JobView(type='run', workspace='default', name='c3008415-656e-3147-9509-5a75114bded6', version='1')";
        "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')" -> "JobView(type='run', workspace='default', name='8edcaa14-d2ca-3848-8059-afacf9c05821', version='1')";
        "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')" -> "JobView(type='run', workspace='default', name='b97b6cd3-4cec-3a7b-8819-293a706e51f4', version='1')";
        "JobView(type='dataset', workspace='test', name='task2_0_out', version='99df53c11c3e2ea961cfaf26e00d62a4')" -> "JobView(type='run', workspace='default', name='c3008415-656e-3147-9509-5a75114bded6', version='1')";
        "JobView(type='run', workspace='default', name='8edcaa14-d2ca-3848-8059-afacf9c05821', version='1')" -> "JobView(type='dataset', workspace='test', name='task3_out', version='25262a5b4190434479a644f2d87f31ca')";
        "JobView(type='run', workspace='default', name='b97b6cd3-4cec-3a7b-8819-293a706e51f4', version='1')" -> "JobView(type='dataset', workspace='test', name='task3_out', version='25262a5b4190434479a644f2d87f31ca')";
        "JobView(type='run', workspace='default', name='c3008415-656e-3147-9509-5a75114bded6', version='1')" -> "JobView(type='dataset', workspace='test', name='task3_out', version='25262a5b4190434479a644f2d87f31ca')";
}

Copy this to any graphviz visualizer, you can see 3 levels of datasets (and their runs)
image

Roadmap

  • tracking for each stage run
  • tracking the relationship (lineage) between each data and run
  • statistics on the downstream / upstream, given:
    • run
    • stage
    • dataset
  • Visualization

@YuanmingLeee YuanmingLeee added the enhancement New feature or request label Dec 10, 2023
@YuanmingLeee YuanmingLeee self-assigned this Dec 10, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Data Lineage
1 participant