From ab3e627b1da134f33c06dd09ee4402a2eaad68f1 Mon Sep 17 00:00:00 2001 From: gotjosh Date: Tue, 8 Oct 2024 16:55:07 +0100 Subject: [PATCH] Revert #9511 (#9566) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts #9511 as it panics ingesters. The details are: ``` │ panic: runtime error: invalid memory address or nil pointer dereference │ │ panic: runtime error: invalid memory address or nil pointer dereference │ │ [signal SIGSEGV: segmentation violation code=0x1 addr=0x30 pc=0x13b2350] │ $GOROOT/src/runtime/panic.go:785 +0x124 │ │ github.com/twmb/franz-go/pkg/kgo/internal/pool.(*BucketedPool[...]).Get(0x0?, 0x0?) ``` Signed-off-by: gotjosh --- go.mod | 7 +- go.sum | 4 +- .../twmb/franz-go/pkg/kgo/compression.go | 57 +------ .../twmb/franz-go/pkg/kgo/config.go | 22 --- .../pkg/kgo/internal/pool/bucketed_pool.go | 94 ------------ .../twmb/franz-go/pkg/kgo/record_and_fetch.go | 34 ----- .../twmb/franz-go/pkg/kgo/source.go | 140 ++++-------------- vendor/modules.txt | 5 +- 8 files changed, 41 insertions(+), 322 deletions(-) delete mode 100644 vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go diff --git a/go.mod b/go.mod index bc5121b68f4..f00f650ee6e 100644 --- a/go.mod +++ b/go.mod @@ -311,11 +311,8 @@ replace github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc // Replacing prometheus/alertmanager with our fork. replace github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 -// Replacing with a fork commit based on v1.17.1 having cherry-picked the following PRs: -// - https://github.com/grafana/franz-go/pull/1 -// - https://github.com/grafana/franz-go/pull/3 -// - https://github.com/grafana/franz-go/pull/4 -replace github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf +// Replacing with a fork commit based on v1.17.1 with https://github.com/twmb/franz-go/pull/803 cherry-picked. +replace github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 // Pin Google GRPC to v1.65.0 as v1.66.0 has API changes and also potentially performance regressions. // Following https://github.com/grafana/dskit/pull/581 diff --git a/go.sum b/go.sum index fe6129789ca..af0390bdb20 100644 --- a/go.sum +++ b/go.sum @@ -951,6 +951,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/digitalocean/godo v1.121.0 h1:ilXiHuEnhbJs2fmFEPX0r/QQ6KfiOIMAhJN3f8NiCfI= github.com/digitalocean/godo v1.121.0/go.mod h1:WQVH83OHUy6gC4gXpEVQKtxTd4L5oCp+5OialidkPLY= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 h1:jszPVGeTr25QTJ/jWiT7eXnabc4R4itChxUVFSCLjRQ= +github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/distribution/reference v0.5.0 h1:/FUIFXtfc/x2gpa5/VGfiGLuOIdYa1t65IKK2OFGvA0= github.com/distribution/reference v0.5.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= github.com/dlclark/regexp2 v1.11.0 h1:G/nrcoOa7ZXlpoa/91N3X7mM3r8eIlMBBJZvsz/mxKI= @@ -1260,8 +1262,6 @@ github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818 h1:VPMurXtl+ONN13d9g github.com/grafana/dskit v0.0.0-20241007163720-de20fd2fe818/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= -github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf h1:g6lIXOJlt9LMbx9fUJ+2RoO155txAtpPmM/8mdFq/B0= -github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE= github.com/grafana/gomemcache v0.0.0-20240229205252-cd6a66d6fb56 h1:X8IKQ0wu40wpvYcKfBcc5T4QnhdQjUhtUtB/1CY89lE= diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go index 1adbe69a074..81d9d8a7e3b 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/compression.go @@ -12,8 +12,6 @@ import ( "github.com/klauspost/compress/s2" "github.com/klauspost/compress/zstd" "github.com/pierrec/lz4/v4" - - "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) var byteBuffers = sync.Pool{New: func() any { return bytes.NewBuffer(make([]byte, 8<<10)) }} @@ -268,23 +266,15 @@ type zstdDecoder struct { inner *zstd.Decoder } -func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPool[byte]) ([]byte, error) { +func (d *decompressor) decompress(src []byte, codec byte) ([]byte, error) { // Early return in case there is no compression compCodec := codecType(codec) if compCodec == codecNone { return src, nil } - - out, buf, err := d.getDecodedBuffer(src, compCodec, pool) - if err != nil { - return nil, err - } - defer func() { - if compCodec == codecSnappy { - return - } - pool.Put(buf) - }() + out := byteBuffers.Get().(*bytes.Buffer) + out.Reset() + defer byteBuffers.Put(out) switch compCodec { case codecGzip: @@ -296,7 +286,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo if _, err := io.Copy(out, ungz); err != nil { return nil, err } - return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil + return append([]byte(nil), out.Bytes()...), nil case codecSnappy: if len(src) > 16 && bytes.HasPrefix(src, xerialPfx) { return xerialDecode(src) @@ -305,7 +295,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo if err != nil { return nil, err } - return d.copyDecodedBuffer(decoded, compCodec, pool), nil + return append([]byte(nil), decoded...), nil case codecLZ4: unlz4 := d.unlz4Pool.Get().(*lz4.Reader) defer d.unlz4Pool.Put(unlz4) @@ -313,7 +303,7 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo if _, err := io.Copy(out, unlz4); err != nil { return nil, err } - return d.copyDecodedBuffer(out.Bytes(), compCodec, pool), nil + return append([]byte(nil), out.Bytes()...), nil case codecZstd: unzstd := d.unzstdPool.Get().(*zstdDecoder) defer d.unzstdPool.Put(unzstd) @@ -321,43 +311,12 @@ func (d *decompressor) decompress(src []byte, codec byte, pool *pool.BucketedPoo if err != nil { return nil, err } - return d.copyDecodedBuffer(decoded, compCodec, pool), nil + return append([]byte(nil), decoded...), nil default: return nil, errors.New("unknown compression codec") } } -func (d *decompressor) getDecodedBuffer(src []byte, compCodec codecType, pool *pool.BucketedPool[byte]) (*bytes.Buffer, []byte, error) { - var ( - decodedBufSize int - err error - ) - switch compCodec { - case codecSnappy: - decodedBufSize, err = s2.DecodedLen(src) - if err != nil { - return nil, nil, err - } - - default: - // Make a guess at the output size. - decodedBufSize = len(src) * 2 - } - buf := pool.Get(decodedBufSize)[:0] - - return bytes.NewBuffer(buf), buf, nil -} - -func (d *decompressor) copyDecodedBuffer(decoded []byte, compCodec codecType, pool *pool.BucketedPool[byte]) []byte { - if compCodec == codecSnappy { - // We already know the actual size of the decoded buffer before decompression, - // so there's no need to copy the buffer. - return decoded - } - out := pool.Get(len(decoded)) - return append(out[:0], decoded...) -} - var xerialPfx = []byte{130, 83, 78, 65, 80, 80, 89, 0} var errMalformedXerial = errors.New("malformed xerial framing") diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go index dabaf03f618..92ebeaa3905 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go @@ -16,8 +16,6 @@ import ( "github.com/twmb/franz-go/pkg/kmsg" "github.com/twmb/franz-go/pkg/kversion" "github.com/twmb/franz-go/pkg/sasl" - - "github.com/twmb/franz-go/pkg/kgo/internal/pool" ) // Opt is an option to configure a client. @@ -153,9 +151,6 @@ type cfg struct { partitions map[string]map[int32]Offset // partitions to directly consume from regex bool - recordsPool *recordsPool - decompressBufferPool *pool.BucketedPool[byte] - //////////////////////////// // CONSUMER GROUP SECTION // //////////////////////////// @@ -394,11 +389,6 @@ func (cfg *cfg) validate() error { } cfg.hooks = processedHooks - // Assume a 2x compression ratio. - maxDecompressedBatchSize := int(cfg.maxBytes.load()) * 2 - cfg.decompressBufferPool = pool.NewBucketedPool[byte](4096, maxDecompressedBatchSize, 2, func(sz int) []byte { - return make([]byte, sz) - }) return nil } @@ -1357,18 +1347,6 @@ func ConsumeRegex() ConsumerOpt { return consumerOpt{func(cfg *cfg) { cfg.regex = true }} } -// EnableRecordsPool sets the client to obtain the *kgo.Record objects from a pool, -// in order to minimize the number of allocations. -// -// By enabling this option, the records returned by PollFetches/PollRecords -// can be sent back to the pool via ReuseRecords method in order to be recycled. -// -// This option is particularly useful for use cases where the volume of generated records is very high, -// as it can negatively impact performance due to the extra GC overhead. -func EnableRecordsPool() ConsumerOpt { - return consumerOpt{func(cfg *cfg) { cfg.recordsPool = newRecordsPool() }} -} - // DisableFetchSessions sets the client to not use fetch sessions (Kafka 1.0+). // // A "fetch session" is is a way to reduce bandwidth for fetch requests & diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go deleted file mode 100644 index b5c435a55e2..00000000000 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/pool/bucketed_pool.go +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2017 The Prometheus Authors -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package pool - -import ( - "sync" -) - -// BucketedPool is a bucketed pool for variably sized slices. -type BucketedPool[T any] struct { - buckets []sync.Pool - sizes []int - // make is the function used to create an empty slice when none exist yet. - make func(int) []T -} - -// NewBucketedPool returns a new BucketedPool with size buckets for minSize to maxSize -// increasing by the given factor. -func NewBucketedPool[T any](minSize, maxSize int, factor float64, makeFunc func(int) []T) *BucketedPool[T] { - if minSize < 1 { - panic("invalid minimum pool size") - } - if maxSize < 1 { - panic("invalid maximum pool size") - } - if factor < 1 { - panic("invalid factor") - } - - var sizes []int - - for s := minSize; s <= maxSize; s = int(float64(s) * factor) { - sizes = append(sizes, s) - } - - p := &BucketedPool[T]{ - buckets: make([]sync.Pool, len(sizes)), - sizes: sizes, - make: makeFunc, - } - return p -} - -// Get returns a new slice with capacity greater than or equal to size. -func (p *BucketedPool[T]) Get(size int) []T { - for i, bktSize := range p.sizes { - if size > bktSize { - continue - } - buff := p.buckets[i].Get() - if buff == nil { - buff = p.make(bktSize) - } - return buff.([]T) - } - return p.make(size) -} - -// Put adds a slice to the right bucket in the pool. -// If the slice does not belong to any bucket in the pool, it is ignored. -func (p *BucketedPool[T]) Put(s []T) { - sCap := cap(s) - if sCap < p.sizes[0] { - return - } - - for i, size := range p.sizes { - if sCap > size { - continue - } - - if sCap == size { - // Buffer is exactly the minimum size for this bucket. Add it to this bucket. - p.buckets[i].Put(s) - } else { - // Buffer belongs in previous bucket. - p.buckets[i-1].Put(s) - } - return - } -} - - diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go index dbfb94510a6..4f1ebe6f524 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/record_and_fetch.go @@ -6,8 +6,6 @@ import ( "reflect" "time" "unsafe" - - "github.com/twmb/franz-go/pkg/kmsg" ) // RecordHeader contains extra information that can be sent with Records. @@ -151,38 +149,6 @@ type Record struct { // producer hooks. It can also be set in a consumer hook to propagate // enrichment to consumer clients. Context context.Context - - // recordsPool is the pool that this record was fetched from, if any. - // - // When reused, record is returned to this pool. - recordsPool *recordsPool - - // rcBatchBuffer is used to keep track of the raw buffer that this record was - // derived from when consuming, after decompression. - // - // This is used to allow reusing these buffers when record pooling has been enabled - // via EnableRecordsPool option. - rcBatchBuffer *rcBuffer[byte] - - // rcRawRecordsBuffer is used to keep track of the raw record buffer that this record was - // derived from when consuming. - // - // This is used to allow reusing these buffers when record pooling has been enabled - // via EnableRecordsPool option. - rcRawRecordsBuffer *rcBuffer[kmsg.Record] -} - -// Reuse releases the record back to the pool. -// -// -// Once this method has been called, any reference to the passed record should be considered invalid by the caller, -// as it may be reused as a result of future calls to the PollFetches/PollRecords method. -func (r *Record) Reuse() { - if r.recordsPool != nil { - r.rcRawRecordsBuffer.release() - r.rcBatchBuffer.release() - r.recordsPool.put(r) - } } func (r *Record) userSize() int64 { diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go index 69fafba7feb..85586a0a412 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go @@ -9,61 +9,13 @@ import ( "sort" "strings" "sync" - "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kbin" "github.com/twmb/franz-go/pkg/kerr" - "github.com/twmb/franz-go/pkg/kgo/internal/pool" "github.com/twmb/franz-go/pkg/kmsg" ) -type recordsPool struct{ p *sync.Pool } - -func newRecordsPool() *recordsPool { - return &recordsPool{ - p: &sync.Pool{New: func() any { return &Record{} }}, - } -} - -func (p *recordsPool) get() *Record { - return p.p.Get().(*Record) -} - -func (p *recordsPool) put(r *Record) { - *r = Record{} // zero out the record - p.p.Put(r) -} - -// rcBuffer is a reference counted buffer. -// -// The internal buffer will be sent back to the pool after calling release -// when the ref count reaches 0. -type rcBuffer[T any] struct { - refCount atomic.Int32 - buffer []T - pool *pool.BucketedPool[T] -} - -func newRCBuffer[T any](buffer []T, pool *pool.BucketedPool[T]) *rcBuffer[T] { - return &rcBuffer[T]{buffer: buffer, pool: pool} -} - -func (b *rcBuffer[T]) acquire() { - b.refCount.Add(1) -} - -func (b *rcBuffer[T]) release() { - if b.refCount.Add(-1) == 0 { - b.pool.Put(b.buffer) - b.buffer = nil - return - } - if b.refCount.Load() < 0 { - panic("rcBuffer released too many times") - } -} - type readerFrom interface { ReadFrom([]byte) error } @@ -158,12 +110,6 @@ type ProcessFetchPartitionOptions struct { // Topic is used to populate the Partition field of each Record. Partition int32 - - // DecompressBufferPool is a pool of buffers to use for decompressing batches. - DecompressBufferPool *pool.BucketedPool[byte] - - // recordsPool is for internal use only. - recordPool *recordsPool } // cursor is where we are consuming from for an individual partition. @@ -1142,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks, s.cl.cfg.recordsPool, s.cl.cfg.decompressBufferPool) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1319,18 +1265,16 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks, recordsPool *recordsPool, decompressBufferPool *pool.BucketedPool[byte]) (fp FetchPartition) { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { if rp.ErrorCode == 0 { o.hwm = rp.HighWatermark } opts := ProcessFetchPartitionOptions{ - KeepControlRecords: br.cl.cfg.keepControl, - Offset: o.offset, - IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, - Topic: o.from.topic, - Partition: o.from.partition, - DecompressBufferPool: decompressBufferPool, - recordPool: recordsPool, + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, } observeMetrics := func(m FetchBatchMetrics) { hooks.each(func(h Hook) { @@ -1539,17 +1483,11 @@ func (a aborter) trackAbortedPID(producerID int64) { // processing records to fetch part // ////////////////////////////////////// -var rawRecordsPool = pool.NewBucketedPool[kmsg.Record](32, 16*1024, 2, func(len int) []kmsg.Record { - return make([]kmsg.Record, len) -}) - // readRawRecords reads n records from in and returns them, returning early if // there were partial records. func readRawRecords(n int, in []byte) []kmsg.Record { - rs := rawRecordsPool.Get(n) - rs = rs[:n] + rs := make([]kmsg.Record, n) for i := 0; i < n; i++ { - rs[i] = kmsg.Record{} length, used := kbin.Varint(in) total := used + int(length) if used == 0 || length < 0 || len(in) < total { @@ -1585,7 +1523,7 @@ func processRecordBatch( rawRecords := batch.Records if compression := byte(batch.Attributes & 0x0007); compression != 0 { var err error - if rawRecords, err = decompressor.decompress(rawRecords, compression, o.DecompressBufferPool); err != nil { + if rawRecords, err = decompressor.decompress(rawRecords, compression); err != nil { return 0, 0 // truncated batch } } @@ -1612,15 +1550,6 @@ func processRecordBatch( } }() - var ( - rcBatchBuff *rcBuffer[byte] - rcRawRecordsBuff *rcBuffer[kmsg.Record] - ) - if o.recordPool != nil { - rcBatchBuff = newRCBuffer(rawRecords, o.DecompressBufferPool) - rcRawRecordsBuff = newRCBuffer(krecords, rawRecordsPool) - } - abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( @@ -1628,10 +1557,9 @@ func processRecordBatch( fp.Partition, batch, &krecords[i], - o.recordPool, ) + o.maybeKeepRecord(fp, record, abortBatch) - o.maybeKeepRecord(fp, record, rcBatchBuff, rcRawRecordsBuff, abortBatch) if abortBatch && record.Attrs.IsControl() { // A control record has a key and a value where the key // is int16 version and int16 type. Aborted records @@ -1656,7 +1584,7 @@ func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, return 1, 0 } - rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) + rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { return 0, 0 // truncated batch } @@ -1751,7 +1679,7 @@ func processV1Message( return false } record := v1MessageToRecord(o.Topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, nil, nil, false) + o.maybeKeepRecord(fp, record, false) return true } @@ -1769,7 +1697,7 @@ func processV0OuterMessage( return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } - rawInner, err := decompressor.decompress(message.Value, compression, o.DecompressBufferPool) + rawInner, err := decompressor.decompress(message.Value, compression) if err != nil { return 0, 0 // truncated batch } @@ -1829,7 +1757,7 @@ func processV0Message( return false } record := v0MessageToRecord(o.Topic, fp.Partition, message) - o.maybeKeepRecord(fp, record, nil, nil, false) + o.maybeKeepRecord(fp, record, false) return true } @@ -1837,7 +1765,7 @@ func processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, rcBatchBuff *rcBuffer[byte], rcRawRecordsBuff *rcBuffer[kmsg.Record], abort bool) { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. @@ -1849,13 +1777,6 @@ func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, recor abort = !o.KeepControlRecords } if !abort { - if rcBatchBuff != nil && rcRawRecordsBuff != nil { - rcBatchBuff.acquire() - record.rcBatchBuffer = rcBatchBuff - - rcRawRecordsBuff.acquire() - record.rcRawRecordsBuffer = rcRawRecordsBuff - } fp.Records = append(fp.Records, record) } @@ -1878,7 +1799,6 @@ func recordToRecord( partition int32, batch *kmsg.RecordBatch, record *kmsg.Record, - recordsPool *recordsPool, ) *Record { h := make([]RecordHeader, 0, len(record.Headers)) for _, kv := range record.Headers { @@ -1887,25 +1807,19 @@ func recordToRecord( Value: kv.Value, }) } - var r *Record - if recordsPool != nil { - r = recordsPool.get() - } else { - r = new(Record) - } - - r.Key = record.Key - r.Value = record.Value - r.Headers = h - r.Topic = topic - r.Partition = partition - r.Attrs = RecordAttrs{uint8(batch.Attributes)} - r.ProducerID = batch.ProducerID - r.ProducerEpoch = batch.ProducerEpoch - r.LeaderEpoch = batch.PartitionLeaderEpoch - r.Offset = batch.FirstOffset + int64(record.OffsetDelta) - r.recordsPool = recordsPool + r := &Record{ + Key: record.Key, + Value: record.Value, + Headers: h, + Topic: topic, + Partition: partition, + Attrs: RecordAttrs{uint8(batch.Attributes)}, + ProducerID: batch.ProducerID, + ProducerEpoch: batch.ProducerEpoch, + LeaderEpoch: batch.PartitionLeaderEpoch, + Offset: batch.FirstOffset + int64(record.OffsetDelta), + } if r.Attrs.TimestampType() == 0 { r.Timestamp = timeFromMillis(batch.FirstTimestamp + record.TimestampDelta64) } else { diff --git a/vendor/modules.txt b/vendor/modules.txt index 2d8c2153e54..149d86e798c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1145,12 +1145,11 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.6.1 ## explicit; go 1.13 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.17.1 => github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf +# github.com/twmb/franz-go v1.17.1 => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 ## explicit; go 1.21 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr github.com/twmb/franz-go/pkg/kgo -github.com/twmb/franz-go/pkg/kgo/internal/pool github.com/twmb/franz-go/pkg/kgo/internal/sticky github.com/twmb/franz-go/pkg/kversion github.com/twmb/franz-go/pkg/sasl @@ -1675,5 +1674,5 @@ sigs.k8s.io/yaml/goyaml.v3 # github.com/opentracing-contrib/go-stdlib => github.com/grafana/opentracing-contrib-go-stdlib v0.0.0-20230509071955-f410e79da956 # github.com/opentracing-contrib/go-grpc => github.com/charleskorn/go-grpc v0.0.0-20231024023642-e9298576254f # github.com/prometheus/alertmanager => github.com/grafana/prometheus-alertmanager v0.25.1-0.20240924175849-b8b7c2c74eb6 -# github.com/twmb/franz-go => github.com/grafana/franz-go v0.0.0-20241003081803-835b5cb1ddcf +# github.com/twmb/franz-go => github.com/dimitarvdimitrov/franz-go v0.0.0-20240904145554-ceadc28d3bd9 # google.golang.org/grpc => google.golang.org/grpc v1.65.0