Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change node handling of locations for persisting data #131

Merged
merged 2 commits into from
Jan 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
cmd/node/node
cmd/node/.b7s_*

cmd/keygen/keygen
cmd/keyforge/keyforge
cmd/bootstrap-limiter/bootstrap-limiter
Expand Down
10 changes: 6 additions & 4 deletions cmd/node/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ import (
"github.com/spf13/pflag"

"github.com/blocklessnetwork/b7s/config"
"github.com/blocklessnetwork/b7s/models/blockless"
"github.com/blocklessnetwork/b7s/node"
)

// Default values.
const (
defaultPort = 0
defaultAddress = "0.0.0.0"
defaultWorkspaceDir = "workspace"
defaultPeerDB = "peer-db"
defaultFunctionDB = "function-db"
defaultConcurrency = uint(node.DefaultConcurrency)
Expand All @@ -27,13 +29,13 @@ func parseFlags() *config.Config {

// Node configuration.
pflag.StringVarP(&cfg.Role, "role", "r", defaultRole, "role this note will have in the Blockless protocol (head or worker)")
pflag.StringVar(&cfg.PeerDatabasePath, "peer-db", defaultPeerDB, "path to the database used for persisting peer data")
pflag.StringVar(&cfg.FunctionDatabasePath, "function-db", defaultFunctionDB, "path to the database used for persisting function data")
pflag.StringVar(&cfg.PeerDatabasePath, "peer-db", "", "path to the database used for persisting peer data")
pflag.StringVar(&cfg.FunctionDatabasePath, "function-db", "", "path to the database used for persisting function data")
pflag.UintVarP(&cfg.Concurrency, "concurrency", "c", defaultConcurrency, "maximum number of requests node will process in parallel")
pflag.StringVar(&cfg.API, "rest-api", "", "address where the head node REST API will listen on")
pflag.StringVar(&cfg.Workspace, "workspace", "./workspace", "directory that the node can use for file storage")
pflag.StringVar(&cfg.Workspace, "workspace", "", "directory that the node can use for file storage")
pflag.StringVar(&cfg.RuntimePath, "runtime-path", "", "runtime path (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", "", "runtime CLI name (used by the worker node)")
pflag.StringVar(&cfg.RuntimeCLI, "runtime-cli", blockless.RuntimeCLI(), "runtime CLI name (used by the worker node)")
pflag.BoolVar(&cfg.LoadAttributes, "attributes", false, "node should try to load its attribute data from IPFS")
pflag.StringSliceVar(&cfg.Topics, "topic", nil, "topics node should subscribe to")

Expand Down
40 changes: 40 additions & 0 deletions cmd/node/id.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package main

import (
"fmt"
"log"
"os"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)

func peerIDFromKey(keyPath string) (string, error) {

key, err := readPrivateKey(keyPath)
if err != nil {
log.Fatalf("could not read key file: %s", err)
}

id, err := peer.IDFromPrivateKey(key)
if err != nil {
log.Fatalf("could not determine identity: %s", err)
}

return id.String(), nil
}

func readPrivateKey(filepath string) (crypto.PrivKey, error) {

payload, err := os.ReadFile(filepath)
if err != nil {
return nil, fmt.Errorf("could not read file: %w", err)
}

key, err := crypto.UnmarshalPrivateKey(payload)
if err != nil {
return nil, fmt.Errorf("could not unmarshal private key: %w", err)
}

return key, nil
}
55 changes: 55 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -61,6 +62,34 @@ func run() int {
return failure
}

// If we have a key, use path that corresponds to that key e.g. `.b7s_<peer-id>`.
nodeDir := ""
if cfg.Host.PrivateKey != "" {
id, err := peerIDFromKey(cfg.Host.PrivateKey)
if err != nil {
log.Error().Err(err).Str("key", cfg.Host.PrivateKey).Msg("could not read private key")
return failure
}

nodeDir = generateNodeDirName(id)
} else {
nodeDir, err = os.MkdirTemp("", ".b7s_*")
if err != nil {
log.Error().Err(err).Msg("could not create node directory")
return failure
}
}

// Set relevant working paths for workspace, peerDB and functionDB.
// If paths were set using the CLI flags, use those. Else, use generated path, e.g. .b7s_<peer-id>/<default-option-for-directory>.
updateDirPaths(nodeDir, cfg)

log.Info().
Str("workspace", cfg.Workspace).
Str("peer_db", cfg.PeerDatabasePath).
Str("function_db", cfg.FunctionDatabasePath).
Msg("filepaths used by the node")

// Convert workspace path to an absolute one.
workspace, err := filepath.Abs(cfg.Workspace)
if err != nil {
Expand Down Expand Up @@ -160,6 +189,7 @@ func run() int {
Err(err).
Str("workspace", cfg.Workspace).
Str("runtime_path", cfg.RuntimePath).
Str("runtime_cli", cfg.RuntimeCLI).
Msg("could not create an executor")
return failure
}
Expand Down Expand Up @@ -283,3 +313,28 @@ func run() int {
func needLimiter(cfg *config.Config) bool {
return cfg.CPUPercentage != 1.0 || cfg.MemoryMaxKB > 0
}

func updateDirPaths(root string, cfg *config.Config) {

workspace := cfg.Workspace
if workspace == "" {
workspace = filepath.Join(root, defaultWorkspaceDir)
}
cfg.Workspace = workspace

peerDB := cfg.PeerDatabasePath
if peerDB == "" {
peerDB = filepath.Join(root, defaultPeerDB)
}
cfg.PeerDatabasePath = peerDB

functionDB := cfg.FunctionDatabasePath
if functionDB == "" {
functionDB = filepath.Join(root, defaultFunctionDB)
}
cfg.FunctionDatabasePath = functionDB
}

func generateNodeDirName(id string) string {
return fmt.Sprintf(".b7s_%s", id)
}
5 changes: 5 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package executor

import (
"errors"
"fmt"
"path/filepath"

Expand All @@ -24,6 +25,10 @@ func New(log zerolog.Logger, options ...Option) (*Executor, error) {
option(&cfg)
}

if cfg.RuntimeDir == "" || cfg.ExecutableName == "" {
return nil, errors.New("runtime path and executable name are required")
}

// Convert the working directory to an absolute path too.
workdir, err := filepath.Abs(cfg.WorkDir)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion node/execution_results.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (n *Node) gatherExecutionResults(ctx context.Context, requestID string, pee
return
}

log.Info().Str("peer", rp.String()).Msg("accounted execution response from peer")
n.log.Info().Str("peer", rp.String()).Msg("accounted execution response from peer")

er := res.(response.Execute)

Expand Down
Loading