Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add span uri to node event #577

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions events/admin_eventsink_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
//go:build integration
// +build integration

// Add this tag to your project settings if you want to pick it up.

package events
Expand Down Expand Up @@ -30,11 +32,17 @@ var (
)

// To run this test, and see if the deadline working, pick an existing successful execution from your admin database
// select * from executions;
//
// select * from executions;
//
// Then delete all the events from it.
// delete from execution_events where execution_name = 'ikuy55mn0y';
//
// delete from execution_events where execution_name = 'ikuy55mn0y';
//
// Then run this
// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work;
//
// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work;
//
// This will lock your table so that admin can't read it, causing the grpc call to timeout.
// On timeout, you should get a deadline exceeded error. Otherwise, you should get an error to the effect of
// "Invalid phase change from SUCCEEDED to RUNNING" or something like that.
Expand Down
1 change: 1 addition & 0 deletions events/admin_eventsink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
InputUri: "input-uri",
},
DeckUri: deckURI,
SpanUri: spanURI,
OutputResult: &event.NodeExecutionEvent_OutputUri{OutputUri: ""},
}

Expand Down
3 changes: 3 additions & 0 deletions events/node_event_recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func getReferenceNodeEv() *event.NodeExecutionEvent {
OutputUri: referenceURI,
},
DeckUri: deckURI,
SpanUri: spanURI,
}
}

Expand All @@ -34,6 +35,7 @@ func getRawOutputNodeEv() *event.NodeExecutionEvent {
OutputData: outputData,
},
DeckUri: deckURI,
SpanUri: spanURI,
}
}

Expand Down Expand Up @@ -82,6 +84,7 @@ func TestRecordNodeEvent_Success_InlineOutputs(t *testing.T) {
}
err := recorder.RecordNodeEvent(ctx, getReferenceNodeEv(), inlineEventConfig)
assert.Equal(t, deckURI, nodeEvent.DeckUri)
assert.Equal(t, spanURI, nodeEvent.SpanUri)
assert.NoError(t, err)
}

Expand Down
1 change: 1 addition & 0 deletions events/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var referenceEventConfig = &config.EventConfig{

var referenceURI = "s3://foo/bar/outputs.pb"
var deckURI = "s3://foo/bar/deck.html"
var spanURI = "s3://foo/bar/span.pb"

var outputData = &core.LiteralMap{
Literals: map[string]*core.Literal{
Expand Down
13 changes: 13 additions & 0 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,19 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
}
}

spanFile := storage.DataReference(recovered.Closure.GetSpanUri())
if len(spanFile) > 0 {
metadata, err := nCtx.DataStore().Head(ctx, spanFile)
if err != nil {
logger.Errorf(ctx, "Failed to check the existence of span file. Error: %v", err)
return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to check the existence of span file.")
}

if metadata.Exists() {
oi.SpanURI = &spanFile
}
}

if err := c.store.WriteProtobuf(ctx, outputFile, so, outputs); err != nil {
logger.Errorf(ctx, "Failed to write protobuf (metadata). Error [%v]", err)
return handler.PhaseInfoUndefined, errors.Wrapf(errors.CausedByError, nCtx.NodeID(), err, "Failed to store recovered node execution outputs")
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var signalClient = &gatemocks.SignalServiceClient{}
const taskID = "tID"
const inputsPath = "inputs.pb"
const deckPath = "out/deck.html"
const spanPath = "out/span.pb"
const outputsPath = "out/outputs.pb"
const testClusterID = "C1"

Expand Down Expand Up @@ -2115,6 +2116,7 @@ func TestRecover(t *testing.T) {
OutputUri: "outputuri.pb",
},
DeckUri: deckPath,
SpanUri: spanPath,
},
}, nil)

Expand All @@ -2128,6 +2130,8 @@ func TestRecover(t *testing.T) {
metadata := existsMetadata{}
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)).
Return(&metadata, nil)
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)).
Return(&metadata, nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
Expand Down Expand Up @@ -2262,6 +2266,7 @@ func TestRecover(t *testing.T) {
},
},
DeckUri: deckPath,
SpanUri: spanPath,
},
}, nil)

Expand Down Expand Up @@ -2294,6 +2299,8 @@ func TestRecover(t *testing.T) {
metadata := existsMetadata{}
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)).
Return(&metadata, nil)
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)).
Return(&metadata, nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
Expand Down Expand Up @@ -2346,6 +2353,7 @@ func TestRecover(t *testing.T) {
},
},
DeckUri: deckPath,
SpanUri: spanPath,
},
}, nil)

Expand All @@ -2359,6 +2367,8 @@ func TestRecover(t *testing.T) {
metadata := existsMetadata{}
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)).
Return(&metadata, nil)
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)).
Return(&metadata, nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
Expand Down Expand Up @@ -2429,6 +2439,7 @@ func TestRecover(t *testing.T) {
OutputUri: "outputuri.pb",
},
DeckUri: deckPath,
SpanUri: spanPath,
},
}, nil)

Expand All @@ -2441,7 +2452,8 @@ func TestRecover(t *testing.T) {
metadata := existsMetadata{}
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)).
Return(&metadata, nil)

mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)).
Return(&metadata, nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
Expand Down Expand Up @@ -2475,6 +2487,7 @@ func TestRecover(t *testing.T) {
OutputUri: "outputuri.pb",
},
DeckUri: deckPath,
SpanUri: spanPath,
},
}, nil)

Expand All @@ -2487,6 +2500,8 @@ func TestRecover(t *testing.T) {
metadata := existsMetadata{}
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(deckPath)).
Return(&metadata, nil)
mockPBStore.OnHeadMatch(mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference(spanPath)).
Return(&metadata, nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/nodes/handler/transition_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type GateNodeInfo struct {
type OutputInfo struct {
OutputURI storage.DataReference
DeckURI *storage.DataReference
SpanURI *storage.DataReference
}

type ExecutionInfo struct {
Expand Down
3 changes: 2 additions & 1 deletion pkg/controller/nodes/handler/transition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ func TestDoTransition(t *testing.T) {

t.Run("barrier", func(t *testing.T) {
tr := DoTransition(TransitionTypeBarrier, PhaseInfoSuccess(&ExecutionInfo{
OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: AsPointer(storage.DataReference("deck"))},
OutputInfo: &OutputInfo{OutputURI: "uri", DeckURI: AsPointer(storage.DataReference("deck")), SpanURI: AsPointer(storage.DataReference("span"))},
}))
assert.Equal(t, TransitionTypeBarrier, tr.Type())
assert.Equal(t, EPhaseSuccess, tr.Info().p)
assert.Equal(t, storage.DataReference("uri"), tr.Info().GetInfo().OutputInfo.OutputURI)
assert.Equal(t, AsPointer(storage.DataReference("deck")), tr.Info().GetInfo().OutputInfo.DeckURI)
assert.Equal(t, AsPointer(storage.DataReference("span")), tr.Info().GetInfo().OutputInfo.SpanURI)
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,11 @@ func (m *CatalogClient) Get(ctx context.Context, key catalog.Key) (catalog.Entry
outputs, err := GenerateTaskOutputsFromArtifact(key.Identifier, key.TypedInterface, artifact)
if err != nil {
logger.Errorf(ctx, "DataCatalog failed to get outputs from artifact %+v, err: %+v", artifact.Id, err)
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReaderWithSpan(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_MISS, md)), err
}

logger.Infof(ctx, "Retrieved %v outputs from artifact %v, tag: %v", len(outputs.Literals), artifact.Id, tag)
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReader(outputs, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil
return catalog.NewCatalogEntry(ioutils.NewInMemoryOutputReaderWithSpan(outputs, nil, nil, nil), catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, md)), nil
}

// CreateDataset creates a Dataset in datacatalog including the associated metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestCatalog_Put(t *testing.T) {
).Return(&datacatalog.AddTagResponse{}, nil)
newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil)
s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
Expand Down Expand Up @@ -523,7 +523,7 @@ func TestCatalog_Put(t *testing.T) {
).Return(&datacatalog.AddTagResponse{}, nil)
newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil)
s, err := discovery.Put(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestCatalog_Update(t *testing.T) {

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil)
s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: taskID.NodeExecutionId.ExecutionId.Name,
Expand Down Expand Up @@ -680,7 +680,7 @@ func TestCatalog_Update(t *testing.T) {

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil)
s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: taskID.NodeExecutionId.ExecutionId.Name,
Expand Down Expand Up @@ -726,7 +726,7 @@ func TestCatalog_Update(t *testing.T) {

newKey := sampleKey
newKey.InputReader = ir
or := ioutils.NewInMemoryOutputReader(sampleParameters, nil, nil)
or := ioutils.NewInMemoryOutputReaderWithSpan(sampleParameters, nil, nil, nil)
s, err := discovery.Update(ctx, newKey, or, catalog.Metadata{
WorkflowExecutionIdentifier: &core.WorkflowExecutionIdentifier{
Name: "test",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type NextPhaseState struct {
TaskInfo *pluginCore.TaskInfo
TaskErr *io.ExecutionError
DeckExists bool
SpanExists bool
OutputExists bool
OrError bool
}
Expand Down Expand Up @@ -52,6 +53,7 @@ func (n NextPhaseStatePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskEx
}

r.OnDeckExistsMatch(mock.Anything).Return(s.DeckExists, nil)
r.OnSpanExistsMatch(mock.Anything).Return(s.SpanExists, nil)
r.On("IsError", mock.Anything).Return(isErr, nil)
r.On("IsFile", mock.Anything).Return(true)
r.On("Exists", mock.Anything).Return(s.OutputExists, nil)
Expand Down
20 changes: 15 additions & 5 deletions pkg/controller/nodes/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ func getPluginMetricKey(pluginID, taskType string) string {
return taskType + "_" + pluginID
}

func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, entry catalog.Entry) {
func (p *pluginRequestedTransition) CacheHit(outputPath storage.DataReference, deckPath *storage.DataReference, spanPath *storage.DataReference, entry catalog.Entry) {
p.ttype = handler.TransitionTypeEphemeral
p.pInfo = pluginCore.PhaseInfoSuccess(nil)
p.ObserveSuccess(outputPath, deckPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
p.ObserveSuccess(outputPath, deckPath, spanPath, &event.TaskNodeMetadata{CacheStatus: entry.GetStatus().GetCacheStatus(), CatalogKey: entry.GetStatus().GetMetadata()})
}

func (p *pluginRequestedTransition) PopulateCacheInfo(entry catalog.Entry) {
Expand Down Expand Up @@ -156,10 +156,11 @@ func (p *pluginRequestedTransition) FinalTaskEvent(input ToTaskExecutionEventInp
return ToTaskExecutionEvent(input)
}

func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
func (p *pluginRequestedTransition) ObserveSuccess(outputPath storage.DataReference, deckPath *storage.DataReference, spanPath *storage.DataReference, taskMetadata *event.TaskNodeMetadata) {
p.execInfo.OutputInfo = &handler.OutputInfo{
OutputURI: outputPath,
DeckURI: deckPath,
SpanURI: spanPath,
}

p.execInfo.TaskNodeInfo = &handler.TaskNodeInfo{
Expand Down Expand Up @@ -503,6 +504,7 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
})
} else {
var deckURI *storage.DataReference
var spanURI *storage.DataReference
if tCtx.ow.GetReader() != nil {
exists, err := tCtx.ow.GetReader().DeckExists(ctx)
if err != nil {
Expand All @@ -512,8 +514,16 @@ func (t Handler) invokePlugin(ctx context.Context, p pluginCore.Plugin, tCtx *ta
deckURIValue := tCtx.ow.GetDeckPath()
deckURI = &deckURIValue
}
exists, err = tCtx.ow.GetReader().SpanExists(ctx)
if err != nil {
logger.Errorf(ctx, "Failed to check span file existence. Error: %v", err)
return pluginTrns, regErrors.Wrapf(err, "failed to check existence of span file")
} else if exists {
spanURIValue := tCtx.ow.GetSpanPath()
spanURI = &spanURIValue
}
}
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI,
pluginTrns.ObserveSuccess(tCtx.ow.GetOutputPath(), deckURI, spanURI,
&event.TaskNodeMetadata{
CacheStatus: cacheStatus.GetCacheStatus(),
CatalogKey: cacheStatus.GetMetadata(),
Expand Down Expand Up @@ -617,7 +627,7 @@ func (t Handler) Handle(ctx context.Context, nCtx handler.NodeExecutionContext)
return handler.UnknownTransition, err
}

pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, entry)
pluginTrns.CacheHit(tCtx.ow.GetOutputPath(), nil, nil, entry)
} else {
logger.Infof(ctx, "No CacheHIT. Status [%s]", entry.GetStatus().GetCacheStatus().String())
pluginTrns.PopulateCacheInfo(entry)
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/task/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ func Test_task_Handle_NoCatalog(t *testing.T) {
})
nCtx.OnNodeStateReader().Return(nr)
nCtx.OnNodeStateWriter().Return(s)

return nCtx
}

Expand Down Expand Up @@ -956,6 +957,7 @@ func Test_task_Handle_Catalog(t *testing.T) {
if tt.args.catalogFetch {
or := &ioMocks.OutputReader{}
or.OnDeckExistsMatch(mock.Anything).Return(true, nil)
or.OnSpanExistsMatch(mock.Anything).Return(true, nil)
or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil)
c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil)
} else {
Expand Down Expand Up @@ -1224,6 +1226,7 @@ func Test_task_Handle_Reservation(t *testing.T) {
if tt.args.catalogFetch {
or := &ioMocks.OutputReader{}
or.OnDeckExistsMatch(mock.Anything).Return(true, nil)
or.OnSpanExistsMatch(mock.Anything).Return(true, nil)
or.OnReadMatch(mock.Anything).Return(&core.LiteralMap{}, nil, nil)
c.OnGetMatch(mock.Anything, mock.Anything).Return(catalog.NewCatalogEntry(or, catalog.NewStatus(core.CatalogCacheStatus_CACHE_HIT, nil)), nil)
} else {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodes/transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ func ToNodeExecutionEvent(nodeExecID *core.NodeExecutionIdentifier,
nev.DeckUri = eInfo.OutputInfo.DeckURI.String()
}

if eInfo.OutputInfo.SpanURI != nil {
nev.SpanUri = eInfo.OutputInfo.SpanURI.String()
}

nev.OutputResult = ToNodeExecOutput(eInfo.OutputInfo)
} else if info.GetErr() != nil {
nev.OutputResult = &event.NodeExecutionEvent_Error{
Expand Down