From 54f7e50ce1aa400e608f34464f92913b7ee8d593 Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Tue, 18 Feb 2025 15:35:47 -0300 Subject: [PATCH 1/6] feat: add llo stream to data streams deployment module --- deployment/data-streams/jobs/stream.go | 92 +++++++++++++++ deployment/data-streams/jobs/stream_test.go | 120 ++++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 deployment/data-streams/jobs/stream_test.go diff --git a/deployment/data-streams/jobs/stream.go b/deployment/data-streams/jobs/stream.go index 415ac457275..98bb8b0ea6c 100644 --- a/deployment/data-streams/jobs/stream.go +++ b/deployment/data-streams/jobs/stream.go @@ -1 +1,93 @@ package jobs + +import ( + "bytes" + "text/template" + + "github.com/pelletier/go-toml/v2" +) + +type Datasource struct { + BridgeName string + ReqData string +} + +type ReportField struct { + ResultPath string +} + +type ObservationSource struct { + Datasources []Datasource + AllowedFaults int + Benchmark ReportField + Bid ReportField + Ask ReportField +} + +type LLOSpec struct { + Base + + StreamID string `toml:"streamID"` + ObservationSource string `toml:"observationSource,multiline,omitempty"` +} + +func (s *LLOSpec) SetObservationSource(obs ObservationSource) error { + rendered, err := s.buildObservationSource(obs) + if err != nil { + return err + } + s.ObservationSource = rendered + return nil +} + +func (s *LLOSpec) buildObservationSource(obs ObservationSource) (string, error) { + var buf bytes.Buffer + if err := observationTmpl.Execute(&buf, obs); err != nil { + return "", err + } + return buf.String(), nil +} + +func (s *LLOSpec) MarshalTOML() ([]byte, error) { + return toml.Marshal(s) +} + +var funcMap = template.FuncMap{ + "inc": func(i int) int { + return i + 1 + }, +} + +var pipelineTemplate = `{{range $i, $a := .Datasources}} +{{- $srcNum := inc $i -}} +// data source {{$srcNum}} +ds{{$srcNum}}_payload [type=bridge name="bridge-{{$a.BridgeName}}" timeout="50s" requestData={{$a.ReqData}}]; + +ds{{$srcNum}}_benchmark [type=jsonparse path="{{$.Benchmark.ResultPath}}"]; +ds{{$srcNum}}_bid [type=jsonparse path="{{$.Bid.ResultPath}}"]; +ds{{$srcNum}}_ask [type=jsonparse path="{{$.Ask.ResultPath}}"]; +{{end -}} + +{{range $i, $a := .Datasources}} +{{- $srcNum := inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_benchmark -> benchmark_price; +{{end -}} +benchmark_price [type=median allowedFaults={{.AllowedFaults}} index=0]; + +{{range $i, $a := .Datasources}} +{{- $srcNum := inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_bid -> bid_price; +{{end -}} +bid_price [type=median allowedFaults={{.AllowedFaults}} index=1]; + +{{range $i, $a := .Datasources}} +{{- $srcNum := inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_ask -> ask_price; +{{end -}} +ask_price [type=median allowedFaults={{.AllowedFaults}} index=2]; +` + +var observationTmpl = template.Must(template.New("observationSource"). + Funcs(funcMap). + Parse(pipelineTemplate), +) diff --git a/deployment/data-streams/jobs/stream_test.go b/deployment/data-streams/jobs/stream_test.go new file mode 100644 index 00000000000..b570bdf5efd --- /dev/null +++ b/deployment/data-streams/jobs/stream_test.go @@ -0,0 +1,120 @@ +package jobs + +import ( + "fmt" + "strings" + "testing" + "text/template" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestLLOSpec_MarshalTOML(t *testing.T) { + testCases := []struct { + name string + spec LLOSpec + obs ObservationSource + wantSubstr []string + }{ + { + name: "multiple datasources with valid paths", + spec: LLOSpec{ + Base: Base{ + Name: "ETH/USD-Test", + Type: "stream", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + StreamID: "1000000001", + }, + obs: ObservationSource{ + Datasources: []Datasource{ + { + BridgeName: "coinmetrics", + ReqData: `{"data":{"endpoint":"cryptolwba","from":"ETH","to":"USD"}}`, + }, + { + BridgeName: "ncfx", + ReqData: `{"data":{"endpoint":"cryptolwba","from":"ETH","to":"USD"}}`, + }, + }, + AllowedFaults: 2, + Benchmark: ReportField{ + ResultPath: "data,mid", + }, + Bid: ReportField{ + ResultPath: "data,bid", + }, + Ask: ReportField{ + ResultPath: "data,ask", + }, + }, + wantSubstr: []string{ + `bridge-coinmetrics`, + `bridge-ncfx`, + `allowedFaults=2`, + `path=\"data,mid\"`, + `path=\"data,bid\"`, + `path=\"data,ask\"`, + }, + }, + { + name: "empty datasource list", + spec: LLOSpec{ + Base: Base{ + Name: "Empty-Test", + Type: "stream", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + StreamID: "2000000002", + }, + obs: ObservationSource{ + Datasources: []Datasource{}, + AllowedFaults: 1, + Benchmark: ReportField{ResultPath: "data,benchmark"}, + Bid: ReportField{ResultPath: "data,bid"}, + Ask: ReportField{ResultPath: "data,ask"}, + }, + wantSubstr: []string{ + `allowedFaults=1`, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.spec.SetObservationSource(tc.obs) + require.NoError(t, err) + tomlBytes, err := tc.spec.MarshalTOML() + require.NoError(t, err) + result := string(tomlBytes) + for _, substr := range tc.wantSubstr { + require.True(t, strings.Contains(result, substr), + "result %q does not contain expected substring %q", result, substr) + } + }) + } +} + +func TestLLOSpec_Error(t *testing.T) { + originalTmpl := observationTmpl + defer func() { + observationTmpl = originalTmpl + }() + + faultyTmpl := template.Must(template.New("faulty"). + Funcs(template.FuncMap{ + "error": func(msg string) (string, error) { + return "", fmt.Errorf("%s", msg) + }, + }). + Parse(`{{ error "forced error" }}`)) + observationTmpl = faultyTmpl + + spec := LLOSpec{} + obs := ObservationSource{} + _, err := spec.buildObservationSource(obs) + require.Error(t, err, "expected error from faulty template execution") +} From f69fed7c4ef458ab71ea3ebf08b307e94fd93a28 Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Wed, 19 Feb 2025 18:03:22 -0300 Subject: [PATCH 2/6] feat: add don --- deployment/data-streams/jobs/don.go | 42 ++++ deployment/data-streams/jobs/don_test.go | 118 ++++++++++ deployment/data-streams/jobs/llo.go | 1 - deployment/data-streams/jobs/stream.go | 93 +++----- deployment/data-streams/jobs/stream_test.go | 209 +++++++++++++----- deployment/data-streams/jobs/template.go | 38 ++++ .../templates/osrc_mercury_v1_median.go.tmpl | 13 ++ .../templates/osrc_mercury_v1_quote.go.tmpl | 27 +++ 8 files changed, 424 insertions(+), 117 deletions(-) create mode 100644 deployment/data-streams/jobs/don.go create mode 100644 deployment/data-streams/jobs/don_test.go delete mode 100644 deployment/data-streams/jobs/llo.go create mode 100644 deployment/data-streams/jobs/template.go create mode 100644 deployment/data-streams/jobs/templates/osrc_mercury_v1_median.go.tmpl create mode 100644 deployment/data-streams/jobs/templates/osrc_mercury_v1_quote.go.tmpl diff --git a/deployment/data-streams/jobs/don.go b/deployment/data-streams/jobs/don.go new file mode 100644 index 00000000000..579accb1789 --- /dev/null +++ b/deployment/data-streams/jobs/don.go @@ -0,0 +1,42 @@ +package jobs + +import ( + "time" + + "github.com/pelletier/go-toml/v2" +) + +type DonJobSpec struct { + Base + + ContractID string `toml:"contractID"` + TransmitterID string `toml:"transmitterID,omitempty"` + ForwardingAllowed *bool `toml:"forwardingAllowed,omitempty"` + P2PV2Bootstrappers []string `toml:"p2pv2Bootstrappers,omitempty"` + OCRKeyBundleID *string `toml:"ocrKeyBundleID,omitempty"` + MaxTaskDuration time.Duration `toml:"maxTaskDuration,omitempty"` + ContractConfigTrackerPollInterval time.Duration `toml:"contractConfigTrackerPollInterval,omitempty"` + Relay RelayType `toml:"relay,omitempty"` + PluginType string `toml:"pluginType,omitempty"` + RelayConfig RelayConfigDon `toml:"relayConfig"` + PluginConfig PluginConfigDon `toml:"pluginConfig"` +} + +// RelayConfig is the configuration for the relay. This could change depending on the relay type. +type RelayConfigDon struct { + ChainID string `toml:"chainID"` + FromBlock uint64 `toml:"fromBlock,omitempty"` + LLOConfigMode string `toml:"lloConfigMode,omitempty"` + LLODonID int64 `toml:"lloDonID,omitempty"` +} + +type PluginConfigDon struct { + ChannelDefinitionsContractAddress string `toml:"channelDefinitionsContractAddress"` + ChannelDefinitionsContractFromBlock uint64 `toml:"channelDefinitionsContractFromBlock"` + DonID int64 `toml:"donID"` + Servers map[string]string `toml:"servers,inline"` +} + +func (s *DonJobSpec) MarshalTOML() ([]byte, error) { + return toml.Marshal(s) +} diff --git a/deployment/data-streams/jobs/don_test.go b/deployment/data-streams/jobs/don_test.go new file mode 100644 index 00000000000..69b43ae4136 --- /dev/null +++ b/deployment/data-streams/jobs/don_test.go @@ -0,0 +1,118 @@ +package jobs + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" +) + +func TestDonJobSpec_MarshalTOML(t *testing.T) { + trueVal := true + ocrKeyBundle := "ocr-bundle-123" + + testCases := []struct { + name string + spec DonJobSpec + wantSubstr []string + }{ + { + name: "with fields populated", + spec: DonJobSpec{ + Base: Base{ + Name: "Test-DON", + Type: "don", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + ContractID: "contract-123", + TransmitterID: "tx-123", + ForwardingAllowed: &trueVal, + P2PV2Bootstrappers: []string{"bootstrap1", "bootstrap2"}, + OCRKeyBundleID: &ocrKeyBundle, + MaxTaskDuration: 10 * time.Second, + ContractConfigTrackerPollInterval: 1 * time.Minute, + Relay: "testrelay", + PluginType: "testplugin", + RelayConfig: RelayConfigDon{ + ChainID: "chain", + FromBlock: 100, + LLOConfigMode: "mode", + LLODonID: 200, + }, + PluginConfig: PluginConfigDon{ + ChannelDefinitionsContractAddress: "0xabc", + ChannelDefinitionsContractFromBlock: 50, + DonID: 300, + Servers: map[string]string{"server1": "http://localhost"}, + }, + }, + wantSubstr: []string{ + "contractID", + "contract-123", + "transmitterID", + "tx-123", + "p2pv2Bootstrappers", + "bootstrap1", + "ocrKeyBundleID", + "ocr-bundle-123", + "maxTaskDuration", + "contractConfigTrackerPollInterval", + "relay", + "testrelay", + "pluginType", + "testplugin", + "chainID", + "chain", + "fromBlock", + "100", + "lloConfigMode", + "mode", + "lloDonID", + "200", + "channelDefinitionsContractAddress", + "0xabc", + "channelDefinitionsContractFromBlock", + "50", + "donID", + "300", + "servers", + "server1", + "http://localhost", + }, + }, + { + name: "empty minimal fields", + spec: DonJobSpec{ + Base: Base{ + Name: "Empty-DON-Test", + Type: "don", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + ContractID: "contract-empty", + RelayConfig: RelayConfigDon{}, + PluginConfig: PluginConfigDon{ + Servers: map[string]string{}, + }, + }, + wantSubstr: []string{ + "contractID", + "contract-empty", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tomlBytes, err := tc.spec.MarshalTOML() + require.NoError(t, err) + + result := string(tomlBytes) + for _, substr := range tc.wantSubstr { + require.Contains(t, result, substr, "result %q does not contain expected substring %q", result, substr) + } + }) + } +} diff --git a/deployment/data-streams/jobs/llo.go b/deployment/data-streams/jobs/llo.go deleted file mode 100644 index 415ac457275..00000000000 --- a/deployment/data-streams/jobs/llo.go +++ /dev/null @@ -1 +0,0 @@ -package jobs diff --git a/deployment/data-streams/jobs/stream.go b/deployment/data-streams/jobs/stream.go index 98bb8b0ea6c..dc04fabacb6 100644 --- a/deployment/data-streams/jobs/stream.go +++ b/deployment/data-streams/jobs/stream.go @@ -1,9 +1,6 @@ package jobs import ( - "bytes" - "text/template" - "github.com/pelletier/go-toml/v2" ) @@ -12,27 +9,51 @@ type Datasource struct { ReqData string } -type ReportField struct { +type ReportFieldLLO struct { ResultPath string } -type ObservationSource struct { +type Pipeline interface { + Render() (string, error) +} + +type BaseObservationSource struct { Datasources []Datasource AllowedFaults int - Benchmark ReportField - Bid ReportField - Ask ReportField + Benchmark ReportFieldLLO +} + +type QuoteObservationSource struct { + BaseObservationSource + Bid ReportFieldLLO + Ask ReportFieldLLO +} + +type MedianObservationSource struct { + BaseObservationSource +} + +func renderObservationTemplate(fname string, obs any) (string, error) { + return renderTemplate(fname, obs) +} + +func (src QuoteObservationSource) Render() (string, error) { + return renderObservationTemplate("osrc_mercury_v1_quote.go.tmpl", src) +} + +func (src MedianObservationSource) Render() (string, error) { + return renderObservationTemplate("osrc_mercury_v1_median.go.tmpl", src) } -type LLOSpec struct { +type StreamJobSpec struct { Base StreamID string `toml:"streamID"` ObservationSource string `toml:"observationSource,multiline,omitempty"` } -func (s *LLOSpec) SetObservationSource(obs ObservationSource) error { - rendered, err := s.buildObservationSource(obs) +func (s *StreamJobSpec) SetObservationSource(obs Pipeline) error { + rendered, err := obs.Render() if err != nil { return err } @@ -40,54 +61,6 @@ func (s *LLOSpec) SetObservationSource(obs ObservationSource) error { return nil } -func (s *LLOSpec) buildObservationSource(obs ObservationSource) (string, error) { - var buf bytes.Buffer - if err := observationTmpl.Execute(&buf, obs); err != nil { - return "", err - } - return buf.String(), nil -} - -func (s *LLOSpec) MarshalTOML() ([]byte, error) { +func (s *StreamJobSpec) MarshalTOML() ([]byte, error) { return toml.Marshal(s) } - -var funcMap = template.FuncMap{ - "inc": func(i int) int { - return i + 1 - }, -} - -var pipelineTemplate = `{{range $i, $a := .Datasources}} -{{- $srcNum := inc $i -}} -// data source {{$srcNum}} -ds{{$srcNum}}_payload [type=bridge name="bridge-{{$a.BridgeName}}" timeout="50s" requestData={{$a.ReqData}}]; - -ds{{$srcNum}}_benchmark [type=jsonparse path="{{$.Benchmark.ResultPath}}"]; -ds{{$srcNum}}_bid [type=jsonparse path="{{$.Bid.ResultPath}}"]; -ds{{$srcNum}}_ask [type=jsonparse path="{{$.Ask.ResultPath}}"]; -{{end -}} - -{{range $i, $a := .Datasources}} -{{- $srcNum := inc $i -}} -ds{{$srcNum}}_payload -> ds{{$srcNum}}_benchmark -> benchmark_price; -{{end -}} -benchmark_price [type=median allowedFaults={{.AllowedFaults}} index=0]; - -{{range $i, $a := .Datasources}} -{{- $srcNum := inc $i -}} -ds{{$srcNum}}_payload -> ds{{$srcNum}}_bid -> bid_price; -{{end -}} -bid_price [type=median allowedFaults={{.AllowedFaults}} index=1]; - -{{range $i, $a := .Datasources}} -{{- $srcNum := inc $i -}} -ds{{$srcNum}}_payload -> ds{{$srcNum}}_ask -> ask_price; -{{end -}} -ask_price [type=median allowedFaults={{.AllowedFaults}} index=2]; -` - -var observationTmpl = template.Must(template.New("observationSource"). - Funcs(funcMap). - Parse(pipelineTemplate), -) diff --git a/deployment/data-streams/jobs/stream_test.go b/deployment/data-streams/jobs/stream_test.go index b570bdf5efd..8104648f83d 100644 --- a/deployment/data-streams/jobs/stream_test.go +++ b/deployment/data-streams/jobs/stream_test.go @@ -1,84 +1,178 @@ package jobs import ( - "fmt" - "strings" + "errors" "testing" - "text/template" "github.com/google/uuid" "github.com/stretchr/testify/require" ) -func TestLLOSpec_MarshalTOML(t *testing.T) { +func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { testCases := []struct { name string - spec LLOSpec - obs ObservationSource + spec StreamJobSpec + obs MedianObservationSource wantSubstr []string }{ { name: "multiple datasources with valid paths", - spec: LLOSpec{ + spec: StreamJobSpec{ Base: Base{ - Name: "ETH/USD-Test", + Name: "BTC/USD-Test", Type: "stream", SchemaVersion: 1, ExternalJobID: uuid.New(), }, - StreamID: "1000000001", + StreamID: "1000", }, - obs: ObservationSource{ - Datasources: []Datasource{ - { - BridgeName: "coinmetrics", - ReqData: `{"data":{"endpoint":"cryptolwba","from":"ETH","to":"USD"}}`, + obs: MedianObservationSource{ + BaseObservationSource: BaseObservationSource{ + Datasources: []Datasource{ + { + BridgeName: "bridge1", + ReqData: `{"data":{"endpoint":"test1"}}`, + }, + { + BridgeName: "bridge2", + ReqData: `{"data":{"endpoint":"test2"}}`, + }, }, - { - BridgeName: "ncfx", - ReqData: `{"data":{"endpoint":"cryptolwba","from":"ETH","to":"USD"}}`, + AllowedFaults: 2, + Benchmark: ReportFieldLLO{ + ResultPath: "data,median", }, }, - AllowedFaults: 2, - Benchmark: ReportField{ - ResultPath: "data,mid", + }, + wantSubstr: []string{ + "bridge-bridge1", + "bridge-bridge2", + "allowedFaults=2", + `data,median`, + }, + }, + { + name: "empty datasource list", + spec: StreamJobSpec{ + Base: Base{ + Name: "Empty-Median-Test", + Type: "stream", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + StreamID: "2000", + }, + obs: MedianObservationSource{ + BaseObservationSource: BaseObservationSource{ + Datasources: []Datasource{}, + AllowedFaults: 1, + Benchmark: ReportFieldLLO{ + ResultPath: "data,empty", + }, }, - Bid: ReportField{ + }, + wantSubstr: []string{ + "allowedFaults=1", + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.spec.SetObservationSource(tc.obs) + require.NoError(t, err) + + tomlBytes, err := tc.spec.MarshalTOML() + require.NoError(t, err) + + result := string(tomlBytes) + for _, substr := range tc.wantSubstr { + require.Contains(t, result, substr, + "result %q does not contain expected substring %q", result, substr) + } + }) + } +} + +func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { + testCases := []struct { + name string + spec StreamJobSpec + obs QuoteObservationSource + wantSubstr []string + }{ + { + name: "multiple datasources with valid paths", + spec: StreamJobSpec{ + Base: Base{ + Name: "BTC/USD-Quote", + Type: "stream", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + StreamID: "3000", + }, + obs: QuoteObservationSource{ + BaseObservationSource: BaseObservationSource{ + Datasources: []Datasource{ + { + BridgeName: "bridge1", + ReqData: `{"data":{"endpoint":"quote1"}}`, + }, + { + BridgeName: "bridge2", + ReqData: `{"data":{"endpoint":"quote2"}}`, + }, + }, + AllowedFaults: 3, + Benchmark: ReportFieldLLO{ + ResultPath: "data,benchmark", + }, + }, + Bid: ReportFieldLLO{ ResultPath: "data,bid", }, - Ask: ReportField{ + Ask: ReportFieldLLO{ ResultPath: "data,ask", }, }, wantSubstr: []string{ - `bridge-coinmetrics`, - `bridge-ncfx`, - `allowedFaults=2`, - `path=\"data,mid\"`, - `path=\"data,bid\"`, - `path=\"data,ask\"`, + "bridge-bridge1", + "bridge-bridge2", + "allowedFaults=3", + `data,benchmark`, + `data,bid`, + `data,ask`, }, }, { name: "empty datasource list", - spec: LLOSpec{ + spec: StreamJobSpec{ Base: Base{ - Name: "Empty-Test", + Name: "Empty-Quote-Test", Type: "stream", SchemaVersion: 1, ExternalJobID: uuid.New(), }, - StreamID: "2000000002", + StreamID: "4000", }, - obs: ObservationSource{ - Datasources: []Datasource{}, - AllowedFaults: 1, - Benchmark: ReportField{ResultPath: "data,benchmark"}, - Bid: ReportField{ResultPath: "data,bid"}, - Ask: ReportField{ResultPath: "data,ask"}, + obs: QuoteObservationSource{ + BaseObservationSource: BaseObservationSource{ + Datasources: []Datasource{}, + AllowedFaults: 1, + Benchmark: ReportFieldLLO{ + ResultPath: "data,empty", + }, + }, + Bid: ReportFieldLLO{ + ResultPath: "data,emptyBid", + }, + Ask: ReportFieldLLO{ + ResultPath: "data,emptyAsk", + }, }, wantSubstr: []string{ - `allowedFaults=1`, + "allowedFaults=1", }, }, } @@ -87,34 +181,37 @@ func TestLLOSpec_MarshalTOML(t *testing.T) { t.Run(tc.name, func(t *testing.T) { err := tc.spec.SetObservationSource(tc.obs) require.NoError(t, err) + tomlBytes, err := tc.spec.MarshalTOML() require.NoError(t, err) + result := string(tomlBytes) for _, substr := range tc.wantSubstr { - require.True(t, strings.Contains(result, substr), + require.Contains(t, result, substr, "result %q does not contain expected substring %q", result, substr) } }) } } -func TestLLOSpec_Error(t *testing.T) { - originalTmpl := observationTmpl - defer func() { - observationTmpl = originalTmpl - }() +type errorPipeline struct{} - faultyTmpl := template.Must(template.New("faulty"). - Funcs(template.FuncMap{ - "error": func(msg string) (string, error) { - return "", fmt.Errorf("%s", msg) - }, - }). - Parse(`{{ error "forced error" }}`)) - observationTmpl = faultyTmpl +func (e errorPipeline) Render() (string, error) { + return "", errors.New("forced error") +} + +func TestStreamJobSpec_SetObservationSource_Error(t *testing.T) { + spec := StreamJobSpec{ + Base: Base{ + Name: "Error-Test", + Type: "stream", + SchemaVersion: 1, + ExternalJobID: uuid.New(), + }, + StreamID: "5000", + } - spec := LLOSpec{} - obs := ObservationSource{} - _, err := spec.buildObservationSource(obs) - require.Error(t, err, "expected error from faulty template execution") + err := spec.SetObservationSource(errorPipeline{}) + require.Error(t, err) + require.Contains(t, err.Error(), "forced error") } diff --git a/deployment/data-streams/jobs/template.go b/deployment/data-streams/jobs/template.go new file mode 100644 index 00000000000..2c81cf487c6 --- /dev/null +++ b/deployment/data-streams/jobs/template.go @@ -0,0 +1,38 @@ +package jobs + +import ( + "embed" + "math/big" + "strings" + "text/template" +) + +//go:embed templates/*.tmpl +var templatesFS embed.FS + +func newTemplate() (*template.Template, error) { + funcMap := template.FuncMap{ + "inc": func(i int) int { + return i + 1 + }, + "dec": func(i int) int { + return i - 1 + }, + "times": func(i int64) string { + t := big.NewInt(10) + t.Exp(t, big.NewInt(i), nil) + return t.String() + }, + } + return template.New("ds").Funcs(funcMap).ParseFS(templatesFS, "templates/*.tmpl") +} + +func renderTemplate(fname string, data any) (string, error) { + tmpl, err := newTemplate() + if err != nil { + return "", err + } + b := new(strings.Builder) + err = tmpl.ExecuteTemplate(b, fname, data) + return b.String(), err +} diff --git a/deployment/data-streams/jobs/templates/osrc_mercury_v1_median.go.tmpl b/deployment/data-streams/jobs/templates/osrc_mercury_v1_median.go.tmpl new file mode 100644 index 00000000000..86dac4aafe5 --- /dev/null +++ b/deployment/data-streams/jobs/templates/osrc_mercury_v1_median.go.tmpl @@ -0,0 +1,13 @@ +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +// data source {{$srcNum}} +ds{{$srcNum}}_payload [type=bridge name="bridge-{{$a.BridgeName}}" timeout="50s" requestData={{$a.ReqData}}]; + +ds{{$srcNum}}_benchmark [type=jsonparse path="{{$.Benchmark.ResultPath}}"]; +{{end -}} + +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_benchmark -> benchmark_price; +{{end -}} +benchmark_price [type=median allowedFaults={{.AllowedFaults}} index=0]; diff --git a/deployment/data-streams/jobs/templates/osrc_mercury_v1_quote.go.tmpl b/deployment/data-streams/jobs/templates/osrc_mercury_v1_quote.go.tmpl new file mode 100644 index 00000000000..41fedff54fc --- /dev/null +++ b/deployment/data-streams/jobs/templates/osrc_mercury_v1_quote.go.tmpl @@ -0,0 +1,27 @@ +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +// data source {{$srcNum}} +ds{{$srcNum}}_payload [type=bridge name="bridge-{{$a.BridgeName}}" timeout="50s" requestData={{$a.ReqData}}]; + +ds{{$srcNum}}_benchmark [type=jsonparse path="{{$.Benchmark.ResultPath}}"]; +ds{{$srcNum}}_bid [type=jsonparse path="{{$.Bid.ResultPath}}"]; +ds{{$srcNum}}_ask [type=jsonparse path="{{$.Ask.ResultPath}}"]; +{{end -}} + +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_benchmark -> benchmark_price; +{{end -}} +benchmark_price [type=median allowedFaults={{.AllowedFaults}} index=0]; + +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_bid -> bid_price; +{{end -}} +bid_price [type=median allowedFaults={{.AllowedFaults}} index=1]; + +{{range $i, $a := .Datasources}} +{{- $srcNum:=inc $i -}} +ds{{$srcNum}}_payload -> ds{{$srcNum}}_ask -> ask_price; +{{end -}} +ask_price [type=median allowedFaults={{.AllowedFaults}} index=2]; From ef0b06bf11ac3a999d8d958a476382e764954784 Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Fri, 21 Feb 2025 09:48:42 -0300 Subject: [PATCH 3/6] feat: update tests --- deployment/data-streams/jobs/don_test.go | 102 +++++++------- deployment/data-streams/jobs/stream_test.go | 144 +++++++++++++------- 2 files changed, 150 insertions(+), 96 deletions(-) diff --git a/deployment/data-streams/jobs/don_test.go b/deployment/data-streams/jobs/don_test.go index 69b43ae4136..9e7ae63f033 100644 --- a/deployment/data-streams/jobs/don_test.go +++ b/deployment/data-streams/jobs/don_test.go @@ -8,14 +8,56 @@ import ( "github.com/stretchr/testify/require" ) +const donSpecTOML1 = `name = 'Test-DON' +type = 'don' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +contractID = 'contract-123' +transmitterID = 'tx-123' +forwardingAllowed = true +p2pv2Bootstrappers = ['bootstrap1', 'bootstrap2'] +ocrKeyBundleID = 'ocr-bundle-123' +maxTaskDuration = 10000000000 +contractConfigTrackerPollInterval = 60000000000 +relay = 'testrelay' +pluginType = 'testplugin' + +[relayConfig] +chainID = 'chain' +fromBlock = 100 +lloConfigMode = 'mode' +lloDonID = 200 + +[pluginConfig] +channelDefinitionsContractAddress = '0xabc' +channelDefinitionsContractFromBlock = 50 +donID = 300 +servers = {server1 = 'http://localhost'} +` + +const donSpecTOML2 = `name = 'Empty-DON-Test' +type = 'don' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +contractID = 'contract-empty' + +[relayConfig] +chainID = '' + +[pluginConfig] +channelDefinitionsContractAddress = '' +channelDefinitionsContractFromBlock = 0 +donID = 0 +servers = {} +` + func TestDonJobSpec_MarshalTOML(t *testing.T) { - trueVal := true ocrKeyBundle := "ocr-bundle-123" - + trueVal := true testCases := []struct { - name string - spec DonJobSpec - wantSubstr []string + name string + spec DonJobSpec + want string }{ { name: "with fields populated", @@ -24,7 +66,7 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { Name: "Test-DON", Type: "don", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, ContractID: "contract-123", TransmitterID: "tx-123", @@ -48,39 +90,7 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { Servers: map[string]string{"server1": "http://localhost"}, }, }, - wantSubstr: []string{ - "contractID", - "contract-123", - "transmitterID", - "tx-123", - "p2pv2Bootstrappers", - "bootstrap1", - "ocrKeyBundleID", - "ocr-bundle-123", - "maxTaskDuration", - "contractConfigTrackerPollInterval", - "relay", - "testrelay", - "pluginType", - "testplugin", - "chainID", - "chain", - "fromBlock", - "100", - "lloConfigMode", - "mode", - "lloDonID", - "200", - "channelDefinitionsContractAddress", - "0xabc", - "channelDefinitionsContractFromBlock", - "50", - "donID", - "300", - "servers", - "server1", - "http://localhost", - }, + want: donSpecTOML1, }, { name: "empty minimal fields", @@ -89,7 +99,7 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { Name: "Empty-DON-Test", Type: "don", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, ContractID: "contract-empty", RelayConfig: RelayConfigDon{}, @@ -97,10 +107,7 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { Servers: map[string]string{}, }, }, - wantSubstr: []string{ - "contractID", - "contract-empty", - }, + want: donSpecTOML2, }, } @@ -108,11 +115,8 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { t.Run(tc.name, func(t *testing.T) { tomlBytes, err := tc.spec.MarshalTOML() require.NoError(t, err) - - result := string(tomlBytes) - for _, substr := range tc.wantSubstr { - require.Contains(t, result, substr, "result %q does not contain expected substring %q", result, substr) - } + got := string(tomlBytes) + require.Equal(t, tc.want, got) }) } } diff --git a/deployment/data-streams/jobs/stream_test.go b/deployment/data-streams/jobs/stream_test.go index 8104648f83d..c5622bc40ff 100644 --- a/deployment/data-streams/jobs/stream_test.go +++ b/deployment/data-streams/jobs/stream_test.go @@ -8,12 +8,88 @@ import ( "github.com/stretchr/testify/require" ) +const medianSpecTOMLMultiple = `name = 'BTC/USD-Test' +type = 'stream' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +streamID = '1000' +observationSource = """ +// data source 1 +ds1_payload [type=bridge name=\"bridge-bridge1\" timeout=\"50s\" requestData={\"data\":{\"endpoint\":\"test1\"}}]; + +ds1_benchmark [type=jsonparse path=\"data,median\"]; +// data source 2 +ds2_payload [type=bridge name=\"bridge-bridge2\" timeout=\"50s\" requestData={\"data\":{\"endpoint\":\"test2\"}}]; + +ds2_benchmark [type=jsonparse path=\"data,median\"]; +ds1_payload -> ds1_benchmark -> benchmark_price; +ds2_payload -> ds2_benchmark -> benchmark_price; +benchmark_price [type=median allowedFaults=2 index=0]; +""" +` + +const medianSpecTOMLEmpty = `name = 'Empty-Median-Test' +type = 'stream' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +streamID = '2000' +observationSource = """ +benchmark_price [type=median allowedFaults=1 index=0]; +""" +` + +const quoteSpecTOMLMultiple = `name = 'BTC/USD-Quote' +type = 'stream' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +streamID = '3000' +observationSource = """ +// data source 1 +ds1_payload [type=bridge name=\"bridge-bridge1\" timeout=\"50s\" requestData={\"data\":{\"endpoint\":\"quote1\"}}]; + +ds1_benchmark [type=jsonparse path=\"data,benchmark\"]; +ds1_bid [type=jsonparse path=\"data,bid\"]; +ds1_ask [type=jsonparse path=\"data,ask\"]; +// data source 2 +ds2_payload [type=bridge name=\"bridge-bridge2\" timeout=\"50s\" requestData={\"data\":{\"endpoint\":\"quote2\"}}]; + +ds2_benchmark [type=jsonparse path=\"data,benchmark\"]; +ds2_bid [type=jsonparse path=\"data,bid\"]; +ds2_ask [type=jsonparse path=\"data,ask\"]; +ds1_payload -> ds1_benchmark -> benchmark_price; +ds2_payload -> ds2_benchmark -> benchmark_price; +benchmark_price [type=median allowedFaults=3 index=0]; + +ds1_payload -> ds1_bid -> bid_price; +ds2_payload -> ds2_bid -> bid_price; +bid_price [type=median allowedFaults=3 index=1]; + +ds1_payload -> ds1_ask -> ask_price; +ds2_payload -> ds2_ask -> ask_price; +ask_price [type=median allowedFaults=3 index=2]; +""" +` + +const quoteSpecTOMLEmpty = `name = 'Empty-Quote-Test' +type = 'stream' +schemaVersion = 1 +externalJobID = '00000000-0000-0000-0000-000000000000' +streamID = '4000' +observationSource = """ +benchmark_price [type=median allowedFaults=1 index=0]; + +bid_price [type=median allowedFaults=1 index=1]; + +ask_price [type=median allowedFaults=1 index=2]; +""" +` + func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { testCases := []struct { - name string - spec StreamJobSpec - obs MedianObservationSource - wantSubstr []string + name string + spec StreamJobSpec + obs MedianObservationSource + want string }{ { name: "multiple datasources with valid paths", @@ -22,7 +98,7 @@ func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { Name: "BTC/USD-Test", Type: "stream", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, StreamID: "1000", }, @@ -44,12 +120,7 @@ func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { }, }, }, - wantSubstr: []string{ - "bridge-bridge1", - "bridge-bridge2", - "allowedFaults=2", - `data,median`, - }, + want: medianSpecTOMLMultiple, }, { name: "empty datasource list", @@ -58,7 +129,7 @@ func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { Name: "Empty-Median-Test", Type: "stream", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, StreamID: "2000", }, @@ -71,9 +142,7 @@ func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { }, }, }, - wantSubstr: []string{ - "allowedFaults=1", - }, + want: medianSpecTOMLEmpty, }, } @@ -81,25 +150,20 @@ func TestStreamJobSpec_Median_MarshalTOML(t *testing.T) { t.Run(tc.name, func(t *testing.T) { err := tc.spec.SetObservationSource(tc.obs) require.NoError(t, err) - tomlBytes, err := tc.spec.MarshalTOML() require.NoError(t, err) - - result := string(tomlBytes) - for _, substr := range tc.wantSubstr { - require.Contains(t, result, substr, - "result %q does not contain expected substring %q", result, substr) - } + got := string(tomlBytes) + require.Equal(t, tc.want, got) }) } } func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { testCases := []struct { - name string - spec StreamJobSpec - obs QuoteObservationSource - wantSubstr []string + name string + spec StreamJobSpec + obs QuoteObservationSource + want string }{ { name: "multiple datasources with valid paths", @@ -108,7 +172,7 @@ func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { Name: "BTC/USD-Quote", Type: "stream", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, StreamID: "3000", }, @@ -136,14 +200,7 @@ func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { ResultPath: "data,ask", }, }, - wantSubstr: []string{ - "bridge-bridge1", - "bridge-bridge2", - "allowedFaults=3", - `data,benchmark`, - `data,bid`, - `data,ask`, - }, + want: quoteSpecTOMLMultiple, }, { name: "empty datasource list", @@ -152,7 +209,7 @@ func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { Name: "Empty-Quote-Test", Type: "stream", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, StreamID: "4000", }, @@ -171,9 +228,7 @@ func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { ResultPath: "data,emptyAsk", }, }, - wantSubstr: []string{ - "allowedFaults=1", - }, + want: quoteSpecTOMLEmpty, }, } @@ -184,12 +239,8 @@ func TestStreamJobSpec_Quote_MarshalTOML(t *testing.T) { tomlBytes, err := tc.spec.MarshalTOML() require.NoError(t, err) - - result := string(tomlBytes) - for _, substr := range tc.wantSubstr { - require.Contains(t, result, substr, - "result %q does not contain expected substring %q", result, substr) - } + got := string(tomlBytes) + require.Equal(t, tc.want, got) }) } } @@ -206,11 +257,10 @@ func TestStreamJobSpec_SetObservationSource_Error(t *testing.T) { Name: "Error-Test", Type: "stream", SchemaVersion: 1, - ExternalJobID: uuid.New(), + ExternalJobID: uuid.MustParse("00000000-0000-0000-0000-000000000000"), }, StreamID: "5000", } - err := spec.SetObservationSource(errorPipeline{}) require.Error(t, err) require.Contains(t, err.Error(), "forced error") From b33d188bdc68f229401ae9c1a0a1df89e6f53137 Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Fri, 21 Feb 2025 11:29:55 -0300 Subject: [PATCH 4/6] feat: add pointer lib to deployment module --- deployment/data-streams/jobs/don_test.go | 7 +++---- deployment/go.mod | 1 + 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/deployment/data-streams/jobs/don_test.go b/deployment/data-streams/jobs/don_test.go index 9e7ae63f033..aa7f6110313 100644 --- a/deployment/data-streams/jobs/don_test.go +++ b/deployment/data-streams/jobs/don_test.go @@ -4,6 +4,7 @@ import ( "testing" "time" + "github.com/AlekSi/pointer" "github.com/google/uuid" "github.com/stretchr/testify/require" ) @@ -52,8 +53,6 @@ servers = {} ` func TestDonJobSpec_MarshalTOML(t *testing.T) { - ocrKeyBundle := "ocr-bundle-123" - trueVal := true testCases := []struct { name string spec DonJobSpec @@ -70,9 +69,9 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { }, ContractID: "contract-123", TransmitterID: "tx-123", - ForwardingAllowed: &trueVal, + ForwardingAllowed: pointer.ToBoolOrNil(true), P2PV2Bootstrappers: []string{"bootstrap1", "bootstrap2"}, - OCRKeyBundleID: &ocrKeyBundle, + OCRKeyBundleID: pointer.ToStringOrNil("ocr-bundle-123"), MaxTaskDuration: 10 * time.Second, ContractConfigTrackerPollInterval: 1 * time.Minute, Relay: "testrelay", diff --git a/deployment/go.mod b/deployment/go.mod index c0586ad3722..4158ad3d854 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -10,6 +10,7 @@ replace github.com/smartcontractkit/chainlink/v2 => ../ require github.com/smartcontractkit/chainlink/v2 v2.0.0-20250128231431-9279badae2f0 require ( + github.com/AlekSi/pointer v1.1.0 github.com/Khan/genqlient v0.7.0 github.com/Masterminds/semver/v3 v3.3.0 github.com/aptos-labs/aptos-go-sdk v1.5.0 From fae41c5b5489b84acc0bb2110867edfd4944215f Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Fri, 21 Feb 2025 14:06:45 -0300 Subject: [PATCH 5/6] feat: change pointer util --- deployment/data-streams/jobs/don_test.go | 6 +++--- deployment/data-streams/utils/pointer/pointer.go | 5 +++++ deployment/go.mod | 1 - 3 files changed, 8 insertions(+), 4 deletions(-) create mode 100644 deployment/data-streams/utils/pointer/pointer.go diff --git a/deployment/data-streams/jobs/don_test.go b/deployment/data-streams/jobs/don_test.go index aa7f6110313..eafa4628ae0 100644 --- a/deployment/data-streams/jobs/don_test.go +++ b/deployment/data-streams/jobs/don_test.go @@ -4,8 +4,8 @@ import ( "testing" "time" - "github.com/AlekSi/pointer" "github.com/google/uuid" + "github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer" "github.com/stretchr/testify/require" ) @@ -69,9 +69,9 @@ func TestDonJobSpec_MarshalTOML(t *testing.T) { }, ContractID: "contract-123", TransmitterID: "tx-123", - ForwardingAllowed: pointer.ToBoolOrNil(true), + ForwardingAllowed: pointer.To(true), P2PV2Bootstrappers: []string{"bootstrap1", "bootstrap2"}, - OCRKeyBundleID: pointer.ToStringOrNil("ocr-bundle-123"), + OCRKeyBundleID: pointer.To("ocr-bundle-123"), MaxTaskDuration: 10 * time.Second, ContractConfigTrackerPollInterval: 1 * time.Minute, Relay: "testrelay", diff --git a/deployment/data-streams/utils/pointer/pointer.go b/deployment/data-streams/utils/pointer/pointer.go new file mode 100644 index 00000000000..48772a45fce --- /dev/null +++ b/deployment/data-streams/utils/pointer/pointer.go @@ -0,0 +1,5 @@ +package pointer + +func To[T any](v T) *T { + return &v +} diff --git a/deployment/go.mod b/deployment/go.mod index 4158ad3d854..c0586ad3722 100644 --- a/deployment/go.mod +++ b/deployment/go.mod @@ -10,7 +10,6 @@ replace github.com/smartcontractkit/chainlink/v2 => ../ require github.com/smartcontractkit/chainlink/v2 v2.0.0-20250128231431-9279badae2f0 require ( - github.com/AlekSi/pointer v1.1.0 github.com/Khan/genqlient v0.7.0 github.com/Masterminds/semver/v3 v3.3.0 github.com/aptos-labs/aptos-go-sdk v1.5.0 From 96dcfaf2d9139bda552a1d49c4aacc01c471d193 Mon Sep 17 00:00:00 2001 From: ChrisAmora <27789416+ChrisAmora@users.noreply.github.com> Date: Fri, 21 Feb 2025 14:35:14 -0300 Subject: [PATCH 6/6] fix: goimport --- deployment/data-streams/jobs/don_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deployment/data-streams/jobs/don_test.go b/deployment/data-streams/jobs/don_test.go index eafa4628ae0..90407cb48cb 100644 --- a/deployment/data-streams/jobs/don_test.go +++ b/deployment/data-streams/jobs/don_test.go @@ -5,8 +5,9 @@ import ( "time" "github.com/google/uuid" - "github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer" "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/deployment/data-streams/utils/pointer" ) const donSpecTOML1 = `name = 'Test-DON'