From 78d83fd4ebca1d0fd698467ab1058f82c86d2a52 Mon Sep 17 00:00:00 2001 From: Raj Nishtala Date: Thu, 21 Nov 2024 15:25:04 -0500 Subject: [PATCH] Adding support for the compression struct while preserving backwards compatibility (for string) --- config/configcompression/compressiontype.go | 71 ++++++----- .../configcompression/compressiontype_test.go | 37 +----- config/confighttp/compression_test.go | 112 +++++++++++++++--- exporter/otlphttpexporter/config_test.go | 2 +- 4 files changed, 129 insertions(+), 93 deletions(-) diff --git a/config/configcompression/compressiontype.go b/config/configcompression/compressiontype.go index b54cbc121eb..38e5d8072f5 100644 --- a/config/configcompression/compressiontype.go +++ b/config/configcompression/compressiontype.go @@ -5,8 +5,6 @@ package configcompression // import "go.opentelemetry.io/collector/config/config import ( "fmt" - "strconv" - "strings" "github.com/klauspost/compress/zlib" ) @@ -21,14 +19,15 @@ type TypeWithLevel struct { } const ( - TypeGzip Type = "gzip" - TypeZlib Type = "zlib" - TypeDeflate Type = "deflate" - TypeSnappy Type = "snappy" - TypeZstd Type = "zstd" - TypeLz4 Type = "lz4" - typeNone Type = "none" - typeEmpty Type = "" + TypeGzip Type = "gzip" + TypeZlib Type = "zlib" + TypeDeflate Type = "deflate" + TypeSnappy Type = "snappy" + TypeZstd Type = "zstd" + TypeLz4 Type = "lz4" + typeNone Type = "none" + typeEmpty Type = "" + LevelNone Level = 0 ) // IsCompressed returns false if CompressionType is nil, none, or empty. @@ -38,42 +37,40 @@ func (ct *Type) IsCompressed() bool { } func (ct *TypeWithLevel) UnmarshalText(in []byte) error { - var err error - parts := strings.Split(string(in), "/") - compressionTyp := Type(parts[0]) - level := zlib.DefaultCompression - if len(parts) == 2 { - level, err = strconv.Atoi(parts[1]) - if err != nil { - return fmt.Errorf("invalid compression level: %q", parts[1]) - } - if compressionTyp == TypeSnappy || - compressionTyp == TypeLz4 || - compressionTyp == typeNone || - compressionTyp == typeEmpty { - return fmt.Errorf("compression level is not supported for %q", compressionTyp) - } + typ := Type(in) + if typ == TypeGzip || + typ == TypeZlib || + typ == TypeDeflate || + typ == TypeSnappy || + typ == TypeZstd || + typ == TypeLz4 || + typ == typeNone || + typ == typeEmpty { + *&ct.Type = typ + return nil } - ct.Level = Level(level) - if (compressionTyp == TypeGzip && isValidLevel(level)) || - (compressionTyp == TypeZlib && isValidLevel(level)) || - (compressionTyp == TypeDeflate && isValidLevel(level)) || - compressionTyp == TypeSnappy || - compressionTyp == TypeLz4 || - compressionTyp == TypeZstd || - compressionTyp == typeNone || - compressionTyp == typeEmpty { - ct.Level = Level(level) - ct.Type = compressionTyp + return fmt.Errorf("unsupported compression type %q", typ) +} + +func (ct *TypeWithLevel) Validate() error { + if (ct.Type == TypeGzip && isValidLevel(int(ct.Level))) || + (ct.Type == TypeZlib && isValidLevel(int(ct.Level))) || + (ct.Type == TypeDeflate && isValidLevel(int(ct.Level))) || + ct.Type == TypeSnappy || + ct.Type == TypeLz4 || + ct.Type == TypeZstd || + ct.Type == typeNone || + ct.Type == typeEmpty { return nil } - return fmt.Errorf("unsupported compression type and level(default if not specified) %s - %d", compressionTyp, ct.Level) + return fmt.Errorf("unsupported compression type and level %s - %d", ct.Type, ct.Level) } // Checks the validity of zlib/gzip/flate compression levels func isValidLevel(level int) bool { return level == zlib.DefaultCompression || + level == int(LevelNone) || level == zlib.HuffmanOnly || level == zlib.NoCompression || level == zlib.BestSpeed || diff --git a/config/configcompression/compressiontype_test.go b/config/configcompression/compressiontype_test.go index 5f2f969f327..238fc68efd2 100644 --- a/config/configcompression/compressiontype_test.go +++ b/config/configcompression/compressiontype_test.go @@ -4,7 +4,6 @@ package configcompression import ( - "strings" "testing" "github.com/stretchr/testify/assert" @@ -48,12 +47,6 @@ func TestUnmarshalText(t *testing.T) { shouldError: false, isCompressed: true, }, - { - name: "ValidZstdLevel", - compressionName: []byte("zstd/11"), - shouldError: false, - isCompressed: true, - }, { name: "ValidEmpty", compressionName: []byte(""), @@ -75,34 +68,6 @@ func TestUnmarshalText(t *testing.T) { compressionName: []byte("ggip"), shouldError: true, }, - { - name: "InvalidSnappy", - compressionName: []byte("snappy/1"), - shouldError: true, - }, - { - name: "InvalidNone", - compressionName: []byte("none/1"), - shouldError: true, - }, - { - name: "InvalidGzip", - compressionName: []byte("gzip/10"), - shouldError: true, - isCompressed: true, - }, - { - name: "InvalidZlib", - compressionName: []byte("zlib/10"), - shouldError: true, - isCompressed: true, - }, - { - name: "InvalidZstdLevel", - compressionName: []byte("zstd/ten"), - shouldError: true, - isCompressed: true, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -113,7 +78,7 @@ func TestUnmarshalText(t *testing.T) { return } require.NoError(t, err) - ct := Type(strings.Split(string(tt.compressionName), "/")[0]) + ct := Type(tt.compressionName) assert.Equal(t, temp.Type, ct) assert.Equal(t, tt.isCompressed, ct.IsCompressed()) }) diff --git a/config/confighttp/compression_test.go b/config/confighttp/compression_test.go index 1a3df7ae2e0..12d028f4b38 100644 --- a/config/confighttp/compression_test.go +++ b/config/confighttp/compression_test.go @@ -26,68 +26,142 @@ import ( "go.opentelemetry.io/collector/config/configcompression" ) -func TestHTTPClientCompression(t *testing.T) { +func TestHTTPClientCompressionwithLevel(t *testing.T) { testBody := []byte("uncompressed_text") compressedGzipBody := compressGzip(t, testBody) compressedZlibBody := compressZlib(t, testBody) compressedDeflateBody := compressZlib(t, testBody) - compressedSnappyBody := compressSnappy(t, testBody) compressedZstdBody := compressZstd(t, testBody) - compressedLz4Body := compressLz4(t, testBody) const ( - gzipLevel configcompression.Type = "gzip/1" - zlibLevel configcompression.Type = "zlib/1" - deflateLevel configcompression.Type = "deflate/1" - zstdLevel configcompression.Type = "zstd/11" + gzipLevel configcompression.Level = 1 + zlibLevel configcompression.Level = 1 + deflateLevel configcompression.Level = 1 + zstdLevel configcompression.Level = 11 ) tests := []struct { name string - encoding configcompression.Type + enclevel configcompression.Level + enctype configcompression.Type reqBody []byte shouldError bool }{ { name: "ValidEmpty", - encoding: "", + enctype: "", reqBody: testBody, shouldError: false, }, { name: "ValidNone", - encoding: "none", + enctype: "none", reqBody: testBody, shouldError: false, }, { name: "ValidGzip", - encoding: gzipLevel, + enctype: "gzip", + enclevel: gzipLevel, reqBody: compressedGzipBody.Bytes(), shouldError: false, }, + { + name: "InvalidGzip", + enctype: "gzip", + enclevel: 20, + reqBody: compressedGzipBody.Bytes(), + shouldError: true, + }, { name: "ValidZlib", - encoding: zlibLevel, + enctype: "zlib", + enclevel: zlibLevel, reqBody: compressedZlibBody.Bytes(), shouldError: false, }, { name: "ValidDeflate", - encoding: deflateLevel, + enctype: "deflate", + enclevel: deflateLevel, reqBody: compressedDeflateBody.Bytes(), shouldError: false, }, { - name: "ValidSnappy", - encoding: configcompression.TypeSnappy, - reqBody: compressedSnappyBody.Bytes(), + name: "ValidZstd", + enctype: "zstd", + enclevel: zstdLevel, + reqBody: compressedZstdBody.Bytes(), + shouldError: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + assert.NoError(t, err, "failed to read request body: %v", err) + assert.EqualValues(t, tt.reqBody, body) + w.WriteHeader(http.StatusOK) + })) + t.Cleanup(srv.Close) + + reqBody := bytes.NewBuffer(testBody) + + req, err := http.NewRequest(http.MethodGet, srv.URL, reqBody) + require.NoError(t, err, "failed to create request to test handler") + compression := configcompression.TypeWithLevel{} + err = compression.UnmarshalText([]byte(tt.enctype)) + compression.Level = tt.enclevel + err = compression.Validate() + if tt.shouldError { + assert.Error(t, err) + return + } + require.NoError(t, err) + clientSettings := ClientConfig{ + Endpoint: srv.URL, + Compression: compression, + } + client, err := clientSettings.ToClient(context.Background(), componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings()) + require.NoError(t, err) + res, err := client.Do(req) + require.NoError(t, err) + + _, err = io.ReadAll(res.Body) + require.NoError(t, err) + require.NoError(t, res.Body.Close(), "failed to close request body: %v", err) + }) + } + +} + +func TestHTTPClientCompression(t *testing.T) { + testBody := []byte("uncompressed_text") + compressedSnappyBody := compressSnappy(t, testBody) + compressedLz4Body := compressLz4(t, testBody) + + tests := []struct { + name string + encoding configcompression.Type + reqBody []byte + shouldError bool + }{ + { + name: "ValidEmpty", + encoding: "", + reqBody: testBody, shouldError: false, }, { - name: "ValidZstd", - encoding: zstdLevel, - reqBody: compressedZstdBody.Bytes(), + name: "ValidNone", + encoding: "none", + reqBody: testBody, + shouldError: false, + }, + { + name: "ValidSnappy", + encoding: configcompression.TypeSnappy, + reqBody: compressedSnappyBody.Bytes(), shouldError: false, }, { diff --git a/exporter/otlphttpexporter/config_test.go b/exporter/otlphttpexporter/config_test.go index b7f7afed2e1..6abd63be9dc 100644 --- a/exporter/otlphttpexporter/config_test.go +++ b/exporter/otlphttpexporter/config_test.go @@ -77,7 +77,7 @@ func TestUnmarshalConfig(t *testing.T) { ReadBufferSize: 123, WriteBufferSize: 345, Timeout: time.Second * 10, - Compression: configcompression.TypeWithLevel{Type: "gzip", Level: -1}, + Compression: configcompression.TypeWithLevel{Type: "gzip", Level: 0}, MaxIdleConns: &defaultMaxIdleConns, MaxIdleConnsPerHost: &defaultMaxIdleConnsPerHost, MaxConnsPerHost: &defaultMaxConnsPerHost,