From 69ab1f50a5f2700a6d8ab60d56ea8ec54b57e495 Mon Sep 17 00:00:00 2001 From: JamesMurkin Date: Thu, 20 Jun 2024 22:28:27 +0100 Subject: [PATCH] Limit the number of events we put into each Pulsar Message (#3708) * Limit the number of events we put into each Pulsar Message Currently we can publish very large events per message (100k+ events per message) This can make the time to process messages quite unpredictable, as they can be anywhere between 1 event and 100000+ events Now we restrict how many messages we put into each message (via `maxAllowedEventsPerMessage`), which should make how many changes a given message may contain somewhat more predictable Signed-off-by: JamesMurkin * Add unit test Signed-off-by: JamesMurkin * Fix unit test Signed-off-by: JamesMurkin * Fix unit test Signed-off-by: JamesMurkin * Fix passing field through ExecutorAPI Signed-off-by: JamesMurkin --------- Signed-off-by: JamesMurkin --- config/armada/config.yaml | 1 + config/scheduler/config.yaml | 1 + internal/armada/configuration/types.go | 2 + internal/armada/server.go | 2 +- internal/common/eventutil/eventutil.go | 24 ++++++++ internal/common/eventutil/eventutil_test.go | 62 ++++++++++++++++++++ internal/common/pulsarutils/eventsequence.go | 6 +- internal/common/pulsarutils/publisher.go | 5 ++ internal/scheduler/api.go | 6 +- internal/scheduler/api_test.go | 2 + internal/scheduler/publisher.go | 5 ++ internal/scheduler/publisher_test.go | 4 +- internal/scheduler/schedulerapp.go | 3 +- 13 files changed, 116 insertions(+), 7 deletions(-) diff --git a/config/armada/config.yaml b/config/armada/config.yaml index cb7e1b4df2a..950e7589308 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -75,6 +75,7 @@ pulsar: compressionLevel: faster eventsPrinter: false eventsPrinterSubscription: "EventsPrinter" + maxAllowedEventsPerMessage: 1000 maxAllowedMessageSize: 4194304 # 4MB receiverQueueSize: 100 postgres: diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 07c432e1218..5b7400edbb9 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -37,6 +37,7 @@ pulsar: maxConnectionsPerBroker: 1 compressionType: zlib compressionLevel: faster + maxAllowedEventsPerMessage: 1000 maxAllowedMessageSize: 4194304 #4Mi armadaApi: armadaUrl: "server:50051" diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 575ea21f77a..b1a3c5c9d5f 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -66,6 +66,8 @@ type PulsarConfig struct { CompressionLevel pulsar.CompressionLevel // Settings for deduplication, which relies on a postgres server. DedupTable string + // Maximum allowed Events per message + MaxAllowedEventsPerMessage int `validate:"gte=0"` // Maximum allowed message size in bytes MaxAllowedMessageSize uint // Timeout when polling pulsar for messages diff --git a/internal/armada/server.go b/internal/armada/server.go index 86fbabfae21..f24d1be5750 100644 --- a/internal/armada/server.go +++ b/internal/armada/server.go @@ -128,7 +128,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt CompressionLevel: config.Pulsar.CompressionLevel, BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize, Topic: config.Pulsar.JobsetEventsTopic, - }, config.Pulsar.MaxAllowedMessageSize) + }, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize) if err != nil { return errors.Wrapf(err, "error creating pulsar producer") } diff --git a/internal/common/eventutil/eventutil.go b/internal/common/eventutil/eventutil.go index c83786f868f..9ca907556b3 100644 --- a/internal/common/eventutil/eventutil.go +++ b/internal/common/eventutil/eventutil.go @@ -14,6 +14,7 @@ import ( "github.com/armadaproject/armada/internal/common/armadacontext" "github.com/armadaproject/armada/internal/common/armadaerrors" + "github.com/armadaproject/armada/internal/common/slices" "github.com/armadaproject/armada/pkg/api" "github.com/armadaproject/armada/pkg/armadaevents" ) @@ -253,6 +254,29 @@ func groupsEqual(g1, g2 []string) bool { return true } +func LimitSequencesEventMessageCount(sequences []*armadaevents.EventSequence, maxEventsPerSequence int) []*armadaevents.EventSequence { + rv := make([]*armadaevents.EventSequence, 0, len(sequences)) + for _, sequence := range sequences { + if len(sequence.Events) > maxEventsPerSequence { + splitEventMessages := slices.PartitionToMaxLen(sequence.Events, maxEventsPerSequence) + + for _, eventMessages := range splitEventMessages { + rv = append(rv, &armadaevents.EventSequence{ + Queue: sequence.Queue, + JobSetName: sequence.JobSetName, + UserId: sequence.UserId, + Groups: sequence.Groups, + Events: eventMessages, + }) + } + + } else { + rv = append(rv, sequence) + } + } + return rv +} + // LimitSequencesByteSize calls LimitSequenceByteSize for each of the provided sequences // and returns all resulting sequences. func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) { diff --git a/internal/common/eventutil/eventutil_test.go b/internal/common/eventutil/eventutil_test.go index e1015b00359..3d68c4c8f86 100644 --- a/internal/common/eventutil/eventutil_test.go +++ b/internal/common/eventutil/eventutil_test.go @@ -286,6 +286,68 @@ func TestSequenceEventListSizeBytes(t *testing.T) { assert.True(t, sequenceSizeBytes < sequenceEventListOverheadSizeBytes) } +func TestLimitSequencesEventMessageCount(t *testing.T) { + input := []*armadaevents.EventSequence{ + { + Queue: "queue1", + UserId: "userId1", + JobSetName: "jobSetName1", + Groups: []string{"group1", "group2"}, + Events: []*armadaevents.EventSequence_Event{ + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "a"}}}, + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "b"}}}, + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "c"}}}, + }, + }, + { + Queue: "queue2", + UserId: "userId1", + JobSetName: "jobSetName1", + Groups: []string{"group1", "group2"}, + Events: []*armadaevents.EventSequence_Event{ + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "d"}}}, + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "e"}}}, + }, + }, + } + + expected := []*armadaevents.EventSequence{ + { + Queue: "queue1", + UserId: "userId1", + JobSetName: "jobSetName1", + Groups: []string{"group1", "group2"}, + Events: []*armadaevents.EventSequence_Event{ + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "a"}}}, + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "b"}}}, + }, + }, + { + Queue: "queue1", + UserId: "userId1", + JobSetName: "jobSetName1", + Groups: []string{"group1", "group2"}, + Events: []*armadaevents.EventSequence_Event{ + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "c"}}}, + }, + }, + { + Queue: "queue2", + UserId: "userId1", + JobSetName: "jobSetName1", + Groups: []string{"group1", "group2"}, + Events: []*armadaevents.EventSequence_Event{ + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "d"}}}, + {Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "e"}}}, + }, + }, + } + + result := LimitSequencesEventMessageCount(input, 2) + assert.Len(t, result, 3) + assert.Equal(t, expected, result) +} + func TestLimitSequenceByteSize(t *testing.T) { sequence := &armadaevents.EventSequence{ Queue: "queue1", diff --git a/internal/common/pulsarutils/eventsequence.go b/internal/common/pulsarutils/eventsequence.go index 981aa824e09..270f6779837 100644 --- a/internal/common/pulsarutils/eventsequence.go +++ b/internal/common/pulsarutils/eventsequence.go @@ -16,10 +16,12 @@ import ( // CompactAndPublishSequences reduces the number of sequences to the smallest possible, // while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar. -func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes uint) error { +func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxEventsPerMessage int, maxMessageSizeInBytes uint) error { // Reduce the number of sequences to send to the minimum possible, - // and then break up any sequences larger than maxMessageSizeInBytes. sequences = eventutil.CompactEventSequences(sequences) + // Limit each sequence to have no more than maxEventsPerSequence events per sequence + sequences = eventutil.LimitSequencesEventMessageCount(sequences, maxEventsPerMessage) + // Limit each sequence to be no larger than maxMessageSizeInBytes bytes sequences, err := eventutil.LimitSequencesByteSize(sequences, maxMessageSizeInBytes, true) if err != nil { return err diff --git a/internal/common/pulsarutils/publisher.go b/internal/common/pulsarutils/publisher.go index 19213cce335..f4d32e84005 100644 --- a/internal/common/pulsarutils/publisher.go +++ b/internal/common/pulsarutils/publisher.go @@ -18,6 +18,8 @@ type Publisher interface { type PulsarPublisher struct { // Used to send messages to pulsar producer pulsar.Producer + // Maximum number of Events in each EventSequence + maxEventsPerMessage int // Maximum size (in bytes) of produced pulsar messages. // This must be below 4MB which is the pulsar message size limit maxAllowedMessageSize uint @@ -26,6 +28,7 @@ type PulsarPublisher struct { func NewPulsarPublisher( pulsarClient pulsar.Client, producerOptions pulsar.ProducerOptions, + maxEventsPerMessage int, maxAllowedMessageSize uint, ) (*PulsarPublisher, error) { producer, err := pulsarClient.CreateProducer(producerOptions) @@ -34,6 +37,7 @@ func NewPulsarPublisher( } return &PulsarPublisher{ producer: producer, + maxEventsPerMessage: maxEventsPerMessage, maxAllowedMessageSize: maxAllowedMessageSize, }, nil } @@ -45,6 +49,7 @@ func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, es *armada ctx, []*armadaevents.EventSequence{es}, p.producer, + p.maxEventsPerMessage, p.maxAllowedMessageSize) } diff --git a/internal/scheduler/api.go b/internal/scheduler/api.go index 01259561597..f455150d274 100644 --- a/internal/scheduler/api.go +++ b/internal/scheduler/api.go @@ -40,6 +40,8 @@ type ExecutorApi struct { allowedPriorities []int32 // Known priority classes priorityClasses map[string]priorityTypes.PriorityClass + // Max number of events in published Pulsar messages + maxEventsPerPulsarMessage int // Max size of Pulsar messages produced. maxPulsarMessageSizeBytes uint // See scheduling schedulingConfig. @@ -56,6 +58,7 @@ func NewExecutorApi(producer pulsar.Producer, nodeIdLabel string, priorityClassNameOverride *string, priorityClasses map[string]priorityTypes.PriorityClass, + maxEventsPerPulsarMessage int, maxPulsarMessageSizeBytes uint, ) (*ExecutorApi, error) { if len(allowedPriorities) == 0 { @@ -66,6 +69,7 @@ func NewExecutorApi(producer pulsar.Producer, jobRepository: jobRepository, executorRepository: executorRepository, allowedPriorities: allowedPriorities, + maxEventsPerPulsarMessage: maxEventsPerPulsarMessage, maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes, nodeIdLabel: nodeIdLabel, priorityClassNameOverride: priorityClassNameOverride, @@ -310,7 +314,7 @@ func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string) // ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing. func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error) { ctx := armadacontext.FromGrpcCtx(grpcCtx) - err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxPulsarMessageSizeBytes) + err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxEventsPerPulsarMessage, srv.maxPulsarMessageSizeBytes) return &types.Empty{}, err } diff --git a/internal/scheduler/api_test.go b/internal/scheduler/api_test.go index 343c1896591..022aae6a2c4 100644 --- a/internal/scheduler/api_test.go +++ b/internal/scheduler/api_test.go @@ -324,6 +324,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) { "kubernetes.io/hostname", nil, priorityClasses, + 1000, 4*1024*1024, ) require.NoError(t, err) @@ -450,6 +451,7 @@ func TestExecutorApi_Publish(t *testing.T) { "kubernetes.io/hostname", nil, priorityClasses, + 1000, 4*1024*1024, ) diff --git a/internal/scheduler/publisher.go b/internal/scheduler/publisher.go index caf6716b59b..104d96f495a 100644 --- a/internal/scheduler/publisher.go +++ b/internal/scheduler/publisher.go @@ -43,6 +43,8 @@ type PulsarPublisher struct { numPartitions int // Timeout after which async messages sends will be considered failed pulsarSendTimeout time.Duration + // Maximum number of Events in each EventSequence + maxEventsPerMessage int // Maximum size (in bytes) of produced pulsar messages. // This must be below 4MB which is the pulsar message size limit maxMessageBatchSize uint @@ -51,6 +53,7 @@ type PulsarPublisher struct { func NewPulsarPublisher( pulsarClient pulsar.Client, producerOptions pulsar.ProducerOptions, + maxEventsPerMessage int, pulsarSendTimeout time.Duration, ) (*PulsarPublisher, error) { partitions, err := pulsarClient.TopicPartitions(producerOptions.Topic) @@ -69,6 +72,7 @@ func NewPulsarPublisher( return &PulsarPublisher{ producer: producer, pulsarSendTimeout: pulsarSendTimeout, + maxEventsPerMessage: maxEventsPerMessage, maxMessageBatchSize: maxMessageBatchSize, numPartitions: len(partitions), }, nil @@ -78,6 +82,7 @@ func NewPulsarPublisher( // single event sequences up to maxMessageBatchSize. func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error { sequences := eventutil.CompactEventSequences(events) + sequences = eventutil.LimitSequencesEventMessageCount(sequences, p.maxEventsPerMessage) sequences, err := eventutil.LimitSequencesByteSize(sequences, p.maxMessageBatchSize, true) if err != nil { return err diff --git a/internal/scheduler/publisher_test.go b/internal/scheduler/publisher_test.go index 6e4e693dcf5..c3fa778565c 100644 --- a/internal/scheduler/publisher_test.go +++ b/internal/scheduler/publisher_test.go @@ -120,7 +120,7 @@ func TestPulsarPublisher_TestPublish(t *testing.T) { }).AnyTimes() options := pulsar.ProducerOptions{Topic: topic} - publisher, err := NewPulsarPublisher(mockPulsarClient, options, 5*time.Second) + publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second) require.NoError(t, err) err = publisher.PublishMessages(ctx, tc.eventSequences, func() bool { return tc.amLeader }) @@ -191,7 +191,7 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) { options := pulsar.ProducerOptions{Topic: topic} ctx := armadacontext.TODO() - publisher, err := NewPulsarPublisher(mockPulsarClient, options, 5*time.Second) + publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second) require.NoError(t, err) published, err := publisher.PublishMarkers(ctx, uuid.New()) diff --git a/internal/scheduler/schedulerapp.go b/internal/scheduler/schedulerapp.go index 801aa1c596c..130a36d9af4 100644 --- a/internal/scheduler/schedulerapp.go +++ b/internal/scheduler/schedulerapp.go @@ -135,7 +135,7 @@ func Run(config schedulerconfig.Configuration) error { CompressionLevel: config.Pulsar.CompressionLevel, BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize, Topic: config.Pulsar.JobsetEventsTopic, - }, config.PulsarSendTimeout) + }, config.Pulsar.MaxAllowedEventsPerMessage, config.PulsarSendTimeout) if err != nil { return errors.WithMessage(err, "error creating pulsar publisher") } @@ -182,6 +182,7 @@ func Run(config schedulerconfig.Configuration) error { config.Scheduling.NodeIdLabel, config.Scheduling.PriorityClassNameOverride, config.Scheduling.PriorityClasses, + config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize, ) if err != nil {