Skip to content

Commit

Permalink
Add support for metadata in execution results (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
Maelkum authored Jul 4, 2024
1 parent b4efdbb commit 75fcc19
Show file tree
Hide file tree
Showing 21 changed files with 190 additions and 85 deletions.
4 changes: 2 additions & 2 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAPI_Execute(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

cluster := execute.Cluster{
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

return expectedCode, "", res, execute.Cluster{}, mocks.GenericError
Expand Down
20 changes: 15 additions & 5 deletions consensus/pbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/execute"
)

Expand All @@ -15,14 +16,16 @@ type Option func(*Config)
type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.Result)

var DefaultConfig = Config{
NetworkTimeout: NetworkTimeout,
RequestTimeout: RequestTimeout,
NetworkTimeout: NetworkTimeout,
RequestTimeout: RequestTimeout,
MetadataProvider: metadata.NewNoopProvider(),
}

type Config struct {
PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done.
NetworkTimeout time.Duration
RequestTimeout time.Duration
PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done.
NetworkTimeout time.Duration
RequestTimeout time.Duration
MetadataProvider metadata.Provider
}

// WithNetworkTimeout sets how much time we allow for message sending.
Expand All @@ -47,3 +50,10 @@ func WithPostProcessors(callbacks ...PostProcessFunc) Option {
cfg.PostProcessors = fns
}
}

// WithMetadataProvider sets the metadata provider for the node.
func WithMetadataProvider(p metadata.Provider) Option {
return func(cfg *Config) {
cfg.MetadataProvider = p
}
}
10 changes: 9 additions & 1 deletion consensus/pbft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,19 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {

r.lastExecuted = sequence

metadata, err := r.cfg.MetadataProvider.Metadata(request.Execute, res.Result)
if err != nil {
log.Warn().Err(err).Msg("could not get metadata")
}

msg := response.Execute{
Code: res.Code,
RequestID: request.ID,
Results: execute.ResultMap{
r.id: res,
r.id: execute.NodeResult{
Result: res,
Metadata: metadata,
},
},
PBFT: response.PBFTResultInfo{
View: r.view,
Expand Down
22 changes: 11 additions & 11 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import (

// defaultConfig used to create Executor.
var defaultConfig = Config{
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
DriversRootPath: "",
}

// Config represents the Executor configuration.
type Config struct {
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
}

type Option func(*Config)
Expand Down
14 changes: 6 additions & 8 deletions executor/execute_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu
if err != nil {

res := execute.Result{
Code: codes.Error,
RequestID: requestID,
Result: out,
Usage: usage,
Code: codes.Error,
Result: out,
Usage: usage,
}

return res, fmt.Errorf("function execution failed: %w", err)
}

res := execute.Result{
Code: codes.OK,
RequestID: requestID,
Result: out,
Usage: usage,
Code: codes.OK,
Result: out,
Usage: usage,
}

return res, nil
Expand Down
1 change: 0 additions & 1 deletion executor/executor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestExecutor_Execute(t *testing.T) {

// Verify the execution result.
require.Equal(t, codes.OK, res.Code)
require.Equal(t, requestID, res.RequestID)
require.Equal(t, hash, res.Result.Stdout)

// Verify usage info - for now, only that they are non-zero.
Expand Down
19 changes: 19 additions & 0 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package metadata

import (
"github.com/blocklessnetwork/b7s/models/execute"
)

type Provider interface {
Metadata(execute.Request, execute.RuntimeOutput) (any, error)
}

type noopProvider struct{}

func (p noopProvider) Metadata(execute.Request, execute.RuntimeOutput) (any, error) {
return nil, nil
}

func NewNoopProvider() Provider {
return noopProvider{}
}
17 changes: 11 additions & 6 deletions models/execute/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import (
"github.com/blocklessnetwork/b7s/models/codes"
)

// NodeResult is an annotated execution result.
type NodeResult struct {
Result
Metadata any `json:"metadata,omitempty"`
}

// Result describes an execution result.
type Result struct {
Code codes.Code `json:"code"`
Result RuntimeOutput `json:"result"`
RequestID string `json:"request_id"`
Usage Usage `json:"usage,omitempty"`
Code codes.Code `json:"code"`
Result RuntimeOutput `json:"result"`
Usage Usage `json:"usage,omitempty"`
}

// Cluster represents the set of peers that executed the request.
Expand All @@ -40,7 +45,7 @@ type Usage struct {
}

// ResultMap contains execution results from multiple peers.
type ResultMap map[peer.ID]Result
type ResultMap map[peer.ID]NodeResult

// MarshalJSON provides means to correctly handle JSON serialization/deserialization.
// See:
Expand All @@ -49,7 +54,7 @@ type ResultMap map[peer.ID]Result
// https://github.com/libp2p/go-libp2p-resource-manager/pull/67#issuecomment-1176820561
func (m ResultMap) MarshalJSON() ([]byte, error) {

em := make(map[string]Result, len(m))
em := make(map[string]NodeResult, len(m))
for p, v := range m {
em[p.String()] = v
}
Expand Down
14 changes: 7 additions & 7 deletions models/execute/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ const (

// RuntimeConfig represents the CLI flags supported by the runtime
type BLSRuntimeConfig struct {
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
DriversRootPath string `json:"drivers_root_path,omitempty"`
// Fields not allowed to be set in the request.
Input string `json:"-"`
Expand All @@ -29,5 +29,5 @@ const (
BLSRuntimeFlagLogger = "runtime-logger"
BLSRuntimeFlagPermission = "permission"
BLSRuntimeFlagEnv = "env"
BLSRuntimeFlagDrivers = "drivers-root-path"
BLSRuntimeFlagDrivers = "drivers-root-path"
)
2 changes: 1 addition & 1 deletion models/response/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestExecute_Signing(t *testing.T) {
RequestID: mocks.GenericUUID.String(),
Code: codes.OK,
Results: execute.ResultMap{
mocks.GenericPeerID: mocks.GenericExecutionResult,
mocks.GenericPeerID: execute.NodeResult{Result: mocks.GenericExecutionResult},
},
Cluster: execute.Cluster{
Peers: mocks.GenericPeerIDs[:4],
Expand Down
35 changes: 15 additions & 20 deletions node/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,39 @@ import (
"github.com/blocklessnetwork/b7s/models/execute"
)

type Results []Result

// Result represents the execution result along with its aggregation stats.
type Result struct {
Result execute.RuntimeOutput `json:"result,omitempty"`
// Peers that got this result.
Peers []peer.ID `json:"peers,omitempty"`
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
}

type resultStats struct {
seen uint
peers []peer.ID
}

func Aggregate(results execute.ResultMap) Results {

total := len(results)
if total == 0 {
return nil
}

type resultStats struct {
seen uint
peers []peer.ID
metadata map[peer.ID]any
}

stats := make(map[execute.RuntimeOutput]resultStats)
for executingPeer, res := range results {

// NOTE: It might make sense to ignore stderr in comparison.
output := res.Result
output := res.Result.Result

stat, ok := stats[output]
if !ok {
stats[output] = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
stat = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
metadata: make(map[peer.ID]any),
}
}

stat.seen++
stat.peers = append(stat.peers, executingPeer)
if res.Metadata != nil {
stat.metadata[executingPeer] = res.Metadata
}

stats[output] = stat
}
Expand All @@ -59,6 +53,7 @@ func Aggregate(results execute.ResultMap) Results {
Result: res,
Peers: stat.peers,
Frequency: 100 * float64(stat.seen) / float64(total),
Metadata: stat.metadata,
}

aggregated = append(aggregated, aggr)
Expand Down
34 changes: 34 additions & 0 deletions node/aggregate/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package aggregate

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/models/execute"
)

type Results []Result

// Result represents the execution result along with its aggregation stats.
type Result struct {
Result execute.RuntimeOutput `json:"result,omitempty"`
// Peers that got this result.
Peers []peer.ID `json:"peers,omitempty"`
// Peers metadata
Metadata NodeMetadata `json:"metadata,omitempty"`
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
}

type NodeMetadata map[peer.ID]any

func (m NodeMetadata) MarshalJSON() ([]byte, error) {

em := make(map[string]any, len(m))
for p, v := range m {
em[p.String()] = v
}

return json.Marshal(em)
}
2 changes: 1 addition & 1 deletion node/cluster_pbft_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ This is the end of my program

for peer, exres := range res.Results {
require.Contains(t, workerIDs, peer)
require.Equal(t, expectedExecutionResult, exres.Result.Stdout)
require.Equal(t, expectedExecutionResult, exres.Result.Result.Stdout)
}

t.Log("client verified execution response")
Expand Down
10 changes: 10 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/blockless"
)

Expand All @@ -23,6 +24,7 @@ var DefaultConfig = Config{
ClusterFormationTimeout: DefaultClusterFormationTimeout,
DefaultConsensus: DefaultConsensusAlgorithm,
LoadAttributes: DefaultAttributeLoadingSetting,
MetadataProvider: metadata.NewNoopProvider(),
}

// Config represents the Node configuration.
Expand All @@ -38,6 +40,7 @@ type Config struct {
Workspace string // Directory where we can store files needed for execution.
DefaultConsensus consensus.Type // Default consensus algorithm to use.
LoadAttributes bool // Node should try to load its attributes from IPFS.
MetadataProvider metadata.Provider // Metadata provider for the node
}

// Validate checks if the given configuration is correct.
Expand Down Expand Up @@ -153,6 +156,13 @@ func WithAttributeLoading(b bool) Option {
}
}

// WithMetadataProvider sets the metadata provider for the node.
func WithMetadataProvider(p metadata.Provider) Option {
return func(cfg *Config) {
cfg.MetadataProvider = p
}
}

func (n *Node) isWorker() bool {
return n.cfg.Role == blockless.WorkerNode
}
Expand Down
Loading

0 comments on commit 75fcc19

Please sign in to comment.