-
Notifications
You must be signed in to change notification settings - Fork 1
refactor: reworked aws_vpc_flow_log table as a custom table allowing for formats to be used #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -32,26 +40,21 @@ func (c *VPCFlowLogExtractor) Extract(_ context.Context, a any) ([]any, error) { | |||
return nil, fmt.Errorf("expected byte[], got %T", a) | |||
} | |||
|
|||
mappedData, err := ConvertToMapSlice(string(jsonBytes)) | |||
mappedData, err := ConvertToMapSlice(c, string(jsonBytes)) | |||
if err != nil { | |||
return nil, fmt.Errorf("error in mapping the results in the form of []map[string]string") | |||
} | |||
|
|||
var res = make([]any, len(mappedData)) | |||
for i, record := range mappedData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this iteration required, previously the mappedData was iterated as a mapping was done and errors may have been reported out, I think we can just return mappedData here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are not using any mapper here, the mapped data should be of type (*types.DynamicRow
) I have made the changes to return the []*types.DynamicRow{}
. Otherwisewe will get the error like required *types.DynamicRow got map[string]string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had removed the need for this mapper, I can see you've added it back purely for S3 source, any particular reason we can't just use the default approach of the regex mapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad — I missed removing it in the initial commit. We no longer need this mapper since the Extractor
now handles everything. I haven’t tried using the Gonx or Regex mapper in this case. My main goal was to preserve the previous table functionality, so we used an Extractor
, which parses the entire object content at once and returns the rows directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI What's the effect on removing vs. keeping this mapper for current users of the table who pull flow logs from an S3 bucket today for default and custom logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cbruno10, in general, the mapper is designed to work on a per-row basis, while the extractor is intended for handling bulk or batch row data. In our earlier implementation, we used an extractor to download the full object content and map the column values based on the header line (i.e., the first line of the log data). To keep the behavior consistent with that approach, I’ve continued using an extractor here.
Additionally, we don't require a custom mapper in this case because the default SDK-provided mapper for custom tables already handles the mapping logic effectively.
Here are a few considerations when comparing extractor vs. mapper for this use case:
-
With a mapper, the user must explicitly provide a
format
layout. However, with an extractor, we have the flexibility to infer the header (i.e., the format layout) from the first line of the log file—whether or not the user specifies a format block. -
In some cases, such as when logs are exported from CloudWatch to an S3 bucket, the log files don’t include a header line. The current extractor-based design handles this scenario well: users can still specify a
format
block to define the layout manually. The default mapper would work the same way in this case if it were used, but the extractor approach makes the solution more adaptive and user-friendly.
const VpcFlowLogTableSkippedData = "SKIPDATA" | ||
const VpcFlowLogTableNoData = "NODATA" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are valid log entries, I don't think we should forcefully remove them in code but rather make a suggestion that users can filter them out using a filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point, @cbruno10 - could we consider keeping the SKIPDATA
/NODATA
values as-is instead of skipping them? That said, this behavior would then apply to all columns. We still need to retain the check for tp_timestamp
and tp_data
, since those are required columns in the Tailpipe DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think I'd keep them, as users can filter out these values when collecting or in their queries like @graza-io mentioned. May be worth adding an example in the Example Configurations section showing how to avoid collecting these with filter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
|
||
row.OutputColumns[constants.TpTable] = VpcFlowLogTableIdentifier | ||
|
||
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData || startTime != VpcFlowLogTableNoData || startTime != VpcFlowLogTableNilValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition uses logical OR (||) when checking against invalid values, which will always evaluate to true. Replace the OR operators with AND (&&) to correctly validate that startTime is not equal to any of the invalid constants.
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData || startTime != VpcFlowLogTableNoData || startTime != VpcFlowLogTableNilValue) { | |
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData && startTime != VpcFlowLogTableNoData && startTime != VpcFlowLogTableNilValue) { |
Copilot uses AI. Check for mistakes.
func (m *VPCFlowLogMapper) Identifier() string { | ||
return "vpc_flow_log_mapper" | ||
func (c *VPCFlowLogMapper) Identifier() string { | ||
return "aws_waf_traffic_log_mapper" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapper Identifier returns 'aws_waf_traffic_log_mapper', which appears inconsistent with the VPC Flow Log context. It should likely return an identifier that reflects VPC Flow Log mapping, for example 'vpc_flow_log_mapper'.
return "aws_waf_traffic_log_mapper" | |
return "vpc_flow_log_mapper" |
Copilot uses AI. Check for mistakes.
func (c *VpcFlowLogTable) GetTableDefinition() *schema.TableSchema { | ||
return &schema.TableSchema{ | ||
Name: VpcFlowLogTableIdentifier, | ||
Columns: []*schema.ColumnSchema{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI Are there any changes from the previous version of the table, or are they the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are the same, @cbruno10- we've just split the VpcFlowLog struct into a custom schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
aws/plugin.go
Outdated
// register formats | ||
table.RegisterFormatPresets(vpc_flow_log.VPCFlowLogTableFormatPresets...) | ||
table.RegisterFormat[*vpc_flow_log.VPCFlowLogTableFormat]() | ||
row_source.RegisterRowSource[*cloudwatch_log_group.AwsCloudWatchLogGroupSource]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AwsCloudWatchLogGroupSource row source is registered twice, which may lead to duplicate registrations. Please remove one of the duplicate calls to avoid potential conflicts.
row_source.RegisterRowSource[*cloudwatch_log_group.AwsCloudWatchLogGroupSource]() |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI Please see comments, thanks!
@@ -7,7 +7,18 @@ description: "AWS VPC flow logs capture information about IP traffic going to an | |||
|
|||
The `aws_vpc_flow_log` table allows you to query data from [AWS VPC Flow Logs](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html). This table provides detailed insights into network traffic within your VPC, including source and destination IP addresses, ports, protocols, and more. | |||
|
|||
**Note**: For timestamp information, the `start` field will be used first, with the `end` field as a fallback. If neither field is available, then that log line will not be collected and Tailpipe will return an error. | |||
**Note**: | |||
- Using `aws_s3_bucket` source: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of these notes could be explained through examples in the Example Configurations section and aren't required here, it's hard to understand without seeing the configs anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the notes that are specific to example configuration.
paths = ["/Users/myuser/vpc_flow_logs"] | ||
file_layout = `%{DATA}.json.gz` | ||
partition "aws_vpc_flow_log" "my_logs_instance" { | ||
filter = "action <> 'REJECT'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use <>
or !=
more commonly (I think the latter)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, !=
is the more commonly used operator. I've updated the query to use !=
instead.
} | ||
``` | ||
|
||
### Collect logs from a CloudWatch log group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this example up and review the order of other examples, we should make examples flow simple -> complex/edge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reordered the example configurations from simple to complex/edge cases.
@cbruno10, would it make sense to reorder the example configurations from simple to complex/edge cases and group them by source?
} | ||
``` | ||
|
||
### Collect VPC flow logs from a CloudWatch log group with custom log format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
### Collect VPC flow logs from a CloudWatch log group with custom log format | |
### Collect logs from a CloudWatch log group with custom log format |
I don't think we need to repeat VPC flow logs
each time, we're already in the VPC flow log doc, so just logs
is OK I think (I believe this is what we do in other docs too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
} | ||
``` | ||
|
||
### Collect VPC flow logs from a CloudWatch log group for all ENIs with wildcard stream names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This title feels long, is there a shorter/more general title we can use to summarize what this example highlights?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the title to make it shorter.
@@ -77,7 +77,7 @@ func (s *AwsCloudWatchLogGroupSource) Identifier() string { | |||
|
|||
func matchesAnyPattern(target string, patterns []string) bool { | |||
for _, pattern := range patterns { | |||
match, err := filepath.Match(pattern, target) | |||
match, err := path.Match(pattern, target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we decided to not escape chars, can you please add 1-2 examples in the CW log source table doc showing how escaping chars works? This can help us determine if we like that design as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added example in CW source doc.
68da62e
to
cdcfed6
Compare
…for formats to be used
…he doc with example config to exclude the NODATA and SKIPDATA for a column
…licable for log-status field
…g the file source implementation to the table map
…s maintain order of precedence for tp_timestamp
0a74b02
to
004d782
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the aws_vpc_flow_log table to implement a custom table that supports multiple log formats. Key changes include embedding a custom table implementation in VpcFlowLogTable, refactoring the EnrichRow and source extraction logic to use dynamic rows, and updating format and registration logic accordingly.
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
tables/vpc_flow_log/vpc_flow_log_table.go | Refactored the table structure, source metadata, and row enrichment logic to use dynamic rows and a custom table implementation. |
tables/vpc_flow_log/vpc_flow_log_extractor.go | Updated the extractor logic to work with dynamic rows and handle header validation. |
tables/vpc_flow_log/vpc_flow_log_format*.go | Introduced and registered new format definitions and presets. |
docs/tables/aws_vpc_flow_log/index.md | Updated documentation to reflect the new processing logic and configuration options. |
aws/plugin.go | Adjusted plugin registration for custom table and format registration. |
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic for setting TpIndex based on 'interface_id', 'subnet_id', or 'vpc_id' has been replaced with a default value. Confirm that this deviation from the documented precedence is intentional.
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | |
if interfaceID, ok := row.GetSourceValue("interface_id"); ok && interfaceID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = interfaceID | |
} else if subnetID, ok := row.GetSourceValue("subnet_id"); ok && subnetID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = subnetID | |
} else if vpcID, ok := row.GetSourceValue("vpc_id"); ok && vpcID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = vpcID | |
} else { | |
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | |
} |
Copilot uses AI. Check for mistakes.
**Note**: For timestamp information, the `start` field will be used first, with the `end` field as a fallback. If neither field is available, then that log line will not be collected and Tailpipe will return an error. | ||
**Note**: | ||
- When determining each log's timestamp, the table uses the `start` field first, with the `end` field as a fallback. If neither field is available, then that log line will not be collected and Tailpipe will return an error. | ||
- When determining each log's index, the table uses the following order of precedence: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The documentation specifies that the TpIndex should be determined by 'interface_id', 'subnet_id', then 'vpc_id', with a default of 'default'. Update the documentation or adjust the code to ensure consistency between the intended behavior and its documentation.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
A refactor converts the aws_vpc_flow_log
table into a customizable table with support for user-defined formats, dynamic row mapping, and CloudWatch integration.
- Switched to
CustomTableImpl
andDynamicRow
for flexible schema and enrichment - Enhanced extractor and mapper to use custom layouts and fallback detection
- Updated documentation with new examples for format presets and source configurations
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 5 comments.
Show a summary per file
File | Description |
---|---|
tables/vpc_flow_log/vpc_flow_log_table.go | Converted to CustomTable, dynamic enrichment logic |
tables/vpc_flow_log/vpc_flow_log_format.go | Added token-based formatting with regex mapper |
tables/vpc_flow_log/vpc_flow_log_format_presets.go | Defined default format preset |
tables/vpc_flow_log/vpc_flow_log_extractor.go | Updated extractor to use DynamicRow and layout fallback |
tables/vpc_flow_log/vpc_flow_log_cloudwatch_mapper.go | Added CloudWatch event mapper wrapping regex mapper |
aws/plugin.go | Registered custom table and formats |
docs/tables/aws_vpc_flow_log/index.md | Expanded and reorganized usage examples |
Comments suppressed due to low confidence (2)
tables/vpc_flow_log/vpc_flow_log_cloudwatch_mapper.go:43
- Consider adding unit tests to cover each input type branch and error case in this mapper to ensure robust handling of CloudWatch event formats.
switch v := a.(type) {
tables/vpc_flow_log/vpc_flow_log_extractor.go:71
- Add tests for the header-fallback logic (when no header is detected) to verify that the default layout is correctly prepended and logged.
func toMapSlice(c *VPCFlowLogExtractor, data string) ([]map[string]string, error) {
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original logic prioritized interface_id, subnet_id, or vpc_id before defaulting; this overwrite always uses the default index. Reintroduce the precedence checks to preserve correct TpIndex values.
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | |
if interfaceID, ok := row.GetSourceValue("interface_id"); ok && interfaceID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = interfaceID | |
} else if subnetID, ok := row.GetSourceValue("subnet_id"); ok && subnetID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = subnetID | |
} else if vpcID, ok := row.GetSourceValue("vpc_id"); ok && vpcID != VpcFlowLogTableNilValue { | |
row.OutputColumns[constants.TpIndex] = vpcID | |
} else { | |
row.OutputColumns[constants.TpIndex] = schema.DefaultIndex | |
} |
Copilot uses AI. Check for mistakes.
t, err := helpers.ParseTime(endTime) | ||
if err != nil { | ||
invalidFields = append(invalidFields, "end_time") | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using ParseTime
on Unix-seconds strings may misinterpret the format. Consider converting the string to an integer and using time.Unix
for accurate epoch parsing.
t, err := helpers.ParseTime(endTime) | |
if err != nil { | |
invalidFields = append(invalidFields, "end_time") | |
} else { | |
// Convert end_time to an integer and parse it as a Unix timestamp | |
endTimeInt, err := strconv.ParseInt(endTime, 10, 64) | |
if err != nil { | |
invalidFields = append(invalidFields, "end_time") | |
} else { | |
t := time.Unix(endTimeInt, 0) |
Copilot uses AI. Check for mistakes.
|
||
source "aws_cloudwatch_log_group" { | ||
connection = connection.aws.default | ||
format = format.aws_vpc_flow_log.log_layout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The example references log_layout
but the preset is named log_format
. Update to format.aws_vpc_flow_log.log_format
to match the defined format label.
Copilot uses AI. Check for mistakes.
@@ -142,27 +161,56 @@ partition "aws_vpc_flow_log" "my_logs_prefix" { | |||
|
|||
### Collect logs from local files | |||
|
|||
You can also collect VPC Flow Logs from local files. | |||
You can also Collect logs from local files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Capitalization: change Collect
to lowercase (collect
) to match sentence style.
You can also Collect logs from local files. | |
You can also collect logs from local files. |
Copilot uses AI. Check for mistakes.
@@ -126,9 +128,26 @@ partition "aws_vpc_flow_log" "my_logs" { | |||
} | |||
``` | |||
|
|||
### Collect logs from a CloudWatch log group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] This heading appears multiple times with similar content. Consolidate or differentiate sections to avoid duplication and improve clarity.
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the AWS VPC flow log table implementation by converting it into a custom table that accepts various log formats. Key changes include:
- Replacing the original VpcFlowLog structure with a dynamic row model.
- Introducing custom format handling and a CloudWatch mapper.
- Updating extractor logic to support flexible header handling and custom error messages.
Reviewed Changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
tables/vpc_flow_log/vpc_flow_log_table.go | Refactors table implementation to use a custom table interface and dynamic rows. |
tables/vpc_flow_log/vpc_flow_log_format_presets.go | Adds format presets for the table. |
tables/vpc_flow_log/vpc_flow_log_format.go | Introduces a new format struct and regex mapping logic. |
tables/vpc_flow_log/vpc_flow_log_extractor.go | Adjusts the extractor to support custom headers and dynamic rows. |
tables/vpc_flow_log/vpc_flow_log_cloudwatch_mapper.go | Implements a CloudWatch-specific mapper using the new format. |
tables/vpc_flow_log/vpc_flow_log.go | Removes the old VpcFlowLog struct to complete the refactor. |
docs/tables/aws_vpc_flow_log/index.md | Updates documentation to reflect new format and filtering options. |
docs/sources/aws_cloudwatch_log_group.md | Updates examples for CloudWatch log group handling. |
aws/plugin.go | Adjusts plugin registration for the custom table and formats. |
@@ -61,6 +78,21 @@ func ConvertToMapSlice(data string) ([]map[string]string, error) { | |||
// Extract header row as keys | |||
keys := strings.Fields(lines[0]) // Splitting header by spaces | |||
|
|||
if !isValidHeader(keys) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The header validation in isValidHeader strictly checks for known tokens, which may inadvertently flag valid custom headers that include additional fields. Consider allowing extra fields or documenting the required header format more explicitly.
Copilot uses AI. Check for mistakes.
} else { | ||
row.TpIndex = "default" | ||
if len(invalidFields) > 0 { | ||
return nil, error_types.NewRowErrorWithFields([]string{}, invalidFields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The error creation here only includes invalidFields without indicating which source fields were processed; including the relevant source field names could improve debugging.
Copilot uses AI. Check for mistakes.
No description provided.