-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add Composite Raw Decoder #179
feat: add Composite Raw Decoder #179
Conversation
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a couple of questions to improve my understanding
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's cool! I think this is a good improvement
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Warning Rate limit exceeded@artem1205 has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 19 minutes and 21 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe pull request introduces a new Changes
Sequence DiagramsequenceDiagram
participant Retriever
participant CompositeRawDecoder
participant Parser
participant Data
Retriever->>CompositeRawDecoder: Decode response
CompositeRawDecoder->>Parser: Parse data
alt Gzip Compression
Parser->>Parser: Decompress
end
Parser->>Data: Extract records
Data-->>Retriever: Return parsed records
Possibly related PRs
Suggested reviewers
Hey there! 👋 I noticed you've added some really cool new parsing capabilities. Quick question: Have you considered how these new parsers might interact with existing retriever configurations? Wdyt about adding some documentation or examples to help users understand the new Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (20)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)
10-10
: Nice import of CompositeRawDecoder.
This addition expands the module's offerings. Perhaps we might include a docstring in the composite_decoder.py file to explain its use cases for future readers, wdyt?airbyte_cdk/sources/declarative/decoders/composite_decoder.py (4)
18-19
: Interface design looks clear.
Introducing Parser as an ABC is a neat approach to unify custom parsers. Perhaps we could add a short docstring on the intended usage or common parameters, so new contributors can adopt the pattern seamlessly, wdyt?
55-58
: Potential optimization for error handling.
When JSON decoding fails, we log a warning and skip. This is good for resilience, but might it be helpful to have a configurable option to raise an exception if unexpected lines are encountered, wdyt?
76-83
: Potential multi-chunk reading concern.
Reading large CSVs in chunks is effective. Could we consider memory usage limits or advanced chunk sizing if extremely large files are processed, wdyt?
95-104
: CompositeRawDecoder usage clarity.
Implementation looks straightforward, but could we include an inline doc or example clarifying what happens if the underlying parser fails on a line (e.g., how are partial results handled), wdyt?unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py (2)
25-31
: Pragmatic approach for test data generation.
Generating 2 million lines is a hefty load. Should we consider adding an environment-based toggle or a smaller default for local testing, wdyt?
35-36
: Tip on skipping slow tests.
We decorate this with @pytest.mark.slow. Maybe we can also provide a quick way to skip slow tests locally unless explicitly enabled, wdyt?unit_tests/sources/declarative/decoders/test_composite_decoder.py (2)
55-67
: Gzip CSV decoding test coverage is good!
Testing with multiple encodings ensures robust coverage. Could we also add a test scenario for missing headers or partial CSV lines, wdyt?
93-106
: Gzip JSON lines test scenario is comprehensive.
We check repeated iteration logic and assert record counts. Might it be useful to test a scenario containing at least one invalid line within a gzipped bundle, wdyt?airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
1112-1114
: Consider clarifying the class docstring.
Would you consider adding a short docstring to explain the JsonLineParser's usage? This can help future maintainers easily identify its purpose. wdyt?
1116-1121
: Check consistency with other parsers.
You might consider adding docstrings or class-level comments like with JsonLineParser for uniformity. wdyt?
1536-1540
: Allowing flexible parser composition.
Might it be helpful to add a small example or usage note on how the CompositeRawDecoder is configured, especially with nested formats? This could reduce confusion for new developers. wdyt?airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
146-148
: CsvParserModel import.
Nice addition. Could we also provide a quick usage example in the docstring for create_csv_parser to guide developers? wdyt?
242-244
: JsonLineParserModel import.
Everything looks aligned. Possibly add a note or reference in docstrings to ensure clarity for new maintainers. wdyt?
461-461
: Adding CompositeRawDecoder constructor mapping.
This is a straightforward approach. Perhaps confirm we log or handle the scenario if a user tries to combine multiple decoders? wdyt?
1690-1695
: Clean approach for JsonLineParser creation.
Consider adding input validation for cases where encoding is None to avoid runtime errors if a user sets it unexpectedly. wdyt?
1720-1725
: CompositeRawDecoder creation method.
Perhaps log the selected parser type at runtime to aid debugging. wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2726-2728
: Experimental annotation in CompositeRawDecoder.
Would you consider a short note specifying potential changes to the interface, to caution users about relying on it immediately? wdyt?
2756-2761
: JsonLineParser YAML entry.
Everything here is straightforward. Maybe add a quick usage snippet explaining how the encoding field is applied. wdyt?
2762-2774
: CsvParser defaults.
Would you consider adding an example for unusual delimiters like "\t" or ";" to the documentation to make it more explicit? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (8)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(1 hunks)airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)unit_tests/sources/declarative/decoders/test_composite_decoder.py
(1 hunks)unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
(1 hunks)unit_tests/sources/declarative/decoders/test_json_decoder.py
(0 hunks)
💤 Files with no reviewable changes (1)
- unit_tests/sources/declarative/decoders/test_json_decoder.py
🔇 Additional comments (9)
airbyte_cdk/sources/declarative/decoders/__init__.py (1)
12-12
: Exporting CompositeRawDecoder via all is a good step.
This ensures that the decoder is publicly accessible when importing from the package. No issues found here!
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)
33-33
: Question about layering multiple GzipParsers.
Even though GzipParser can wrap another parser, might we want to add a quick guard that ensures we’re not repeatedly decompressing if the file is already decompressed, wdyt?
unit_tests/sources/declarative/decoders/test_composite_decoder.py (1)
108-119
: Nice variety of encodings for JSON lines.
Everything looks smooth here. No further suggestions.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
1501-1505
: Fallback strategy for inner_parser.
Could it be valuable to define a fallback parser if none is specified? This might guard against unexpected null references if inner_parser is optional in future. wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
70-75
: Imports appear correct.
Everything looks consistent with the newly introduced classes from composite_decoder. Great job!
1716-1719
: CSV parser constructor.
This is good. Could setting a default delimiter to "," be validated against potential multi-character delimiters or non-standard uses? wdyt?
134-136
: Ensure alignment with newly added model.
Would you consider verifying that CompositeRawDecoderModel is properly referenced in all relevant factory methods? wdyt?
✅ Verification successful
Let me gather information about the usage of CompositeRawDecoderModel in the codebase.
This will help us understand:
- Where CompositeRawDecoderModel is used in factory methods
- All references to CompositeRawDecoderModel in the codebase
- All factory-related files that might need to reference this model
Let me check the test file to ensure proper test coverage for the CompositeRawDecoderModel.
Let me try one more search to look for any composite decoder related tests with a broader pattern.
CompositeRawDecoderModel is properly integrated in the factory
Based on the codebase analysis, the CompositeRawDecoderModel is correctly referenced in all necessary places within the model_to_component_factory.py:
- It's properly imported from the schema
- Registered in the factory mapping to its creation method
- Has a dedicated factory method
create_composite_raw_decoder
with correct type hints
However, I notice there might be missing test coverage. Would you consider adding tests for the composite raw decoder factory method to ensure its reliability? wdyt?
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for CompositeRawDecoderModel usage in factory methods
ast-grep --pattern 'def $_(self, $_, CompositeRawDecoderModel, $_) { $$$ }'
# Search for any references to CompositeRawDecoderModel
rg "CompositeRawDecoderModel" -A 2
# Look for factory-related files that might need to reference this model
fd "factory" --type f
Length of output: 1464
Script:
#!/bin/bash
# Check test coverage for CompositeRawDecoderModel
rg "CompositeRawDecoderModel" unit_tests/sources/declarative/parsers/test_model_to_component_factory.py -A 3
# Look for test cases related to composite raw decoder
ast-grep --pattern 'def test_$_composite_raw_decoder$_($$$) { $$$ }' unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
Length of output: 253
Script:
#!/bin/bash
# Search for any composite decoder related tests with a broader pattern
rg "composite.*decoder" -i unit_tests/sources/declarative/parsers/test_model_to_component_factory.py
# Also check the actual implementation details of the factory method
rg "create_composite_raw_decoder" -A 5 airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Length of output: 920
215-217
: Confirm references to GzipParserModel.
Just to be sure: do we need specialized error handling or fallback for mismatched references to GzipParserModel? wdyt?
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)
2722-2722
: Reference to CompositeRawDecoder.
Looks good for including CompositeRawDecoder in the SimpleRetriever’s decoder list. Have you verified any potential conflicts with existing decoders? wdyt?
✅ Verification successful
Let me verify the potential conflicts with existing decoders by searching through the codebase.
Let me search for more specific information about the CompositeRawDecoder and its parsers.
CompositeRawDecoder looks compatible with existing decoders
Based on the schema definitions, the CompositeRawDecoder is designed to work with a specific set of parsers (GzipParser, JsonLineParser, CsvParser) in a composable way. There are no conflicts visible as:
- Each parser has well-defined, non-overlapping responsibilities
- GzipParser is explicitly designed to work with inner parsers (JsonLineParser or CsvParser)
- The schema enforces these relationships through proper type definitions
🏁 Scripts executed
The following scripts were executed for the analysis:
Error: No shell scripts found
Script:
#!/bin/bash
# Search for decoder definitions in the schema file
rg -A 5 "CompositeRawDecoder|RawDecoder" airbyte_cdk/sources/declarative/declarative_component_schema.yaml
# Search for any decoder implementations or references
ast-grep --pattern 'class $_ implements RawDecoder'
ast-grep --pattern 'class $_ extends RawDecoder'
ast-grep --pattern 'class CompositeRawDecoder'
# Look for any usage patterns of decoders
rg -A 3 "decoder.*:" airbyte_cdk/sources/declarative/
Length of output: 17586
Script:
#!/bin/bash
# Search for parser definitions that might be used with CompositeRawDecoder
rg -A 10 "GzipParser|JsonLineParser|CsvParser" airbyte_cdk/sources/declarative/declarative_component_schema.yaml
# Look for any implementations or usages of these parsers
ast-grep --pattern 'class $_(GzipParser|JsonLineParser|CsvParser)'
rg "type.*: .*Parser" airbyte_cdk/sources/declarative/
Length of output: 1372
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
1690-1695
: Consider handling None encoding?Would you like to ensure there's a safe default or check when model.encoding is None, so we avoid unexpected decoding errors? wdyt?
1718-1721
: Handle defaults for CSV parser options?Would you like to set a fallback if model.delimiter or model.encoding is None, or raise a clearer error? wdyt?
1722-1727
: Validate parser configuration?Would you be open to adding a quick check if model.parser is None or invalid? This may prevent unexpected runtime errors. wdyt?
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)
18-29
: Naming consideration for Parser?Would you consider something more descriptive like "DataParser"? Or are you comfortable with the generic name? wdyt?
47-61
: Graceful handling of malformed JSON lines.You currently skip lines that aren’t valid JSON. If you anticipate a high failure rate, would it be helpful to optionally stop or store such lines? wdyt?
63-83
: About the TODO note.There’s a TODO: "migrate implementation to re-use file-base classes." Would you like help creating a GitHub issue or add the changes in this PR? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)
🔇 Additional comments (12)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (9)
70-75
: Looks good!
The imports from "composite_decoder" appear consistent with the rest of the codebase, nicely grouped, and concise.
134-136
: No issues spotted.
Adding "CompositeRawDecoderModel" to the imports for usage in the mappings is straightforward.
146-148
: All set here.
The "CsvParserModel" import nicely aligns with the rest of the decoders.
215-217
: Imports look appropriate.
"GzipParserModel" is properly added for new parser creation.
242-244
: No concerns here.
"JsonLineParserModel" import is consistent with the design.
461-461
: Clean addition.
The "CompositeRawDecoderModel" mapped to "self.create_composite_raw_decoder" in "PYDANTIC_MODEL_TO_CONSTRUCTOR" ensures correct wiring.
492-492
: Well-integrated.
"JsonLineParserModel" is mapped via the new create method.
494-494
: All good.
"GzipParserModel" is properly linked to the factory method.
1712-1717
: Raise an error for missing inner_parser?
Would it help to log or raise an error if the inner_parser is missing or invalid, so the user knows immediately? wdyt?
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (3)
1-12
: Imports appear concise and purposeful.
The dependencies (gzip, json, logging, pandas, requests, etc.) match the intended functionality here.
31-44
: Potential memory usage check for GzipParser?
Would you like to confirm that the GzipFile approach handles large files gracefully, or possibly add chunk-based reading if needed? wdyt?
85-104
: CompositeRawDecoder implementation appears solid.
The streaming approach to parse data from response.raw looks good. No major issues spotted.
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (5)
18-29
: How about enhancing the docstring for better clarity? wdyt?The docstring could provide more details about:
- Expected format/structure of the yielded dictionaries
- Any specific error handling expectations
- Whether the parser should close the input stream
Example:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ - Parse data and yield dictionaries. + Parse data from a buffered stream and yield dictionaries. + + Args: + data: A buffered stream containing the data to parse + + Returns: + A generator yielding parsed records as dictionaries + + Raises: + ValueError: If the data cannot be parsed according to the expected format """ pass
35-43
: Should we add error handling for corrupted gzip data? wdyt?The current implementation might raise uncaught exceptions for corrupted gzip data. Consider wrapping the gzip operations in a try-except block:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - yield from self.inner_parser.parse(gzipobj) + try: + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + except gzip.BadGzipFile as e: + raise ValueError(f"Failed to decompress gzip data: {e}")
50-59
: How about enhancing the error logging with more context? wdyt?The current warning message could be more helpful by including line numbers and truncated content:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - for line in data: + for line_num, line in enumerate(data, 1): try: yield json.loads(line.decode(encoding=self.encoding or "utf-8")) except json.JSONDecodeError: - logger.warning(f"Cannot decode/parse line {line!r} as JSON") + decoded_line = line.decode(encoding=self.encoding or "utf-8") + preview = decoded_line[:100] + "..." if len(decoded_line) > 100 else decoded_line + logger.warning( + f"Cannot parse line {line_num} as JSON. Preview: {preview!r}" + )
67-80
: Consider using csv.DictReader instead of pandas for simpler CSV parsing? wdyt?Using pandas for basic CSV parsing might be overkill, especially since we're only using basic features. The standard library's
csv.DictReader
could be more lightweight:def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - reader = pd.read_csv( # type: ignore - data, sep=self.delimiter, iterator=True, dtype=object, encoding=self.encoding - ) - for chunk in reader: - chunk = chunk.replace({nan: None}).to_dict(orient="records") - for row in chunk: - yield row + text_data = data.read().decode(encoding=self.encoding or "utf-8") + csv_reader = csv.DictReader( + text_data.splitlines(), + delimiter=self.delimiter + ) + yield from csv_readerThis would:
- Remove the pandas dependency
- Simplify the code
- Handle the None/NaN conversion automatically
83-102
: Should we improve type safety and documentation for response.raw handling? wdyt?The
# type: ignore[arg-type]
suggests a type mismatch betweenresponse.raw
andBufferedIOBase
. Consider:
- Adding type hints to clarify the expected type:
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: + # response.raw is a urllib3.response.HTTPResponse which is compatible with BufferedIOBase yield from self.parser.parse(data=response.raw) # type: ignore[arg-type]
- Enhancing the class docstring with details about response.raw handling:
""" Decoder strategy to transform a requests.Response into a Generator[MutableMapping[str, Any], None, None] passed response.raw to parser(s). Note: response.raw is not decoded/decompressed by default. parsers should be instantiated recursively. + + Important: + - The response.raw attribute is a raw HTTP response from urllib3 + - It must be read in binary mode to properly handle compression + - The stream is automatically closed when fully consumed + Example: composite_decoder = CompositeDecoder(parser=GzipParser(inner_parser=JsonLineParser(encoding="iso-8859-1"))) """
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py
(1 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/decoders/composite_decoder.py (1)
1-16
: LGTM! Clean and well-organized imports.
The imports are logically grouped and all seem necessary for the implementation.
…te-decoder-with-parsers # Conflicts: # airbyte_cdk/sources/declarative/decoders/__init__.py
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2)
2773-2788
: Would you consider enhancing the documentation for CompositeRawDecoder?The experimental component could benefit from:
- A more detailed description explaining its purpose and use cases
- Example configurations showing how to chain parsers
- Clear warnings about potential breaking changes since it's experimental
wdyt?
2809-2821
: Would you consider adding more CSV configuration options?The CsvParser could benefit from additional configuration options commonly needed when parsing CSV files:
quote_char
: Character used for quoting fieldsescape_char
: Character used for escaping special charactershas_header
: Boolean indicating if the first row is a headerskip_rows
: Number of rows to skip from the beginningThese options would make the parser more flexible for various CSV formats. wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (4)
1128-1130
: Consider adding encoding validation.Would it help to validate the encoding parameter against Python's standard encodings to prevent runtime errors? For example, using
codecs.lookup()
to verify the encoding exists. wdyt?from codecs import lookup class JsonLineParser(BaseModel): encoding: Optional[str] = "utf-8" @validator("encoding") def validate_encoding(cls, v): try: if v is not None: lookup(v) return v except LookupError: raise ValueError(f"Unknown encoding: {v}")
1132-1135
: Consider adding parameter validation.Would it be helpful to add validation for both parameters? For example:
- Validate encoding like in JsonLineParser
- Ensure delimiter is exactly one character long
class CsvParser(BaseModel): type: Literal["CsvParser"] encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," @validator("encoding") def validate_encoding(cls, v): try: if v is not None: lookup(v) return v except LookupError: raise ValueError(f"Unknown encoding: {v}") @validator("delimiter") def validate_delimiter(cls, v): if v is not None and len(v) != 1: raise ValueError("Delimiter must be exactly one character") return v
1517-1520
: Consider validating inner_parser.Would it be helpful to ensure inner_parser is always provided since it's required for the composite pattern to work? We could add a validator to raise a descriptive error if it's missing. wdyt?
class GzipParser(BaseModel): type: Literal["GzipParser"] inner_parser: Union[JsonLineParser, CsvParser] @validator("inner_parser") def validate_inner_parser(cls, v): if v is None: raise ValueError("inner_parser is required for GzipParser") return v
1552-1555
: Consider validating parser.Similar to GzipParser, would it be helpful to validate that parser is always provided? We could add a validator to raise a descriptive error if it's missing. wdyt?
class CompositeRawDecoder(BaseModel): type: Literal["CompositeRawDecoder"] parser: Union[GzipParser, JsonLineParser, CsvParser] @validator("parser") def validate_parser(cls, v): if v is None: raise ValueError("parser is required for CompositeRawDecoder") return vairbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
1725-1729
: Consider adding error handling.Would it be helpful to add try-catch block to handle potential encoding errors early? This could provide better error messages during parser creation rather than at runtime. wdyt?
@staticmethod def create_jsonline_parser( model: JsonLineParserModel, config: Config, **kwargs: Any ) -> JsonLineParser: try: if model.encoding: codecs.lookup(model.encoding) return JsonLineParser(encoding=model.encoding) except LookupError: raise ValueError(f"Invalid encoding '{model.encoding}' specified for JsonLineParser")
1747-1751
: Consider adding error handling for inner parser creation.Would it be helpful to add try-catch block to handle potential errors during inner parser creation? This could provide better error messages about which part of the composite failed. wdyt?
def create_gzip_parser( self, model: GzipParserModel, config: Config, **kwargs: Any ) -> GzipParser: try: inner_parser = self._create_component_from_model(model=model.inner_parser, config=config) return GzipParser(inner_parser=inner_parser) except Exception as e: raise ValueError(f"Failed to create inner parser for GzipParser: {str(e)}")
1754-1755
: Consider adding error handling.Would it be helpful to add validation for both encoding and delimiter during parser creation? This could catch configuration issues early. wdyt?
@staticmethod def create_csv_parser(model: CsvParserModel, config: Config, **kwargs: Any) -> CsvParser: try: if model.encoding: codecs.lookup(model.encoding) if model.delimiter and len(model.delimiter) != 1: raise ValueError("Delimiter must be exactly one character") return CsvParser(encoding=model.encoding, delimiter=model.delimiter) except LookupError: raise ValueError(f"Invalid encoding '{model.encoding}' specified for CsvParser")
1757-1761
: Consider adding error handling for parser creation.Would it be helpful to add try-catch block to handle potential errors during parser creation? This could provide better error messages about which parser failed to be created. wdyt?
def create_composite_raw_decoder( self, model: CompositeRawDecoderModel, config: Config, **kwargs: Any ) -> CompositeRawDecoder: try: parser = self._create_component_from_model(model=model.parser, config=config) return CompositeRawDecoder(parser=parser) except Exception as e: raise ValueError(f"Failed to create parser for CompositeRawDecoder: {str(e)}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(4 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/decoders/init.py
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2789-2802
: The GzipParser implementation looks good, but could benefit from explicit error handling.
Looking at the implementation and tests, I notice that while it handles normal cases well (including different encodings), it might be worth adding explicit error handling for:
- Empty or zero-sized inputs
- Corrupted gzip data
- Memory constraints with large files
Would you consider adding these error cases to the schema? wdyt?
2803-2808
:
Should we add the required type field to JsonLineParser?
I notice that JsonLineParser is missing the required type
field in its schema, unlike the other parsers. This could lead to inconsistency in validation. Would you consider adding it? wdyt?
JsonLineParser:
type: object
+ required:
+ - type
properties:
+ type:
+ type: string
+ enum: [ JsonLineParser ]
encoding:
type: string
default: utf-8
Likely invalid or redundant comment.
2769-2772
: Verify the impact on existing decoder implementations.
The addition of CompositeRawDecoder
to SimpleRetriever's decoder options looks good. However, let's verify that this doesn't affect existing decoder implementations. I'll help check for any potential conflicts:
✅ Verification successful
The addition of CompositeRawDecoder looks safe and well-integrated
Based on the codebase analysis, the CompositeRawDecoder is properly integrated into the existing decoder hierarchy and doesn't conflict with other decoders. Here's what I found:
- It's properly defined in the schema alongside other decoders like JsonDecoder, XmlDecoder, etc.
- The implementation follows the same pattern as other decoders
- The factory code (ModelToComponentFactory) has proper handling for CompositeRawDecoder
- No existing decoders are modified or impacted by this addition
The decoder is properly isolated and follows the existing patterns, making it a safe addition to the SimpleRetriever's decoder options.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for existing decoder implementations that might be affected
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):" --type python
# Search for decoder usage in existing manifests
echo "Searching for decoder usage in manifests..."
rg -l "decoder:" --type yaml
Length of output: 361
Script:
#!/bin/bash
# Let's try searching without file type restrictions since python extension wasn't recognized
echo "Searching for decoder implementations..."
rg -l "class.*Decoder.*\(.*\):"
# Let's also look for specific decoder usage patterns
echo "Searching for decoder references in code..."
rg "Decoder" -A 3
# Check for any existing composite decoder patterns
echo "Searching for composite decoder patterns..."
rg "CompositeRawDecoder|composite.*decoder" -i
Length of output: 77271
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
476-476
: LGTM!
The mapping of CompositeRawDecoderModel to its factory method is correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a few comments and small change requests. overall good though
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (12)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (3)
42-43
: Consider adding a try-except block for corrupted gzip data.Would you consider wrapping the GzipFile reading in a try-except to gracefully handle corrupted or empty gzip data? It might help avoid unexpected crashes, wdyt?
56-58
: Clarify handling of unparseable JSON lines.Currently, the code only logs a warning and ignores unparseable lines. If data integrity is critical, would you find it useful to stash these lines or raise an exception for further inspection, wdyt?
74-81
: Validate CSV chunk size for large data sets.You’re already using
iterator=True
, which is good for memory. Would you consider explicitly setting a chunk size to manage memory usage during extreme data loads, wdyt?unit_tests/sources/declarative/decoders/test_composite_decoder.py (2)
55-55
: Expand tests for corner cases.You're testing multiple encodings, which is great! Would it be valuable to add tests for malformed CSV or invalid JSON lines to verify graceful error handling, wdyt?
70-74
: Add negative validation test case.This function yields typical JSON lines. To strengthen confidence, do you want to add a test with a broken JSON line to ensure the parser’s warning or skip logic is tested, wdyt?
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (2)
1128-1133
: Possible note on usage context for JsonLineParser.Would you consider documenting possible edge cases (e.g., blank lines) in the schema doc to help users handle them more explicitly, wdyt?
1555-1558
: Clarify user awareness that CompositeRawDecoder is experimental.It's labeled experimental, which is helpful. Do you want to highlight potential future interface changes (e.g., disclaimers) to reduce confusion for end-users, wdyt?
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
1725-1730
: Check if skipping malformed lines is desired.In
create_json_line_parser
, lines are merely logged and skipped if malformed. Would you consider exposing a configuration flag to raise or skip these lines to give the user more control, wdyt?
1757-1762
: Prompt logging for CompositeRawDecoder instantiation.When the
CompositeRawDecoder
is initialized, might it help to log or debug the parser type being used, to aid troubleshooting, wdyt?airbyte_cdk/sources/declarative/declarative_component_schema.yaml (3)
2769-2775
: Add usage example for CompositeRawDecoder.Extending the schema doc with a small example snippet might help integrators see how to configure CompositeRawDecoder in conjunction with GzipParser or CsvParser, wdyt?
2810-2813
: Reiterate optional handling for invalid encodings in JsonLineParser.Defaulting to
utf-8
is sensible. Would you consider documenting or enumerating fallback strategies if invalid encoding data is encountered, wdyt?
2814-2826
: Potential custom quotechar setting for CsvParser.Currently, a delimiter can be specified. Might it be helpful to allow a
quotechar
param as well for advanced CSVs, wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
.github/workflows/pypi_publish.yml
(1 hunks)airbyte_cdk/sources/declarative/declarative_component_schema.yaml
(1 hunks)airbyte_cdk/sources/declarative/decoders/__init__.py
(2 hunks)airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
(1 hunks)airbyte_cdk/sources/declarative/models/declarative_component_schema.py
(5 hunks)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
(9 hunks)unit_tests/sources/declarative/decoders/test_composite_decoder.py
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- .github/workflows/pypi_publish.yml
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte_cdk/sources/declarative/decoders/init.py
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
1747-1752
: Improve error handling for GzipParser creation.
Would it be worthwhile to validate inner_parser
before usage, perhaps by checking if it’s not None or is a valid parser type, wdyt?
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (6)
17-28
: How about enhancing the docstring for better clarity? wdyt?The abstract base class looks good, but we could make the contract more explicit in the docstring. Maybe something like:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: """ - Parse data and yield dictionaries. + Parse data from a buffered stream and yield dictionaries. + + Args: + data: A buffered stream containing the data to parse + + Returns: + A generator yielding dictionaries representing parsed records + + Raises: + Should document expected exceptions that implementations might raise """ pass
30-43
: Should we add some error handling and resource management here? wdyt?The implementation looks good, but we might want to:
- Handle potential gzip-related errors (corrupted data, incomplete stream, etc.)
- Use a context manager to ensure proper resource cleanup
Here's a suggestion:
def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - yield from self.inner_parser.parse(gzipobj) + try: + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj) + except (OSError, EOFError) as e: + logger.error(f"Failed to decompress gzip data: {e}") + raise
45-58
: How about adding error tracking and memory optimization? wdyt?The implementation looks good, but we could:
- Track the number of failed lines for monitoring/alerting
- Optimize memory usage for large lines
Here's a suggestion:
@dataclass class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" + _failed_lines: int = 0 def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: for line in data: try: - yield json.loads(line.decode(encoding=self.encoding or "utf-8")) + # Process line in chunks to avoid loading entire line into memory + decoded_line = line.decode(encoding=self.encoding or "utf-8") + yield json.loads(decoded_line) except json.JSONDecodeError: + self._failed_lines += 1 logger.warning(f"Cannot decode/parse line {line!r} as JSON") + if self._failed_lines > 100: # arbitrary threshold + logger.error(f"High number of JSON decode failures: {self._failed_lines}")
62-62
: Should we track this TODO in an issue? 🤔The TODO comment about migrating implementation to re-use file-base classes should be tracked.
Would you like me to create a GitHub issue to track this TODO?
60-76
: How about adding more CSV configuration options? wdyt?The implementation looks good, but we could make it more flexible by:
- Adding support for more csv.DictReader options (quoting, escapechar, etc.)
- Adding validation for the delimiter
Here's a suggestion:
@dataclass class CsvParser(Parser): encoding: Optional[str] = "utf-8" delimiter: Optional[str] = "," + quoting: int = csv.QUOTE_MINIMAL + escapechar: Optional[str] = None + quotechar: str = '"' def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: + if not self.delimiter: + raise ValueError("Delimiter cannot be empty") text_data = TextIOWrapper(data, encoding=self.encoding) - reader = csv.DictReader(text_data, delimiter=self.delimiter) + reader = csv.DictReader( + text_data, + delimiter=self.delimiter, + quoting=self.quoting, + escapechar=self.escapechar, + quotechar=self.quotechar, + ) yield from reader
78-97
: Should we add some validation and fix the type issue? wdyt?The implementation looks good, but we could:
- Validate that the response.raw is not None
- Fix the type issue instead of using type: ignore
Here's a suggestion:
def decode( self, response: requests.Response ) -> Generator[MutableMapping[str, Any], None, None]: - yield from self.parser.parse(data=response.raw) # type: ignore[arg-type] + if not response.raw: + raise ValueError("Response raw stream is not available") + # Convert to BufferedIOBase to satisfy type checker + raw_stream = response.raw if isinstance(response.raw, BufferedIOBase) else BufferedIOBase(response.raw) + yield from self.parser.parse(data=raw_stream)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
(1 hunks)
🔇 Additional comments (1)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (1)
1-14
: LGTM! Clean import organization.
The imports are well-structured following the common pattern (stdlib → third-party → local), and the logger is properly configured.
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (3)
30-43
: Consider using a context manager when handling GzipFile.
Currently,gzip.GzipFile
is instantiated outside awith
block, and the file remains open until the generator is exhausted. Would you like to use a context manager so that the file automatically closes on completion? wdyt?def parse( self, data: BufferedIOBase, ) -> Generator[MutableMapping[str, Any], None, None]: - gzipobj = gzip.GzipFile(fileobj=data, mode="rb") - yield from self.inner_parser.parse(gzipobj) + with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: + yield from self.inner_parser.parse(gzipobj)
45-58
: Optional: Log error details in JSON decoding failures.
When a line fails to decode, you log only a warning message without the JSON exception details. Would adding the exception reason help with debugging? wdyt?except json.JSONDecodeError as e: - logger.warning(f"Cannot decode/parse line {line!r} as JSON") + logger.warning(f"Cannot decode/parse line {line!r} as JSON, error: {e}")
60-76
: CSV parser looks correct; consider addressing the TODO.
The CSV parsing implementation is straightforward, usingcsv.DictReader
within aTextIOWrapper
for consistent decoding. The TODO note suggests migrating to re-use file-based classes. Would you like assistance with that or to open a GitHub issue? wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py
(1 hunks)
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py (4)
1-13
: Imports look organized and relevant.
Everything looks good in terms of readability and completeness. Import statements cover standard libraries (csv
, gzip
, etc.) as well as external dependencies like requests
and references to the airbyte_cdk
. The usage of built-in modules is appropriate. No issues spotted here.
14-16
: Logger configuration is straightforward.
Using logging.getLogger("airbyte")
helps maintain consistency with other Airbyte code. No changes needed.
17-28
: Good use of an abstract base class for parsing.
Defining a common interface with Parser(ABC)
cleanly enforces a uniform parse
method signature across various parsers. This design creates a flexible foundation for new formats. Looks great to me.
78-97
: All core decoder methods look coherent.
Returning True
from is_stream_response
matches the streaming approach. The decode
method simply hands off to the parser
, which keeps the design modular. This looks well-structured and gives flexibility for any future parser changes.
Signed-off-by: Artem Inzhyyants <[email protected]>
Signed-off-by: Artem Inzhyyants <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for making the changes! 🚢
What
Resolving Source Amazon Seller Partner CDK migration
add composite decoder to apply
decompressors
|decoders
consequentlyNote
memory test usage moved to
unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py
.JsonlDecoder
(as well as others streamable decoders) should be deleted/replaced withCompositeRawDecoder
+parser
Summary by CodeRabbit
CompositeRawDecoder
,GzipParser
,JsonLineParser
, andCsvParser
for enhanced data decoding capabilities.SimpleRetriever
to support the new decoding options.CompositeRawDecoder
and its associated parsers, validating decoding functionalities.