diff --git a/consensus/pbft/execute.go b/consensus/pbft/execute.go index 11977099..b0a76665 100644 --- a/consensus/pbft/execute.go +++ b/consensus/pbft/execute.go @@ -1,6 +1,7 @@ package pbft import ( + "encoding/hex" "fmt" "time" @@ -98,6 +99,7 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error { RequestTimestamp: request.Timestamp, Replica: r.id, }, + Signature: hex.EncodeToString(res.Signature), } // Save this executions in case it's requested again. diff --git a/executor/config.go b/executor/config.go index 16e723f2..81f41b79 100644 --- a/executor/config.go +++ b/executor/config.go @@ -4,26 +4,37 @@ import ( "github.com/spf13/afero" "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/models/execute" ) +type ExecutionSigner interface { + Sign(execute.Request, execute.RuntimeOutput) ([]byte, error) +} + +type MetaProvider interface { + WithMetadata(execute.Request, execute.RuntimeOutput) (interface{}, error) +} + // defaultConfig used to create Executor. var defaultConfig = Config{ - WorkDir: "workspace", - RuntimeDir: "", - ExecutableName: blockless.RuntimeCLI(), - FS: afero.NewOsFs(), - Limiter: &noopLimiter{}, + WorkDir: "workspace", + RuntimeDir: "", + ExecutableName: blockless.RuntimeCLI(), + FS: afero.NewOsFs(), + Limiter: &noopLimiter{}, DriversRootPath: "", } // Config represents the Executor configuration. type Config struct { - WorkDir string // directory where files needed for the execution are stored - RuntimeDir string // directory where the executable can be found - ExecutableName string // name for the executable - DriversRootPath string // where are cgi drivers stored - FS afero.Fs // FS accessor - Limiter Limiter // Resource limiter for executed processes + WorkDir string // directory where files needed for the execution are stored + RuntimeDir string // directory where the executable can be found + ExecutableName string // name for the executable + DriversRootPath string // where are cgi drivers stored + FS afero.Fs // FS accessor + Limiter Limiter // Resource limiter for executed processes + Signer ExecutionSigner // Signer for the executor + MetaProvider MetaProvider // Metadata provider for the executor } type Option func(*Config) @@ -62,3 +73,17 @@ func WithLimiter(limiter Limiter) Option { cfg.Limiter = limiter } } + +// WithSigner sets the signer for the executor. +func WithSigner(signer ExecutionSigner) Option { + return func(cfg *Config) { + cfg.Signer = signer + } +} + +// WithMetaProvider sets the metadata provider for the executor. +func WithMetaProvider(meta MetaProvider) Option { + return func(cfg *Config) { + cfg.MetaProvider = meta + } +} diff --git a/executor/execute_function.go b/executor/execute_function.go index ae47e72e..c9cb4494 100644 --- a/executor/execute_function.go +++ b/executor/execute_function.go @@ -11,16 +11,15 @@ import ( func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) { // Execute the function. - out, usage, err := e.executeFunction(requestID, req) + out, usage, signature, meta, err := e.executeFunction(requestID, req) if err != nil { - res := execute.Result{ Code: codes.Error, RequestID: requestID, Result: out, Usage: usage, + Signature: signature, } - return res, fmt.Errorf("function execution failed: %w", err) } @@ -29,6 +28,8 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu RequestID: requestID, Result: out, Usage: usage, + Signature: signature, + Metadata: meta, } return res, nil @@ -36,7 +37,7 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu // executeFunction handles the actual execution of the Blockless function. It returns the // execution information like standard output, standard error, exit code and resource usage. -func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, error) { +func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, interface{}, error) { log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger() @@ -47,7 +48,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions) if err != nil { - return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) + return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, nil, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err) } // Remove all temporary files after we're done. defer func() { @@ -66,10 +67,28 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu out, usage, err := e.executeCommand(cmd) if err != nil { - return out, execute.Usage{}, fmt.Errorf("command execution failed: %w", err) + return out, execute.Usage{}, []byte{}, nil, fmt.Errorf("command execution failed: %w", err) } log.Info().Msg("command executed successfully") - return out, usage, nil + var signature []byte + if e.cfg.Signer != nil { + signature, err = e.cfg.Signer.Sign(req, out) + if err != nil { + return out, usage, []byte{}, nil, fmt.Errorf("failed to sign output: %w", err) + } + log.Debug().Msg("output signed") + } + + var metadata interface{} + if e.cfg.MetaProvider != nil { + metadata, err = e.cfg.MetaProvider.WithMetadata(req, out) + if err != nil { + return out, usage, []byte{}, nil, fmt.Errorf("failed to inject metadata: %w", err) + } + log.Debug().Msg("metadata injected") + } + + return out, usage, signature, metadata, nil } diff --git a/models/execute/response.go b/models/execute/response.go index 6feddf38..d87c960b 100644 --- a/models/execute/response.go +++ b/models/execute/response.go @@ -15,6 +15,8 @@ type Result struct { Result RuntimeOutput `json:"result"` RequestID string `json:"request_id"` Usage Usage `json:"usage,omitempty"` + Signature []byte `json:"signature,omitempty"` + Metadata interface{} `json:"metadata,omitempty"` } // Cluster represents the set of peers that executed the request. diff --git a/models/execute/runtime.go b/models/execute/runtime.go index 1b817e98..bc16e7b6 100644 --- a/models/execute/runtime.go +++ b/models/execute/runtime.go @@ -6,12 +6,12 @@ const ( // RuntimeConfig represents the CLI flags supported by the runtime type BLSRuntimeConfig struct { - Entry string `json:"entry,omitempty"` - ExecutionTime uint64 `json:"run_time,omitempty"` - DebugInfo bool `json:"debug_info,omitempty"` - Fuel uint64 `json:"limited_fuel,omitempty"` - Memory uint64 `json:"limited_memory,omitempty"` - Logger string `json:"runtime_logger,omitempty"` + Entry string `json:"entry,omitempty"` + ExecutionTime uint64 `json:"run_time,omitempty"` + DebugInfo bool `json:"debug_info,omitempty"` + Fuel uint64 `json:"limited_fuel,omitempty"` + Memory uint64 `json:"limited_memory,omitempty"` + Logger string `json:"runtime_logger,omitempty"` DriversRootPath string `json:"drivers_root_path,omitempty"` // Fields not allowed to be set in the request. Input string `json:"-"` @@ -29,5 +29,5 @@ const ( BLSRuntimeFlagLogger = "runtime-logger" BLSRuntimeFlagPermission = "permission" BLSRuntimeFlagEnv = "env" - BLSRuntimeFlagDrivers = "drivers-root-path" + BLSRuntimeFlagDrivers = "drivers-root-path" ) diff --git a/node/aggregate/aggregate.go b/node/aggregate/aggregate.go index 20cce244..e1bdc592 100644 --- a/node/aggregate/aggregate.go +++ b/node/aggregate/aggregate.go @@ -17,11 +17,17 @@ type Result struct { Peers []peer.ID `json:"peers,omitempty"` // How frequent was this result, in percentages. Frequency float64 `json:"frequency,omitempty"` + // Signature of this result + Signature []byte `json:"signature,omitempty"` + // Metadata is used to store additional information about the result. + Metadata interface{} `json:"metadata,omitempty"` } type resultStats struct { - seen uint - peers []peer.ID + seen uint + peers []peer.ID + signature []byte + metadata interface{} } func Aggregate(results execute.ResultMap) Results { @@ -40,13 +46,17 @@ func Aggregate(results execute.ResultMap) Results { stat, ok := stats[output] if !ok { stats[output] = resultStats{ - seen: 0, - peers: make([]peer.ID, 0), + seen: 0, + peers: make([]peer.ID, 0), + signature: res.Signature, + metadata: res.Metadata, } } stat.seen++ stat.peers = append(stat.peers, executingPeer) + stat.signature = res.Signature + stat.metadata = res.Metadata stats[output] = stat } @@ -59,6 +69,8 @@ func Aggregate(results execute.ResultMap) Results { Result: res, Peers: stat.peers, Frequency: 100 * float64(stat.seen) / float64(total), + Signature: stat.signature, + Metadata: stat.metadata, } aggregated = append(aggregated, aggr) diff --git a/node/worker_execute.go b/node/worker_execute.go index 343b4d90..773ab403 100644 --- a/node/worker_execute.go +++ b/node/worker_execute.go @@ -2,6 +2,7 @@ package node import ( "context" + "encoding/hex" "fmt" "time" @@ -47,6 +48,7 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque Results: execute.ResultMap{ n.host.ID(): result, }, + Signature: hex.EncodeToString(result.Signature), } // Send the response, whatever it may be (success or failure).