diff --git a/axiom/datasets.go b/axiom/datasets.go index 46cbb6ac..33dd3de4 100644 --- a/axiom/datasets.go +++ b/axiom/datasets.go @@ -10,6 +10,7 @@ import ( "io" "net/http" "net/url" + "strings" "time" "unicode" @@ -342,7 +343,7 @@ func (s *DatasetsService) Ingest(ctx context.Context, id string, r io.Reader, ty return nil, spanError(span, err) } - if err = setEventLabels(req, opts.EventLabels); err != nil { + if err = setOptionHeaders(req, opts, typ); err != nil { return nil, spanError(span, err) } @@ -399,6 +400,10 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events [] )) defer span.End() + if len(events) == 0 { + return &ingest.Status{}, nil + } + // Apply supplied options. var opts ingest.Options for _, option := range options { @@ -407,10 +412,6 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events [] } } - if len(events) == 0 { - return &ingest.Status{}, nil - } - path, err := url.JoinPath("/v1/datasets", id, "ingest") if err != nil { return nil, spanError(span, err) @@ -461,7 +462,7 @@ func (s *DatasetsService) IngestEvents(ctx context.Context, id string, events [] } req.GetBody = getBody - if err = setEventLabels(req, opts.EventLabels); err != nil { + if err = setOptionHeaders(req, opts, NDJSON); err != nil { return nil, spanError(span, err) } @@ -796,6 +797,21 @@ func setLegacyQueryResultOnSpan(span trace.Span, res querylegacy.Result) { ) } +func setOptionHeaders(req *http.Request, opts ingest.Options, typ ContentType) error { + // Set event labels. + if err := setEventLabels(req, opts.EventLabels); err != nil { + return err + } + + // Set object/csv fields. The former is only valid for JSON and NDJSON as + // the latter is obviously only valid for CSV. Both are equally optional. + if typ == CSV { + req.Header.Set("X-Axiom-CSV-Fields", strings.Join(opts.CSVFields, ",")) + } + + return nil +} + func setEventLabels(req *http.Request, labels map[string]any) error { if len(labels) == 0 { return nil diff --git a/axiom/datasets_integration_test.go b/axiom/datasets_integration_test.go index 4add2df4..683203b9 100644 --- a/axiom/datasets_integration_test.go +++ b/axiom/datasets_integration_test.go @@ -17,28 +17,37 @@ import ( "github.com/axiomhq/axiom-go/axiom/querylegacy" ) -const ingestData = `[ - { - "time": "17/May/2015:08:05:30 +0000", - "remote_ip": "93.180.71.1", - "remote_user": "-", - "request": "GET /downloads/product_1 HTTP/1.1", - "response": 304, - "bytes": 0, - "referrer": "-", - "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" - }, - { - "time": "17/May/2015:08:05:31 +0000", - "remote_ip": "93.180.71.2", - "remote_user": "-", - "request": "GET /downloads/product_1 HTTP/1.1", - "response": 304, - "bytes": 0, - "referrer": "-", - "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" - } -]` +const ( + ingestData = `[ + { + "time": "17/May/2015:08:05:30 +0000", + "remote_ip": "93.180.71.1", + "remote_user": "-", + "request": "GET /downloads/product_1 HTTP/1.1", + "response": 304, + "bytes": 0, + "referrer": "-", + "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" + }, + { + "time": "17/May/2015:08:05:31 +0000", + "remote_ip": "93.180.71.2", + "remote_user": "-", + "request": "GET /downloads/product_1 HTTP/1.1", + "response": 304, + "bytes": 0, + "referrer": "-", + "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)" + } + ]` + + csvIngestData = `17/May/2015:08:05:30 +0000,93.180.71.1,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21 + 17/May/2015:08:05:31 +0000,93.180.71.2,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)` + + csvIngestDataHeader = `time,remote_ip,remote_user,request,response,bytes,referrer,agent + 17/May/2015:08:05:30 +0000,93.180.71.1,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21 + 17/May/2015:08:05:31 +0000,93.180.71.2,-,GET /downloads/product_1 HTTP/1.1,304,0,-,Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)` +) var ingestEvents = []axiom.Event{ { @@ -137,9 +146,9 @@ func (s *DatasetsTestSuite) Test() { ingested bytes.Buffer r io.Reader - resetBuffer = func(contentEncoders ...axiom.ContentEncoder) { + resetBuffer = func(data string, contentEncoders ...axiom.ContentEncoder) { ingested.Reset() - r = io.TeeReader(strings.NewReader(ingestData), &ingested) + r = io.TeeReader(strings.NewReader(data), &ingested) for _, contentEncoder := range contentEncoders { var ceErr error @@ -149,7 +158,7 @@ func (s *DatasetsTestSuite) Test() { } ) - resetBuffer() + resetBuffer(ingestData) ingestStatus, err := s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Identity, ingest.SetEventLabel("region", "eu-west-1")) s.Require().NoError(err) s.Require().NotNil(ingestStatus) @@ -160,7 +169,7 @@ func (s *DatasetsTestSuite) Test() { s.EqualValues(ingested.Len()+22, ingestStatus.ProcessedBytes) // 22 bytes extra for the event label // ... but gzip encoded... - resetBuffer(axiom.GzipEncoder()) + resetBuffer(ingestData, axiom.GzipEncoder()) ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Gzip) s.Require().NoError(err) s.Require().NotNil(ingestStatus) @@ -171,7 +180,7 @@ func (s *DatasetsTestSuite) Test() { s.EqualValues(ingested.Len(), ingestStatus.ProcessedBytes) // ... but zstd encoded... - resetBuffer(axiom.ZstdEncoder()) + resetBuffer(ingestData, axiom.ZstdEncoder()) ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.JSON, axiom.Zstd) s.Require().NoError(err) s.Require().NotNil(ingestStatus) @@ -182,7 +191,6 @@ func (s *DatasetsTestSuite) Test() { s.EqualValues(ingested.Len(), ingestStatus.ProcessedBytes) // ... and from a map source... - resetBuffer() ingestStatus, err = s.client.Datasets.IngestEvents(s.ctx, s.dataset.ID, ingestEvents) s.Require().NoError(err) s.Require().NotNil(ingestStatus) @@ -192,8 +200,7 @@ func (s *DatasetsTestSuite) Test() { s.Empty(ingestStatus.Failures) s.EqualValues(448, int(ingestStatus.ProcessedBytes)) - // ... and from a channel source. - resetBuffer() + // ... and from a channel source ... ingestStatus, err = s.client.Datasets.IngestChannel(s.ctx, s.dataset.ID, getEventChan()) s.Require().NoError(err) s.Require().NotNil(ingestStatus) @@ -203,6 +210,29 @@ func (s *DatasetsTestSuite) Test() { s.Empty(ingestStatus.Failures) s.EqualValues(448, int(ingestStatus.ProcessedBytes)) + // ... and from a CSV reader source with header... + resetBuffer(csvIngestDataHeader) + ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.CSV, axiom.Identity) + s.Require().NoError(err) + s.Require().NotNil(ingestStatus) + + s.EqualValues(ingestStatus.Ingested, 2) + s.Zero(ingestStatus.Failed) + s.Empty(ingestStatus.Failures) + s.EqualValues(325, int(ingestStatus.ProcessedBytes)) + + // ... and from a CSV reader source without header. + resetBuffer(csvIngestData) + ingestStatus, err = s.client.Datasets.Ingest(s.ctx, s.dataset.ID, r, axiom.CSV, axiom.Identity, + ingest.SetCSVFields("time", "remote_ip", "remote_user", "request", "response", "bytes", "referrer", "agent")) + s.Require().NoError(err) + s.Require().NotNil(ingestStatus) + + s.EqualValues(ingestStatus.Ingested, 2) + s.Zero(ingestStatus.Failed) + s.Empty(ingestStatus.Failures) + s.EqualValues(258, int(ingestStatus.ProcessedBytes)) + now := time.Now().Truncate(time.Second) startTime := now.Add(-time.Minute) endTime := now.Add(time.Minute) @@ -216,9 +246,9 @@ func (s *DatasetsTestSuite) Test() { s.Require().NoError(err) s.Require().NotNil(queryResult) - s.EqualValues(10, queryResult.Status.RowsExamined) - s.EqualValues(10, queryResult.Status.RowsMatched) - s.Len(queryResult.Matches, 10) + s.EqualValues(14, queryResult.Status.RowsExamined) + s.EqualValues(14, queryResult.Status.RowsMatched) + s.Len(queryResult.Matches, 14) // Also run a legacy query and make sure we see some results. legacyQueryResult, err := s.client.Datasets.QueryLegacy(s.ctx, s.dataset.ID, querylegacy.Query{ @@ -228,9 +258,9 @@ func (s *DatasetsTestSuite) Test() { s.Require().NoError(err) s.Require().NotNil(legacyQueryResult) - s.EqualValues(10, legacyQueryResult.Status.RowsExamined) - s.EqualValues(10, legacyQueryResult.Status.RowsMatched) - s.Len(legacyQueryResult.Matches, 10) + s.EqualValues(14, legacyQueryResult.Status.RowsExamined) + s.EqualValues(14, legacyQueryResult.Status.RowsMatched) + s.Len(legacyQueryResult.Matches, 14) // Run a more complex legacy query. complexLegacyQuery := querylegacy.Query{ @@ -245,9 +275,15 @@ func (s *DatasetsTestSuite) Test() { }, GroupBy: []string{"success", "remote_ip"}, Filter: querylegacy.Filter{ - Op: querylegacy.OpEqual, + Op: querylegacy.OpExists, Field: "response", - Value: 304, + Children: []querylegacy.Filter{ + { + Op: querylegacy.OpContains, + Field: "request", + Value: "GET", + }, + }, }, Order: []querylegacy.Order{ { @@ -262,7 +298,7 @@ func (s *DatasetsTestSuite) Test() { VirtualFields: []querylegacy.VirtualField{ { Alias: "success", - Expression: "response < 400", + Expression: "toint(response) < 400", }, }, Projections: []querylegacy.Projection{ @@ -277,12 +313,12 @@ func (s *DatasetsTestSuite) Test() { s.Require().NoError(err) s.Require().NotNil(complexLegacyQueryResult) - s.EqualValues(10, complexLegacyQueryResult.Status.RowsExamined) - s.EqualValues(10, complexLegacyQueryResult.Status.RowsMatched) + s.EqualValues(14, complexLegacyQueryResult.Status.RowsExamined) + s.EqualValues(14, complexLegacyQueryResult.Status.RowsMatched) if s.Len(complexLegacyQueryResult.Buckets.Totals, 2) { agg := complexLegacyQueryResult.Buckets.Totals[0].Aggregations[0] s.EqualValues("event_count", agg.Alias) - s.EqualValues(5, agg.Value) + s.EqualValues(7, agg.Value) } // Trim the dataset down to a minimum. diff --git a/axiom/datasets_test.go b/axiom/datasets_test.go index 65d112e1..9aa273ab 100644 --- a/axiom/datasets_test.go +++ b/axiom/datasets_test.go @@ -426,13 +426,13 @@ func TestDatasetsService_Ingest(t *testing.T) { hf := func(w http.ResponseWriter, r *http.Request) { assert.Equal(t, http.MethodPost, r.Method) assert.Equal(t, mediaTypeJSON, r.Header.Get("Content-Type")) - eventLabels := assertValidJSON(t, strings.NewReader(r.Header.Get("X-Axiom-Event-Labels"))) - assert.Equal(t, "eu-west-1", eventLabels[0].(map[string]any)["region"]) - assert.EqualValues(t, 1, eventLabels[0].(map[string]any)["instance"]) assert.Equal(t, "time", r.URL.Query().Get("timestamp-field")) assert.Equal(t, "2/Jan/2006:15:04:05 +0000", r.URL.Query().Get("timestamp-format")) assert.Equal(t, ";", r.URL.Query().Get("csv-delimiter")) + eventLabels := assertValidJSON(t, strings.NewReader(r.Header.Get("X-Axiom-Event-Labels"))) + assert.Equal(t, "eu-west-1", eventLabels[0].(map[string]any)["region"]) + assert.EqualValues(t, 1, eventLabels[0].(map[string]any)["instance"]) _ = assertValidJSON(t, r.Body) diff --git a/axiom/ingest/options.go b/axiom/ingest/options.go index 70263fe9..8d553a4a 100644 --- a/axiom/ingest/options.go +++ b/axiom/ingest/options.go @@ -22,6 +22,10 @@ type Options struct { // event data. This is especially useful when ingesting events from a // third-party source that you do not have control over. EventLabels map[string]any `url:"-"` + // Fields is a list of fields to be ingested with every event. This is only + // valid for CSV content and also completely optional. It comes in handy + // when the CSV content does not have a header row. + CSVFields []string `url:"-"` } // An Option applies optional parameters to an ingest operation. @@ -52,7 +56,7 @@ func SetCSVDelimiter(delim string) Option { func SetEventLabel(key string, value any) Option { return func(o *Options) { if o.EventLabels == nil { - o.EventLabels = make(map[string]any) + o.EventLabels = make(map[string]any, 1) } o.EventLabels[key] = value } @@ -63,3 +67,18 @@ func SetEventLabel(key string, value any) Option { func SetEventLabels(labels map[string]any) Option { return func(o *Options) { o.EventLabels = labels } } + +// AddCSVField adds one or more fields to be ingested with every CSV event. +func AddCSVField(field ...string) Option { + return func(o *Options) { + if o.CSVFields == nil { + o.CSVFields = make([]string, 0, len(field)) + } + o.CSVFields = append(o.CSVFields, field...) + } +} + +// SetCSVFields sets the fields to be ingested with every CSV event. +func SetCSVFields(fields ...string) Option { + return func(o *Options) { o.CSVFields = fields } +} diff --git a/axiom/ingest/options_test.go b/axiom/ingest/options_test.go index 05f4f879..8ac0e5a0 100644 --- a/axiom/ingest/options_test.go +++ b/axiom/ingest/options_test.go @@ -96,6 +96,44 @@ func TestOptions(t *testing.T) { }, }, }, + { + name: "add csv field", + options: []ingest.Option{ + ingest.AddCSVField("foo"), + }, + want: ingest.Options{ + CSVFields: []string{"foo"}, + }, + }, + { + name: "add multiple csv fields", + options: []ingest.Option{ + ingest.AddCSVField("foo"), + ingest.AddCSVField("bar", "baz"), + }, + want: ingest.Options{ + CSVFields: []string{"foo", "bar", "baz"}, + }, + }, + { + name: "set csv fields", + options: []ingest.Option{ + ingest.SetCSVFields("foo", "bar"), + }, + want: ingest.Options{ + CSVFields: []string{"foo", "bar"}, + }, + }, + { + name: "set csv fields on existing csv fields", + options: []ingest.Option{ + ingest.SetCSVFields("foo", "bar"), + ingest.SetCSVFields("bar", "foo"), + }, + want: ingest.Options{ + CSVFields: []string{"bar", "foo"}, + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {