Skip to content

Commit

Permalink
Limit the number of events we put into each Pulsar Message (#3708)
Browse files Browse the repository at this point in the history
* Limit the number of events we put into each Pulsar Message

Currently we can publish very large events per message (100k+ events per message)

This can make the time to process messages quite unpredictable, as they can be anywhere between 1 event and 100000+ events

Now we restrict how many messages we put into each message (via `maxAllowedEventsPerMessage`), which should make how many changes a given message may contain somewhat more predictable

Signed-off-by: JamesMurkin <[email protected]>

* Add unit test

Signed-off-by: JamesMurkin <[email protected]>

* Fix unit test

Signed-off-by: JamesMurkin <[email protected]>

* Fix unit test

Signed-off-by: JamesMurkin <[email protected]>

* Fix passing field through ExecutorAPI

Signed-off-by: JamesMurkin <[email protected]>

---------

Signed-off-by: JamesMurkin <[email protected]>
  • Loading branch information
JamesMurkin authored Jun 20, 2024
1 parent 477a57c commit 69ab1f5
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 7 deletions.
1 change: 1 addition & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pulsar:
compressionLevel: faster
eventsPrinter: false
eventsPrinterSubscription: "EventsPrinter"
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 # 4MB
receiverQueueSize: 100
postgres:
Expand Down
1 change: 1 addition & 0 deletions config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pulsar:
maxConnectionsPerBroker: 1
compressionType: zlib
compressionLevel: faster
maxAllowedEventsPerMessage: 1000
maxAllowedMessageSize: 4194304 #4Mi
armadaApi:
armadaUrl: "server:50051"
Expand Down
2 changes: 2 additions & 0 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ type PulsarConfig struct {
CompressionLevel pulsar.CompressionLevel
// Settings for deduplication, which relies on a postgres server.
DedupTable string
// Maximum allowed Events per message
MaxAllowedEventsPerMessage int `validate:"gte=0"`
// Maximum allowed message size in bytes
MaxAllowedMessageSize uint
// Timeout when polling pulsar for messages
Expand Down
2 changes: 1 addition & 1 deletion internal/armada/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func Serve(ctx *armadacontext.Context, config *configuration.ArmadaConfig, healt
CompressionLevel: config.Pulsar.CompressionLevel,
BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize,
Topic: config.Pulsar.JobsetEventsTopic,
}, config.Pulsar.MaxAllowedMessageSize)
}, config.Pulsar.MaxAllowedEventsPerMessage, config.Pulsar.MaxAllowedMessageSize)
if err != nil {
return errors.Wrapf(err, "error creating pulsar producer")
}
Expand Down
24 changes: 24 additions & 0 deletions internal/common/eventutil/eventutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadacontext"
"github.com/armadaproject/armada/internal/common/armadaerrors"
"github.com/armadaproject/armada/internal/common/slices"
"github.com/armadaproject/armada/pkg/api"
"github.com/armadaproject/armada/pkg/armadaevents"
)
Expand Down Expand Up @@ -253,6 +254,29 @@ func groupsEqual(g1, g2 []string) bool {
return true
}

func LimitSequencesEventMessageCount(sequences []*armadaevents.EventSequence, maxEventsPerSequence int) []*armadaevents.EventSequence {
rv := make([]*armadaevents.EventSequence, 0, len(sequences))
for _, sequence := range sequences {
if len(sequence.Events) > maxEventsPerSequence {
splitEventMessages := slices.PartitionToMaxLen(sequence.Events, maxEventsPerSequence)

for _, eventMessages := range splitEventMessages {
rv = append(rv, &armadaevents.EventSequence{
Queue: sequence.Queue,
JobSetName: sequence.JobSetName,
UserId: sequence.UserId,
Groups: sequence.Groups,
Events: eventMessages,
})
}

} else {
rv = append(rv, sequence)
}
}
return rv
}

// LimitSequencesByteSize calls LimitSequenceByteSize for each of the provided sequences
// and returns all resulting sequences.
func LimitSequencesByteSize(sequences []*armadaevents.EventSequence, sizeInBytes uint, strict bool) ([]*armadaevents.EventSequence, error) {
Expand Down
62 changes: 62 additions & 0 deletions internal/common/eventutil/eventutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,68 @@ func TestSequenceEventListSizeBytes(t *testing.T) {
assert.True(t, sequenceSizeBytes < sequenceEventListOverheadSizeBytes)
}

func TestLimitSequencesEventMessageCount(t *testing.T) {
input := []*armadaevents.EventSequence{
{
Queue: "queue1",
UserId: "userId1",
JobSetName: "jobSetName1",
Groups: []string{"group1", "group2"},
Events: []*armadaevents.EventSequence_Event{
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "a"}}},
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "b"}}},
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "c"}}},
},
},
{
Queue: "queue2",
UserId: "userId1",
JobSetName: "jobSetName1",
Groups: []string{"group1", "group2"},
Events: []*armadaevents.EventSequence_Event{
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "d"}}},
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "e"}}},
},
},
}

expected := []*armadaevents.EventSequence{
{
Queue: "queue1",
UserId: "userId1",
JobSetName: "jobSetName1",
Groups: []string{"group1", "group2"},
Events: []*armadaevents.EventSequence_Event{
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "a"}}},
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "b"}}},
},
},
{
Queue: "queue1",
UserId: "userId1",
JobSetName: "jobSetName1",
Groups: []string{"group1", "group2"},
Events: []*armadaevents.EventSequence_Event{
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "c"}}},
},
},
{
Queue: "queue2",
UserId: "userId1",
JobSetName: "jobSetName1",
Groups: []string{"group1", "group2"},
Events: []*armadaevents.EventSequence_Event{
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "d"}}},
{Event: &armadaevents.EventSequence_Event_SubmitJob{SubmitJob: &armadaevents.SubmitJob{JobIdStr: "e"}}},
},
},
}

result := LimitSequencesEventMessageCount(input, 2)
assert.Len(t, result, 3)
assert.Equal(t, expected, result)
}

func TestLimitSequenceByteSize(t *testing.T) {
sequence := &armadaevents.EventSequence{
Queue: "queue1",
Expand Down
6 changes: 4 additions & 2 deletions internal/common/pulsarutils/eventsequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ import (

// CompactAndPublishSequences reduces the number of sequences to the smallest possible,
// while respecting per-job set ordering and max Pulsar message size, and then publishes to Pulsar.
func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxMessageSizeInBytes uint) error {
func CompactAndPublishSequences(ctx *armadacontext.Context, sequences []*armadaevents.EventSequence, producer pulsar.Producer, maxEventsPerMessage int, maxMessageSizeInBytes uint) error {
// Reduce the number of sequences to send to the minimum possible,
// and then break up any sequences larger than maxMessageSizeInBytes.
sequences = eventutil.CompactEventSequences(sequences)
// Limit each sequence to have no more than maxEventsPerSequence events per sequence
sequences = eventutil.LimitSequencesEventMessageCount(sequences, maxEventsPerMessage)
// Limit each sequence to be no larger than maxMessageSizeInBytes bytes
sequences, err := eventutil.LimitSequencesByteSize(sequences, maxMessageSizeInBytes, true)
if err != nil {
return err
Expand Down
5 changes: 5 additions & 0 deletions internal/common/pulsarutils/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type Publisher interface {
type PulsarPublisher struct {
// Used to send messages to pulsar
producer pulsar.Producer
// Maximum number of Events in each EventSequence
maxEventsPerMessage int
// Maximum size (in bytes) of produced pulsar messages.
// This must be below 4MB which is the pulsar message size limit
maxAllowedMessageSize uint
Expand All @@ -26,6 +28,7 @@ type PulsarPublisher struct {
func NewPulsarPublisher(
pulsarClient pulsar.Client,
producerOptions pulsar.ProducerOptions,
maxEventsPerMessage int,
maxAllowedMessageSize uint,
) (*PulsarPublisher, error) {
producer, err := pulsarClient.CreateProducer(producerOptions)
Expand All @@ -34,6 +37,7 @@ func NewPulsarPublisher(
}
return &PulsarPublisher{
producer: producer,
maxEventsPerMessage: maxEventsPerMessage,
maxAllowedMessageSize: maxAllowedMessageSize,
}, nil
}
Expand All @@ -45,6 +49,7 @@ func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, es *armada
ctx,
[]*armadaevents.EventSequence{es},
p.producer,
p.maxEventsPerMessage,
p.maxAllowedMessageSize)
}

Expand Down
6 changes: 5 additions & 1 deletion internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type ExecutorApi struct {
allowedPriorities []int32
// Known priority classes
priorityClasses map[string]priorityTypes.PriorityClass
// Max number of events in published Pulsar messages
maxEventsPerPulsarMessage int
// Max size of Pulsar messages produced.
maxPulsarMessageSizeBytes uint
// See scheduling schedulingConfig.
Expand All @@ -56,6 +58,7 @@ func NewExecutorApi(producer pulsar.Producer,
nodeIdLabel string,
priorityClassNameOverride *string,
priorityClasses map[string]priorityTypes.PriorityClass,
maxEventsPerPulsarMessage int,
maxPulsarMessageSizeBytes uint,
) (*ExecutorApi, error) {
if len(allowedPriorities) == 0 {
Expand All @@ -66,6 +69,7 @@ func NewExecutorApi(producer pulsar.Producer,
jobRepository: jobRepository,
executorRepository: executorRepository,
allowedPriorities: allowedPriorities,
maxEventsPerPulsarMessage: maxEventsPerPulsarMessage,
maxPulsarMessageSizeBytes: maxPulsarMessageSizeBytes,
nodeIdLabel: nodeIdLabel,
priorityClassNameOverride: priorityClassNameOverride,
Expand Down Expand Up @@ -310,7 +314,7 @@ func addAnnotations(job *armadaevents.SubmitJob, annotations map[string]string)
// ReportEvents publishes all eventSequences to Pulsar. The eventSequences are compacted for more efficient publishing.
func (srv *ExecutorApi) ReportEvents(grpcCtx context.Context, list *executorapi.EventList) (*types.Empty, error) {
ctx := armadacontext.FromGrpcCtx(grpcCtx)
err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxPulsarMessageSizeBytes)
err := pulsarutils.CompactAndPublishSequences(ctx, list.Events, srv.producer, srv.maxEventsPerPulsarMessage, srv.maxPulsarMessageSizeBytes)
return &types.Empty{}, err
}

Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func TestExecutorApi_LeaseJobRuns(t *testing.T) {
"kubernetes.io/hostname",
nil,
priorityClasses,
1000,
4*1024*1024,
)
require.NoError(t, err)
Expand Down Expand Up @@ -450,6 +451,7 @@ func TestExecutorApi_Publish(t *testing.T) {
"kubernetes.io/hostname",
nil,
priorityClasses,
1000,
4*1024*1024,
)

Expand Down
5 changes: 5 additions & 0 deletions internal/scheduler/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ type PulsarPublisher struct {
numPartitions int
// Timeout after which async messages sends will be considered failed
pulsarSendTimeout time.Duration
// Maximum number of Events in each EventSequence
maxEventsPerMessage int
// Maximum size (in bytes) of produced pulsar messages.
// This must be below 4MB which is the pulsar message size limit
maxMessageBatchSize uint
Expand All @@ -51,6 +53,7 @@ type PulsarPublisher struct {
func NewPulsarPublisher(
pulsarClient pulsar.Client,
producerOptions pulsar.ProducerOptions,
maxEventsPerMessage int,
pulsarSendTimeout time.Duration,
) (*PulsarPublisher, error) {
partitions, err := pulsarClient.TopicPartitions(producerOptions.Topic)
Expand All @@ -69,6 +72,7 @@ func NewPulsarPublisher(
return &PulsarPublisher{
producer: producer,
pulsarSendTimeout: pulsarSendTimeout,
maxEventsPerMessage: maxEventsPerMessage,
maxMessageBatchSize: maxMessageBatchSize,
numPartitions: len(partitions),
}, nil
Expand All @@ -78,6 +82,7 @@ func NewPulsarPublisher(
// single event sequences up to maxMessageBatchSize.
func (p *PulsarPublisher) PublishMessages(ctx *armadacontext.Context, events []*armadaevents.EventSequence, shouldPublish func() bool) error {
sequences := eventutil.CompactEventSequences(events)
sequences = eventutil.LimitSequencesEventMessageCount(sequences, p.maxEventsPerMessage)
sequences, err := eventutil.LimitSequencesByteSize(sequences, p.maxMessageBatchSize, true)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions internal/scheduler/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestPulsarPublisher_TestPublish(t *testing.T) {
}).AnyTimes()

options := pulsar.ProducerOptions{Topic: topic}
publisher, err := NewPulsarPublisher(mockPulsarClient, options, 5*time.Second)
publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second)
require.NoError(t, err)
err = publisher.PublishMessages(ctx, tc.eventSequences, func() bool { return tc.amLeader })

Expand Down Expand Up @@ -191,7 +191,7 @@ func TestPulsarPublisher_TestPublishMarkers(t *testing.T) {

options := pulsar.ProducerOptions{Topic: topic}
ctx := armadacontext.TODO()
publisher, err := NewPulsarPublisher(mockPulsarClient, options, 5*time.Second)
publisher, err := NewPulsarPublisher(mockPulsarClient, options, 1000, 5*time.Second)
require.NoError(t, err)

published, err := publisher.PublishMarkers(ctx, uuid.New())
Expand Down
3 changes: 2 additions & 1 deletion internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func Run(config schedulerconfig.Configuration) error {
CompressionLevel: config.Pulsar.CompressionLevel,
BatchingMaxSize: config.Pulsar.MaxAllowedMessageSize,
Topic: config.Pulsar.JobsetEventsTopic,
}, config.PulsarSendTimeout)
}, config.Pulsar.MaxAllowedEventsPerMessage, config.PulsarSendTimeout)
if err != nil {
return errors.WithMessage(err, "error creating pulsar publisher")
}
Expand Down Expand Up @@ -182,6 +182,7 @@ func Run(config schedulerconfig.Configuration) error {
config.Scheduling.NodeIdLabel,
config.Scheduling.PriorityClassNameOverride,
config.Scheduling.PriorityClasses,
config.Pulsar.MaxAllowedEventsPerMessage,
config.Pulsar.MaxAllowedMessageSize,
)
if err != nil {
Expand Down

0 comments on commit 69ab1f5

Please sign in to comment.