diff --git a/go/runtime/ops_publisher.go b/go/runtime/ops_publisher.go index a5c78e21a4..c7ff1e2a87 100644 --- a/go/runtime/ops_publisher.go +++ b/go/runtime/ops_publisher.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "sync" "time" "github.com/estuary/flow/go/flow" @@ -17,41 +18,59 @@ import ( ) type OpsPublisher struct { - labels ops.ShardLabeling logsPublisher *message.Publisher mapper flow.Mapper - opsLogsSpec *pf.CollectionSpec - opsStatsSpec *pf.CollectionSpec - shard *ops.ShardRef + mu sync.Mutex + + // Fields that update with each task term: + labels ops.ShardLabeling + opsLogsSpec *pf.CollectionSpec + opsStatsSpec *pf.CollectionSpec + shard *ops.ShardRef } var _ ops.Publisher = &OpsPublisher{} func NewOpsPublisher( - labels ops.ShardLabeling, logsPublisher *message.Publisher, mapper flow.Mapper, +) *OpsPublisher { + return &OpsPublisher{ + logsPublisher: logsPublisher, + mapper: mapper, + mu: sync.Mutex{}, + } +} + +func (p *OpsPublisher) UpdateLabels( + labels ops.ShardLabeling, opsLogsSpec *pf.CollectionSpec, opsStatsSpec *pf.CollectionSpec, -) (*OpsPublisher, error) { +) error { // Sanity-check the shape of logs and stats collections. if err := ops.ValidateLogsCollection(opsLogsSpec); err != nil { - return nil, err + return err } else if err := ops.ValidateStatsCollection(opsStatsSpec); err != nil { - return nil, err + return err } - return &OpsPublisher{ - labels: labels, - logsPublisher: logsPublisher, - mapper: mapper, - opsLogsSpec: opsLogsSpec, - opsStatsSpec: opsStatsSpec, - shard: ops.NewShardRef(labels), - }, nil + p.mu.Lock() + defer p.mu.Unlock() + + p.labels = labels + p.opsLogsSpec = opsLogsSpec + p.opsStatsSpec = opsStatsSpec + p.shard = ops.NewShardRef(labels) + + return nil } -func (p *OpsPublisher) Labels() ops.ShardLabeling { return p.labels } +func (p *OpsPublisher) Labels() ops.ShardLabeling { + p.mu.Lock() + defer p.mu.Unlock() + + return p.labels +} func (p *OpsPublisher) PublishStats( out ops.Stats, @@ -66,6 +85,9 @@ func (p *OpsPublisher) PublishStats( panic(fmt.Errorf("marshal of *ops.Stats should always succeed but: %w", err)) } + p.mu.Lock() + defer p.mu.Unlock() + var msg = flow.Mappable{ Spec: p.opsStatsSpec, Doc: buf.Bytes(), @@ -86,6 +108,9 @@ func (p *OpsPublisher) PublishLog(out ops.Log) { panic(fmt.Errorf("marshal of *ops.Log should always succeed but: %w", err)) } + p.mu.Lock() + defer p.mu.Unlock() + var msg = flow.Mappable{ Spec: p.opsLogsSpec, Doc: json.RawMessage(buf.Bytes()), diff --git a/go/runtime/task_term.go b/go/runtime/task_term.go index 287dacd329..12c8c26d59 100644 --- a/go/runtime/task_term.go +++ b/go/runtime/task_term.go @@ -33,7 +33,7 @@ type taskTerm struct { labels ops.ShardLabeling // Resolved *Build of the task's build ID. build *flow.Build - // ops.Publisher of ops.Logs and (in the future) ops.Stats. + // ops.Publisher of ops.Logs and ops.Stats. opsPublisher *OpsPublisher } @@ -74,10 +74,15 @@ func (t *taskTerm) initTerm(shard consumer.Shard, host *FlowConsumer) error { return err } - if t.opsPublisher, err = NewOpsPublisher( + // OpsPublisher is instantiated once, but updates with labels of each term. + if t.opsPublisher == nil { + t.opsPublisher = NewOpsPublisher( + host.LogPublisher, + flow.NewMapper(shard.Context(), host.Service.Etcd, host.Journals, shard.FQN()), + ) + } + if err = t.opsPublisher.UpdateLabels( t.labels, - host.LogPublisher, - flow.NewMapper(shard.Context(), host.Service.Etcd, host.Journals, shard.FQN()), logsCollectionSpec, statsCollectionSpec, ); err != nil { @@ -121,13 +126,14 @@ func (r *taskReader) initReader( r.mu.Lock() defer r.mu.Unlock() - // Coordinator will shed reads upon |term.ctx| cancellation. - r.coordinator = shuffle.NewCoordinator( - term.ctx, - host.Builds, - term.opsPublisher, - shard.JournalClient(), - ) + // Coordinator is instantiated once, and has the lifetime of the shard. + if r.coordinator == nil { + r.coordinator = shuffle.NewCoordinator( + shard.Context(), + term.opsPublisher, + shard.JournalClient(), + ) + } // Use the taskTerm's Context.Done as the |drainCh| monitored // by the ReadBuilder. When the term's context is cancelled, @@ -136,7 +142,7 @@ func (r *taskReader) initReader( var err error r.readBuilder, err = shuffle.NewReadBuilder( term.labels.Build, - term.ctx.Done(), + term.ctx.Done(), // Drain reads upon term cancellation. host.Journals, term.opsPublisher, host.Service, diff --git a/go/shuffle/api_test.go b/go/shuffle/api_test.go index be496ba937..109bb13fc9 100644 --- a/go/shuffle/api_test.go +++ b/go/shuffle/api_test.go @@ -31,13 +31,12 @@ import ( ) func TestAPIIntegrationWithFixtures(t *testing.T) { - var dir = t.TempDir() var args = bindings.BuildArgs{ Context: context.Background(), FileRoot: "./testdata", BuildAPI_Config: pf.BuildAPI_Config{ BuildId: "a-build-id", - BuildDb: path.Join(dir, "a-build-db"), + BuildDb: path.Join(t.TempDir(), "build.db"), Source: "file:///ab.flow.yaml", SourceType: pf.ContentType_CATALOG, }} @@ -53,8 +52,6 @@ func TestAPIIntegrationWithFixtures(t *testing.T) { var etcd = etcdtest.TestClient() defer etcdtest.Cleanup() - var builds, err = flow.NewBuildService("file://" + dir + "/") - require.NoError(t, err) var bk = brokertest.NewBroker(t, etcd, "local", "broker") var journalSpec = brokertest.Journal(pb.JournalSpec{ Name: "a/journal", @@ -100,7 +97,7 @@ func TestAPIIntegrationWithFixtures(t *testing.T) { // Use a resolve() fixture which returns a mocked store with our |coordinator|. var srv = server.MustLoopback() var apiCtx, cancelAPICtx = context.WithCancel(backgroundCtx) - var coordinator = NewCoordinator(apiCtx, builds, localPublisher, bk.Client()) + var coordinator = NewCoordinator(apiCtx, localPublisher, bk.Client()) pf.RegisterShufflerServer(srv.GRPCServer, &API{ resolve: func(args consumer.ResolveArgs) (consumer.Resolution, error) { diff --git a/go/shuffle/reader_test.go b/go/shuffle/reader_test.go index fb9013a6db..bffff29ce0 100644 --- a/go/shuffle/reader_test.go +++ b/go/shuffle/reader_test.go @@ -64,13 +64,12 @@ func TestStuffedMessageChannel(t *testing.T) { } func TestConsumerIntegration(t *testing.T) { - var dir = t.TempDir() var args = bindings.BuildArgs{ Context: context.Background(), FileRoot: "./testdata", BuildAPI_Config: pf.BuildAPI_Config{ BuildId: "a-build-id", - BuildDb: path.Join(dir, "a-build-id"), + BuildDb: path.Join(t.TempDir(), "build.id"), Source: "file:///ab.flow.yaml", SourceType: pf.ContentType_CATALOG, }} @@ -88,8 +87,6 @@ func TestConsumerIntegration(t *testing.T) { var etcd = etcdtest.TestClient() defer etcdtest.Cleanup() - var builds, err = flow.NewBuildService("file://" + dir + "/") - require.NoError(t, err) // Fixtures which parameterize the test: var ( sourcePartitions = []pb.Journal{ @@ -169,7 +166,6 @@ func TestConsumerIntegration(t *testing.T) { Journals: broker.Client(), App: &testApp{ journals: journals, - builds: builds, shuffles: derivation.TaskShuffles(), buildID: "a-build-id", }, @@ -282,7 +278,6 @@ func TestConsumerIntegration(t *testing.T) { type testApp struct { service *consumer.Service journals flow.Journals - builds *flow.BuildService shuffles []*pf.Shuffle buildID string } @@ -315,7 +310,7 @@ func (a testApp) NewStore(shard consumer.Shard, recorder *recoverylog.Recorder) return nil, err } - var coordinator = NewCoordinator(shard.Context(), a.builds, localPublisher, shard.JournalClient()) + var coordinator = NewCoordinator(shard.Context(), localPublisher, shard.JournalClient()) return &testStore{ JSONFileStore: store, diff --git a/go/shuffle/ring.go b/go/shuffle/ring.go index 9105edf050..32922d6e94 100644 --- a/go/shuffle/ring.go +++ b/go/shuffle/ring.go @@ -10,7 +10,6 @@ import ( "sync" "github.com/estuary/flow/go/bindings" - "github.com/estuary/flow/go/flow" pf "github.com/estuary/flow/go/protocols/flow" "github.com/estuary/flow/go/protocols/ops" "github.com/pkg/errors" @@ -22,7 +21,6 @@ import ( // Coordinator collects a set of rings servicing ongoing shuffle reads, // and matches new ShuffleConfigs to a new or existing ring. type Coordinator struct { - builds *flow.BuildService ctx context.Context publisher ops.Publisher mu sync.Mutex @@ -33,12 +31,10 @@ type Coordinator struct { // NewCoordinator returns a new *Coordinator using the given clients. func NewCoordinator( ctx context.Context, - builds *flow.BuildService, publisher ops.Publisher, rjc pb.RoutedJournalClient, ) *Coordinator { return &Coordinator{ - builds: builds, ctx: ctx, publisher: publisher, rings: make(map[*ring]struct{}), @@ -216,11 +212,9 @@ func (r *ring) serve() { r.log(ops.Log_debug, "started shuffle ring") var ( - build = r.coordinator.builds.Open(r.shuffle.BuildId) extractor *bindings.Extractor initErr error ) - defer build.Close() // TODO(johnny): defer |extractor| cleanup (not yet implemented). if extractor, initErr = bindings.NewExtractor(r.coordinator.publisher); initErr != nil { diff --git a/go/shuffle/ring_test.go b/go/shuffle/ring_test.go index e57e7356f2..4ce2ab4b8a 100644 --- a/go/shuffle/ring_test.go +++ b/go/shuffle/ring_test.go @@ -49,7 +49,7 @@ func TestReadingDocuments(t *testing.T) { Arena: make(pf.Arena, 1), } - var coordinator = NewCoordinator(ctx, nil, localPublisher, bk.Client()) + var coordinator = NewCoordinator(ctx, localPublisher, bk.Client()) var ring = newRing(coordinator, pf.JournalShuffle{ Journal: "a/journal", BuildId: "a-build", @@ -133,7 +133,7 @@ func TestReadingDocuments(t *testing.T) { } func TestDocumentExtraction(t *testing.T) { - var coordinator = NewCoordinator(context.Background(), nil, localPublisher, nil) + var coordinator = NewCoordinator(context.Background(), localPublisher, nil) var r = newRing(coordinator, pf.JournalShuffle{ Journal: "a/journal", BuildId: "a-build",