Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
[release-0.25] control maximum allowed vreplica count (#860)
Browse files Browse the repository at this point in the history
* control maximum allowed vreplica count

* fix go lint

* fix lint error

Co-authored-by: Steven DOng <[email protected]>
  • Loading branch information
knative-prow-robot and steven0711dong authored Sep 8, 2021
1 parent ba92355 commit 9c718d5
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 22 deletions.
7 changes: 6 additions & 1 deletion config/source/multi/deployments/controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ spec:

# The number of virtual replicas each adapter pod can handle.
- name: POD_CAPACITY
value: '100'
value: '20'

- name: MAX_MPS_PER_PARTITION
value: '15'

- name: VREPLICA_LIMITS_MPS
value: '250'
# The scheduling policy type for placing vreplicas on pods (see type SchedulerPolicyType for enum list)
- name: SCHEDULER_POLICY_TYPE
value: 'MAXFILLUP'
Expand Down
3 changes: 2 additions & 1 deletion pkg/apis/duck/v1alpha1/placement_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
// Placeable is a list of podName and virtual replicas pairs.
// Each pair represents the assignment of virtual replicas to a pod
type Placeable struct {
Placement []Placement `json:"placements,omitempty"`
MaxAllowedVReplicas *int32 `json:"maxAllowedVReplicas,omitempty"`
Placement []Placement `json:"placements,omitempty"`
}

// PlaceableType is a skeleton type wrapping Placeable in the manner we expect
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions pkg/apis/sources/v1beta1/kafka_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ func (k *KafkaSource) GetVReplicas() int32 {
if k.Spec.Consumers == nil {
return 1
}
if k.Status.MaxAllowedVReplicas != nil {
if *k.Spec.Consumers > *k.Status.MaxAllowedVReplicas {
return *k.Status.MaxAllowedVReplicas
}
}
return *k.Spec.Consumers
}

Expand Down
19 changes: 10 additions & 9 deletions pkg/source/client/offsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,25 +32,26 @@ import (
// is closed before at least one message is consumed from ALL partitions.
// Without InitOffsets, an event sent to a partition with an uninitialized offset
// will not be forwarded when the session is closed (or a rebalancing is in progress).
func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) error {
func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string, consumerGroup string) (int32, error) {
totalPartitions := 0
offsetManager, err := sarama.NewOffsetManagerFromClient(consumerGroup, kafkaClient)
if err != nil {
return err
return -1, err
}

kafkaAdminClient, err := sarama.NewClusterAdminFromClient(kafkaClient)
if err != nil {
return fmt.Errorf("failed to create a Kafka admin client: %w", err)
return -1, fmt.Errorf("failed to create a Kafka admin client: %w", err)
}
defer kafkaAdminClient.Close()

// Retrieve all partitions
topicPartitions := make(map[string][]int32)
for _, topic := range topics {
partitions, err := kafkaClient.Partitions(topic)

totalPartitions += len(partitions)
if err != nil {
return fmt.Errorf("failed to get partitions for topic %s: %w", topic, err)
return -1, fmt.Errorf("failed to get partitions for topic %s: %w", topic, err)
}

topicPartitions[topic] = partitions
Expand All @@ -59,13 +60,13 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string
// Fetch topic offsets
topicOffsets, err := knsarama.GetOffsets(kafkaClient, topicPartitions, sarama.OffsetNewest)
if err != nil {
return fmt.Errorf("failed to get the topic offsets: %w", err)
return -1, fmt.Errorf("failed to get the topic offsets: %w", err)
}

// Look for uninitialized offset (-1)
offsets, err := kafkaAdminClient.ListConsumerGroupOffsets(consumerGroup, topicPartitions)
if err != nil {
return err
return -1, err
}

dirty := false
Expand All @@ -88,7 +89,7 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string

pm, err := offsetManager.ManagePartition(topic, partitionID)
if err != nil {
return fmt.Errorf("failed to create the partition manager for topic %s and partition %d: %w", topic, partitionID, err)
return -1, fmt.Errorf("failed to create the partition manager for topic %s and partition %d: %w", topic, partitionID, err)
}

pm.MarkOffset(offset, "")
Expand All @@ -103,6 +104,6 @@ func InitOffsets(ctx context.Context, kafkaClient sarama.Client, topics []string
}

// At this stage the KafkaSource instance is considered Ready
return nil
return int32(totalPartitions), nil

}
9 changes: 7 additions & 2 deletions pkg/source/client/offsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"

logtesting "knative.dev/pkg/logging/testing"
)
Expand Down Expand Up @@ -138,8 +139,12 @@ func TestInitOffsets(t *testing.T) {
}
defer sc.Close()
ctx := logtesting.TestContextWithLogger(t)
err = InitOffsets(ctx, sc, tc.topics, group)

partitionCt, err := InitOffsets(ctx, sc, tc.topics, group)
total := 0
for _, partitions := range tc.topicOffsets {
total += len(partitions)
}
assert.Equal(t, int(partitionCt), total)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
Expand Down
18 changes: 11 additions & 7 deletions pkg/source/reconciler/mtsource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ import (
)

type envConfig struct {
SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"`
PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"`
SchedulerPolicy stsscheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"`
SchedulerRefreshPeriod int64 `envconfig:"AUTOSCALER_REFRESH_PERIOD" required:"true"`
PodCapacity int32 `envconfig:"POD_CAPACITY" required:"true"`
VReplicaMPS int32 `envconfig:"VREPLICA_LIMITS_MPS" required:"false" default:"-1"`
MaxEventPerSecondPerPartition int32 `envconfig:"MAX_MPS_PER_PARTITION" required:"false" default:"-1"`
SchedulerPolicy stsscheduler.SchedulerPolicyType `envconfig:"SCHEDULER_POLICY_TYPE" required:"true"`
}

func NewController(
Expand All @@ -67,10 +69,12 @@ func NewController(
nodeInformer := nodeinformer.Get(ctx)

c := &Reconciler{
KubeClientSet: kubeclient.Get(ctx),
kafkaClientSet: kafkaclient.Get(ctx),
kafkaLister: kafkaInformer.Lister(),
configs: source.WatchConfigurations(ctx, component, cmw),
KubeClientSet: kubeclient.Get(ctx),
kafkaClientSet: kafkaclient.Get(ctx),
kafkaLister: kafkaInformer.Lister(),
configs: source.WatchConfigurations(ctx, component, cmw),
VReplicaMPS: env.VReplicaMPS,
MaxEventPerSecondPerPartition: env.MaxEventPerSecondPerPartition,
}

impl := kafkasource.NewImpl(ctx, c)
Expand Down
9 changes: 8 additions & 1 deletion pkg/source/reconciler/mtsource/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Reconciler struct {
sinkResolver *resolver.URIResolver
configs source.ConfigAccessor
scheduler scheduler.Scheduler

VReplicaMPS int32
MaxEventPerSecondPerPartition int32
}

// Check that our Reconciler implements Interface
Expand Down Expand Up @@ -120,13 +123,17 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource
defer c.Close()
src.Status.MarkConnectionEstablished()

err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup)
totalPartitions, err := client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup)
if err != nil {
logging.FromContext(ctx).Errorw("unable to initialize consumergroup offsets", zap.Error(err))
src.Status.MarkInitialOffsetNotCommitted("OffsetsNotCommitted", "Unable to initialize consumergroup offsets: %v", err)
return err
}
src.Status.MarkInitialOffsetCommitted()
if r.MaxEventPerSecondPerPartition != -1 && r.VReplicaMPS != -1 {
maxVReplicas := totalPartitions*r.MaxEventPerSecondPerPartition/r.VReplicaMPS + 1
src.Status.MaxAllowedVReplicas = &maxVReplicas
}

// Finally, schedule the source
if err := r.reconcileMTReceiveAdapter(src); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/source/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource
defer c.Close()
src.Status.MarkConnectionEstablished()

err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup)
_, err = client.InitOffsets(ctx, c, src.Spec.Topics, src.Spec.ConsumerGroup)
if err != nil {
logging.FromContext(ctx).Errorw("unable to initialize consumergroup offsets", zap.Error(err))
src.Status.MarkInitialOffsetNotCommitted("OffsetsNotCommitted", "Unable to initialize consumergroup offsets: %v", err)
Expand Down

0 comments on commit 9c718d5

Please sign in to comment.