@@ -15,6 +15,7 @@ import (
15
15
"github.com/twmb/franz-go/pkg/kadm"
16
16
"github.com/twmb/franz-go/pkg/kerr"
17
17
"github.com/twmb/franz-go/pkg/kgo"
18
+ "golang.org/x/sync/singleflight"
18
19
19
20
"github.com/grafana/loki/v3/pkg/kafka"
20
21
"github.com/grafana/loki/v3/pkg/kafka/client"
@@ -156,6 +157,7 @@ type ShardedPartitionResolver struct {
156
157
admin * kadm.Client
157
158
topicPrefix string
158
159
160
+ sflight singleflight.Group // for topic creation
159
161
// tenantShards maps tenant IDs to their active shards
160
162
// map[tenant]map[shard]struct{}
161
163
tenantShards map [string ]map [int32 ]struct {}
@@ -204,15 +206,15 @@ func (r *ShardedPartitionResolver) createShard(ctx context.Context, tenant strin
204
206
topic := r .topicName (tenant , shard )
205
207
replicationFactor := 2 // TODO: expose RF
206
208
207
- // TODO(owen-d): ensure this only runs once
208
- // at a time.
209
- _ , err := r . admin . CreateTopic (
210
- ctx ,
211
- 1 ,
212
- int16 ( replicationFactor ) ,
213
- nil ,
214
- topic ,
215
- )
209
+ _ , err , _ := r . sflight . Do ( topic , func () ( interface {}, error ) {
210
+ return r . admin . CreateTopic (
211
+ ctx ,
212
+ 1 ,
213
+ int16 ( replicationFactor ) ,
214
+ nil ,
215
+ topic ,
216
+ )
217
+ } )
216
218
217
219
// Topic creation errors are returned in the response
218
220
if err != nil && ! errors .Is (err , kerr .TopicAlreadyExists ) {
@@ -320,6 +322,10 @@ func (t *TenantTopicWriter) partitionsForRateLimit(bytesRateLimit float64) uint3
320
322
321
323
// Duplicate implements the Tee interface
322
324
func (t * TenantTopicWriter ) Duplicate (tenant string , streams []KeyedStream ) {
325
+ go t .write (tenant , streams )
326
+ }
327
+
328
+ func (t * TenantTopicWriter ) write (tenant string , streams []KeyedStream ) {
323
329
if len (streams ) == 0 {
324
330
return
325
331
}
0 commit comments