-
Notifications
You must be signed in to change notification settings - Fork 1
refactor: reworked aws_vpc_flow_log table as a custom table allowing for formats to be used #137
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
…for formats to be used
@@ -32,26 +40,21 @@ func (c *VPCFlowLogExtractor) Extract(_ context.Context, a any) ([]any, error) { | |||
return nil, fmt.Errorf("expected byte[], got %T", a) | |||
} | |||
|
|||
mappedData, err := ConvertToMapSlice(string(jsonBytes)) | |||
mappedData, err := ConvertToMapSlice(c, string(jsonBytes)) | |||
if err != nil { | |||
return nil, fmt.Errorf("error in mapping the results in the form of []map[string]string") | |||
} | |||
|
|||
var res = make([]any, len(mappedData)) | |||
for i, record := range mappedData { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this iteration required, previously the mappedData was iterated as a mapping was done and errors may have been reported out, I think we can just return mappedData here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we are not using any mapper here, the mapped data should be of type (*types.DynamicRow
) I have made the changes to return the []*types.DynamicRow{}
. Otherwisewe will get the error like required *types.DynamicRow got map[string]string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had removed the need for this mapper, I can see you've added it back purely for S3 source, any particular reason we can't just use the default approach of the regex mapper?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, my bad — I missed removing it in the initial commit. We no longer need this mapper since the Extractor
now handles everything. I haven’t tried using the Gonx or Regex mapper in this case. My main goal was to preserve the previous table functionality, so we used an Extractor
, which parses the entire object content at once and returns the rows directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI What's the effect on removing vs. keeping this mapper for current users of the table who pull flow logs from an S3 bucket today for default and custom logs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cbruno10, in general, the mapper is designed to work on a per-row basis, while the extractor is intended for handling bulk or batch row data. In our earlier implementation, we used an extractor to download the full object content and map the column values based on the header line (i.e., the first line of the log data). To keep the behavior consistent with that approach, I’ve continued using an extractor here.
Additionally, we don't require a custom mapper in this case because the default SDK-provided mapper for custom tables already handles the mapping logic effectively.
Here are a few considerations when comparing extractor vs. mapper for this use case:
-
With a mapper, the user must explicitly provide a
format
layout. However, with an extractor, we have the flexibility to infer the header (i.e., the format layout) from the first line of the log file—whether or not the user specifies a format block. -
In some cases, such as when logs are exported from CloudWatch to an S3 bucket, the log files don’t include a header line. The current extractor-based design handles this scenario well: users can still specify a
format
block to define the layout manually. The default mapper would work the same way in this case if it were used, but the extractor approach makes the solution more adaptive and user-friendly.
const VpcFlowLogTableSkippedData = "SKIPDATA" | ||
const VpcFlowLogTableNoData = "NODATA" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are valid log entries, I don't think we should forcefully remove them in code but rather make a suggestion that users can filter them out using a filter
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, good point, @cbruno10 - could we consider keeping the SKIPDATA
/NODATA
values as-is instead of skipping them? That said, this behavior would then apply to all columns. We still need to retain the check for tp_timestamp
and tp_data
, since those are required columns in the Tailpipe DB.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think I'd keep them, as users can filter out these values when collecting or in their queries like @graza-io mentioned. May be worth adding an example in the Example Configurations section showing how to avoid collecting these with filter
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
|
||
row.OutputColumns[constants.TpTable] = VpcFlowLogTableIdentifier | ||
|
||
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData || startTime != VpcFlowLogTableNoData || startTime != VpcFlowLogTableNilValue) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition uses logical OR (||) when checking against invalid values, which will always evaluate to true. Replace the OR operators with AND (&&) to correctly validate that startTime is not equal to any of the invalid constants.
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData || startTime != VpcFlowLogTableNoData || startTime != VpcFlowLogTableNilValue) { | |
if startTime, ok := row.GetSourceValue("start_time"); ok && (startTime != VpcFlowLogTableSkippedData && startTime != VpcFlowLogTableNoData && startTime != VpcFlowLogTableNilValue) { |
Copilot uses AI. Check for mistakes.
func (m *VPCFlowLogMapper) Identifier() string { | ||
return "vpc_flow_log_mapper" | ||
func (c *VPCFlowLogMapper) Identifier() string { | ||
return "aws_waf_traffic_log_mapper" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mapper Identifier returns 'aws_waf_traffic_log_mapper', which appears inconsistent with the VPC Flow Log context. It should likely return an identifier that reflects VPC Flow Log mapping, for example 'vpc_flow_log_mapper'.
return "aws_waf_traffic_log_mapper" | |
return "vpc_flow_log_mapper" |
Copilot uses AI. Check for mistakes.
func (c *VpcFlowLogTable) GetTableDefinition() *schema.TableSchema { | ||
return &schema.TableSchema{ | ||
Name: VpcFlowLogTableIdentifier, | ||
Columns: []*schema.ColumnSchema{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI Are there any changes from the previous version of the table, or are they the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are the same, @cbruno10- we've just split the VpcFlowLog struct into a custom schema.
… dynamic table accept only string in it's mapper
…he doc with example config to exclude the NODATA and SKIPDATA for a column
…licable for log-status field
…g the file source implementation to the table map
…ilepath package instead of regex match
… stream name match
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot reviewed 10 out of 10 changed files in this pull request and generated 1 comment.
aws/plugin.go
Outdated
// register formats | ||
table.RegisterFormatPresets(vpc_flow_log.VPCFlowLogTableFormatPresets...) | ||
table.RegisterFormat[*vpc_flow_log.VPCFlowLogTableFormat]() | ||
row_source.RegisterRowSource[*cloudwatch_log_group.AwsCloudWatchLogGroupSource]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The AwsCloudWatchLogGroupSource row source is registered twice, which may lead to duplicate registrations. Please remove one of the duplicate calls to avoid potential conflicts.
row_source.RegisterRowSource[*cloudwatch_log_group.AwsCloudWatchLogGroupSource]() |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ParthaI Please see comments, thanks!
@@ -7,7 +7,18 @@ description: "AWS VPC flow logs capture information about IP traffic going to an | |||
|
|||
The `aws_vpc_flow_log` table allows you to query data from [AWS VPC Flow Logs](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html). This table provides detailed insights into network traffic within your VPC, including source and destination IP addresses, ports, protocols, and more. | |||
|
|||
**Note**: For timestamp information, the `start` field will be used first, with the `end` field as a fallback. If neither field is available, then that log line will not be collected and Tailpipe will return an error. | |||
**Note**: | |||
- Using `aws_s3_bucket` source: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think most of these notes could be explained through examples in the Example Configurations section and aren't required here, it's hard to understand without seeing the configs anyway
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed the notes that are specific to example configuration.
paths = ["/Users/myuser/vpc_flow_logs"] | ||
file_layout = `%{DATA}.json.gz` | ||
partition "aws_vpc_flow_log" "my_logs_instance" { | ||
filter = "action <> 'REJECT'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we use <>
or !=
more commonly (I think the latter)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, !=
is the more commonly used operator. I've updated the query to use !=
instead.
} | ||
``` | ||
|
||
### Collect logs from a CloudWatch log group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this example up and review the order of other examples, we should make examples flow simple -> complex/edge
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reordered the example configurations from simple to complex/edge cases.
@cbruno10, would it make sense to reorder the example configurations from simple to complex/edge cases and group them by source?
} | ||
``` | ||
|
||
### Collect VPC flow logs from a CloudWatch log group with custom log format |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
### Collect VPC flow logs from a CloudWatch log group with custom log format | |
### Collect logs from a CloudWatch log group with custom log format |
I don't think we need to repeat VPC flow logs
each time, we're already in the VPC flow log doc, so just logs
is OK I think (I believe this is what we do in other docs too).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
} | ||
``` | ||
|
||
### Collect VPC flow logs from a CloudWatch log group for all ENIs with wildcard stream names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This title feels long, is there a shorter/more general title we can use to summarize what this example highlights?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the title to make it shorter.
@@ -77,7 +77,7 @@ func (s *AwsCloudWatchLogGroupSource) Identifier() string { | |||
|
|||
func matchesAnyPattern(target string, patterns []string) bool { | |||
for _, pattern := range patterns { | |||
match, err := filepath.Match(pattern, target) | |||
match, err := path.Match(pattern, target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that we decided to not escape chars, can you please add 1-2 examples in the CW log source table doc showing how escaping chars works? This can help us determine if we like that design as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added example in CW source doc.
No description provided.