Skip to content

Commit

Permalink
runtime: switch to single coordinator having shard context lifetime
Browse files Browse the repository at this point in the history
Swith to a single OpsPublisher having the shard lifetime, that updates
with the labels of each task term. Use a Mutex to guard access.

Have a single shuffle.Coordinator that has the Shard's lifetime. This
fixes the current cancellation race between the Coordinator (term
context) and a replay read (shard context).

Tested on a local stack, where I published a materialization multiple
times and confirmed it processed as expected.

Fixes estuary#998
  • Loading branch information
jgraettinger committed Apr 11, 2023
1 parent 9e5368d commit c35b49a
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 49 deletions.
59 changes: 42 additions & 17 deletions go/runtime/ops_publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/estuary/flow/go/flow"
Expand All @@ -17,41 +18,59 @@ import (
)

type OpsPublisher struct {
labels ops.ShardLabeling
logsPublisher *message.Publisher
mapper flow.Mapper
opsLogsSpec *pf.CollectionSpec
opsStatsSpec *pf.CollectionSpec
shard *ops.ShardRef
mu sync.Mutex

// Fields that update with each task term:
labels ops.ShardLabeling
opsLogsSpec *pf.CollectionSpec
opsStatsSpec *pf.CollectionSpec
shard *ops.ShardRef
}

var _ ops.Publisher = &OpsPublisher{}

func NewOpsPublisher(
labels ops.ShardLabeling,
logsPublisher *message.Publisher,
mapper flow.Mapper,
) *OpsPublisher {
return &OpsPublisher{
logsPublisher: logsPublisher,
mapper: mapper,
mu: sync.Mutex{},
}
}

func (p *OpsPublisher) UpdateLabels(
labels ops.ShardLabeling,
opsLogsSpec *pf.CollectionSpec,
opsStatsSpec *pf.CollectionSpec,
) (*OpsPublisher, error) {
) error {
// Sanity-check the shape of logs and stats collections.
if err := ops.ValidateLogsCollection(opsLogsSpec); err != nil {
return nil, err
return err
} else if err := ops.ValidateStatsCollection(opsStatsSpec); err != nil {
return nil, err
return err
}

return &OpsPublisher{
labels: labels,
logsPublisher: logsPublisher,
mapper: mapper,
opsLogsSpec: opsLogsSpec,
opsStatsSpec: opsStatsSpec,
shard: ops.NewShardRef(labels),
}, nil
p.mu.Lock()
defer p.mu.Unlock()

p.labels = labels
p.opsLogsSpec = opsLogsSpec
p.opsStatsSpec = opsStatsSpec
p.shard = ops.NewShardRef(labels)

return nil
}

func (p *OpsPublisher) Labels() ops.ShardLabeling { return p.labels }
func (p *OpsPublisher) Labels() ops.ShardLabeling {
p.mu.Lock()
defer p.mu.Unlock()

return p.labels
}

func (p *OpsPublisher) PublishStats(
out ops.Stats,
Expand All @@ -66,6 +85,9 @@ func (p *OpsPublisher) PublishStats(
panic(fmt.Errorf("marshal of *ops.Stats should always succeed but: %w", err))
}

p.mu.Lock()
defer p.mu.Unlock()

var msg = flow.Mappable{
Spec: p.opsStatsSpec,
Doc: buf.Bytes(),
Expand All @@ -86,6 +108,9 @@ func (p *OpsPublisher) PublishLog(out ops.Log) {
panic(fmt.Errorf("marshal of *ops.Log should always succeed but: %w", err))
}

p.mu.Lock()
defer p.mu.Unlock()

var msg = flow.Mappable{
Spec: p.opsLogsSpec,
Doc: json.RawMessage(buf.Bytes()),
Expand Down
30 changes: 18 additions & 12 deletions go/runtime/task_term.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type taskTerm struct {
labels ops.ShardLabeling
// Resolved *Build of the task's build ID.
build *flow.Build
// ops.Publisher of ops.Logs and (in the future) ops.Stats.
// ops.Publisher of ops.Logs and ops.Stats.
opsPublisher *OpsPublisher
}

Expand Down Expand Up @@ -74,10 +74,15 @@ func (t *taskTerm) initTerm(shard consumer.Shard, host *FlowConsumer) error {
return err
}

if t.opsPublisher, err = NewOpsPublisher(
// OpsPublisher is instantiated once, but updates with labels of each term.
if t.opsPublisher == nil {
t.opsPublisher = NewOpsPublisher(
host.LogPublisher,
flow.NewMapper(shard.Context(), host.Service.Etcd, host.Journals, shard.FQN()),
)
}
if err = t.opsPublisher.UpdateLabels(
t.labels,
host.LogPublisher,
flow.NewMapper(shard.Context(), host.Service.Etcd, host.Journals, shard.FQN()),
logsCollectionSpec,
statsCollectionSpec,
); err != nil {
Expand Down Expand Up @@ -121,13 +126,14 @@ func (r *taskReader) initReader(
r.mu.Lock()
defer r.mu.Unlock()

// Coordinator will shed reads upon |term.ctx| cancellation.
r.coordinator = shuffle.NewCoordinator(
term.ctx,
host.Builds,
term.opsPublisher,
shard.JournalClient(),
)
// Coordinator is instantiated once, and has the lifetime of the shard.
if r.coordinator == nil {
r.coordinator = shuffle.NewCoordinator(
shard.Context(),
term.opsPublisher,
shard.JournalClient(),
)
}

// Use the taskTerm's Context.Done as the |drainCh| monitored
// by the ReadBuilder. When the term's context is cancelled,
Expand All @@ -136,7 +142,7 @@ func (r *taskReader) initReader(
var err error
r.readBuilder, err = shuffle.NewReadBuilder(
term.labels.Build,
term.ctx.Done(),
term.ctx.Done(), // Drain reads upon term cancellation.
host.Journals,
term.opsPublisher,
host.Service,
Expand Down
7 changes: 2 additions & 5 deletions go/shuffle/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ import (
)

func TestAPIIntegrationWithFixtures(t *testing.T) {
var dir = t.TempDir()
var args = bindings.BuildArgs{
Context: context.Background(),
FileRoot: "./testdata",
BuildAPI_Config: pf.BuildAPI_Config{
BuildId: "a-build-id",
BuildDb: path.Join(dir, "a-build-db"),
BuildDb: path.Join(t.TempDir(), "build.db"),
Source: "file:///ab.flow.yaml",
SourceType: pf.ContentType_CATALOG,
}}
Expand All @@ -53,8 +52,6 @@ func TestAPIIntegrationWithFixtures(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()

var builds, err = flow.NewBuildService("file://" + dir + "/")
require.NoError(t, err)
var bk = brokertest.NewBroker(t, etcd, "local", "broker")
var journalSpec = brokertest.Journal(pb.JournalSpec{
Name: "a/journal",
Expand Down Expand Up @@ -100,7 +97,7 @@ func TestAPIIntegrationWithFixtures(t *testing.T) {
// Use a resolve() fixture which returns a mocked store with our |coordinator|.
var srv = server.MustLoopback()
var apiCtx, cancelAPICtx = context.WithCancel(backgroundCtx)
var coordinator = NewCoordinator(apiCtx, builds, localPublisher, bk.Client())
var coordinator = NewCoordinator(apiCtx, localPublisher, bk.Client())

pf.RegisterShufflerServer(srv.GRPCServer, &API{
resolve: func(args consumer.ResolveArgs) (consumer.Resolution, error) {
Expand Down
9 changes: 2 additions & 7 deletions go/shuffle/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,12 @@ func TestStuffedMessageChannel(t *testing.T) {
}

func TestConsumerIntegration(t *testing.T) {
var dir = t.TempDir()
var args = bindings.BuildArgs{
Context: context.Background(),
FileRoot: "./testdata",
BuildAPI_Config: pf.BuildAPI_Config{
BuildId: "a-build-id",
BuildDb: path.Join(dir, "a-build-id"),
BuildDb: path.Join(t.TempDir(), "build.id"),
Source: "file:///ab.flow.yaml",
SourceType: pf.ContentType_CATALOG,
}}
Expand All @@ -88,8 +87,6 @@ func TestConsumerIntegration(t *testing.T) {
var etcd = etcdtest.TestClient()
defer etcdtest.Cleanup()

var builds, err = flow.NewBuildService("file://" + dir + "/")
require.NoError(t, err)
// Fixtures which parameterize the test:
var (
sourcePartitions = []pb.Journal{
Expand Down Expand Up @@ -169,7 +166,6 @@ func TestConsumerIntegration(t *testing.T) {
Journals: broker.Client(),
App: &testApp{
journals: journals,
builds: builds,
shuffles: derivation.TaskShuffles(),
buildID: "a-build-id",
},
Expand Down Expand Up @@ -282,7 +278,6 @@ func TestConsumerIntegration(t *testing.T) {
type testApp struct {
service *consumer.Service
journals flow.Journals
builds *flow.BuildService
shuffles []*pf.Shuffle
buildID string
}
Expand Down Expand Up @@ -315,7 +310,7 @@ func (a testApp) NewStore(shard consumer.Shard, recorder *recoverylog.Recorder)
return nil, err
}

var coordinator = NewCoordinator(shard.Context(), a.builds, localPublisher, shard.JournalClient())
var coordinator = NewCoordinator(shard.Context(), localPublisher, shard.JournalClient())

return &testStore{
JSONFileStore: store,
Expand Down
6 changes: 0 additions & 6 deletions go/shuffle/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"sync"

"github.com/estuary/flow/go/bindings"
"github.com/estuary/flow/go/flow"
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/estuary/flow/go/protocols/ops"
"github.com/pkg/errors"
Expand All @@ -22,7 +21,6 @@ import (
// Coordinator collects a set of rings servicing ongoing shuffle reads,
// and matches new ShuffleConfigs to a new or existing ring.
type Coordinator struct {
builds *flow.BuildService
ctx context.Context
publisher ops.Publisher
mu sync.Mutex
Expand All @@ -33,12 +31,10 @@ type Coordinator struct {
// NewCoordinator returns a new *Coordinator using the given clients.
func NewCoordinator(
ctx context.Context,
builds *flow.BuildService,
publisher ops.Publisher,
rjc pb.RoutedJournalClient,
) *Coordinator {
return &Coordinator{
builds: builds,
ctx: ctx,
publisher: publisher,
rings: make(map[*ring]struct{}),
Expand Down Expand Up @@ -216,11 +212,9 @@ func (r *ring) serve() {
r.log(ops.Log_debug, "started shuffle ring")

var (
build = r.coordinator.builds.Open(r.shuffle.BuildId)
extractor *bindings.Extractor
initErr error
)
defer build.Close()
// TODO(johnny): defer |extractor| cleanup (not yet implemented).

if extractor, initErr = bindings.NewExtractor(r.coordinator.publisher); initErr != nil {
Expand Down
4 changes: 2 additions & 2 deletions go/shuffle/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestReadingDocuments(t *testing.T) {
Arena: make(pf.Arena, 1),
}

var coordinator = NewCoordinator(ctx, nil, localPublisher, bk.Client())
var coordinator = NewCoordinator(ctx, localPublisher, bk.Client())
var ring = newRing(coordinator, pf.JournalShuffle{
Journal: "a/journal",
BuildId: "a-build",
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestReadingDocuments(t *testing.T) {
}

func TestDocumentExtraction(t *testing.T) {
var coordinator = NewCoordinator(context.Background(), nil, localPublisher, nil)
var coordinator = NewCoordinator(context.Background(), localPublisher, nil)
var r = newRing(coordinator, pf.JournalShuffle{
Journal: "a/journal",
BuildId: "a-build",
Expand Down

0 comments on commit c35b49a

Please sign in to comment.