Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: In each record to filter and transform, publish a local service field holding the original object the record is extracted from #214

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

rpopov
Copy link

@rpopov rpopov commented Jan 13, 2025

Implement airbytehq/airbyte#50395 discussed in airbytehq/airbyte#49971

At the record extraction step, in each record, add the service field $root holding a reference to:

  • the root response object when parsing JSON format
  • the original record, when parsing JSONL format
    that each record to process is extracted from.
    More service fields could be added in the future.
    The service fields are available in the record's filtering and transform steps.

Avoid:

  • reusing the maps/dictionaries produced, thus avoid building cyclic structures
  • transforming the service fields in the Flatten transformation.

Explicitly clean the service field(s) after the transform step, thus making them:

  • local for the filter and transform steps
  • not visible to the next mapping and store steps (as they should be)
  • not visible in the tests beyond the test_record_selector (as they should be)
    This allows the record transformation logic to define its "local variables" to reuse
    some interim calculations.

The contract of body parsing seems irregular in representing the cases of bad JSON, no JSON and empty JSON.
It cannot be unified as that irregularity is already used.

Update the development environment setup documentation

  • to organize and present the setup steps explicitly
  • to avoid misunderstandings and wasted efforts.

Update CONTRIBUTING.md to

  • collect and organize the knowledge on running the test locally;
  • state the actual testing steps;
  • clarify and make explicit the procedures and steps.

The unit, integration, and acceptance tests in this exact version succeed under Fedora 41, while
one of them fails under Oracle Linux 8.7. not related to the contents of this PR.
The integration tests of the CDK fail due to missing secrets/config.json file for the Shopify source.
See #197

Summary by CodeRabbit

Release Notes

  • New Features

    • Enhanced JSON decoding with improved error handling and logging
    • Added service key management for record extraction and transformation
    • Improved record flattening and extraction capabilities
  • Bug Fixes

    • Updated pagination strategy to handle empty decoder responses
    • Improved record selector to handle nested and service-specific keys
  • Documentation

    • Updated contributing guidelines with more detailed setup and testing instructions
    • Minor formatting improvements in release documentation
  • Chores

    • Updated Prettier dependency to version 3.4.2
    • Added comprehensive unit tests for new functionality

At record extraction step, in each record add the service field $root holding a reference to:
* the root response object, when parsing JSON format
* the original record, when parsing JSONL format
that each record to process is extracted from.
More service fields could be added in future.
The service fields are available in the record's filtering and transform steps.

Avoid:
* reusing the maps/dictionaries produced, thus avoid building cyclic structures
* transforming the service fields in the Flatten transformation.

Explicitly cleanup the service field(s) after the transform step, thus making them:
* local for the filter and transform steps
* not visible to the next mapping and store steps (as they should be)
* not visible in the tests beyond the test_record_selector (as they should be)
This allows the record transformation logic to define its "local variables" to reuse
some interim calculations.

The contract of body parsing seems irregular in representing the cases of bad JSON, no JSON and empty JSON.
Cannot be unified as that that irregularity is already used.

Update the development environment setup documentation
* to organize and present the setup steps explicitly
* to avoid misunderstandings and wasted efforts.

Update CONTRIBUTING.md to
* collect and organize the knowledge on running the test locally.
* state the actual testing steps.
* clarify and make explicit the procedures and steps.

The unit, integration, and acceptance tests in this exactly version succeed under Fedora 41, while
one of them fails under Oracle Linux 8.7. not related to the contents of this PR.
The integration tests of the CDK fail due to missing `secrets/config.json` file for the Shopify source.
See airbytehq#197
Copy link
Contributor

coderabbitai bot commented Jan 13, 2025

📝 Walkthrough

Walkthrough

This pull request introduces several enhancements to the Airbyte CDK's declarative source components, focusing on improving error handling, record extraction, and service key management. The changes span multiple files in the sources and extractors modules, with modifications to JSON decoding, record processing, and transformation logic. The updates aim to provide more robust and flexible data handling capabilities for declarative sources.

Changes

File Change Summary
airbyte_cdk/sources/declarative/decoders/json_decoder.py Enhanced error handling and simplified JSON parsing with improved logging and default value handling
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py Added RECORD_ROOT_KEY and update_record function to improve record extraction
airbyte_cdk/sources/declarative/extractors/record_extractor.py Introduced service key management functions and constants
airbyte_cdk/sources/declarative/extractors/record_selector.py Added method to remove service keys from records
docs/CONTRIBUTING.md Updated setup and testing instructions with improved clarity
package.json Upgraded prettier dependency version
Multiple test files Added comprehensive test cases for new functionality

Sequence Diagram

sequenceDiagram
    participant Decoder as JSON Decoder
    participant Extractor as DPath Extractor
    participant Selector as Record Selector
    participant Transformer as Flatten Fields

    Decoder->>Extractor: Decode Response
    Extractor->>Extractor: Update Record with Root Key
    Extractor->>Selector: Extract Records
    Selector->>Selector: Remove Service Keys
    Selector->>Transformer: Transform Records
    Transformer-->>Selector: Flattened Records
Loading

Possibly related PRs

Suggested Labels

enhancement, docs

Suggested Reviewers

  • aaronsteers
  • maxi297

Hey there! 👋 I noticed some interesting improvements in the declarative source components. Would you like me to elaborate on any specific changes? The modifications seem to enhance error handling and record processing quite nicely. Wdyt? 🤔

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (17)
airbyte_cdk/sources/declarative/extractors/record_extractor.py (3)

10-14: Small typo in the documentation

There's a typo in "transormation" -> "transformation". Otherwise, the documentation clearly explains the convention and lifecycle of service fields.

-- The service fields are kept only during the record's filtering and transormation.
++ The service fields are kept only during the record's filtering and transformation.

21-27: Consider removing type ignore and adding return type annotation

The # type: ignore[no-untyped-def] seems unnecessary here. What do you think about adding a return type annotation of None since the function modifies the dict in place? wdyt?

-def remove_service_keys(mapping: dict[str, Any]):  # type: ignore[no-untyped-def]
+def remove_service_keys(mapping: dict[str, Any]) -> None:

34-35: Consider removing type ignore and improving error message

Similar to above, the type ignore seems unnecessary. Also, would it be helpful to include the actual mapping in the assertion message to aid debugging? wdyt?

-def assert_service_keys_exist(mapping: Mapping[str, Any]):  # type: ignore[no-untyped-def]
-    assert mapping != exclude_service_keys(mapping), "The mapping should contain service keys"
+def assert_service_keys_exist(mapping: Mapping[str, Any]) -> None:
+    assert mapping != exclude_service_keys(mapping), f"The mapping should contain service keys, got: {mapping}"
airbyte_cdk/sources/declarative/transformations/flatten_fields.py (1)

36-44: Consider adjusting indentation for better readability

The logic looks good! The service keys are correctly preserved. Would you consider adjusting the indentation of the else clause to match the if condition for better readability? wdyt?

                    if not is_service_key(current_key):
                        new_key = (
                            f"{parent_key}.{current_key}"
                            if (current_key in transformed_record or force_with_parent_name)
                            else current_key
                        )
                        stack.append((value, new_key))
-                    else:  # transfer the service fields without change
-                        transformed_record[current_key] = value
+                    else:
+                        # transfer the service fields without change
+                        transformed_record[current_key] = value
unit_tests/sources/declarative/transformations/test_flatten_fields.py (1)

50-67: Consider adding test cases for nested service keys

The test coverage for service keys looks good! Would you consider adding a test case for nested objects containing service keys to ensure they're handled correctly at any depth? For example:

(
    {
        SERVICE_KEY_PREFIX + "root": {
            "nested": {SERVICE_KEY_PREFIX + "child": "value"}
        },
        "regular": "field"
    },
    {
        SERVICE_KEY_PREFIX + "root": {
            "nested": {SERVICE_KEY_PREFIX + "child": "value"}
        },
        "regular": "field"
    }
)
unit_tests/sources/declarative/extractors/test_record_extractor.py (2)

62-67: Consider catching specific assertion error

The test looks good, but would it be safer to catch the specific AssertionError instead of using a bare except? This would prevent masking other unexpected exceptions. wdyt?

    try:
        assert_service_keys_exist(original)
        success = False
-    except:  # OK, expected
+    except AssertionError:  # OK, expected
        success = True

71-76: Consider adding edge cases for service key detection

The basic test cases look good! Would you consider adding some edge cases to ensure robust handling of special characters and edge cases? For example:

def test_service_key_edge_cases():
    assert is_service_key(SERVICE_KEY_PREFIX)  # just the prefix
    assert is_service_key(SERVICE_KEY_PREFIX + "$nested")  # double prefix
    assert not is_service_key("prefix" + SERVICE_KEY_PREFIX)  # prefix in middle
airbyte_cdk/sources/declarative/decoders/json_decoder.py (1)

38-41: Consider making stack trace logging configurable?

The debug logging with stack_info=True provides great debugging context but might be verbose in production. What do you think about making it configurable through a parameter, wdyt?

-            logger.debug("Response to parse: %s", response.text, exc_info=True, stack_info=True)
+            logger.debug(
+                "Response to parse: %s",
+                response.text,
+                exc_info=True,
+                stack_info=self.parameters.get("debug_stack_trace", False)
+            )
airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (2)

23-29: Consider using dict constructor for a cleaner copy?

The implementation safely handles both dict and non-dict cases. For the dict copy, what do you think about using the dict constructor for a slightly cleaner approach, wdyt?

-        copy = {k: v for k, v in record.items()}
+        copy = dict(record)

92-109: Consider consolidating the yield statements?

The implementation handles all cases well. What do you think about consolidating the yield statements for better maintainability, wdyt?

-                if isinstance(extracted, list):
-                    for record in extracted:
-                        yield update_record(record, root_response)
-                elif isinstance(extracted, dict):
-                    yield update_record(extracted, root_response)
-                elif extracted:
-                    yield extracted
-                else:
-                    yield from []
+                if not extracted:
+                    return
+                records = [extracted] if isinstance(extracted, dict) else extracted if isinstance(extracted, list) else [extracted]
+                yield from (update_record(record, root_response) for record in records)
airbyte_cdk/sources/declarative/extractors/record_selector.py (1)

112-113: Consider adding a docstring to explain the service fields removal step?

The addition of service fields removal before normalization is a significant change in the data processing pipeline. Would you consider adding a brief docstring to explain why this step is necessary here? wdyt?

unit_tests/sources/declarative/extractors/test_dpath_extractor.py (1)

196-201: Consider extracting the service key validation into a helper function?

The service key validation logic is repeated across test cases. Would it make sense to extract this into a helper function to improve maintainability? wdyt?

+def validate_service_keys(records):
+    for record in records:
+        if record != {}:
+            assert_service_keys_exist(record)
+    return [exclude_service_keys(record) for record in records]
+
 def test_dpath_extractor(field_path: List, decoder: Decoder, body, expected_records: List):
     # ...
-    for record in actual_records:
-        if record != {}:
-            assert_service_keys_exist(record)
-
-    actual_records = [exclude_service_keys(record) for record in actual_records]
+    actual_records = validate_service_keys(actual_records)
unit_tests/sources/declarative/extractors/test_record_selector.py (1)

169-171: Consider consistent whitespace around service key operations?

The whitespace around service key operations differs between test methods. Would you like to make it consistent? For example, always having one blank line before and after? wdyt?

Also applies to: 244-245

unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py (1)

325-325: Nice addition of deduplication logic! 👍

The deduplication using dictionary comprehension with orjson.dumps as keys is elegant. Would you consider extracting this into a helper function since it's used twice in the test? Something like _dedupe_records(records), wdyt?

+def _dedupe_records(records):
+    return list({orjson.dumps(record): record for record in records}.values())

 # Usage:
-cumulative_records_state_deduped = list(
-    {orjson.dumps(record): record for record in cumulative_records_state}.values()
-)
+cumulative_records_state_deduped = _dedupe_records(cumulative_records_state)
docs/CONTRIBUTING.md (2)

97-115: Comprehensive test execution instructions! 🎯

The detailed breakdown of test commands with explanations is very helpful. Consider adding a note about expected test execution times for the different commands to help developers plan their workflow, wdyt?

🧰 Tools
🪛 LanguageTool

[style] ~101-~101: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ... unit tests. - poetry run pytest-fast to run the subset of PyTest tests, which a...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


[style] ~108-~108: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...ting issues. - poetry run ruff format to format your Python code. ### Run Code ...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


167-167: Minor: Missing comma in the sentence

-On the **Settings / Domains** page find the subdomain
+On the **Settings / Domains** page, find the subdomain
🧰 Tools
🪛 LanguageTool

[uncategorized] ~167-~167: Possible missing comma found.
Context: ...file. - On the Settings / Domains page find the subdomain of myshopify.com a...

(AI_HYDRA_LEO_MISSING_COMMA)

docs/RELEASES.md (1)

60-61: Clear instructions for testing pre-release versions

The instructions for testing pre-release versions are well-structured. Consider adding a note about the expected time for the publishing pipeline to complete, wdyt?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0e7802a and 4cdc87c.

⛔ Files ignored due to path filters (2)
  • package-lock.json is excluded by !**/package-lock.json
  • poetry.lock is excluded by !**/*.lock
📒 Files selected for processing (17)
  • airbyte_cdk/sources/declarative/decoders/json_decoder.py (1 hunks)
  • airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (2 hunks)
  • airbyte_cdk/sources/declarative/extractors/record_extractor.py (1 hunks)
  • airbyte_cdk/sources/declarative/extractors/record_selector.py (3 hunks)
  • airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py (1 hunks)
  • airbyte_cdk/sources/declarative/transformations/flatten_fields.py (2 hunks)
  • docs/CONTRIBUTING.md (5 hunks)
  • docs/RELEASES.md (2 hunks)
  • package.json (1 hunks)
  • unit_tests/sources/declarative/decoders/test_json_decoder.py (1 hunks)
  • unit_tests/sources/declarative/extractors/test_dpath_extractor.py (5 hunks)
  • unit_tests/sources/declarative/extractors/test_record_extractor.py (1 hunks)
  • unit_tests/sources/declarative/extractors/test_record_selector.py (5 hunks)
  • unit_tests/sources/declarative/partition_routers/test_parent_state_stream.py (1 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (2 hunks)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py (2 hunks)
  • unit_tests/sources/declarative/transformations/test_flatten_fields.py (2 hunks)
✅ Files skipped from review due to trivial changes (2)
  • package.json
  • unit_tests/sources/declarative/test_manifest_declarative_source.py
🧰 Additional context used
🪛 LanguageTool
docs/CONTRIBUTING.md

[style] ~101-~101: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ... unit tests. - poetry run pytest-fast to run the subset of PyTest tests, which a...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


[style] ~108-~108: Three successive sentences begin with the same word. Consider rewording the sentence or use a thesaurus to find a synonym.
Context: ...ting issues. - poetry run ruff format to format your Python code. ### Run Code ...

(ENGLISH_WORD_REPEAT_BEGINNING_RULE)


[uncategorized] ~167-~167: Possible missing comma found.
Context: ...file. - On the Settings / Domains page find the subdomain of myshopify.com a...

(AI_HYDRA_LEO_MISSING_COMMA)


[style] ~269-~269: Consider a more expressive alternative.
Context: ...or, you may need to regenerate them. To do that, you can run: ```bash poetry run ...

(DO_ACHIEVE)


[locale-violation] ~287-~287: In American English, “take a look” is more commonly used.
Context: ...Actions. ## Release Management Please have a look at the [Release Management](./RELEASES....

(HAVE_A_LOOK)

docs/RELEASES.md

[grammar] ~68-~68: Did you mean the noun “publishing”?
Context: ...ng Low-Code Python connectors Once the publish pipeline has completed, set the version...

(PREPOSITION_VERB)

🪛 Markdownlint (0.37.0)
docs/CONTRIBUTING.md

140-140: null
Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


239-239: null
Emphasis used instead of a heading

(MD036, no-emphasis-as-heading)


86-86: null
Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms (7)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-the-guardian-api' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (14)
airbyte_cdk/sources/declarative/decoders/json_decoder.py (1)

47-50: Nice simplification! 👍

The use of yield from makes the code more elegant and Pythonic. Great improvement!

airbyte_cdk/sources/declarative/requesters/paginators/strategies/cursor_pagination_strategy.py (1)

73-73: Good defensive programming! 👍

Adding a default empty dict to next() prevents StopIteration exceptions when the decoder yields no items. This aligns well with the JsonDecoder changes.

airbyte_cdk/sources/declarative/extractors/dpath_extractor.py (2)

12-20: Clean implementation of service field! 👍

Good use of the service key prefix convention for the root field. The implementation aligns well with the PR objectives.


88-90: Clear empty case handling! 👍

The explicit empty dict check and the explanatory comment make the contract compliance clear.

unit_tests/sources/declarative/decoders/test_json_decoder.py (2)

16-27: Test cases align well with the new behavior! 👍

The parameter rename to expected_json and updated test cases clearly reflect the decoder's behavior.


34-48: Great test coverage! 👍

The additional test cases and descriptive IDs improve the test suite's comprehensiveness and readability.

airbyte_cdk/sources/declarative/extractors/record_selector.py (2)

11-14: LGTM! Clean import organization.

The imports are well-organized and the new exclude_service_keys import aligns with the PR's objective of managing service fields.


162-167: LGTM! Clean implementation of service keys removal.

The _remove_service_keys method is simple, focused, and effectively uses the imported exclude_service_keys function. The implementation aligns well with the PR's objective of making service fields local to filtering and transformation.

unit_tests/sources/declarative/extractors/test_dpath_extractor.py (2)

18-21: LGTM! Well-organized imports.

The new imports for service key management functions are properly organized.


40-94: Great test coverage for edge cases!

The test cases thoroughly cover various scenarios including empty strings, objects, arrays, and nested structures. The parameterization is well-organized and the test data is clear.

unit_tests/sources/declarative/extractors/test_record_selector.py (2)

12-19: LGTM! Clean import organization.

The imports are well-organized and properly grouped.


50-79: Great test coverage for nested records!

The test cases thoroughly cover scenarios with nested records and properly validate the behavior with and without filters.

unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)

247-247: LGTM! Clear state management assertions.

The assertions properly verify that the state is updated to indicate sync completion after reading records.

Also applies to: 362-362

docs/CONTRIBUTING.md (1)

17-23: Great addition of Python version requirements!

The explicit Python version requirements and Fedora-specific installation instructions make it easier for contributors to get started.

@rpopov rpopov changed the title In each record to filter and transform, publish a local service field holding the original object the record is extracted from feat: In each record to filter and transform, publish a local service field holding the original object the record is extracted from Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant