From ac71fa71028a61293a078b1099ddb820df9b820d Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 01:17:17 +0000 Subject: [PATCH 1/4] add span uri to node event Signed-off-by: Yicheng-Lu-llll --- pkg/controller/nodes/executor.go | 13 ++++++++++++ .../nodes/handler/transition_info.go | 1 + .../task/catalog/datacatalog/datacatalog.go | 4 ++-- pkg/controller/nodes/task/handler.go | 20 ++++++++++++++----- pkg/controller/nodes/transformers.go | 5 +++++ 5 files changed, 36 insertions(+), 7 deletions(-) diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index c447b779c..aa819eeab 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -328,6 +328,19 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe } } + spanFile := storage.DataReference(recovered.Closure.GetSpanUri()) + if len(spanFile) > 0 { + metadata, err := nCtx.DataStore().Head(ctx, spanFile) + if err != nil { + logger.Errorf(ctx, "Failed to check the existence of span file. Error: %v", err) + return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to check the existence of span file.") + } + + if metadata.Exists() { + oi.SpanURI = &spanFile + } + } + if err := c.store.WriteProtobuf(ctx, outputFile, so, outputs); err != nil { logger.Errorf(ctx, "Failed to write protobuf (metadata). Error [%v]", err) return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to store recovered node execution outputs") diff --git a/pkg/controller/nodes/handler/transition_info.go b/pkg/controller/nodes/handler/transition_info.go index 5d302f4fa..477db358b 100644 --- a/pkg/controller/nodes/handler/transition_info.go +++ b/pkg/controller/nodes/handler/transition_info.go @@ -55,6 +55,7 @@ type GateNodeInfo struct { type OutputInfo struct { OutputURI storage.DataReference DeckURI *storage.DataReference + SpanURI *storage.DataReference } type ExecutionInfo struct { diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index 3009af038..b0b57a963 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -137,11 +137,11 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact) if err != nil { logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err } logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil } // CreateDataset creates a Dataset in datacatalog including the associated metadata. diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index d7300818b..ec1f24d8e 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -83,10 +83,10 @@ func getPluginMetricKey(pluginID, taskType string) string { return taskType + "_" + pluginID } -func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) { +func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, spanPath *storage.DataReference, entry catalog.Entry) { p.ttype = handler.TransitionTypeEphemeral p.pInfo = pluginCore.PhaseInfoSuccess(nil) - p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) + p.ObserveSuccess(outputPath, deckPath, spanPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()}) } func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) { @@ -156,10 +156,11 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp return ToTaskExecutionEvent(input) } -func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { +func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, spanPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) { p.execInfo.OutputInfo = &handler.OutputInfo{ OutputURI: outputPath, DeckURI: deckPath, + SpanURI: spanPath, } p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{ @@ -503,6 +504,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta }) } else { var deckURI *storage.DataReference + var spanURI *storage.DataReference if tCtx.ow.GetReader() != nil { exists, err := tCtx.ow.GetReader().DeckExists(ctx) if err != nil { @@ -512,8 +514,16 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta deckURIValue := tCtx.ow.GetDeckPath() deckURI = &deckURIValue } + exists, err = tCtx.ow.GetReader().SpanExists(ctx) + if err != nil { + logger.Errorf(ctx, "Failed to check span file existence. Error: %v", err) + return pluginTrns, regErrors.Wrapf(err, "failed to check existence of span file") + } else if exists { + spanURIValue := tCtx.ow.GetSpanPath() + spanURI = &spanURIValue + } } - pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, + pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, spanURI, &event.TaskNodeMetadata{ CacheStatus: cacheStatus.GetCacheStatus(), CatalogKey: cacheStatus.GetMetadata(), @@ -617,7 +627,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext) return handler.UnknownTransition, err } - pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry) + pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, nil, entry) } else { logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String()) pluginTrns.PopulateCacheInfo(entry) diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index ae615a9f3..358a092c5 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -168,6 +168,11 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, nev.DeckUri = eInfo.OutputInfo.DeckURI.String() } + if eInfo.OutputInfo.SpanURI != nil { + defer logger.Debugf(context.TODO(), "For test only, Span URI:",eInfo.OutputInfo.SpanURI.String()) + nev.SpanUri = eInfo.OutputInfo.SpanURI.String() + } + nev.OutputResult = ToNodeExecOutput(eInfo.OutputInfo) } else if info.GetErr() != nil { nev.OutputResult = &event.NodeExecutionEvent_Error{ From 88be7023b8ee7c2c17b6d127c6f822c0286146c7 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 16:02:05 +0000 Subject: [PATCH 2/4] add ci test Signed-off-by: Yicheng-Lu-llll --- events/admin_eventsink_integration_test.go | 14 +++++++++++--- events/admin_eventsink_test.go | 1 + events/node_event_recorder_test.go | 3 +++ events/test_utils.go | 1 + pkg/controller/nodes/executor.go | 2 +- pkg/controller/nodes/executor_test.go | 17 ++++++++++++++++- pkg/controller/nodes/handler/transition_test.go | 3 ++- .../catalog/datacatalog/datacatalog_test.go | 10 +++++----- .../task/fakeplugins/next_phase_state_plugin.go | 2 ++ pkg/controller/nodes/task/handler.go | 2 +- pkg/controller/nodes/task/handler_test.go | 3 +++ pkg/controller/nodes/transformers.go | 2 +- 12 files changed, 47 insertions(+), 13 deletions(-) diff --git a/events/admin_eventsink_integration_test.go b/events/admin_eventsink_integration_test.go index 44c23a344..4875e14f1 100644 --- a/events/admin_eventsink_integration_test.go +++ b/events/admin_eventsink_integration_test.go @@ -1,4 +1,6 @@ +//go:build integration // +build integration + // Add this tag to your project settings if you want to pick it up. package events @@ -30,11 +32,17 @@ var ( ) // To run this test, and see if the deadline working, pick an existing successful execution from your admin database -// select * from executions; +// +// select * from executions; +// // Then delete all the events from it. -// delete from execution_events where execution_name = 'ikuy55mn0y'; +// +// delete from execution_events where execution_name = 'ikuy55mn0y'; +// // Then run this -// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work; +// +// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work; +// // This will lock your table so that admin can't read it, causing the grpc call to timeout. // On timeout, you should get a deadline exceeded error. Otherwise, you should get an error to the effect of // "Invalid phase change from SUCCEEDED to RUNNING" or something like that. diff --git a/events/admin_eventsink_test.go b/events/admin_eventsink_test.go index 734946559..7653f3250 100644 --- a/events/admin_eventsink_test.go +++ b/events/admin_eventsink_test.go @@ -48,6 +48,7 @@ var ( InputUri: "input-uri", }, DeckUri: deckURI, + SpanUri: spanURI, OutputResult: &event.NodeExecutionEvent_OutputUri{OutputUri: ""}, } diff --git a/events/node_event_recorder_test.go b/events/node_event_recorder_test.go index 54129c7ef..b387ebef3 100644 --- a/events/node_event_recorder_test.go +++ b/events/node_event_recorder_test.go @@ -24,6 +24,7 @@ func getReferenceNodeEv() *event.NodeExecutionEvent { OutputUri: referenceURI, }, DeckUri: deckURI, + SpanUri: spanURI, } } @@ -34,6 +35,7 @@ func getRawOutputNodeEv() *event.NodeExecutionEvent { OutputData: outputData, }, DeckUri: deckURI, + SpanUri: spanURI, } } @@ -82,6 +84,7 @@ func TestRecordNodeEvent_Success_InlineOutputs(t *testing.T) { } err := recorder.RecordNodeEvent(ctx, getReferenceNodeEv(), inlineEventConfig) assert.Equal(t, deckURI, nodeEvent.DeckUri) + assert.Equal(t, spanURI, nodeEvent.SpanUri) assert.NoError(t, err) } diff --git a/events/test_utils.go b/events/test_utils.go index e0aa82788..d953b6225 100644 --- a/events/test_utils.go +++ b/events/test_utils.go @@ -20,6 +20,7 @@ var referenceEventConfig = &config.EventConfig{ var referenceURI = "s3://foo/bar/outputs.pb" var deckURI = "s3://foo/bar/deck.html" +var spanURI = "s3://foo/bar/span.pb" var outputData = &core.LiteralMap{ Literals: map[string]*core.Literal{ diff --git a/pkg/controller/nodes/executor.go b/pkg/controller/nodes/executor.go index aa819eeab..5d90f0b67 100644 --- a/pkg/controller/nodes/executor.go +++ b/pkg/controller/nodes/executor.go @@ -335,7 +335,7 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe logger.Errorf(ctx, "Failed to check the existence of span file. Error: %v", err) return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to check the existence of span file.") } - + if metadata.Exists() { oi.SpanURI = &spanFile } diff --git a/pkg/controller/nodes/executor_test.go b/pkg/controller/nodes/executor_test.go index a707b4dfc..785942807 100644 --- a/pkg/controller/nodes/executor_test.go +++ b/pkg/controller/nodes/executor_test.go @@ -57,6 +57,7 @@ var signalClient = &gatemocks.SignalServiceClient{} const taskID = "tID" const inputsPath = "inputs.pb" const deckPath = "out/deck.html" +const spanPath = "out/span.pb" const outputsPath = "out/outputs.pb" const testClusterID = "C1" @@ -2115,6 +2116,7 @@ func TestRecover(t *testing.T) { OutputUri: "outputuri.pb", }, DeckUri: deckPath, + SpanUri: spanPath, }, }, nil) @@ -2128,6 +2130,8 @@ func TestRecover(t *testing.T) { metadata := existsMetadata{} mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)). Return(&metadata, nil) + mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)). + Return(&metadata, nil) mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { return reference.String() == inputsPath || reference.String() == outputsPath }), mock.Anything, @@ -2262,6 +2266,7 @@ func TestRecover(t *testing.T) { }, }, DeckUri: deckPath, + SpanUri: spanPath, }, }, nil) @@ -2294,6 +2299,8 @@ func TestRecover(t *testing.T) { metadata := existsMetadata{} mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)). Return(&metadata, nil) + mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)). + Return(&metadata, nil) mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { return reference.String() == inputsPath || reference.String() == outputsPath }), mock.Anything, @@ -2346,6 +2353,7 @@ func TestRecover(t *testing.T) { }, }, DeckUri: deckPath, + SpanUri: spanPath, }, }, nil) @@ -2359,6 +2367,8 @@ func TestRecover(t *testing.T) { metadata := existsMetadata{} mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)). Return(&metadata, nil) + mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)). + Return(&metadata, nil) mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { return reference.String() == inputsPath || reference.String() == outputsPath }), mock.Anything, @@ -2429,6 +2439,7 @@ func TestRecover(t *testing.T) { OutputUri: "outputuri.pb", }, DeckUri: deckPath, + SpanUri: spanPath, }, }, nil) @@ -2441,7 +2452,8 @@ func TestRecover(t *testing.T) { metadata := existsMetadata{} mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)). Return(&metadata, nil) - + mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)). + Return(&metadata, nil) mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { return reference.String() == inputsPath || reference.String() == outputsPath }), mock.Anything, @@ -2475,6 +2487,7 @@ func TestRecover(t *testing.T) { OutputUri: "outputuri.pb", }, DeckUri: deckPath, + SpanUri: spanPath, }, }, nil) @@ -2487,6 +2500,8 @@ func TestRecover(t *testing.T) { metadata := existsMetadata{} mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)). Return(&metadata, nil) + mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)). + Return(&metadata, nil) mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool { return reference.String() == inputsPath || reference.String() == outputsPath }), mock.Anything, diff --git a/pkg/controller/nodes/handler/transition_test.go b/pkg/controller/nodes/handler/transition_test.go index 32f79d2dc..572a1c061 100644 --- a/pkg/controller/nodes/handler/transition_test.go +++ b/pkg/controller/nodes/handler/transition_test.go @@ -22,12 +22,13 @@ func TestDoTransition(t *testing.T) { t.Run("barrier", func(t *testing.T) { tr := DoTransition(TransitionTypeBarrier, PhaseInfoSuccess(&ExecutionInfo{ - OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: AsPointer(storage.DataReference("deck"))}, + OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: AsPointer(storage.DataReference("deck")), SpanURI: AsPointer(storage.DataReference("span"))}, })) assert.Equal(t, TransitionTypeBarrier, tr.Type()) assert.Equal(t, EPhaseSuccess, tr.Info().p) assert.Equal(t, storage.DataReference("uri"), tr.Info().GetInfo().OutputInfo.OutputURI) assert.Equal(t, AsPointer(storage.DataReference("deck")), tr.Info().GetInfo().OutputInfo.DeckURI) + assert.Equal(t, AsPointer(storage.DataReference("span")), tr.Info().GetInfo().OutputInfo.SpanURI) }) } diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go index 7243c45de..44125e4f7 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go @@ -432,7 +432,7 @@ func TestCatalog_Put(t *testing.T) { ).Return(&datacatalog.AddTagResponse{}, nil) newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test", @@ -523,7 +523,7 @@ func TestCatalog_Put(t *testing.T) { ).Return(&datacatalog.AddTagResponse{}, nil) newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test", @@ -590,7 +590,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: taskID.NodeExecutionId.ExecutionId.Name, @@ -680,7 +680,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: taskID.NodeExecutionId.ExecutionId.Name, @@ -726,7 +726,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil) + or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test", diff --git a/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go b/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go index 5e68c9729..1e1146a8f 100644 --- a/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go +++ b/pkg/controller/nodes/task/fakeplugins/next_phase_state_plugin.go @@ -17,6 +17,7 @@ type NextPhaseState struct { TaskInfo *pluginCore.TaskInfo TaskErr *io.ExecutionError DeckExists bool + SpanExists bool OutputExists bool OrError bool } @@ -52,6 +53,7 @@ func (n NextPhaseStatePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskEx } r.OnDeckExistsMatch(mock.Anything).Return(s.DeckExists, nil) + r.OnSpanExistsMatch(mock.Anything).Return(s.SpanExists, nil) r.On("IsError", mock.Anything).Return(isErr, nil) r.On("IsFile", mock.Anything).Return(true) r.On("Exists", mock.Anything).Return(s.OutputExists, nil) diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index ec1f24d8e..a805290e9 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -520,7 +520,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta return pluginTrns, regErrors.Wrapf(err, "failed to check existence of span file") } else if exists { spanURIValue := tCtx.ow.GetSpanPath() - spanURI = &spanURIValue + spanURI = &spanURIValue } } pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, spanURI, diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 6da4762cb..06c552a8b 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -517,6 +517,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) { }) nCtx.OnNodeStateReader().Return(nr) nCtx.OnNodeStateWriter().Return(s) + return nCtx } @@ -956,6 +957,7 @@ func Test_task_Handle_Catalog(t *testing.T) { if tt.args.catalogFetch { or := &ioMocks.OutputReader{} or.OnDeckExistsMatch(mock.Anything).Return(true, nil) + or.OnSpanExistsMatch(mock.Anything).Return(true, nil) or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil) c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil) } else { @@ -1224,6 +1226,7 @@ func Test_task_Handle_Reservation(t *testing.T) { if tt.args.catalogFetch { or := &ioMocks.OutputReader{} or.OnDeckExistsMatch(mock.Anything).Return(true, nil) + or.OnSpanExistsMatch(mock.Anything).Return(true, nil) or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil) c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil) } else { diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index 358a092c5..f557afa38 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -169,7 +169,7 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, } if eInfo.OutputInfo.SpanURI != nil { - defer logger.Debugf(context.TODO(), "For test only, Span URI:",eInfo.OutputInfo.SpanURI.String()) + defer logger.Debugf(context.TODO(), "For test only, Span URI:", eInfo.OutputInfo.SpanURI.String()) nev.SpanUri = eInfo.OutputInfo.SpanURI.String() } From 1b9066da4d9e09210fea3d3f97af84e8a1071b74 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Wed, 14 Jun 2023 19:24:14 +0000 Subject: [PATCH 3/4] nit Signed-off-by: Yicheng-Lu-llll --- pkg/controller/nodes/transformers.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/controller/nodes/transformers.go b/pkg/controller/nodes/transformers.go index f557afa38..140f63547 100644 --- a/pkg/controller/nodes/transformers.go +++ b/pkg/controller/nodes/transformers.go @@ -169,7 +169,6 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier, } if eInfo.OutputInfo.SpanURI != nil { - defer logger.Debugf(context.TODO(), "For test only, Span URI:", eInfo.OutputInfo.SpanURI.String()) nev.SpanUri = eInfo.OutputInfo.SpanURI.String() } From 040f44ee6710f8a8f10fd6ffc9d5ab4a9cd96c31 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Sun, 18 Jun 2023 02:08:23 +0000 Subject: [PATCH 4/4] use NewInMemoryOutputReaderWithSpan Signed-off-by: Yicheng-Lu-llll --- .../nodes/task/catalog/datacatalog/datacatalog.go | 4 ++-- .../nodes/task/catalog/datacatalog/datacatalog_test.go | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go index b0b57a963..9e63f06ee 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog.go @@ -137,11 +137,11 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact) if err != nil { logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReaderWithSpan(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err } logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag) - return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil + return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReaderWithSpan(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil } // CreateDataset creates a Dataset in datacatalog including the associated metadata. diff --git a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go index 44125e4f7..6c501fd23 100644 --- a/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go +++ b/pkg/controller/nodes/task/catalog/datacatalog/datacatalog_test.go @@ -432,7 +432,7 @@ func TestCatalog_Put(t *testing.T) { ).Return(&datacatalog.AddTagResponse{}, nil) newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) + or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil) s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test", @@ -523,7 +523,7 @@ func TestCatalog_Put(t *testing.T) { ).Return(&datacatalog.AddTagResponse{}, nil) newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) + or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil) s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test", @@ -590,7 +590,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) + or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: taskID.NodeExecutionId.ExecutionId.Name, @@ -680,7 +680,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) + or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: taskID.NodeExecutionId.ExecutionId.Name, @@ -726,7 +726,7 @@ func TestCatalog_Update(t *testing.T) { newKey := sampleKey newKey.InputReader = ir - or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil, nil) + or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil) s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{ WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{ Name: "test",