Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
samsondav committed Nov 14, 2023
1 parent 47185db commit f109e6b
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 129 deletions.
118 changes: 0 additions & 118 deletions ]

This file was deleted.

10 changes: 5 additions & 5 deletions core/services/job/spawner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ type (
Delegate interface {
JobType() Type
// BeforeJobCreated is only called once on first time job create.
BeforeJobCreated(spec Job)
BeforeJobCreated(Job)
// ServicesForSpec returns services to be started and stopped for this
// job. In case a given job type relies upon well-defined startup/shutdown
// ordering for services, they are started in the order they are given
// and stopped in reverse order.
ServicesForSpec(spec Job) ([]ServiceCtx, error)
AfterJobCreated(spec Job)
BeforeJobDeleted(spec Job)
ServicesForSpec(Job) ([]ServiceCtx, error)
AfterJobCreated(Job)
BeforeJobDeleted(Job)
// OnDeleteJob will be called from within DELETE db transaction. Any db
// commands issued within OnDeleteJob() should be performed first, before any
// non-db side effects. This is required in order to guarantee mutual atomicity between
// all tasks intended to happen during job deletion. For the same reason, the job will
// not show up in the db within OnDeleteJob(), even though it is still actively running.
OnDeleteJob(spec Job, q pg.Queryer) error
OnDeleteJob(jb Job, q pg.Queryer) error
}

activeJob struct {
Expand Down
2 changes: 2 additions & 0 deletions core/services/llo/channel_definition_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func NewChannelDefinitionCache() ChannelDefinitionCache {
return &channelDefinitionCache{}
}

// TODO: Needs a way to subscribe/unsubscribe to contracts

func (c *channelDefinitionCache) Start(ctx context.Context) error {
// TODO: Initial load, then poll
// TODO: needs to be populated asynchronously from onchain ConfigurationStore
Expand Down
95 changes: 95 additions & 0 deletions core/services/llo/delegate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package llo

import (
"context"
"fmt"
"log"

relayllo "github.com/smartcontractkit/chainlink-relay/pkg/reportingplugins/llo"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/libocr/offchainreporting2/types"
ocr2plus "github.com/smartcontractkit/libocr/offchainreporting2plus"
"github.com/smartcontractkit/libocr/offchainreporting2plus/ocr3types"

"github.com/smartcontractkit/libocr/commontypes"
)

var _ job.Delegate = &Delegate{}

// Delegate is a container struct for an Oracle plugin. This struct provides
// the ability to start and stop underlying services associated with the
// plugin instance.
type Delegate struct {
llo ocr2plus.Oracle
logger *log.Logger
}

type DelegateConfig struct {
BinaryNetworkEndpointFactory types.BinaryNetworkEndpointFactory
V2Bootstrappers []commontypes.BootstrapperLocator
ContractConfigTracker types.ContractConfigTracker
ContractTransmitter ocr3types.ContractTransmitter[relayllo.ReportInfo]
KeepersDatabase ocr3types.Database
Logger commontypes.Logger
MonitoringEndpoint commontypes.MonitoringEndpoint
OffchainConfigDigester types.OffchainConfigDigester
OffchainKeyring types.OffchainKeyring
OnchainKeyring ocr3types.OnchainKeyring[relayllo.ReportInfo]
LocalConfig types.LocalConfig
}

func NewDelegate(c DelegateConfig) (*Delegate, error) {
return &Delegate{}
}

func (d *Delegate) Start(_ context.Context) error {
return d.llo.Start()
}

func (d *Delegate) JobType() job.Type {
// FIXME: Is this correct?
return job.OffchainReporting2
}

// BeforeJobCreated is only called once on first time job create.
func (d *Delegate) BeforeJobCreated(jb job.Job) {}

// ServicesForSpec returns services to be started and stopped for this
// job. In case a given job type relies upon well-defined startup/shutdown
// ordering for services, they are started in the order they are given
// and stopped in reverse order.
func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
// create the oracle from config values
llo, err := ocr2plus.NewOracle(ocr2plus.OCR3OracleArgs[relayllo.ReportInfo]{
BinaryNetworkEndpointFactory: c.BinaryNetworkEndpointFactory,
V2Bootstrappers: c.V2Bootstrappers,
ContractConfigTracker: c.ContractConfigTracker,
ContractTransmitter: c.ContractTransmitter,
Database: c.KeepersDatabase,
LocalConfig: c.LocalConfig,
Logger: c.Logger,
MonitoringEndpoint: c.MonitoringEndpoint,
OffchainConfigDigester: c.OffchainConfigDigester,
OffchainKeyring: c.OffchainKeyring,
OnchainKeyring: c.OnchainKeyring,
ReportingPluginFactory: relayllo.NewLLOPluginFactory(),
})

if err != nil {
return nil, fmt.Errorf("%w: failed to create new OCR oracle", err)
}

return []job.ServiceCtx{llo}
}
func (d *Delegate) AfterJobCreated(jb job.Job) {}
func (d *Delegate) BeforeJobDeleted(jb job.Job) {}

// OnDeleteJob will be called from within DELETE db transaction. Any db
// commands issued within OnDeleteJob() should be performed first, before any
// non-db side effects. This is required in order to guarantee mutual atomicity between
// all tasks intended to happen during job deletion. For the same reason, the job will
// not show up in the db within OnDeleteJob(), even though it is still actively running.
func (d *Delegate) OnDeleteJob(jb job.Job, q pg.Queryer) error {
return nil
}
4 changes: 2 additions & 2 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,13 +796,13 @@ func (d *Delegate) newServicesLLO(

chEnhancedTelem := make(chan ocrcommon.EnhancedTelemetryMercuryData, 100)

mercuryServices, err2 := mercury.NewServices(jb, mercuryProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))
lloServices, err2 := llo.NewServices(jb, lloProvider, d.pipelineRunner, runResults, lggr, oracleArgsNoPlugin, d.cfg.JobPipeline(), chEnhancedTelem, chain, d.mercuryORM, (mercuryutils.FeedID)(*spec.FeedID))

if ocrcommon.ShouldCollectEnhancedTelemetryMercury(jb) {
enhancedTelemService := ocrcommon.NewEnhancedTelemetryService(&jb, chEnhancedTelem, make(chan struct{}), d.monitoringEndpointGen.GenMonitoringEndpoint(rid.Network, rid.ChainID, spec.FeedID.String(), synchronization.EnhancedEAMercury), lggr.Named("EnhancedTelemetryMercury"))
mercuryServices = append(mercuryServices, enhancedTelemService)
} else {
lggr.Infow("Enhanced telemetry is disabled for mercury job", "job", jb.Name)
lggr.Infow("Enhanced telemetry is disabled for llo job", "job", jb.Name)
}

return mercuryServices, err2
Expand Down
3 changes: 2 additions & 1 deletion core/services/relay/evm/evm.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ func (r *Relayer) NewLLOProvider(rargs relaytypes.RelayArgs, pargs relaytypes.Pl
// FIXME
// transmitter := llo.NewTransmitter(r.lggr, configWatcher.ContractConfigTracker(), client, privKey.PublicKey, rargs.JobID, r.db, r.pgCfg)
transmitter := llo.NewTransmitter(r.lggr, client, privKey.PublicKey)
channelDefinitionCache := llo.NewChannelDefinitionCache()

return NewLLOProvider(configWatcher, transmitter, r.lggr), nil
return NewLLOProvider(configWatcher, transmitter, r.lggr, channelDefinitionCache), nil
}

func (r *Relayer) NewFunctionsProvider(rargs relaytypes.RelayArgs, pargs relaytypes.PluginArgs) (relaytypes.FunctionsProvider, error) {
Expand Down
8 changes: 5 additions & 3 deletions core/services/relay/evm/llo_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
var _ relaytypes.LLOProvider = (*lloProvider)(nil)

type lloProvider struct {
configWatcher *configWatcher
transmitter llo.Transmitter
logger logger.Logger
configWatcher *configWatcher
transmitter llo.Transmitter
logger logger.Logger
channelDefinitionCache llo.ChannelDefinitionCache

ms services.MultiStart
}
Expand All @@ -33,6 +34,7 @@ func NewLLOProvider(
configWatcher,
transmitter,
lggr,
channelDefinitionCache,
services.MultiStart{},
}
}
Expand Down

0 comments on commit f109e6b

Please sign in to comment.