Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for metadata in execution results #157

Merged
merged 5 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestAPI_Execute(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

cluster := execute.Cluster{
Expand Down Expand Up @@ -88,7 +88,7 @@ func TestAPI_Execute_HandlesErrors(t *testing.T) {
node.ExecuteFunctionFunc = func(context.Context, execute.Request, string) (codes.Code, string, execute.ResultMap, execute.Cluster, error) {

res := execute.ResultMap{
mocks.GenericPeerID: executionResult,
mocks.GenericPeerID: execute.NodeResult{Result: executionResult},
}

return expectedCode, "", res, execute.Cluster{}, mocks.GenericError
Expand Down
20 changes: 15 additions & 5 deletions consensus/pbft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/execute"
)

Expand All @@ -15,14 +16,16 @@ type Option func(*Config)
type PostProcessFunc func(requestID string, origin peer.ID, request execute.Request, result execute.Result)

var DefaultConfig = Config{
NetworkTimeout: NetworkTimeout,
RequestTimeout: RequestTimeout,
NetworkTimeout: NetworkTimeout,
RequestTimeout: RequestTimeout,
MetadataProvider: metadata.NewNoopProvider(),
}

type Config struct {
PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done.
NetworkTimeout time.Duration
RequestTimeout time.Duration
PostProcessors []PostProcessFunc // Callback functions to be invoked after execution is done.
NetworkTimeout time.Duration
RequestTimeout time.Duration
MetadataProvider metadata.Provider
}

// WithNetworkTimeout sets how much time we allow for message sending.
Expand All @@ -47,3 +50,10 @@ func WithPostProcessors(callbacks ...PostProcessFunc) Option {
cfg.PostProcessors = fns
}
}

// WithMetadataProvider sets the metadata provider for the node.
func WithMetadataProvider(p metadata.Provider) Option {
return func(cfg *Config) {
cfg.MetadataProvider = p
}
}
10 changes: 9 additions & 1 deletion consensus/pbft/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,19 @@ func (r *Replica) execute(view uint, sequence uint, digest string) error {

r.lastExecuted = sequence

metadata, err := r.cfg.MetadataProvider.Metadata(request.Execute, res.Result)
if err != nil {
log.Warn().Err(err).Msg("could not get metadata")
}

msg := response.Execute{
Code: res.Code,
RequestID: request.ID,
Results: execute.ResultMap{
r.id: res,
r.id: execute.NodeResult{
Result: res,
Metadata: metadata,
},
},
PBFT: response.PBFTResultInfo{
View: r.view,
Expand Down
22 changes: 11 additions & 11 deletions executor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,22 @@ import (

// defaultConfig used to create Executor.
var defaultConfig = Config{
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
WorkDir: "workspace",
RuntimeDir: "",
ExecutableName: blockless.RuntimeCLI(),
FS: afero.NewOsFs(),
Limiter: &noopLimiter{},
DriversRootPath: "",
}

// Config represents the Executor configuration.
type Config struct {
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
WorkDir string // directory where files needed for the execution are stored
RuntimeDir string // directory where the executable can be found
ExecutableName string // name for the executable
DriversRootPath string // where are cgi drivers stored
FS afero.Fs // FS accessor
Limiter Limiter // Resource limiter for executed processes
}

type Option func(*Config)
Expand Down
14 changes: 6 additions & 8 deletions executor/execute_function.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ func (e *Executor) ExecuteFunction(requestID string, req execute.Request) (execu
if err != nil {

res := execute.Result{
Code: codes.Error,
RequestID: requestID,
dmikey marked this conversation as resolved.
Show resolved Hide resolved
Result: out,
Usage: usage,
Code: codes.Error,
Result: out,
Usage: usage,
}

return res, fmt.Errorf("function execution failed: %w", err)
}

res := execute.Result{
Code: codes.OK,
RequestID: requestID,
Result: out,
Usage: usage,
Code: codes.OK,
Result: out,
Usage: usage,
}

return res, nil
Expand Down
1 change: 0 additions & 1 deletion executor/executor_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func TestExecutor_Execute(t *testing.T) {

// Verify the execution result.
require.Equal(t, codes.OK, res.Code)
require.Equal(t, requestID, res.RequestID)
require.Equal(t, hash, res.Result.Stdout)

// Verify usage info - for now, only that they are non-zero.
Expand Down
19 changes: 19 additions & 0 deletions metadata/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package metadata

import (
"github.com/blocklessnetwork/b7s/models/execute"
)

type Provider interface {
Metadata(execute.Request, execute.RuntimeOutput) (any, error)
}

type noopProvider struct{}

func (p noopProvider) Metadata(execute.Request, execute.RuntimeOutput) (any, error) {
return nil, nil
}

func NewNoopProvider() Provider {
return noopProvider{}
}
17 changes: 11 additions & 6 deletions models/execute/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ import (
"github.com/blocklessnetwork/b7s/models/codes"
)

// NodeResult is an annotated execution result.
type NodeResult struct {
Result
Metadata any `json:"metadata,omitempty"`
}

// Result describes an execution result.
type Result struct {
Code codes.Code `json:"code"`
Result RuntimeOutput `json:"result"`
RequestID string `json:"request_id"`
Usage Usage `json:"usage,omitempty"`
Code codes.Code `json:"code"`
Result RuntimeOutput `json:"result"`
Usage Usage `json:"usage,omitempty"`
}

// Cluster represents the set of peers that executed the request.
Expand All @@ -40,7 +45,7 @@ type Usage struct {
}

// ResultMap contains execution results from multiple peers.
type ResultMap map[peer.ID]Result
type ResultMap map[peer.ID]NodeResult

// MarshalJSON provides means to correctly handle JSON serialization/deserialization.
// See:
Expand All @@ -49,7 +54,7 @@ type ResultMap map[peer.ID]Result
// https://github.com/libp2p/go-libp2p-resource-manager/pull/67#issuecomment-1176820561
func (m ResultMap) MarshalJSON() ([]byte, error) {

em := make(map[string]Result, len(m))
em := make(map[string]NodeResult, len(m))
for p, v := range m {
em[p.String()] = v
}
Expand Down
14 changes: 7 additions & 7 deletions models/execute/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ const (

// RuntimeConfig represents the CLI flags supported by the runtime
type BLSRuntimeConfig struct {
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
Entry string `json:"entry,omitempty"`
ExecutionTime uint64 `json:"run_time,omitempty"`
DebugInfo bool `json:"debug_info,omitempty"`
Fuel uint64 `json:"limited_fuel,omitempty"`
Memory uint64 `json:"limited_memory,omitempty"`
Logger string `json:"runtime_logger,omitempty"`
DriversRootPath string `json:"drivers_root_path,omitempty"`
// Fields not allowed to be set in the request.
Input string `json:"-"`
Expand All @@ -29,5 +29,5 @@ const (
BLSRuntimeFlagLogger = "runtime-logger"
BLSRuntimeFlagPermission = "permission"
BLSRuntimeFlagEnv = "env"
BLSRuntimeFlagDrivers = "drivers-root-path"
BLSRuntimeFlagDrivers = "drivers-root-path"
)
2 changes: 1 addition & 1 deletion models/response/execute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestExecute_Signing(t *testing.T) {
RequestID: mocks.GenericUUID.String(),
Code: codes.OK,
Results: execute.ResultMap{
mocks.GenericPeerID: mocks.GenericExecutionResult,
mocks.GenericPeerID: execute.NodeResult{Result: mocks.GenericExecutionResult},
},
Cluster: execute.Cluster{
Peers: mocks.GenericPeerIDs[:4],
Expand Down
35 changes: 15 additions & 20 deletions node/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,45 +8,39 @@ import (
"github.com/blocklessnetwork/b7s/models/execute"
)

type Results []Result

// Result represents the execution result along with its aggregation stats.
type Result struct {
Result execute.RuntimeOutput `json:"result,omitempty"`
// Peers that got this result.
Peers []peer.ID `json:"peers,omitempty"`
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
}

type resultStats struct {
seen uint
peers []peer.ID
}

func Aggregate(results execute.ResultMap) Results {

total := len(results)
if total == 0 {
return nil
}

type resultStats struct {
seen uint
peers []peer.ID
metadata map[peer.ID]any
zees-dev marked this conversation as resolved.
Show resolved Hide resolved
}

stats := make(map[execute.RuntimeOutput]resultStats)
for executingPeer, res := range results {

// NOTE: It might make sense to ignore stderr in comparison.
output := res.Result
output := res.Result.Result

stat, ok := stats[output]
if !ok {
stats[output] = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
stat = resultStats{
seen: 0,
peers: make([]peer.ID, 0),
metadata: make(map[peer.ID]any),
}
}

stat.seen++
stat.peers = append(stat.peers, executingPeer)
if res.Metadata != nil {
stat.metadata[executingPeer] = res.Metadata
}

stats[output] = stat
}
Expand All @@ -59,6 +53,7 @@ func Aggregate(results execute.ResultMap) Results {
Result: res,
Peers: stat.peers,
Frequency: 100 * float64(stat.seen) / float64(total),
Metadata: stat.metadata,
}

aggregated = append(aggregated, aggr)
Expand Down
34 changes: 34 additions & 0 deletions node/aggregate/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package aggregate

import (
"encoding/json"

"github.com/libp2p/go-libp2p/core/peer"

"github.com/blocklessnetwork/b7s/models/execute"
)

type Results []Result

// Result represents the execution result along with its aggregation stats.
type Result struct {
Result execute.RuntimeOutput `json:"result,omitempty"`
// Peers that got this result.
Peers []peer.ID `json:"peers,omitempty"`
// Peers metadata
Metadata NodeMetadata `json:"metadata,omitempty"`
zees-dev marked this conversation as resolved.
Show resolved Hide resolved
// How frequent was this result, in percentages.
Frequency float64 `json:"frequency,omitempty"`
}

type NodeMetadata map[peer.ID]any

func (m NodeMetadata) MarshalJSON() ([]byte, error) {

em := make(map[string]any, len(m))
for p, v := range m {
em[p.String()] = v
}

return json.Marshal(em)
}
2 changes: 1 addition & 1 deletion node/cluster_pbft_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ This is the end of my program

for peer, exres := range res.Results {
require.Contains(t, workerIDs, peer)
require.Equal(t, expectedExecutionResult, exres.Result.Stdout)
require.Equal(t, expectedExecutionResult, exres.Result.Result.Stdout)
}

t.Log("client verified execution response")
Expand Down
10 changes: 10 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/blocklessnetwork/b7s/consensus"
"github.com/blocklessnetwork/b7s/metadata"
"github.com/blocklessnetwork/b7s/models/blockless"
)

Expand All @@ -23,6 +24,7 @@ var DefaultConfig = Config{
ClusterFormationTimeout: DefaultClusterFormationTimeout,
DefaultConsensus: DefaultConsensusAlgorithm,
LoadAttributes: DefaultAttributeLoadingSetting,
MetadataProvider: metadata.NewNoopProvider(),
}

// Config represents the Node configuration.
Expand All @@ -38,6 +40,7 @@ type Config struct {
Workspace string // Directory where we can store files needed for execution.
DefaultConsensus consensus.Type // Default consensus algorithm to use.
LoadAttributes bool // Node should try to load its attributes from IPFS.
MetadataProvider metadata.Provider // Metadata provider for the node
}

// Validate checks if the given configuration is correct.
Expand Down Expand Up @@ -153,6 +156,13 @@ func WithAttributeLoading(b bool) Option {
}
}

// WithMetadataProvider sets the metadata provider for the node.
func WithMetadataProvider(p metadata.Provider) Option {
return func(cfg *Config) {
cfg.MetadataProvider = p
}
}

func (n *Node) isWorker() bool {
return n.cfg.Role == blockless.WorkerNode
}
Expand Down
Loading
Loading