Skip to content

Commit

Permalink
ring: add GetWithOptions method to adjust per call behavior
Browse files Browse the repository at this point in the history
This change adds a new method that accepts 0 or more `Option` instances
that modify the behavior of the call. These options can (currently) be
used to adjust the replication factor for a particular key or use buffers
to avoid excessive allocation.

The most notable changes are in the `Ring.findInstancesForKey` method
which is the core of the `Ring.Get` method. Instead of keeping track
of distinct zones and assuming that only a single instance per zone
would ever be returned, we keep a map of the number of instances
found in each zone.

Part of grafana/mimir#9944
  • Loading branch information
56quarters committed Jan 3, 2025
1 parent 9935aca commit 4fdf760
Show file tree
Hide file tree
Showing 5 changed files with 381 additions and 75 deletions.
19 changes: 18 additions & 1 deletion ring/replication_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,15 @@ import (
)

type ReplicationStrategy interface {
// Filter out unhealthy instances and checks if there're enough instances
// Filter out unhealthy instances and checks if there are enough instances
// for an operation to succeed. Returns an error if there are not enough
// instances.
Filter(instances []InstanceDesc, op Operation, replicationFactor int, heartbeatTimeout time.Duration, zoneAwarenessEnabled bool) (healthy []InstanceDesc, maxFailures int, err error)

// SupportsExpandedReplication returns true for replication strategies that
// support increasing the replication factor beyond a single instance per zone,
// false otherwise.
SupportsExpandedReplication() bool
}

type defaultReplicationStrategy struct{}
Expand Down Expand Up @@ -70,6 +75,14 @@ func (s *defaultReplicationStrategy) Filter(instances []InstanceDesc, op Operati
return instances, len(instances) - minSuccess, nil
}

func (s *defaultReplicationStrategy) SupportsExpandedReplication() bool {
// defaultReplicationStrategy assumes that a single instance per zone is returned and that
// it can treat replication factor as equivalent to the number of zones. This doesn't work
// when a per-call replication factor increases it beyond the configured replication factor
// and the number of zones.
return false
}

type ignoreUnhealthyInstancesReplicationStrategy struct{}

func NewIgnoreUnhealthyInstancesReplicationStrategy() ReplicationStrategy {
Expand Down Expand Up @@ -101,6 +114,10 @@ func (r *ignoreUnhealthyInstancesReplicationStrategy) Filter(instances []Instanc
return instances, len(instances) - 1, nil
}

func (r *ignoreUnhealthyInstancesReplicationStrategy) SupportsExpandedReplication() bool {
return true
}

func (r *Ring) IsHealthy(instance *InstanceDesc, op Operation, now time.Time) bool {
return instance.IsHealthy(op, r.cfg.HeartbeatTimeout, now)
}
Expand Down
147 changes: 101 additions & 46 deletions ring/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@ const (
GetBufferSize = 5
)

// Options are the result of Option instances that can be used to modify Ring.GetWithOptions behavior.
type Options struct {
ReplicationFactor int
BufDescs []InstanceDesc
BufHosts []string
BufZones []string
}

// Option can be used to modify Ring behavior when calling Ring.GetWithOptions
type Option func(opts *Options)

// WithBuffers creates an Option that will cause the given buffers to be used, avoiding allocations.
func WithBuffers(bufDescs []InstanceDesc, bufHosts, bufZones []string) Option {
return func(opts *Options) {
opts.BufDescs = bufDescs
opts.BufHosts = bufHosts
opts.BufZones = bufZones
}
}

// WithReplicationFactor creates an Option that overrides the default replication factor for a single call.
func WithReplicationFactor(replication int) Option {
return func(opts *Options) {
opts.ReplicationFactor = replication
}
}

func collectOptions(opts ...Option) Options {
final := Options{}
for _, opt := range opts {
opt(&final)
}
return final
}

// ReadRing represents the read interface to the ring.
// Support for read-only instances requires use of ShuffleShard or ShuffleShardWithLookback prior to getting a ReplicationSet.
type ReadRing interface {
Expand All @@ -42,13 +77,17 @@ type ReadRing interface {
// to avoid memory allocation; can be nil, or created with ring.MakeBuffersForGet().
Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error)

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more Option instances to change the behavior of the method call.
GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error)

// GetAllHealthy returns all healthy instances in the ring, for the given operation.
// This function doesn't check if the quorum is honored, so doesn't fail if the number
// of unhealthy instances is greater than the tolerated max unavailable.
GetAllHealthy(op Operation) (ReplicationSet, error)

// GetReplicationSetForOperation returns all instances where the input operation should be executed.
// The resulting ReplicationSet doesn't necessarily contains all healthy instances
// The resulting ReplicationSet doesn't necessarily contain all healthy instances
// in the ring, but could contain the minimum set of instances required to execute
// the input operation.
GetReplicationSetForOperation(op Operation) (ReplicationSet, error)
Expand Down Expand Up @@ -422,18 +461,43 @@ func (r *Ring) setRingStateFromDesc(ringDesc *Desc, updateMetrics, updateRegiste

// Get returns n (or more) instances which form the replicas for the given key.
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
// Note that we purposefully aren't calling GetWithOptions here since the closures it
// uses result in heap allocations which we specifically avoid in this method since it's
// called in hot loops.
return r.getReplicationSetForKey(key, op, bufDescs, bufHosts, bufZones, r.cfg.ReplicationFactor)
}

// GetWithOptions returns n (or more) instances which form the replicas for the given key
// with 0 or more options to change the behavior of the method call.
func (r *Ring) GetWithOptions(key uint32, op Operation, opts ...Option) (ReplicationSet, error) {
options := collectOptions(opts...)
return r.getReplicationSetForKey(key, op, options.BufDescs, options.BufHosts, options.BufZones, options.ReplicationFactor)
}

func (r *Ring) getReplicationSetForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string, replicationFactor int) (ReplicationSet, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()
if r.ringDesc == nil || len(r.ringTokens) == 0 {
return ReplicationSet{}, ErrEmptyRing
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil)
if replicationFactor <= 0 || replicationFactor < r.cfg.ReplicationFactor {
replicationFactor = r.cfg.ReplicationFactor
}

// Not all replication strategies support increasing the replication factor beyond
// the number of zones available. Return an error unless a ReplicationStrategy has
// explicitly opted into supporting this.
if replicationFactor > r.cfg.ReplicationFactor && !r.strategy.SupportsExpandedReplication() {
return ReplicationSet{}, fmt.Errorf("per-call replication factor %d cannot exceed the configured replication factor %d with this replication strategy", replicationFactor, r.cfg.ReplicationFactor)
}

instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, replicationFactor, nil)
if err != nil {
return ReplicationSet{}, err
}

healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, replicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled)
if err != nil {
return ReplicationSet{}, err
}
Expand All @@ -447,21 +511,31 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts,
// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy.
// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early.
// This function needs to be called with read lock on the ring.
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {
func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, replicationFactor int, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) {

Check failure on line 514 in ring/ring.go

View workflow job for this annotation

GitHub Actions / Check

unused-parameter: parameter 'bufZones' seems to be unused, consider removing or renaming it as _ (revive)
var (
n = r.cfg.ReplicationFactor
n = replicationFactor
instances = bufDescs[:0]
start = searchToken(r.ringTokens, key)
iterations = 0
maxZones = len(r.ringTokensByZone)
maxInstances = len(r.ringDesc.Ingesters)

// We use a slice instead of a map because it's faster to search within a
// slice than lookup a map for a very low number of items.
// slice than lookup a map for a very low number of items, we only expect
// to have low single-digit number of hosts.
distinctHosts = bufHosts[:0]
distinctZones = bufZones[:0]

// TODO: Do we need to pass this in to avoid allocations?
hostsPerZone = make(map[string]int)
targetHostsPerZone = max(1, replicationFactor/maxZones)
)
for i := start; len(distinctHosts) < min(maxInstances, n) && len(distinctZones) < maxZones && iterations < len(r.ringTokens); i++ {

for i := start; len(distinctHosts) < min(maxInstances, n) && iterations < len(r.ringTokens); i++ {
// If we have the target number of instances in all zones, stop looking.
if r.cfg.ZoneAwarenessEnabled && haveTargetHostsInAllZones(hostsPerZone, targetHostsPerZone, maxZones) {
break
}

iterations++
// Wrap i around in the ring.
i %= len(r.ringTokens)
Expand All @@ -478,9 +552,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
continue
}

// Ignore if the instances don't have a zone set.
// If we already have the required number of instances for this zone, skip.
if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
if slices.Contains(distinctZones, info.Zone) {
if hostsPerZone[info.Zone] >= targetHostsPerZone {
continue
}
}
Expand All @@ -493,9 +567,9 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
if op.ShouldExtendReplicaSetOnState(instance.State) {
n++
} else if r.cfg.ZoneAwarenessEnabled && info.Zone != "" {
// We should only add the zone if we are not going to extend,
// as we want to extend the instance in the same AZ.
distinctZones = append(distinctZones, info.Zone)
// We should only increment the count for this zone if we are not going to
// extend, as we want to extend the instance in the same AZ.
hostsPerZone[info.Zone] += 1

Check failure on line 572 in ring/ring.go

View workflow job for this annotation

GitHub Actions / Check

increment-decrement: should replace hostsPerZone[info.Zone] += 1 with hostsPerZone[info.Zone]++ (revive)
}

include, keepGoing := true, true
Expand All @@ -512,6 +586,20 @@ func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []Instance
return instances, nil
}

func haveTargetHostsInAllZones(hostsByZone map[string]int, targetHostsPerZone int, maxZones int) bool {
if len(hostsByZone) != maxZones {
return false
}

for _, count := range hostsByZone {
if count < targetHostsPerZone {
return false
}
}

return true
}

// GetAllHealthy implements ReadRing.
func (r *Ring) GetAllHealthy(op Operation) (ReplicationSet, error) {
r.mtx.RLock()
Expand Down Expand Up @@ -1332,36 +1420,3 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool {

// All states are healthy, no states extend replica set.
var allStatesRingOperation = Operation(0x0000ffff)

// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance.
func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) {
r.mtx.RLock()
defer r.mtx.RUnlock()

if r.ringDesc == nil || len(r.ringTokens) == 0 {
return 0, ErrEmptyRing
}

// Instance is not in this ring, it can't own any key.
if _, ok := r.ringDesc.Ingesters[instanceID]; !ok {
return 0, nil
}

owned := 0
for _, tok := range keys {
i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) {
if foundInstanceID == instanceID {
// If we've found our instance, we can stop.
return true, false
}
return false, true
})
if err != nil {
return 0, err
}
if len(i) > 0 {
owned++
}
}
return owned, nil
}
Loading

0 comments on commit 4fdf760

Please sign in to comment.