From 9c718d590dc5fe419c83d70244d8a78ecc60e364 Mon Sep 17 00:00:00 2001 From: Knative Prow Robot Date: Wed, 8 Sep 2021 11:30:45 -0700 Subject: [PATCH] [release-0.25] control maximum allowed vreplica count (#860) * control maximum allowed vreplica count * fix go lint * fix lint error Co-authored-by: Steven DOng --- .../source/multi/deployments/controller.yaml | 7 ++++++- pkg/apis/duck/v1alpha1/placement_types.go | 3 ++- .../duck/v1alpha1/zz_generated.deepcopy.go | 5 +++++ pkg/apis/sources/v1beta1/kafka_scheduling.go | 5 +++++ pkg/source/client/offsets.go | 19 ++++++++++--------- pkg/source/client/offsets_test.go | 9 +++++++-- pkg/source/reconciler/mtsource/controller.go | 18 +++++++++++------- pkg/source/reconciler/mtsource/kafkasource.go | 9 ++++++++- pkg/source/reconciler/source/kafkasource.go | 2 +- 9 files changed, 55 insertions(+), 22 deletions(-) diff --git a/config/source/multi/deployments/controller.yaml b/config/source/multi/deployments/controller.yaml index e4bf9a5db6..5540616fb8 100644 --- a/config/source/multi/deployments/controller.yaml +++ b/config/source/multi/deployments/controller.yaml @@ -51,8 +51,13 @@ spec: # The number of virtual replicas each adapter pod can handle. - name: POD_CAPACITY - value: '100' + value: '20' + + - name: MAX_MPS_PER_PARTITION + value: '15' + - name: VREPLICA_LIMITS_MPS + value: '250' # The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list) - name: SCHEDULER_POLICY_TYPE value: 'MAXFILLUP' diff --git a/pkg/apis/duck/v1alpha1/placement_types.go b/pkg/apis/duck/v1alpha1/placement_types.go index b108211e2c..8f8fe8fd3c 100644 --- a/pkg/apis/duck/v1alpha1/placement_types.go +++ b/pkg/apis/duck/v1alpha1/placement_types.go @@ -27,7 +27,8 @@ import ( // Placeable is a list of podName and virtual replicas pairs. // Each pair represents the assignment of virtual replicas to a pod type Placeable struct { - Placement []Placement `json:"placements,omitempty"` + MaxAllowedVReplicas *int32 `json:"maxAllowedVReplicas,omitempty"` + Placement []Placement `json:"placements,omitempty"` } // PlaceableType is a skeleton type wrapping Placeable in the manner we expect diff --git a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go index a5b89327d4..4a972bfc05 100644 --- a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go @@ -27,6 +27,11 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Placeable) DeepCopyInto(out *Placeable) { *out = *in + if in.MaxAllowedVReplicas != nil { + in, out := &in.MaxAllowedVReplicas, &out.MaxAllowedVReplicas + *out = new(int32) + **out = **in + } if in.Placement != nil { in, out := &in.Placement, &out.Placement *out = make([]Placement, len(*in)) diff --git a/pkg/apis/sources/v1beta1/kafka_scheduling.go b/pkg/apis/sources/v1beta1/kafka_scheduling.go index 293ecda6fd..34696ff231 100644 --- a/pkg/apis/sources/v1beta1/kafka_scheduling.go +++ b/pkg/apis/sources/v1beta1/kafka_scheduling.go @@ -32,6 +32,11 @@ func (k *KafkaSource) GetVReplicas() int32 { if k.Spec.Consumers == nil { return 1 } + if k.Status.MaxAllowedVReplicas != nil { + if *k.Spec.Consumers > *k.Status.MaxAllowedVReplicas { + return *k.Status.MaxAllowedVReplicas + } + } return *k.Spec.Consumers } diff --git a/pkg/source/client/offsets.go b/pkg/source/client/offsets.go index dac1574a07..93fc0972e7 100644 --- a/pkg/source/client/offsets.go +++ b/pkg/source/client/offsets.go @@ -32,15 +32,16 @@ import ( // is closed before at least one message is consumed from ALL partitions. // Without InitOffsets, an event sent to a partition with an uninitialized offset // will not be forwarded when the session is closed (or a rebalancing is in progress). -func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) error { +func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) (int32, error) { + totalPartitions := 0 offsetManager, err := sarama.NewOffsetManagerFromClient(consumerGroup, kafkaClient) if err != nil { - return err + return -1, err } kafkaAdminClient, err := sarama.NewClusterAdminFromClient(kafkaClient) if err != nil { - return fmt.Errorf("failed to create a Kafka admin client: %w", err) + return -1, fmt.Errorf("failed to create a Kafka admin client: %w", err) } defer kafkaAdminClient.Close() @@ -48,9 +49,9 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string topicPartitions := make(map[string][]int32) for _, topic := range topics { partitions, err := kafkaClient.Partitions(topic) - + totalPartitions += len(partitions) if err != nil { - return fmt.Errorf("failed to get partitions for topic %s: %w", topic, err) + return -1, fmt.Errorf("failed to get partitions for topic %s: %w", topic, err) } topicPartitions[topic] = partitions @@ -59,13 +60,13 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string // Fetch topic offsets topicOffsets, err := knsarama.GetOffsets(kafkaClient, topicPartitions, sarama.OffsetNewest) if err != nil { - return fmt.Errorf("failed to get the topic offsets: %w", err) + return -1, fmt.Errorf("failed to get the topic offsets: %w", err) } // Look for uninitialized offset (-1) offsets, err := kafkaAdminClient.ListConsumerGroupOffsets(consumerGroup, topicPartitions) if err != nil { - return err + return -1, err } dirty := false @@ -88,7 +89,7 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string pm, err := offsetManager.ManagePartition(topic, partitionID) if err != nil { - return fmt.Errorf("failed to create the partition manager for topic %s and partition %d: %w", topic, partitionID, err) + return -1, fmt.Errorf("failed to create the partition manager for topic %s and partition %d: %w", topic, partitionID, err) } pm.MarkOffset(offset, "") @@ -103,6 +104,6 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string } // At this stage the KafkaSource instance is considered Ready - return nil + return int32(totalPartitions), nil } diff --git a/pkg/source/client/offsets_test.go b/pkg/source/client/offsets_test.go index 269d8f9dcc..77d5f4ec06 100644 --- a/pkg/source/client/offsets_test.go +++ b/pkg/source/client/offsets_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/Shopify/sarama" + "github.com/stretchr/testify/assert" logtesting "knative.dev/pkg/logging/testing" ) @@ -138,8 +139,12 @@ func TestInitOffsets(t *testing.T) { } defer sc.Close() ctx := logtesting.TestContextWithLogger(t) - err = InitOffsets(ctx, sc, tc.topics, group) - + partitionCt, err := InitOffsets(ctx, sc, tc.topics, group) + total := 0 + for _, partitions := range tc.topicOffsets { + total += len(partitions) + } + assert.Equal(t, int(partitionCt), total) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/pkg/source/reconciler/mtsource/controller.go b/pkg/source/reconciler/mtsource/controller.go index 68f1705654..aebb817db5 100644 --- a/pkg/source/reconciler/mtsource/controller.go +++ b/pkg/source/reconciler/mtsource/controller.go @@ -47,9 +47,11 @@ import ( ) type envConfig struct { - SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` - PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` - SchedulerPolicy stsscheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"` + SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"` + PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"` + VReplicaMPS int32 `envconfig:"VREPLICA_LIMITS_MPS" required:"false" default:"-1"` + MaxEventPerSecondPerPartition int32 `envconfig:"MAX_MPS_PER_PARTITION" required:"false" default:"-1"` + SchedulerPolicy stsscheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"` } func NewController( @@ -67,10 +69,12 @@ func NewController( nodeInformer := nodeinformer.Get(ctx) c := &Reconciler{ - KubeClientSet: kubeclient.Get(ctx), - kafkaClientSet: kafkaclient.Get(ctx), - kafkaLister: kafkaInformer.Lister(), - configs: source.WatchConfigurations(ctx, component, cmw), + KubeClientSet: kubeclient.Get(ctx), + kafkaClientSet: kafkaclient.Get(ctx), + kafkaLister: kafkaInformer.Lister(), + configs: source.WatchConfigurations(ctx, component, cmw), + VReplicaMPS: env.VReplicaMPS, + MaxEventPerSecondPerPartition: env.MaxEventPerSecondPerPartition, } impl := kafkasource.NewImpl(ctx, c) diff --git a/pkg/source/reconciler/mtsource/kafkasource.go b/pkg/source/reconciler/mtsource/kafkasource.go index f80ff386c7..6b8b8072e7 100644 --- a/pkg/source/reconciler/mtsource/kafkasource.go +++ b/pkg/source/reconciler/mtsource/kafkasource.go @@ -53,6 +53,9 @@ type Reconciler struct { sinkResolver *resolver.URIResolver configs source.ConfigAccessor scheduler scheduler.Scheduler + + VReplicaMPS int32 + MaxEventPerSecondPerPartition int32 } // Check that our Reconciler implements Interface @@ -120,13 +123,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource defer c.Close() src.Status.MarkConnectionEstablished() - err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup) + totalPartitions, err := client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup) if err != nil { logging.FromContext(ctx).Errorw("unable to initialize consumergroup offsets", zap.Error(err)) src.Status.MarkInitialOffsetNotCommitted("OffsetsNotCommitted", "Unable to initialize consumergroup offsets: %v", err) return err } src.Status.MarkInitialOffsetCommitted() + if r.MaxEventPerSecondPerPartition != -1 && r.VReplicaMPS != -1 { + maxVReplicas := totalPartitions*r.MaxEventPerSecondPerPartition/r.VReplicaMPS + 1 + src.Status.MaxAllowedVReplicas = &maxVReplicas + } // Finally, schedule the source if err := r.reconcileMTReceiveAdapter(src); err != nil { diff --git a/pkg/source/reconciler/source/kafkasource.go b/pkg/source/reconciler/source/kafkasource.go index b31f2eb67f..b7f0b25dcd 100644 --- a/pkg/source/reconciler/source/kafkasource.go +++ b/pkg/source/reconciler/source/kafkasource.go @@ -186,7 +186,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource defer c.Close() src.Status.MarkConnectionEstablished() - err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup) + _, err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup) if err != nil { logging.FromContext(ctx).Errorw("unable to initialize consumergroup offsets", zap.Error(err)) src.Status.MarkInitialOffsetNotCommitted("OffsetsNotCommitted", "Unable to initialize consumergroup offsets: %v", err)