Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zstd compression wastes too much time on memory allocation due to lack of destination buffer #2964

Open
HenryCaiHaiying opened this issue Aug 16, 2024 · 1 comment

Comments

@HenryCaiHaiying
Copy link

HenryCaiHaiying commented Aug 16, 2024

Description

zstd compression wastes too much time on malloc due to lack of destination buffer

Versions

Sarama v1.41.0

Sarama Kafka Go
1.41.0 3.8.0 1.21.4
Configuration
Producer.Compression = CompressionZSTD

Logs
logs: malloc

goroutine 12165 [GC assist wait]:
runtime.gopark(0xc04dfab240?, 0x12d9c8b?, 0x0?, 0x0?, 0x74b8a57f60a8?)
/usr/local/go/src/runtime/proc.go:398 +0xce fp=0xc04dfab1c8 sp=0xc04dfab1a8 pc=0x13028ae
runtime.goparkunlock(...)
/usr/local/go/src/runtime/proc.go:404
runtime.gcParkAssist()
/usr/local/go/src/runtime/mgcmark.go:652 +0xe7 fp=0xc04dfab200 sp=0xc04dfab1c8 pc=0x12e47e7
runtime.gcAssistAlloc(0xc00f48a820)
/usr/local/go/src/runtime/mgcmark.go:509 +0x22a fp=0xc04dfab260 sp=0xc04dfab200 pc=0x12e424a
runtime.deductAssistCredit(0xc057881ea0?)
/usr/local/go/src/runtime/malloc.go:1291 +0x54 fp=0xc04dfab288 sp=0xc04dfab260 pc=0x12d27f4
runtime.mallocgc(0x37e, 0x3544380, 0x1)
/usr/local/go/src/runtime/malloc.go:1006 +0xc9 fp=0xc04dfab2f0 sp=0xc04dfab288 pc=0x12d2029
runtime.makeslice(0x389d880?, 0xc014798b60?, 0x74b80c61ae10?)
/usr/local/go/src/runtime/slice.go:103 +0x49 fp=0xc04dfab318 sp=0xc04dfab2f0 pc=0x13180e9
github.com/klauspost/compress/zstd.(*Encoder).EncodeAll(0xc0594a28c0, {0xc02221e380, 0x37e, 0x37e}, {0x0, 0x0, 0x0})
/go/pkg/mod/github.com/klauspost/[email protected]/zstd/encoder.go:516 +0x313 fp=0xc04dfab498 sp=0xc04dfab318 pc=0x1fd3b93
github.com/IBM/sarama.zstdCompress({0x0?}, {0x0, 0x0, 0x0}, {0xc02221e380, 0x37e, 0x37e})
/go/pkg/mod/github.com/!i!b!m/[email protected]/zstd.go:71 +0x86 fp=0xc04dfab510 sp=0xc04dfab498 pc=0x20495c6
github.com/IBM/sarama.compress(0x20?, 0xc047086b88?, {0xc02221e380, 0x37e, 0x37e})
/go/pkg/mod/github.com/!i!b!m/[email protected]/compress.go:190 +0xdae fp=0xc04dfaba38 sp=0xc04dfab510 pc=0x200bb4e
github.com/IBM/sarama.(*RecordBatch).encodeRecords(0xc009c36140, {0x3ea55e8?, 0xc008c79480?})
/go/pkg/mod/github.com/!i!b!m/[email protected]/record_batch.go:206 +0x9a fp=0xc04dfaba80 sp=0xc04dfaba38 pc=0x203d7da
github.com/IBM/sarama.(*RecordBatch).encode(0xc009c36140, {0x3ea55e8, 0xc008c79480})
/go/pkg/mod/github.com/!i!b!m/[email protected]/record_batch.go:88 +0x231 fp=0xc04dfabae0 sp=0xc04dfaba80 pc=0x203cd31
github.com/IBM/sarama.(*Records).encode(0xc008c79480?, {0x3ea55e8?, 0xc008c79480?})
/go/pkg/mod/github.com/!i!b!m/[email protected]/records.go:62 +0x98 fp=0xc04dfabb28 sp=0xc04dfabae0 pc=0x203dab8
github.com/IBM/sarama.(*ProduceRequest).encode(0xc008c79440, {0x3ea55e8, 0xc008c79480})
/go/pkg/mod/github.com/!i!b!m/[email protected]/produce_request.go:108 +0x5e7 fp=0xc04dfabcd8 sp=0xc04dfabb28 pc=0x2035367
github.com/IBM/sarama.(*request).encode(0xc0161f6450, {0x3ea55e8, 0xc008c79480})
/go/pkg/mod/github.com/!i!b!m/[email protected]/request.go:43 +0x13c fp=0xc04dfabd00 sp=0xc04dfabcd8 pc=0x203e4dc
github.com/IBM/sarama.encode({0x3e67160, 0xc0161f6450}, {0x3e91630?, 0xc007a2f3b0})
/go/pkg/mod/github.com/!i!b!m/[email protected]/encoder_decoder.go:29 +0x91 fp=0xc04dfabd70 sp=0xc04dfabd00 pc=0x201af31
github.com/IBM/sarama.(*Broker).sendInternal(0xc008100800, {0x3e8dc30, 0xc008c79440}, 0xc02e47a880)
/go/pkg/mod/github.com/!i!b!m/[email protected]/broker.go:1001 +0x16b fp=0xc04dfabe50 sp=0xc04dfabd70 pc=0x1ffc94b
github.com/IBM/sarama.(*Broker).sendWithPromise(0xc008100800, {0x3e8dc30, 0xc008c79440}, 0x0?)
/go/pkg/mod/github.com/!i!b!m/[email protected]/broker.go:991 +0x9e fp=0xc04dfabe88 sp=0xc04dfabe50 pc=0x1ffc75e
github.com/IBM/sarama.(*Broker).AsyncProduce(0xc008100800, 0xc008c79440, 0xc008c79460?)
/go/pkg/mod/github.com/!i!b!m/[email protected]/broker.go:470 +0x1cc fp=0xc04dfabf10 sp=0xc04dfabe88 pc=0x1ffa88c
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func1()
/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:828 +0x145 fp=0xc04dfabfa8 sp=0xc04dfabf10 pc=0x1ff2cc5
github.com/IBM/sarama.withRecover(0xc0105c9770?)
/go/pkg/mod/github.com/!i!b!m/[email protected]/utils.go:43 +0x33 fp=0xc04dfabfc8 sp=0xc04dfabfa8 pc=0x2048a13
github.com/IBM/sarama.(*asyncProducer).newBrokerProducer.func4()
/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:793 +0x25 fp=0xc04dfabfe0 sp=0xc04dfabfc8 pc=0x1ff2b45
runtime.goexit()
/usr/local/go/src/runtime/asm_amd64.s:1650 +0x1 fp=0xc04dfabfe8 sp=0xc04dfabfe0 pc=0x1336601
created by github.com/IBM/sarama.(*asyncProducer).newBrokerProducer in goroutine 16568
/go/pkg/mod/github.com/!i!b!m/[email protected]/async_producer.go:793 +0x2e5

Additional Context

We are trying to switch our compression algorithm from snappy to zstd, we are seeing about 25% more compressed bytes which is good but we are also seeing more CPU usage which is bad. We ended up adding 60% more pods to handle the same traffic load.

We are using sarama v1.14.0 and klauspost v1.71.4.

We generated a few stack traces and found about 75 go routines are stuck waiting for malloc, I haven't find a zstd go routine doing any real work except waiting for the malloc.

I think the malloc is from this line:

https://github.com/klauspost/compress/blob/v1.17.4/zstd/encoder.go#L516
// If less than 1MB, allocate a buffer up front.
if len(dst) == 0 && cap(dst) == 0 && len(src) < 1<<20 && !e.o.lowMem {
dst = make([]byte, 0, len(src))
}

Related issue is also fired on zstd compression library:
klauspost/compress#987

After discussion with clauspost/compress community, the recommendation is to pass a pre-allocated dst buffer instead of nil (2nd argument in the below code segment):

https://github.com/IBM/sarama/blob/main/compress.go#L190
case CompressionZSTD:
return zstdCompress(ZstdEncoderParams{level}, nil, data)

@HenryCaiHaiying
Copy link
Author

If sarama community thinks this is the right way to fix this problem to make zstd compression performant, I can submit a PR to add the following config option for zstd compression path:

Producer.Compression.ZStandard.DestinationBufferSize = 4096

In zstd.go, each ZstdEncoder can have pre-allocated slice dstBuffer of DesintationBufferSize if this param is set. When function zstdCompress() is invoked and when dst argument is nil and len(src) is less than DestinationBufferSize, we will use this pre-allocated dstBuffer slice to call the underlying zstd library.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant