Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker executor response signature support #154

Closed
wants to merge 8 commits into from
2 changes: 2 additions & 0 deletions consensus/pbft/execute.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pbft

import (
"encoding/hex"
"fmt"
"time"

Expand Down Expand Up @@ -98,6 +99,7 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {
RequestTimestamp: request.Timestamp,
Replica: r.id,
},
Signature: hex.EncodeToString(res.Signature),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PBFT already has and mandates signatures - note the msg.Sign() invocation below that will overwrite this value.

}

// Save this executions in case it's requested again.
Expand Down
47 changes: 36 additions & 11 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,37 @@ import (
"github.com/spf13/afero"

"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/models/execute"
)

type ExecutionSigner interface {
Sign(execute.Request, execute.RuntimeOutput) ([]byte, error)
}

type MetaProvider interface {
WithMetadata(execute.Request, execute.RuntimeOutput) (interface{}, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WithMetadata - This function name is IMO more common for specifying functional options - e.g. server.New(server.WithPort(8888)). I think the name of a function is a bit misleading and is in fact more CreateMetadata or GetMetadata (this one less so because "get" implies extracting something from the input params).

I think just Metadata(req, runtimeOutput) (any, error) is just fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick - I prefer using any instead of interface{}. They mean the same, but I think any reads a little nicer.

}

// defaultConfig used to create Executor.
var defaultConfig = Config{
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
DriversRootPath: "",
}

// Config represents the Executor configuration.
type Config struct {
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
Signer ExecutionSigner // Signer for the executor
MetaProvider MetaProvider // Metadata provider for the executor
}

type Option func(*Config)
Expand Down Expand Up @@ -62,3 +73,17 @@ func WithLimiter(limiter Limiter) Option {
cfg.Limiter = limiter
}
}

// WithSigner sets the signer for the executor.
func WithSigner(signer ExecutionSigner) Option {
return func(cfg *Config) {
cfg.Signer = signer
}
}

// WithMetaProvider sets the metadata provider for the executor.
func WithMetaProvider(meta MetaProvider) Option {
return func(cfg *Config) {
cfg.MetaProvider = meta
}
}
33 changes: 26 additions & 7 deletions executor/execute_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ import (
func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execute.Result, error) {

// Execute the function.
out, usage, err := e.executeFunction(requestID, req)
out, usage, signature, meta, err := e.executeFunction(requestID, req)
if err != nil {

res := execute.Result{
Code: codes.Error,
RequestID: requestID,
Result: out,
Usage: usage,
Signature: signature,
}

return res, fmt.Errorf("function execution failed: %w", err)
}

Expand All @@ -29,14 +28,16 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu
RequestID: requestID,
Result: out,
Usage: usage,
Signature: signature,
Metadata: meta,
}

return res, nil
}

// executeFunction handles the actual execution of the Blockless function. It returns the
// execution information like standard output, standard error, exit code and resource usage.
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, error) {
func (e *Executor) executeFunction(requestID string, req execute.Request) (execute.RuntimeOutput, execute.Usage, []byte, interface{}, error) {

log := e.log.With().Str("request", requestID).Str("function", req.FunctionID).Logger()

Expand All @@ -47,7 +48,7 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu

err := e.cfg.FS.MkdirAll(paths.workdir, defaultPermissions)
if err != nil {
return execute.RuntimeOutput{}, execute.Usage{}, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err)
return execute.RuntimeOutput{}, execute.Usage{}, []byte{}, nil, fmt.Errorf("could not setup working directory for execution (dir: %s): %w", paths.workdir, err)
}
// Remove all temporary files after we're done.
defer func() {
Expand All @@ -66,10 +67,28 @@ func (e *Executor) executeFunction(requestID string, req execute.Request) (execu

out, usage, err := e.executeCommand(cmd)
if err != nil {
return out, execute.Usage{}, fmt.Errorf("command execution failed: %w", err)
return out, execute.Usage{}, []byte{}, nil, fmt.Errorf("command execution failed: %w", err)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And other places too - why not return nil instead of []byte{}?

}

log.Info().Msg("command executed successfully")

return out, usage, nil
var signature []byte
if e.cfg.Signer != nil {
signature, err = e.cfg.Signer.Sign(req, out)
if err != nil {
return out, usage, []byte{}, nil, fmt.Errorf("failed to sign output: %w", err)
}
log.Debug().Msg("output signed")
}

var metadata interface{}
if e.cfg.MetaProvider != nil {
metadata, err = e.cfg.MetaProvider.WithMetadata(req, out)
if err != nil {
return out, usage, []byte{}, nil, fmt.Errorf("failed to inject metadata: %w", err)
}
log.Debug().Msg("metadata injected")
}

return out, usage, signature, metadata, nil
}
2 changes: 2 additions & 0 deletions models/execute/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ type Result struct {
Result RuntimeOutput `json:"result"`
RequestID string `json:"request_id"`
Usage Usage `json:"usage,omitempty"`
Signature []byte `json:"signature,omitempty"`
Metadata interface{} `json:"metadata,omitempty"`
}

// Cluster represents the set of peers that executed the request.
Expand Down
14 changes: 7 additions & 7 deletions models/execute/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ const (

// RuntimeConfig represents the CLI flags supported by the runtime
type BLSRuntimeConfig struct {
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
DriversRootPath string `json:"drivers_root_path,omitempty"`
// Fields not allowed to be set in the request.
Input string `json:"-"`
Expand All @@ -29,5 +29,5 @@ const (
BLSRuntimeFlagLogger = "runtime-logger"
BLSRuntimeFlagPermission = "permission"
BLSRuntimeFlagEnv = "env"
BLSRuntimeFlagDrivers = "drivers-root-path"
BLSRuntimeFlagDrivers = "drivers-root-path"
)
20 changes: 16 additions & 4 deletions node/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ type Result struct {
Peers []peer.ID `json:"peers,omitempty"`
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
// Signature of this result
Signature []byte `json:"signature,omitempty"`
// Metadata is used to store additional information about the result.
Metadata interface{} `json:"metadata,omitempty"`
}

type resultStats struct {
seen uint
peers []peer.ID
seen uint
peers []peer.ID
signature []byte
metadata interface{}
}

func Aggregate(results execute.ResultMap) Results {
Expand All @@ -40,13 +46,17 @@ func Aggregate(results execute.ResultMap) Results {
stat, ok := stats[output]
if !ok {
stats[output] = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
seen: 0,
peers: make([]peer.ID, 0),
signature: res.Signature,
metadata: res.Metadata,
}
}

stat.seen++
stat.peers = append(stat.peers, executingPeer)
stat.signature = res.Signature
stat.metadata = res.Metadata
Comment on lines 48 to +59
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this will work as intended..

Is this signature like a hash that can be recalculated by anyone or is it a signature that can be tied to the signer (can be verified by a public key)? I assume it's the latter.

Let's say we have 5 execution responses from 5 worker nodes. We will have e.g. the same execution result (stdout of the blockless runtime) but 5 different signatures, and I believe 5 different metadata objects. (I'm not sure if metadata is unique or not, but since it's configurable, it can be anything.)

When aggregating, we iterate through the results, and for the seen results, we increment the number of times it was seen, add the worker ID to the list of peers that have this exact result, and set the signature and metadata to that of the result we just processed. Meaning if we iterate through execution results of worker1 through worker5, we will save the signature of worker5 (if we process that one last, which also isn't guaranteed in a map). In each iteration we override the previous signature/metadata.

I think we will need to have a map that will map worker IDs to signatures and metadata.

For example something like:

{
    "result": "whatever",
    "peers": [
        "worker1",
        "worker2",
        "worker3",
        "worker4",
        "worker5"
    ],
    "frequency": 100,
    "signatures": {
        "worker1": {
            "scheme": "abc", // without specifying the scheme we don't know how to verify the signature, no?
            "sig": "0x1234567890abcdef1234567890abcdef"
        },
        // other workers
    },
    "metadata": {
        "worker1": {
            // metadata object
        },
        // other workers
    }
}


stats[output] = stat
}
Expand All @@ -59,6 +69,8 @@ func Aggregate(results execute.ResultMap) Results {
Result: res,
Peers: stat.peers,
Frequency: 100 * float64(stat.seen) / float64(total),
Signature: stat.signature,
Metadata: stat.metadata,
}

aggregated = append(aggregated, aggr)
Expand Down
2 changes: 2 additions & 0 deletions node/worker_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"encoding/hex"
"fmt"
"time"

Expand Down Expand Up @@ -47,6 +48,7 @@ func (n *Node) workerProcessExecute(ctx context.Context, from peer.ID, req reque
Results: execute.ResultMap{
n.host.ID(): result,
},
Signature: hex.EncodeToString(result.Signature),
}

// Send the response, whatever it may be (success or failure).
Expand Down
Loading