Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Nov 13, 2024
1 parent a121c28 commit 61f603b
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 25 deletions.
31 changes: 21 additions & 10 deletions internal/configtypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,8 +620,9 @@ type KafkaConsumerConfig struct {
// Set to -1 to use non-buffered channel.
PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size" envconfig:"partition_buffer_size" default:"16" yaml:"partition_buffer_size" toml:"partition_buffer_size"`

// PublicationDataMode is a configuration for the mode where message payload already contains data ready to publish into channels, instead of API command.
PublicationDataMode KafkaPublicationModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"`
// PublicationDataMode is a configuration for the mode where message payload already
// contains data ready to publish into channels, instead of API command.
PublicationDataMode KafkaPublicationDataModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"`
}

func (c KafkaConsumerConfig) Validate() error {
Expand All @@ -634,19 +635,29 @@ func (c KafkaConsumerConfig) Validate() error {
if c.ConsumerGroup == "" {
return errors.New("no Kafka consumer group provided")
}
if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeaderName == "" {
if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeader == "" {
return errors.New("no Kafka channels_header_name provided for publication data mode")
}
return nil
}

type KafkaPublicationModeConfig struct {
// KafkaPublicationDataModeConfig is a configuration for Kafka publication data mode.
// In this mode we expect Kafka message payload to contain data ready to publish into
// channels, instead of API command. All other fields used to build channel Publication
// can be passed in Kafka message headers – thus it's possible to integrate existing
// topics with Centrifugo.
type KafkaPublicationDataModeConfig struct {
// Enabled enables Kafka publication data mode for the Kafka consumer.
Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"`
// ChannelsHeaderName is a header name to extract publication channels (channels must be comma-separated).
ChannelsHeaderName string `mapstructure:"channels_header_name" json:"channels_header_name" envconfig:"channels_header_name" yaml:"channels_header_name" toml:"channels_header_name"`
// IdempotencyKeyHeaderName is a header name to extract idempotency key from Kafka message.
IdempotencyKeyHeaderName string `mapstructure:"idempotency_key_header_name" json:"idempotency_key_header_name" envconfig:"idempotency_key_header_name" yaml:"idempotency_key_header_name" toml:"idempotency_key_header_name"`
// DeltaHeaderName is a header name to extract delta flag from Kafka message.
DeltaHeaderName string `mapstructure:"delta_header_name" json:"delta_header_name" envconfig:"delta_header_name" yaml:"delta_header_name" toml:"delta_header_name"`
// ChannelsHeader is a header name to extract channels to publish data into
// (channels must be comma-separated). Ex. of value: "channel1,channel2".
ChannelsHeader string `mapstructure:"channels_header" json:"channels_header" envconfig:"channels_header" yaml:"channels_header" toml:"channels_header"`
// IdempotencyKeyHeader is a header name to extract Publication idempotency key from
// Kafka message. See https://centrifugal.dev/docs/server/server_api#publishrequest.
IdempotencyKeyHeader string `mapstructure:"idempotency_key_header" json:"idempotency_key_header" envconfig:"idempotency_key_header" yaml:"idempotency_key_header" toml:"idempotency_key_header"`
// DeltaHeader is a header name to extract Publication delta flag from Kafka message
// which tells Centrifugo whether to use delta compression for message or not.
// See https://centrifugal.dev/docs/server/delta_compression and
// https://centrifugal.dev/docs/server/server_api#publishrequest.
DeltaHeader string `mapstructure:"delta_header" json:"delta_header" envconfig:"delta_header" yaml:"delta_header" toml:"delta_header"`
}
8 changes: 4 additions & 4 deletions internal/consuming/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,18 +412,18 @@ func getHeaderValue(record *kgo.Record, headerKey string) string {

func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, record *kgo.Record) error {
var delta bool
if pc.config.PublicationDataMode.DeltaHeaderName != "" {
if pc.config.PublicationDataMode.DeltaHeader != "" {
var err error
delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeaderName))
delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeader))
if err != nil {
pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error parsing delta header value, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition}))
return nil
}
}
req := &apiproto.BroadcastRequest{
Data: record.Value,
Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeaderName), ","),
IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeaderName),
Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeader), ","),
IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeader),
Delta: delta,
}
return pc.dispatcher.Broadcast(ctx, req)
Expand Down
21 changes: 10 additions & 11 deletions internal/consuming/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ import (
"testing"
"time"

"github.com/centrifugal/centrifugo/v5/internal/configtypes"

"github.com/centrifugal/centrifugo/v5/internal/apiproto"
"github.com/centrifugal/centrifugo/v5/internal/configtypes"

"github.com/centrifugal/centrifuge"
"github.com/google/uuid"
Expand Down Expand Up @@ -460,11 +459,11 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) {
Brokers: []string{testKafkaBrokerURL}, // Adjust as needed
Topics: []string{testKafkaTopic},
ConsumerGroup: uuid.New().String(),
PublicationDataMode: configtypes.KafkaPublicationModeConfig{
Enabled: true,
ChannelsHeaderName: "centrifugo-channels",
IdempotencyKeyHeaderName: "centrifugo-idempotency-key",
DeltaHeaderName: "centrifugo-delta",
PublicationDataMode: configtypes.KafkaPublicationDataModeConfig{
Enabled: true,
ChannelsHeader: "centrifugo-channels",
IdempotencyKeyHeader: "centrifugo-idempotency-key",
DeltaHeader: "centrifugo-delta",
},
}

Expand All @@ -474,7 +473,7 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) {
consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{
onBroadcast: func(ctx context.Context, req *apiproto.BroadcastRequest) error {
require.Equal(t, testChannels, req.Channels)
require.Equal(t, testPayload, req.Data)
require.Equal(t, apiproto.Raw(testPayload), req.Data)
require.Equal(t, testIdempotencyKey, req.IdempotencyKey)
require.Equal(t, testDelta, req.Delta)
close(eventReceived)
Expand All @@ -491,15 +490,15 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) {

err = produceTestMessage(testKafkaTopic, testPayload, []kgo.RecordHeader{
{
Key: config.PublicationDataMode.ChannelsHeaderName,
Key: config.PublicationDataMode.ChannelsHeader,
Value: []byte(strings.Join(testChannels, ",")),
},
{
Key: config.PublicationDataMode.IdempotencyKeyHeaderName,
Key: config.PublicationDataMode.IdempotencyKeyHeader,
Value: []byte(testIdempotencyKey),
},
{
Key: config.PublicationDataMode.DeltaHeaderName,
Key: config.PublicationDataMode.DeltaHeader,
Value: []byte(fmt.Sprintf("%v", testDelta)),
},
})
Expand Down

0 comments on commit 61f603b

Please sign in to comment.