diff --git a/README.md b/README.md index 12a79727..c504ff3f 100644 --- a/README.md +++ b/README.md @@ -26,27 +26,27 @@ You can also use Docker to install b7s. See the [Docker documentation](/docker/R For a more detailed overview of the configuration options, see the [b7s-node Readme](/cmd/node/README.md#usage). -| Flag | Short Form | Default Value | Description | -| ------------------------- | ---------- | ----------------------- | --------------------------------------------------------------------------------------------- | -| config | N/A | N/A | Specifies the config file to load. -| log-level | -l | "info" | Specifies the level of logging to use. | -| peer-db | N/A | "peer-db" | Specifies the path to database used for persisting peer data. | -| function-db | N/A | "function-db" | Specifies the path to database used for persisting function data. | -| role | -r | "worker" | Specifies the role this node will have in the Blockless protocol (head or worker). | -| address | -a | "0.0.0.0" | Specifies the address that the libp2p host will use. | -| port | -p | 0 | Specifies the port that the libp2p host will use. | -| websocket-port | N/A | 0 | Specifies the port that the libp2p host will use for websocket connections. | -| private-key | N/A | N/A | Specifies the private key that the libp2p host will use. | -| concurrency | -c | node.DefaultConcurrency | Specifies the maximum number of requests the node will process in parallel. | -| rest-api | N/A | N/A | Specifies the address where the head node REST API will listen on. | -| boot-nodes | N/A | N/A | Specifies a list of addresses that this node will connect to on startup, in multiaddr format. | -| workspace | N/A | "./workspace" | Specifies the directory that the node can use for file storage. | -| runtime | N/A | N/A | Specifies the runtime address used by the worker node. | -| dialback-address | N/A | N/A | Specifies the advertised dialback address of the Node. | -| dialback-port | N/A | N/A | Specifies the advertised dialback port of the Node. | -| websocket-dialback-port | N/A | 0 | Specifies the advertised dialback port for Websocket connections. +| Flag | Short Form | Default Value | Description | +| ------------------------- | ---------- | ----------------------- | ----------------------------------------------------------------------------------------------------- | +| config | N/A | N/A | Specifies the config file to load. | +| log-level | -l | "info" | Specifies the level of logging to use. | +| db | N/A | "db" | Specifies the path to database used for persisting peer and function data. | +| role | -r | "worker" | Specifies the role this node will have in the Blockless protocol (head or worker). | +| address | -a | "0.0.0.0" | Specifies the address that the libp2p host will use. | +| port | -p | 0 | Specifies the port that the libp2p host will use. | +| websocket-port | N/A | 0 | Specifies the port that the libp2p host will use for websocket connections. | +| private-key | N/A | N/A | Specifies the private key that the libp2p host will use. | +| concurrency | -c | node.DefaultConcurrency | Specifies the maximum number of requests the node will process in parallel. | +| rest-api | N/A | N/A | Specifies the address where the head node REST API will listen on. | +| boot-nodes | N/A | N/A | Specifies a list of addresses that this node will connect to on startup, in multiaddr format. | +| workspace | N/A | "./workspace" | Specifies the directory that the node can use for file storage. | +| runtime | N/A | N/A | Specifies the runtime address used by the worker node. | +| dialback-address | N/A | N/A | Specifies the advertised dialback address of the Node. | +| dialback-port | N/A | N/A | Specifies the advertised dialback port of the Node. | +| websocket-dialback-port | N/A | 0 | Specifies the advertised dialback port for Websocket connections. | | cpu-percentage-limit | N/A | 1.0 | Specifies the amount of CPU time allowed for Blockless Functions in the 0-1 range, 1 being unlimited. | -| memory-limit | N/A | N/A | Specifies the memory limit for Blockless Functions, in kB. | +| memory-limit | N/A | N/A | Specifies the memory limit for Blockless Functions, in kB. | +| no-dialback-peers | N/A | false | Specifies if the node should avoid dialing back peers known from past runs | ## Dependencies diff --git a/cmd/node/README.md b/cmd/node/README.md index 63f5cbe1..bed2740c 100644 --- a/cmd/node/README.md +++ b/cmd/node/README.md @@ -26,29 +26,29 @@ List of supported CLI flags is listed below. ```console Usage of b7s-node: - --config string path to a config file -r, --role string role this node will have in the Blockless protocol (head or worker) (default "worker") -c, --concurrency uint maximum number of requests node will process in parallel (default 10) --boot-nodes strings list of addresses that this node will connect to on startup, in multiaddr format --workspace string directory that the node can use for file storage - --attributes node should try to load its attribute data from IPFS - --peer-db string path to the database used for persisting peer data (default "peer-db") - --function-db string path to the database used for persisting function data (default "function-db") + --load-attributes node should try to load its attribute data from IPFS --topics strings topics node should subscribe to + --db string path to the database used for persisting peer and function data -l, --log-level string log level to use (default "info") -a, --address string address that the b7s host will use (default "0.0.0.0") -p, --port uint port that the b7s host will use --private-key string private key that the b7s host will use + --dialback-address string external address that the b7s host will advertise + --dialback-port uint external port that the b7s host will advertise -w, --websocket should the node use websocket protocol for communication --websocket-port uint port to use for websocket connections - --dialback-address string external address that the b7s host will advertise (default "0.0.0.0") - --dialback-port uint external port that the b7s host will advertise --websocket-dialback-port uint external port that the b7s host will advertise for websocket connections + --no-dialback-peers start without dialing back peers from previous runs + --rest-api string address where the head node REST API will listen on --runtime-path string Blockless Runtime location (used by the worker node) - --runtime-cli string runtime CLI name (used by the worker node) (default "bls-runtime") - --cpu-percentage-limit float amount of CPU time allowed for Blockless Functions in the 0-1 range, 1 being unlimited (default 1) + --runtime-cli string runtime CLI name (used by the worker node) + --cpu-percentage-limit float amount of CPU time allowed for Blockless Functions in the 0-1 range, 1 being unlimited --memory-limit int memory limit (kB) for Blockless Functions - --rest-api string address where the head node REST API will listen on + --config string path to a config file ``` Alternatively to the CLI flags, you can create a YAML file and specify the parameters there. @@ -94,12 +94,11 @@ If a private key is not specified the node will start with a randomly generated ### Starting a Worker Node ```console -$ ./node --peer-db peer-database --log-level debug --port 9000 --role worker --runtime ~/.local/bin --workspace workspace --private-key ./keys/priv.bin +$ ./node --db /tmp/db --log-level debug --port 9000 --role worker --runtime ~/.local/bin --workspace workspace --private-key ./keys/priv.bin ``` The created `node` will listen on all addresses on TCP port 9000. -Database used to persist Node data between runs will be created in the `peer-database` subdirectory. -On the other hand, Node will persist function data in the default database, in the `function-db` subdirectory. +Database used to persist Node data between runs will be created in the `/tmp/db` subdirectory. Blockless Runtime path is given as `/home/user/.local/bin`. At startup, node will check if the Blockless Runtime is actually found there, namely the [bls-runtime](https://blockless.network/docs/protocol/runtime). @@ -111,12 +110,11 @@ Any transient files needed for node operation will be created in the `workspace` ### Starting a Head Node ```console -$ ./node --peer-db /var/tmp/b7s/peerdb --function-db /var/tmp/b7s/fdb --log-level debug --port 9002 -r head --workspace /var/tmp/b7s/workspace --private-key ~/keys/priv.bin --rest-api ':8080' +$ ./node --db /var/tmp/b7s/db --log-level debug --port 9002 -r head --workspace /var/tmp/b7s/workspace --private-key ~/keys/priv.bin --rest-api ':8080' ``` The created `node` will listen on all addresses on TCP port 9002. -Database used to persist Node peer data between runs will be created at `/var/tmp/b7s/peerdb`. -Database used to persist Node function data will be created at `/var/tmp/b7s/fdb`. +Database used to persist Node peer and function data between runs will be created at `/var/tmp/b7s/db`. Any transient files needed for node operation will be created in the `/var/tmp/b7s/workspace` directory. diff --git a/cmd/node/example.yaml b/cmd/node/example.yaml index 7311dd62..a14e1580 100644 --- a/cmd/node/example.yaml +++ b/cmd/node/example.yaml @@ -7,11 +7,8 @@ # directory where node will keep files needed for operation # workspace: workspace -# directory where node will maintain its peer database -# peer-db: pdb - -# directory where node will maintain its function database -# function-db: fdb +# directory where node will maintain its database +# db: db # multiaddresses of nodes this node will try to connect to on boot # boot-nodes: [] diff --git a/cmd/node/main.go b/cmd/node/main.go index 55e76b11..c1003e17 100644 --- a/cmd/node/main.go +++ b/cmd/node/main.go @@ -22,8 +22,8 @@ import ( "github.com/blocklessnetwork/b7s/host" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/node" - "github.com/blocklessnetwork/b7s/peerstore" "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" ) const ( @@ -84,14 +84,13 @@ func run() int { } } - // Set relevant working paths for workspace, peerDB and functionDB. + // Set relevant working paths for workspace and DB. // If paths were set using the CLI flags, use those. Else, use generated path, e.g. .b7s_/. updateDirPaths(nodeDir, cfg) log.Info(). Str("workspace", cfg.Workspace). - Str("peer_db", cfg.PeerDB). - Str("function_db", cfg.FunctionDB). + Str("db", cfg.DB). Msg("filepaths used by the node") // Convert workspace path to an absolute one. @@ -103,23 +102,15 @@ func run() int { cfg.Workspace = workspace // Open the pebble peer database. - pdb, err := pebble.Open(cfg.PeerDB, &pebble.Options{Logger: &pebbleNoopLogger{}}) + db, err := pebble.Open(cfg.DB, &pebble.Options{Logger: &pebbleNoopLogger{}}) if err != nil { - log.Error().Err(err).Str("db", cfg.PeerDB).Msg("could not open pebble peer database") + log.Error().Err(err).Str("db", cfg.DB).Msg("could not open pebble database") return failure } - defer pdb.Close() + defer db.Close() // Create a new store. - pstore := store.New(pdb) - peerstore := peerstore.New(pstore) - - // Get the list of dial back peers. - peers, err := peerstore.Peers() - if err != nil { - log.Error().Err(err).Msg("could not get list of dial-back peers") - return failure - } + store := store.New(db, codec.NewJSONCodec()) // Get the list of boot nodes addresses. bootNodeAddrs, err := getBootNodeAddresses(cfg.BootNodes) @@ -128,17 +119,29 @@ func run() int { return failure } - // Create libp2p host. - host, err := host.New(log, cfg.Connectivity.Address, cfg.Connectivity.Port, + hostOpts := []func(*host.Config){ host.WithPrivateKey(cfg.Connectivity.PrivateKey), host.WithBootNodes(bootNodeAddrs), - host.WithDialBackPeers(peers), host.WithDialBackAddress(cfg.Connectivity.DialbackAddress), host.WithDialBackPort(cfg.Connectivity.DialbackPort), host.WithDialBackWebsocketPort(cfg.Connectivity.WebsocketDialbackPort), host.WithWebsocket(cfg.Connectivity.Websocket), host.WithWebsocketPort(cfg.Connectivity.WebsocketPort), - ) + } + + if !cfg.Connectivity.NoDialbackPeers { + // Get the list of dial back peers. + peers, err := store.RetrievePeers() + if err != nil { + log.Error().Err(err).Msg("could not get list of dial-back peers") + return failure + } + + hostOpts = append(hostOpts, host.WithDialBackPeers(peers)) + } + + // Create libp2p host. + host, err := host.New(log, cfg.Connectivity.Address, cfg.Connectivity.Port, hostOpts...) if err != nil { log.Error().Err(err).Str("key", cfg.Connectivity.PrivateKey).Msg("could not create host") return failure @@ -149,7 +152,6 @@ func run() int { Str("id", host.ID().String()). Strs("addresses", host.Addresses()). Int("boot_nodes", len(bootNodeAddrs)). - Int("dial_back_peers", len(peers)). Msg("created host") // Set node options. @@ -202,18 +204,8 @@ func run() int { opts = append(opts, node.WithWorkspace(cfg.Workspace)) } - // Open the pebble function database. - fdb, err := pebble.Open(cfg.FunctionDB, &pebble.Options{Logger: &pebbleNoopLogger{}}) - if err != nil { - log.Error().Err(err).Str("db", cfg.FunctionDB).Msg("could not open pebble function database") - return failure - } - defer fdb.Close() - - functionStore := store.New(fdb) - // Create function store. - fstore := fstore.New(log, functionStore, cfg.Workspace) + fstore := fstore.New(log, store, cfg.Workspace) // If we have topics specified, use those. if len(cfg.Topics) > 0 { @@ -221,7 +213,7 @@ func run() int { } // Instantiate node. - node, err := node.New(log, host, peerstore, fstore, opts...) + node, err := node.New(log, host, store, fstore, opts...) if err != nil { log.Error().Err(err).Msg("could not create node") return failure @@ -327,17 +319,11 @@ func updateDirPaths(root string, cfg *config.Config) { } cfg.Workspace = workspace - peerDB := cfg.PeerDB - if peerDB == "" { - peerDB = filepath.Join(root, config.DefaultPeerDBName) - } - cfg.PeerDB = peerDB - - functionDB := cfg.FunctionDB - if functionDB == "" { - functionDB = filepath.Join(root, config.DefaultFunctionDBName) + db := cfg.DB + if db == "" { + db = filepath.Join(root, config.DefaultDBName) } - cfg.FunctionDB = functionDB + cfg.DB = db } func generateNodeDirName(id string) string { diff --git a/config/config.go b/config/config.go index ce86cc15..afb9e13b 100644 --- a/config/config.go +++ b/config/config.go @@ -16,9 +16,8 @@ const ( // Default names for storage directories. const ( - DefaultPeerDBName = "peer-db" - DefaultFunctionDBName = "function-db" - DefaultWorkspaceName = "workspace" + DefaultDBName = "db" + DefaultWorkspaceName = "workspace" ) var DefaultConfig = Config{ @@ -45,8 +44,7 @@ type Config struct { LoadAttributes bool `koanf:"load-attributes" flag:"load-attributes"` // TODO: Head node probably doesn't need attributes..? Topics []string `koanf:"topics" flag:"topics"` - PeerDB string `koanf:"peer-db" flag:"peer-db"` - FunctionDB string `koanf:"function-db" flag:"function-db"` // TODO: Head node doesn't need a function database. + DB string `koanf:"db" flag:"db"` Log Log `koanf:"log"` Connectivity Connectivity `koanf:"connectivity"` @@ -69,6 +67,7 @@ type Connectivity struct { Websocket bool `koanf:"websocket" flag:"websocket,w"` WebsocketPort uint `koanf:"websocket-port" flag:"websocket-port"` WebsocketDialbackPort uint `koanf:"websocket-dialback-port" flag:"websocket-dialback-port"` + NoDialbackPeers bool `koanf:"no-dialback-peers" flag:"no-dialback-peers"` } type Head struct { @@ -108,10 +107,8 @@ func getFlagDescription(flag string) string { return "node should try to load its attribute data from IPFS" case "topics": return "topics node should subscribe to" - case "peer-db": - return "path to the database used for persisting peer data" - case "function-db": - return "path to the database used for persisting function data" + case "db": + return "path to the database used for persisting peer and function data" case "log-level": return "log level to use" case "address": @@ -140,6 +137,8 @@ func getFlagDescription(flag string) string { return "amount of CPU time allowed for Blockless Functions in the 0-1 range, 1 being unlimited" case "memory-limit": return "memory limit (kB) for Blockless Functions" + case "no-dialback-peers": + return "start without dialing back peers from previous runs" default: return "" } diff --git a/config/load_test.go b/config/load_test.go index 4b7edfd7..1f1ac082 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -222,9 +222,7 @@ func TestConfig_Environment(t *testing.T) { concurrency = uint(45) bootNodes = "a,b,c,d" topics = "topic1,topic2,topic3" - - peerDB = "/tmp/db/peer-db" - functionDB = "/tmp/db/function-db" + db = "/tmp/db" logLevel = "trace" @@ -244,8 +242,7 @@ func TestConfig_Environment(t *testing.T) { t.Setenv("B7S_Concurrency", fmt.Sprint(concurrency)) t.Setenv("B7S_BootNodes", bootNodes) t.Setenv("B7S_Topics", topics) - t.Setenv("B7S_PeerDB", peerDB) - t.Setenv("B7S_FunctionDB", functionDB) + t.Setenv("B7S_DB", db) t.Setenv("B7S_Log_Level", logLevel) t.Setenv("B7S_Connectivity_Address", address) t.Setenv("B7S_Connectivity_Port", fmt.Sprint(port)) @@ -269,8 +266,7 @@ func TestConfig_Environment(t *testing.T) { topicList := strings.Split(topics, ",") require.Equal(t, topicList, cfg.Topics) - require.Equal(t, peerDB, cfg.PeerDB) - require.Equal(t, functionDB, cfg.FunctionDB) + require.Equal(t, db, cfg.DB) require.Equal(t, logLevel, cfg.Log.Level) require.Equal(t, address, cfg.Connectivity.Address) require.Equal(t, port, cfg.Connectivity.Port) diff --git a/docker/README.md b/docker/README.md index bfd00664..bb4021dd 100644 --- a/docker/README.md +++ b/docker/README.md @@ -27,8 +27,7 @@ docker run -d --name b7s \ -e REST_API=8081 \ -e DIALBACK_PORT=32342 \ -e DIALBACK_ADDRESS=1.1.1.1 \ - -v /var/tmp/b7s/peerdb:/var/tmp/b7s/peerdb \ - -v /var/tmp/b7s/function-db:/var/tmp/b7s/function-db \ + -v /var/tmp/b7s/db:/var/tmp/b7s/db \ -p 9527:9527 \ ghcr.io/blocklessnetwork/b7s:v0.0.25 ``` @@ -42,8 +41,7 @@ docker run -d --name b7s \ -e KEY_PASSWORD= \ -e NODE_ROLE=worker \ -e P2P_PORT=9527 \ - -v /var/tmp/b7s/peerdb:/var/tmp/b7s/peerdb \ - -v /var/tmp/b7s/function-db:/var/tmp/b7s/function-db \ + -v /var/tmp/b7s/db:/var/tmp/b7s/db \ -p 9527:9527 \ ghcr.io/blocklessnetwork/b7s:v0.0.25 ``` diff --git a/docker/run.sh b/docker/run.sh index 7c9d01ae..2d88bee5 100644 --- a/docker/run.sh +++ b/docker/run.sh @@ -91,7 +91,7 @@ if [ -n "$BOOT_NODES" ]; then fi if [ "$NODE_ROLE" = "head" ]; then - ./b7s --peer-db /var/tmp/b7s/peerdb --function-db /var/tmp/b7s/function-db --log-level debug --port $P2P_PORT --role head --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH --rest-api :$REST_API $dialback_args $bootnode_args + ./b7s --db /var/tmp/b7s/db --log-level debug --port $P2P_PORT --role head --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH --rest-api :$REST_API $dialback_args $bootnode_args else - ./b7s --peer-db ./peer-database --function-db ./function-database--log-level debug --port $P2P_PORT --role worker --runtime /app/runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args + ./b7s --db ./db --log-level debug --port $P2P_PORT --role worker --runtime /app/runtime --workspace $WORKSPACE_ROOT --private-key $NODE_KEY_PATH $dialback_args $bootnode_args fi diff --git a/fstore/fstore.go b/fstore/fstore.go index ee570a44..4173d1f4 100644 --- a/fstore/fstore.go +++ b/fstore/fstore.go @@ -5,13 +5,15 @@ import ( "github.com/cavaliergopher/grab/v3" "github.com/rs/zerolog" + + "github.com/blocklessnetwork/b7s/models/blockless" ) // FStore - function store - deals with all of the function-related actions - saving/reading them from backing storage, // downloading them, unpacking them etc. type FStore struct { log zerolog.Logger - store Store + store blockless.FunctionStore http *http.Client downloader *grab.Client @@ -19,7 +21,7 @@ type FStore struct { } // New creates a new function store. -func New(log zerolog.Logger, store Store, workdir string) *FStore { +func New(log zerolog.Logger, store blockless.FunctionStore, workdir string) *FStore { // Create an HTTP client. cli := http.Client{ @@ -40,9 +42,3 @@ func New(log zerolog.Logger, store Store, workdir string) *FStore { return &h } - -// InstalledFunctions will return the CIDs of all functions found in local storage. -func (h *FStore) InstalledFunctions() ([]string, error) { - ids := h.store.Keys() - return ids, nil -} diff --git a/fstore/fstore_integration_test.go b/fstore/fstore_integration_test.go new file mode 100644 index 00000000..7d9b26b9 --- /dev/null +++ b/fstore/fstore_integration_test.go @@ -0,0 +1,142 @@ +//go:build integration +// +build integration + +package fstore_test + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "io" + "net/http" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/blocklessnetwork/b7s/fstore" + "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" + "github.com/blocklessnetwork/b7s/testing/helpers" + "github.com/blocklessnetwork/b7s/testing/mocks" +) + +const ( + cleanupDisableEnv = "B7S_INTEG_CLEANUP_DISABLE" +) + +func TestStore_InstallFunction(t *testing.T) { + + const ( + functionCID = "bafybeia24v4czavtpjv2co3j54o4a5ztduqcpyyinerjgncx7s2s22s7ea" + manifestURL = "https://bafybeia24v4czavtpjv2co3j54o4a5ztduqcpyyinerjgncx7s2s22s7ea.ipfs.w3s.link/manifest.json" + dirPattern = "b7s-fstore-integration-test-" + ) + + // 0. Setup. + + t.Log("starting test") + + dir, err := os.MkdirTemp("", dirPattern) + require.NoError(t, err) + + cleanupDisabled := cleanupDisabled() + if !cleanupDisabled { + defer os.RemoveAll(dir) + } + + t.Logf("test dir: %v", dir) + + db := helpers.InMemoryDB(t) + defer db.Close() + + fstore := fstore.New(mocks.NoopLogger, store.New(db, codec.NewJSONCodec()), dir) + + // 1. Function Install + err = fstore.Install(manifestURL, functionCID) + require.NoError(t, err) + + t.Log("function install successful") + + // 2. Verify function installation on filesystem - file structure, checksum etc. + + manifest := getManifest(t, manifestURL) + + archive := filepath.Join(dir, functionCID, manifest.Runtime.URL) + listedChecksum, err := hex.DecodeString(manifest.Runtime.Checksum) + require.Equal(t, listedChecksum, getChecksum(t, archive)) + + t.Logf("verified checksum: checksum: %x, archive: %v", listedChecksum, archive) + + file := filepath.Join(dir, functionCID, manifest.Entry) + info, err := os.Stat(file) + require.NoError(t, err) + require.NotZero(t, info.Size()) + + t.Logf("verified extracted file: path: %v", file) + + // 3. Verify function record is persisted + + function, err := fstore.Get(functionCID) + require.NoError(t, err) + + t.Log("retrieved function record") + + require.Equal(t, functionCID, function.CID) + require.Equal(t, manifestURL, function.URL) + + // We're not validating manifest because we store a tweaked version. + + // Record has the workdir prefix trimmed. + require.Contains(t, archive, function.Archive) + require.Contains(t, file, function.Files) + + t.Logf("verified persisted function record") + + // 4. Verify sync functionality by deleting files and running a sync. + + require.NoError(t, os.Remove(archive)) + require.NoError(t, os.Remove(file)) + + require.NoError(t, fstore.Sync(true)) + require.Equal(t, listedChecksum, getChecksum(t, archive)) + info, err = os.Stat(file) + require.NoError(t, err) + require.NotZero(t, info.Size()) + + t.Logf("verified files reappear after sync") +} + +func cleanupDisabled() bool { + return os.Getenv(cleanupDisableEnv) == "yes" +} + +func getManifest(t *testing.T, url string) blockless.FunctionManifest { + t.Helper() + + res, err := http.Get(url) + require.NoError(t, err) + defer res.Body.Close() + + var manifest blockless.FunctionManifest + err = json.NewDecoder(res.Body).Decode(&manifest) + require.NoError(t, err) + + return manifest +} + +func getChecksum(t *testing.T, path string) []byte { + t.Helper() + + f, err := os.Open(path) + require.NoError(t, err) + defer f.Close() + + h := sha256.New() + _, err = io.Copy(h, f) + require.NoError(t, err) + + return h.Sum(nil) +} diff --git a/fstore/get.go b/fstore/get.go deleted file mode 100644 index ecad0c7f..00000000 --- a/fstore/get.go +++ /dev/null @@ -1,18 +0,0 @@ -package fstore - -import ( - "fmt" - - "github.com/blocklessnetwork/b7s/models/blockless" -) - -// Get retrieves a function manifest for the given function from storage. -func (h *FStore) Get(cid string) (*blockless.FunctionManifest, error) { - - fn, err := h.getFunction(cid) - if err != nil { - return nil, fmt.Errorf("could not get function from store: %w", err) - } - - return &fn.Manifest, nil -} diff --git a/fstore/get_test.go b/fstore/get_test.go index b39eba8c..5a720cbd 100644 --- a/fstore/get_test.go +++ b/fstore/get_test.go @@ -7,10 +7,11 @@ import ( "github.com/stretchr/testify/require" "github.com/blocklessnetwork/b7s/fstore" + "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/testing/mocks" ) -func TestFunction_GetHandlesErrors(t *testing.T) { +func TestFunction_RetrieveHandlesErrors(t *testing.T) { const ( testCID = "dummy-cid" @@ -22,8 +23,8 @@ func TestFunction_GetHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) store := mocks.BaselineStore(t) - store.GetRecordFunc = func(string, interface{}) error { - return mocks.GenericError + store.RetrieveFunctionFunc = func(string) (blockless.FunctionRecord, error) { + return blockless.FunctionRecord{}, mocks.GenericError } fh := fstore.New(mocks.NoopLogger, store, workdir) @@ -31,28 +32,3 @@ func TestFunction_GetHandlesErrors(t *testing.T) { _, err = fh.Get(testCID) require.Error(t, err) } - -func TestFunction_InstalledFunctions(t *testing.T) { - - installed := []string{ - "func1", - "func2", - "func3", - } - - workdir, err := os.MkdirTemp("", "b7s-function-get-") - require.NoError(t, err) - - defer os.RemoveAll(workdir) - - store := mocks.BaselineStore(t) - store.KeysFunc = func() []string { - return installed - } - - fh := fstore.New(mocks.NoopLogger, store, workdir) - - list, err := fh.InstalledFunctions() - require.NoError(t, err) - require.Equal(t, installed, list) -} diff --git a/fstore/gzip_internal_test.go b/fstore/gzip_internal_test.go index 10f0e386..680c3ebf 100644 --- a/fstore/gzip_internal_test.go +++ b/fstore/gzip_internal_test.go @@ -6,8 +6,6 @@ import ( "github.com/stretchr/testify/require" - "github.com/blocklessnetwork/b7s/store" - "github.com/blocklessnetwork/b7s/testing/helpers" "github.com/blocklessnetwork/b7s/testing/mocks" ) @@ -22,8 +20,7 @@ func TestFunction_UnpackArchive(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) err = fh.unpackArchive(filename, workdir) require.NoError(t, err) @@ -41,8 +38,7 @@ func TestFunction_UnpackArchiveHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) err = fh.unpackArchive(filename, workdir) require.Error(t, err) diff --git a/fstore/http_internal_test.go b/fstore/http_internal_test.go index b59134dc..c97aa624 100644 --- a/fstore/http_internal_test.go +++ b/fstore/http_internal_test.go @@ -16,6 +16,7 @@ import ( "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" "github.com/blocklessnetwork/b7s/testing/helpers" "github.com/blocklessnetwork/b7s/testing/mocks" ) @@ -35,7 +36,7 @@ func TestFunction_GetJSON(t *testing.T) { })) defer srv.Close() - store := store.New(helpers.InMemoryDB(t)) + store := store.New(helpers.InMemoryDB(t), codec.NewJSONCodec()) fh := New(mocks.NoopLogger, store, workdir) var downloaded blockless.FunctionManifest @@ -124,8 +125,7 @@ func TestFunction_GetJSONHandlesErrors(t *testing.T) { })) defer srv.Close() - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) var response blockless.FunctionManifest err := fh.getJSON(srv.URL, &response) @@ -153,8 +153,7 @@ func TestFunction_Download(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/test-file", srv.URL) hash := sha256.Sum256(payload) @@ -201,8 +200,7 @@ func TestFunction_DownloadHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/test-file", srv.URL) hash := sha256.Sum256(payload) @@ -226,8 +224,7 @@ func TestFunction_DownloadHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/test-file", srv.URL) + "\n" hash := sha256.Sum256(payload) @@ -251,8 +248,7 @@ func TestFunction_DownloadHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/test-file", srv.URL) hash := sha256.Sum256(payload) @@ -280,3 +276,8 @@ func getRandomPayload(t *testing.T, len int) []byte { return buf } + +func newInMemoryStore(t *testing.T) *store.Store { + t.Helper() + return store.New(helpers.InMemoryDB(t), codec.NewJSONCodec()) +} diff --git a/fstore/install.go b/fstore/install.go index 3808ee18..1b0479f2 100644 --- a/fstore/install.go +++ b/fstore/install.go @@ -50,7 +50,7 @@ func (h *FStore) Install(address string, cid string) error { manifest.Deployment.File = functionPath // Store the function record. - fn := functionRecord{ + fn := blockless.FunctionRecord{ CID: cid, URL: address, Manifest: manifest, @@ -84,7 +84,7 @@ func (h *FStore) Installed(cid string) (bool, error) { return false, fmt.Errorf("could not get function from store: %w", err) } - haveArchive, haveFiles, err := h.checkFunctionFiles(*fn) + haveArchive, haveFiles, err := h.checkFunctionFiles(fn) if err != nil { return false, fmt.Errorf("could not verify function cache: %w", err) } diff --git a/fstore/install_test.go b/fstore/install_test.go index 3b7484bf..0ce2e021 100644 --- a/fstore/install_test.go +++ b/fstore/install_test.go @@ -8,6 +8,7 @@ import ( "net/http" "net/http/httptest" "os" + "path/filepath" "strings" "testing" @@ -16,6 +17,7 @@ import ( "github.com/blocklessnetwork/b7s/fstore" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" "github.com/blocklessnetwork/b7s/testing/helpers" "github.com/blocklessnetwork/b7s/testing/mocks" ) @@ -45,8 +47,7 @@ func TestFunction_Install(t *testing.T) { defer fsrv.Close() defer msrv.Close() - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) + fh := fstore.New(mocks.NoopLogger, newInMemoryStore(t), workdir) t.Run("function install works", func(t *testing.T) { @@ -57,11 +58,11 @@ func TestFunction_Install(t *testing.T) { err = fh.Install(address, testCID) require.NoError(t, err) - manifest, err := fh.Get(testCID) + function, err := fh.Get(testCID) require.NoError(t, err) // Verify downloaded file. - archive := manifest.Deployment.File + archive := filepath.Join(workdir, function.Manifest.Deployment.File) require.FileExists(t, archive) ok := verifyFileHash(t, archive, hash) @@ -111,7 +112,7 @@ func TestFunction_InstallHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) store := mocks.BaselineStore(t) - store.SetRecordFunc = func(string, interface{}) error { + store.SaveFunctionFunc = func(blockless.FunctionRecord) error { return mocks.GenericError } @@ -131,8 +132,7 @@ func TestFunction_InstallHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) + fh := fstore.New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) err = fh.Install(address, testCID) @@ -148,8 +148,7 @@ func TestFunction_InstallHandlesErrors(t *testing.T) { defer os.RemoveAll(workdir) - store := store.New(helpers.InMemoryDB(t)) - fh := fstore.New(mocks.NoopLogger, store, workdir) + fh := fstore.New(mocks.NoopLogger, newInMemoryStore(t), workdir) address := fmt.Sprintf("%s/%v", msrv.URL, manifestURL) err = fh.Install(address, testCID) @@ -172,8 +171,8 @@ func TestFunction_InstalledHandlesError(t *testing.T) { defer os.RemoveAll(workdir) store := mocks.BaselineStore(t) - store.GetRecordFunc = func(string, interface{}) error { - return mocks.GenericError + store.RetrieveFunctionFunc = func(string) (blockless.FunctionRecord, error) { + return blockless.FunctionRecord{}, mocks.GenericError } fh := fstore.New(mocks.NoopLogger, store, workdir) @@ -194,8 +193,8 @@ func TestFunction_InstalledHandlesError(t *testing.T) { defer os.RemoveAll(workdir) store := mocks.BaselineStore(t) - store.GetRecordFunc = func(string, interface{}) error { - return blockless.ErrNotFound + store.RetrieveFunctionFunc = func(string) (blockless.FunctionRecord, error) { + return blockless.FunctionRecord{}, blockless.ErrNotFound } fh := fstore.New(mocks.NoopLogger, store, workdir) @@ -260,3 +259,8 @@ func verifyFileHash(t *testing.T, filename string, checksum [32]byte) bool { return bytes.Equal(checksum[:], h[:]) } + +func newInMemoryStore(t *testing.T) *store.Store { + t.Helper() + return store.New(helpers.InMemoryDB(t), codec.NewJSONCodec()) +} diff --git a/fstore/record.go b/fstore/record.go index a14bdd3d..5acc9439 100644 --- a/fstore/record.go +++ b/fstore/record.go @@ -7,42 +7,41 @@ import ( "github.com/blocklessnetwork/b7s/models/blockless" ) -type functionRecord struct { - CID string `json:"cid"` - URL string `json:"url"` - Manifest blockless.FunctionManifest `json:"manifest"` - Archive string `json:"archive"` - Files string `json:"files"` - - UpdatedAt time.Time `json:"updated_at"` - LastRetrieved time.Time `json:"last_retrieved"` +// Get retrieves a function manifest for the given function from storage. +func (h *FStore) Get(cid string) (blockless.FunctionRecord, error) { + + fn, err := h.getFunction(cid) + if err != nil { + return blockless.FunctionRecord{}, fmt.Errorf("could not get function from store: %w", err) + } + + return fn, nil } -func (h *FStore) getFunction(cid string) (*functionRecord, error) { +func (h *FStore) getFunction(cid string) (blockless.FunctionRecord, error) { - // Retrieve function. - var fn functionRecord - err := h.store.GetRecord(cid, &fn) + function, err := h.store.RetrieveFunction(cid) if err != nil { - return nil, fmt.Errorf("could not retrieve function record: %w", err) + return blockless.FunctionRecord{}, fmt.Errorf("could not retrieve function record: %w", err) } // Update the "last retrieved" timestamp. - fn.LastRetrieved = time.Now().UTC() - err = h.store.SetRecord(cid, fn) + function.LastRetrieved = time.Now().UTC() + err = h.store.SaveFunction(function) if err != nil { h.log.Warn().Err(err).Str("cid", cid).Msg("could not update function record timestamp") } - return &fn, nil + return function, nil } -func (h *FStore) saveFunction(fn functionRecord) error { +func (h *FStore) saveFunction(fn blockless.FunctionRecord) error { // Clean paths - make them relative to the current working directory. fn.Archive = h.cleanPath(fn.Archive) fn.Files = h.cleanPath(fn.Files) + fn.Manifest.Deployment.File = h.cleanPath(fn.Manifest.Deployment.File) fn.UpdatedAt = time.Now().UTC() - return h.store.SetRecord(fn.CID, fn) + return h.store.SaveFunction(fn) } diff --git a/fstore/store.go b/fstore/store.go deleted file mode 100644 index 3407f62b..00000000 --- a/fstore/store.go +++ /dev/null @@ -1,7 +0,0 @@ -package fstore - -type Store interface { - GetRecord(string, interface{}) error - SetRecord(string, interface{}) error - Keys() []string -} diff --git a/fstore/sync.go b/fstore/sync.go index 6e28b8e0..eda713d6 100644 --- a/fstore/sync.go +++ b/fstore/sync.go @@ -4,24 +4,45 @@ import ( "fmt" "os" "path/filepath" + + "github.com/hashicorp/go-multierror" + + "github.com/blocklessnetwork/b7s/models/blockless" ) +func (h *FStore) Sync(haltOnError bool) error { + + functions, err := h.store.RetrieveFunctions() + if err != nil { + return fmt.Errorf("could not retrieve functions: %w", err) + } + + var multierr *multierror.Error + for _, function := range functions { + err := h.sync(function) + if err != nil { + // Add CID info to error to know what erred. + wrappedErr := fmt.Errorf("could not sync function (cid: %s): %w", function.CID, err) + if haltOnError { + return wrappedErr + } + + multierr = multierror.Append(multierr, wrappedErr) + } + } + + return multierr.ErrorOrNil() +} + // Sync will verify that the function identified by `cid` is still found on the local filesystem. // If the function archive of function files are missing, they will be recreated. -func (h *FStore) Sync(cid string) error { - - h.log.Debug().Str("cid", cid).Msg("checking function installation") +func (h *FStore) sync(fn blockless.FunctionRecord) error { // Read the function directly from storage - we don't want to update the timestamp // since this is a 'maintenance' access. - var fn functionRecord - err := h.store.GetRecord(cid, &fn) - if err != nil { - return fmt.Errorf("could not get function record: %w", err) - } h.log.Debug(). - Str("cid", cid). + Str("cid", fn.CID). Str("archive", fn.Archive). Str("files", fn.Files). Msg("checking function installation") @@ -33,21 +54,21 @@ func (h *FStore) Sync(cid string) error { // If both archive and files are there - we're done. if haveArchive && haveFiles { - h.log.Debug().Str("cid", cid).Msg("function files found, done") + h.log.Debug().Str("cid", fn.CID).Msg("function files found, done") return nil } h.log.Debug(). Bool("have_archive", haveArchive). Bool("have_files", haveFiles). - Str("cid", cid). + Str("cid", fn.CID). Msg("function installation missing files") // If we don't have the archive - redownload it. if !haveArchive { - path, err := h.download(cid, fn.Manifest) + path, err := h.download(fn.CID, fn.Manifest) if err != nil { - return fmt.Errorf("could not download the function archive (cid: %v): %w", cid, err) + return fmt.Errorf("could not download the function archive (cid: %v): %w", fn.CID, err) } // Update path in case it changed. @@ -67,7 +88,7 @@ func (h *FStore) Sync(cid string) error { err = h.unpackArchive(archivePath, files) if err != nil { - return fmt.Errorf("could not unpack gzip archive (cid: %v, file: %s): %w", cid, fn.Archive, err) + return fmt.Errorf("could not unpack gzip archive (cid: %v, file: %s): %w", fn.CID, fn.Archive, err) } fn.Files = files @@ -76,7 +97,7 @@ func (h *FStore) Sync(cid string) error { // Save the updated function record. err = h.saveFunction(fn) if err != nil { - return fmt.Errorf("could not save function (cid: %v): %w", cid, err) + return fmt.Errorf("could not save function (cid: %v): %w", fn.CID, err) } return nil @@ -84,7 +105,7 @@ func (h *FStore) Sync(cid string) error { // checkFunctionFiles checks if the files required by the function are found on local storage. // It returns two booleans indicating presence of the archive file, the unpacked files, and a potential error. -func (h *FStore) checkFunctionFiles(fn functionRecord) (bool, bool, error) { +func (h *FStore) checkFunctionFiles(fn blockless.FunctionRecord) (bool, bool, error) { // Check if the archive is found. archiveFound := true diff --git a/fstore/sync_internal_test.go b/fstore/sync_internal_test.go index 579ed8cb..ea792253 100644 --- a/fstore/sync_internal_test.go +++ b/fstore/sync_internal_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/testing/mocks" ) @@ -17,15 +18,14 @@ func TestFstore_CheckFunctionFiles(t *testing.T) { defer os.RemoveAll(workdir) - store := mocks.BaselineStore(t) - fh := New(mocks.NoopLogger, store, workdir) + fh := New(mocks.NoopLogger, newInMemoryStore(t), workdir) var ( archiveName = "archive.tar.gz" functionFileName = "function-file" ) - rec := functionRecord{ + rec := blockless.FunctionRecord{ Archive: archiveName, Files: functionFileName, } diff --git a/go.mod b/go.mod index aa44d5b6..875e7389 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect - github.com/getsentry/sentry-go v0.26.0 // indirect + github.com/getsentry/sentry-go v0.27.0 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.20.2 // indirect @@ -86,7 +86,7 @@ require ( go.uber.org/dig v1.17.1 // indirect go.uber.org/fx v1.21.0 // indirect go.uber.org/mock v0.4.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/text v0.15.0 // indirect golang.org/x/time v0.5.0 // indirect gonum.org/v1/gonum v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -155,7 +155,7 @@ require ( github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect github.com/multiformats/go-multibase v0.2.0 // indirect github.com/multiformats/go-multicodec v0.9.0 // indirect - github.com/multiformats/go-multihash v0.2.3 // indirect + github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/opencontainers/runtime-spec v1.2.0 @@ -165,8 +165,8 @@ require ( github.com/polydawn/refmt v0.89.0 // indirect github.com/prometheus/client_golang v1.19.0 // indirect github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.52.2 // indirect - github.com/prometheus/procfs v0.13.0 // indirect + github.com/prometheus/common v0.53.0 // indirect + github.com/prometheus/procfs v0.14.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rogpeppe/go-internal v1.12.0 // indirect github.com/smartystreets/assertions v1.13.0 // indirect @@ -176,14 +176,14 @@ require ( go.opencensus.io v0.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect - golang.org/x/crypto v0.22.0 // indirect - golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 // indirect + golang.org/x/crypto v0.23.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.24.0 // indirect + golang.org/x/net v0.25.0 // indirect golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.19.0 - golang.org/x/tools v0.20.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect + golang.org/x/sys v0.20.0 + golang.org/x/tools v0.21.0 // indirect + google.golang.org/protobuf v1.34.1 // indirect lukechampine.com/blake3 v1.2.2 // indirect ) diff --git a/go.sum b/go.sum index d001ad5c..1f138625 100644 --- a/go.sum +++ b/go.sum @@ -116,8 +116,8 @@ github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nos github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= github.com/getkin/kin-openapi v0.124.0 h1:VSFNMB9C9rTKBnQ/fpyDU8ytMTr4dWI9QovSKj9kz/M= github.com/getkin/kin-openapi v0.124.0/go.mod h1:wb1aSZA/iWmorQP9KTAS/phLj/t17B5jT7+fS8ed9NM= -github.com/getsentry/sentry-go v0.26.0 h1:IX3++sF6/4B5JcevhdZfdKIHfyvMmAq/UnqcyT2H6mA= -github.com/getsentry/sentry-go v0.26.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= +github.com/getsentry/sentry-go v0.27.0 h1:Pv98CIbtB3LkMWmXi4Joa5OOcwbmnX88sF5qbK3r3Ps= +github.com/getsentry/sentry-go v0.27.0/go.mod h1:lc76E2QywIyW8WuBnwl8Lc4bkmQH4+w1gwTf25trprY= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= @@ -464,15 +464,15 @@ github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7q github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= -github.com/prometheus/common v0.52.2 h1:LW8Vk7BccEdONfrJBDffQGRtpSzi5CQaRZGtboOO2ck= -github.com/prometheus/common v0.52.2/go.mod h1:lrWtQx+iDfn2mbH5GUzlH9TSHyfZpHkSiG1W7y3sF2Q= +github.com/prometheus/common v0.53.0 h1:U2pL9w9nmJwJDa4qqLQ3ZaePJ6ZTwt7cMD3AG3+aLCE= +github.com/prometheus/common v0.53.0/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= -github.com/prometheus/procfs v0.13.0 h1:GqzLlQyfsPbaEHaQkO7tbDlriv/4o5Hudv6OXHGKX7o= -github.com/prometheus/procfs v0.13.0/go.mod h1:cd4PFCR54QLnGKPaKGA6l+cfuNXtht43ZKY6tow0Y1g= +github.com/prometheus/procfs v0.14.0 h1:Lw4VdGGoKEZilJsayHf0B+9YgLGREba2C6xr+Fdfq6s= +github.com/prometheus/procfs v0.14.0/go.mod h1:XL+Iwz8k8ZabyZfMFHPiilCniixqQarAy5Mu67pHlNQ= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A= github.com/quic-go/quic-go v0.42.0 h1:uSfdap0eveIl8KXnipv9K7nlwZ5IqLlYOpJ58u5utpM= @@ -615,11 +615,11 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200602180216-279210d13fed/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= -golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= -golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= +golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= +golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= -golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8 h1:ESSUROHIBHg7USnszlcdmjBEwdMj9VUvU+OPk4yl2mc= -golang.org/x/exp v0.0.0-20240409090435-93d18d7e34b8/go.mod h1:/lliqkxwWAhPjf5oSOIJup2XcqJaw8RGS6k3TGEc7GI= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= +golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= @@ -654,8 +654,8 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= -golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= +golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= +golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -702,15 +702,15 @@ golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= -golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= +golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= +golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= @@ -731,8 +731,8 @@ golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.20.0 h1:hz/CVckiOxybQvFw6h7b/q80NTr9IUQb4s1IIzW7KNY= -golang.org/x/tools v0.20.0/go.mod h1:WvitBU7JJf6A4jOdg4S1tviW9bhUxkgeCui/0JHctQg= +golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= +golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -772,8 +772,8 @@ google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +google.golang.org/protobuf v1.34.1 h1:9ddQBjfCyZPOHPUiPxpYESBLc+T8P3E+Vo4IbKZgFWg= +google.golang.org/protobuf v1.34.1/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/models/blockless/function_record.go b/models/blockless/function_record.go new file mode 100644 index 00000000..07876c01 --- /dev/null +++ b/models/blockless/function_record.go @@ -0,0 +1,16 @@ +package blockless + +import ( + "time" +) + +type FunctionRecord struct { + CID string `json:"cid"` + URL string `json:"url"` + Manifest FunctionManifest `json:"manifest"` + Archive string `json:"archive"` + Files string `json:"files"` + + UpdatedAt time.Time `json:"updated_at"` + LastRetrieved time.Time `json:"last_retrieved"` +} diff --git a/models/blockless/store.go b/models/blockless/store.go new file mode 100644 index 00000000..c012120f --- /dev/null +++ b/models/blockless/store.go @@ -0,0 +1,24 @@ +package blockless + +import ( + "github.com/libp2p/go-libp2p/core/peer" +) + +type Store interface { + PeerStore + FunctionStore +} + +type PeerStore interface { + SavePeer(peer Peer) error + RetrievePeer(id peer.ID) (Peer, error) + RetrievePeers() ([]Peer, error) + RemovePeer(id peer.ID) error +} + +type FunctionStore interface { + SaveFunction(function FunctionRecord) error + RetrieveFunction(cid string) (FunctionRecord, error) + RetrieveFunctions() ([]FunctionRecord, error) + RemoveFunction(id string) error +} diff --git a/node/function.go b/node/fstore.go similarity index 55% rename from node/function.go rename to node/fstore.go index aab7c9cc..dbe9b5e0 100644 --- a/node/function.go +++ b/node/fstore.go @@ -8,9 +8,6 @@ type FStore interface { // Installed returns info if the function is installed or not. Installed(cid string) (bool, error) - // InstalledFunction returns the list of CIDs of installed functions. - InstalledFunctions() ([]string, error) - - // Sync will recheck if function installation is found in local storage, and redownload it if it isn't. - Sync(cid string) error + // Sync will ensure function installations are correct, redownloading functions if needed. + Sync(haltOnError bool) error } diff --git a/node/health_internal_test.go b/node/health_internal_test.go index 6a37023d..f181e2a0 100644 --- a/node/health_internal_test.go +++ b/node/health_internal_test.go @@ -27,7 +27,7 @@ func TestNode_Health(t *testing.T) { var ( logger = mocks.NoopLogger - peerstore = mocks.BaselinePeerStore(t) + store = mocks.BaselineStore(t) functionHandler = mocks.BaselineFStore(t) ) @@ -37,7 +37,7 @@ func TestNode_Health(t *testing.T) { nhost, err := host.New(logger, loopback, 0) require.NoError(t, err) - node, err := New(logger, nhost, peerstore, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopics([]string{topic})) + node, err := New(logger, nhost, store, functionHandler, WithRole(blockless.HeadNode), WithHealthInterval(healthInterval), WithTopics([]string{topic})) require.NoError(t, err) // Create a host that will listen on the the topic to verify health pings diff --git a/node/node.go b/node/node.go index 60172312..502bb736 100644 --- a/node/node.go +++ b/node/node.go @@ -47,7 +47,7 @@ type Node struct { } // New creates a new Node. -func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore, options ...Option) (*Node, error) { +func New(log zerolog.Logger, host *host.Host, store blockless.PeerStore, fstore FStore, options ...Option) (*Node, error) { // Initialize config. cfg := DefaultConfig @@ -99,8 +99,8 @@ func New(log zerolog.Logger, host *host.Host, peerStore PeerStore, fstore FStore return nil, fmt.Errorf("node configuration is not valid: %w", err) } - // Create a notifiee with a backing peerstore. - cn := newConnectionNotifee(log, peerStore) + // Create a notifiee with a backing store. + cn := newConnectionNotifee(log, store) host.Network().Notify(cn) return n, nil diff --git a/node/node_integration_test.go b/node/node_integration_test.go index 953f38ba..f5845fdd 100644 --- a/node/node_integration_test.go +++ b/node/node_integration_test.go @@ -30,8 +30,8 @@ import ( "github.com/blocklessnetwork/b7s/models/execute" "github.com/blocklessnetwork/b7s/models/request" "github.com/blocklessnetwork/b7s/node" - "github.com/blocklessnetwork/b7s/peerstore" "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" "github.com/blocklessnetwork/b7s/testing/helpers" "github.com/blocklessnetwork/b7s/testing/mocks" ) @@ -97,9 +97,8 @@ func createNode(t *testing.T, dir string, logger zerolog.Logger, host *host.Host require.NoError(t, err) var ( - store = store.New(db) - peerstore = peerstore.New(store) - fstore = fstore.New(logger, store, workdir) + store = store.New(db, codec.NewJSONCodec()) + fstore = fstore.New(logger, store, workdir) ) opts := []node.Option{ @@ -120,7 +119,7 @@ func createNode(t *testing.T, dir string, logger zerolog.Logger, host *host.Host opts = append(opts, node.WithExecutor(executor)) } - node, err := node.New(logger, host, peerstore, fstore, opts...) + node, err := node.New(logger, host, store, fstore, opts...) require.NoError(t, err) return db, node diff --git a/node/node_internal_test.go b/node/node_internal_test.go index 25d9451b..b6d95721 100644 --- a/node/node_internal_test.go +++ b/node/node_internal_test.go @@ -37,7 +37,7 @@ func TestNode_New(t *testing.T) { var ( logger = mocks.NoopLogger - peerstore = mocks.BaselinePeerStore(t) + store = mocks.BaselineStore(t) functionHandler = mocks.BaselineFStore(t) executor = mocks.BaselineExecutor(t) ) @@ -48,23 +48,23 @@ func TestNode_New(t *testing.T) { t.Run("create a head node", func(t *testing.T) { t.Parallel() - node, err := New(logger, host, peerstore, functionHandler, WithRole(blockless.HeadNode)) + node, err := New(logger, host, store, functionHandler, WithRole(blockless.HeadNode)) require.NoError(t, err) require.NotNil(t, node) // Creating a head node with executor fails. - _, err = New(logger, host, peerstore, functionHandler, WithRole(blockless.HeadNode), WithExecutor(executor)) + _, err = New(logger, host, store, functionHandler, WithRole(blockless.HeadNode), WithExecutor(executor)) require.Error(t, err) }) t.Run("create a worker node", func(t *testing.T) { t.Parallel() - node, err := New(logger, host, peerstore, functionHandler, WithRole(blockless.WorkerNode), WithExecutor(executor), WithWorkspace(t.TempDir())) + node, err := New(logger, host, store, functionHandler, WithRole(blockless.WorkerNode), WithExecutor(executor), WithWorkspace(t.TempDir())) require.NoError(t, err) require.NotNil(t, node) // Creating a worker node without executor fails. - _, err = New(logger, host, peerstore, functionHandler, WithRole(blockless.WorkerNode)) + _, err = New(logger, host, store, functionHandler, WithRole(blockless.WorkerNode)) require.Error(t, err) }) } @@ -74,7 +74,7 @@ func createNode(t *testing.T, role blockless.NodeRole) *Node { var ( logger = mocks.NoopLogger - peerstore = mocks.BaselinePeerStore(t) + store = mocks.BaselineStore(t) functionHandler = mocks.BaselineFStore(t) ) @@ -91,7 +91,7 @@ func createNode(t *testing.T, role blockless.NodeRole) *Node { opts = append(opts, WithWorkspace(t.TempDir())) } - node, err := New(logger, host, peerstore, functionHandler, opts...) + node, err := New(logger, host, store, functionHandler, opts...) require.NoError(t, err) return node diff --git a/node/notifiee.go b/node/notifiee.go index 99daecb5..1fcc43a8 100644 --- a/node/notifiee.go +++ b/node/notifiee.go @@ -4,18 +4,20 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" + + "github.com/blocklessnetwork/b7s/models/blockless" ) type connectionNotifiee struct { log zerolog.Logger - peers PeerStore + store blockless.PeerStore } -func newConnectionNotifee(log zerolog.Logger, peerStore PeerStore) *connectionNotifiee { +func newConnectionNotifee(log zerolog.Logger, store blockless.PeerStore) *connectionNotifiee { cn := connectionNotifiee{ log: log.With().Str("component", "notifiee").Logger(), - peers: peerStore, + store: store, } return &cn @@ -36,8 +38,14 @@ func (n *connectionNotifiee) Connected(network network.Network, conn network.Con Interface("addr_info", addrInfo). Msg("peer connected") + peer := blockless.Peer{ + ID: peerID, + MultiAddr: maddr.String(), + AddrInfo: addrInfo, + } + // Store the peer info. - err := n.peers.Store(peerID, maddr, addrInfo) + err := n.store.SavePeer(peer) if err != nil { n.log.Warn().Err(err).Str("id", peerID.String()).Msg("could not add peer to peerstore") } diff --git a/node/notifiee_internal_test.go b/node/notifiee_internal_test.go index fa299abb..9f4f4227 100644 --- a/node/notifiee_internal_test.go +++ b/node/notifiee_internal_test.go @@ -4,8 +4,6 @@ import ( "context" "testing" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/require" "github.com/blocklessnetwork/b7s/host" @@ -27,14 +25,14 @@ func TestNode_Notifiee(t *testing.T) { storedPeer bool ) - peerstore := mocks.BaselinePeerStore(t) + store := mocks.BaselineStore(t) // Override the peerstore methods so we know if the node correctly handled incoming connection. - peerstore.StoreFunc = func(peer.ID, multiaddr.Multiaddr, peer.AddrInfo) error { + store.SavePeerFunc = func(blockless.Peer) error { storedPeer = true return nil } - node, err := New(logger, server, peerstore, functionHandler, WithRole(blockless.HeadNode)) + node, err := New(logger, server, store, functionHandler, WithRole(blockless.HeadNode)) require.NoError(t, err) client, err := host.New(mocks.NoopLogger, loopback, 0) diff --git a/node/peerstore.go b/node/peerstore.go deleted file mode 100644 index 533d078a..00000000 --- a/node/peerstore.go +++ /dev/null @@ -1,10 +0,0 @@ -package node - -import ( - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" -) - -type PeerStore interface { - Store(peer.ID, multiaddr.Multiaddr, peer.AddrInfo) error -} diff --git a/node/run.go b/node/run.go index 71e09eed..ce6b6f0f 100644 --- a/node/run.go +++ b/node/run.go @@ -23,7 +23,10 @@ func (n *Node) Run(ctx context.Context) error { } // Sync functions now in case they were removed from the storage. - n.syncFunctions() + err = n.fstore.Sync(false) + if err != nil { + return fmt.Errorf("could not sync functions: %w", err) + } // Set the handler for direct messages. n.listenDirectMessages(ctx) diff --git a/node/store.go b/node/store.go deleted file mode 100644 index b70b7ae6..00000000 --- a/node/store.go +++ /dev/null @@ -1,6 +0,0 @@ -package node - -type Store interface { - GetRecord(string, interface{}) error - SetRecord(string, interface{}) error -} diff --git a/node/sync.go b/node/sync.go index 34c2e4b4..1f37b88c 100644 --- a/node/sync.go +++ b/node/sync.go @@ -5,28 +5,6 @@ import ( "time" ) -// syncFunctions will try to redownload any functions that were removed from local disk -// but were previously installed. We do NOT abort on failure. -func (n *Node) syncFunctions() { - - cids, err := n.fstore.InstalledFunctions() - if err != nil { - n.log.Error().Err(err).Msg("could not get list of installed functions") - return - } - - for _, cid := range cids { - - err := n.fstore.Sync(cid) - if err != nil { - n.log.Error().Err(err).Str("cid", cid).Msg("function sync error") - continue - } - - n.log.Debug().Str("function", cid).Msg("function sync ok") - } -} - func (n *Node) runSyncLoop(ctx context.Context) { ticker := time.NewTicker(syncInterval) @@ -34,7 +12,12 @@ func (n *Node) runSyncLoop(ctx context.Context) { for { select { case <-ticker.C: - n.syncFunctions() + err := n.fstore.Sync(false) + if err != nil { + n.log.Error().Err(err).Msg("function sync unsuccessful") + } else { + n.log.Debug().Msg("function sync ok") + } case <-ctx.Done(): ticker.Stop() diff --git a/node/sync_internal_test.go b/node/sync_internal_test.go deleted file mode 100644 index ed03600a..00000000 --- a/node/sync_internal_test.go +++ /dev/null @@ -1,40 +0,0 @@ -package node - -import ( - "testing" - - "github.com/stretchr/testify/require" - - "github.com/blocklessnetwork/b7s/models/blockless" - "github.com/blocklessnetwork/b7s/testing/mocks" -) - -func TestNode_Sync(t *testing.T) { - - var ( - installed = []string{ - "func1", - "func2", - "func3", - } - - synced []string - ) - - fstore := mocks.BaselineFStore(t) - fstore.InstalledFunctionsFunc = func() ([]string, error) { - return installed, nil - } - fstore.SyncFunc = func(cid string) error { - synced = append(synced, cid) - return nil - } - - node := createNode(t, blockless.WorkerNode) - node.fstore = fstore - - node.syncFunctions() - - // Verify all functions were synced. - require.Equal(t, installed, synced) -} diff --git a/peerstore/peerstore.go b/peerstore/peerstore.go deleted file mode 100644 index a6a9bc69..00000000 --- a/peerstore/peerstore.go +++ /dev/null @@ -1,85 +0,0 @@ -package peerstore - -import ( - "fmt" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - - "github.com/blocklessnetwork/b7s/models/blockless" -) - -// PeerStore takes care of storing and reading peer information to and from persistent storage. -type PeerStore struct { - store Store -} - -// New creates a new PeerStore handler. -func New(store Store) *PeerStore { - - ps := PeerStore{ - store: store, - } - - return &ps -} - -// Get wil retrieve peer with the given ID. -func (p *PeerStore) Get(id peer.ID) (blockless.Peer, error) { - - var peer blockless.Peer - err := p.store.GetRecord(id.String(), &peer) - if err != nil { - return blockless.Peer{}, fmt.Errorf("could not retrieve peer: %w", err) - } - - return peer, nil -} - -// Store will persist the peer information. -func (p *PeerStore) Store(id peer.ID, addr multiaddr.Multiaddr, info peer.AddrInfo) error { - - peerInfo := blockless.Peer{ - ID: id, - MultiAddr: addr.String(), - AddrInfo: info, - } - - err := p.store.SetRecord(id.String(), peerInfo) - if err != nil { - return fmt.Errorf("could not store peer: %w", err) - } - - return nil -} - -// Remove removes the peer from the peerstore. -func (p *PeerStore) Remove(id peer.ID) error { - - err := p.store.Delete(id.String()) - if err != nil { - return fmt.Errorf("could not remove peer: %w", err) - } - - return nil -} - -// Peers returns the list of peers from the peer store. -func (p *PeerStore) Peers() ([]blockless.Peer, error) { - - ids := p.store.Keys() - - var peers []blockless.Peer - for _, id := range ids { - - var peer blockless.Peer - err := p.store.GetRecord(id, &peer) - if err != nil { - return nil, fmt.Errorf("could not retrieve peer (id: %v): %w", id, err) - } - - peers = append(peers, peer) - } - - return peers, nil -} diff --git a/peerstore/peerstore_test.go b/peerstore/peerstore_test.go deleted file mode 100644 index c003189c..00000000 --- a/peerstore/peerstore_test.go +++ /dev/null @@ -1,190 +0,0 @@ -package peerstore_test - -import ( - "testing" - - "github.com/cockroachdb/pebble" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - "github.com/stretchr/testify/require" - - "github.com/blocklessnetwork/b7s/peerstore" - "github.com/blocklessnetwork/b7s/store" - "github.com/blocklessnetwork/b7s/testing/helpers" - "github.com/blocklessnetwork/b7s/testing/mocks" -) - -func Test_PeerStore(t *testing.T) { - t.Run("empty peer store", func(t *testing.T) { - t.Parallel() - - peerstore, db := setupPeerStore(t) - defer db.Close() - - peers, err := peerstore.Peers() - require.NoError(t, err) - require.Empty(t, peers) - }) - t.Run("store/get/delete peer", func(t *testing.T) { - t.Parallel() - - peerstore, db := setupPeerStore(t) - defer db.Close() - - var ( - peerID = mocks.GenericPeerID - addr = genericMultiAddr(t) - info = peer.AddrInfo{ - ID: peerID, - Addrs: []multiaddr.Multiaddr{addr}, - } - ) - - // Verify peerstore is empty. - peers, err := peerstore.Peers() - require.NoError(t, err) - require.Len(t, peers, 0) - - err = peerstore.Store(mocks.GenericPeerID, addr, info) - require.NoError(t, err) - - // Verify peer is written to the peerstore. - read, err := peerstore.Get(peerID) - require.NoError(t, err) - - require.Equal(t, addr.String(), read.MultiAddr) - require.Equal(t, info, read.AddrInfo) - - // Verify peer list has one peer. - peers, err = peerstore.Peers() - require.NoError(t, err) - require.Len(t, peers, 1) - - err = peerstore.Remove(peerID) - require.NoError(t, err) - - // Verify peer cannot be retrieved anymore. - _, err = peerstore.Get(peerID) - require.Error(t, err) - - // Verify peer list is empty now. - peers, err = peerstore.Peers() - require.NoError(t, err) - require.Len(t, peers, 0) - }) - t.Run("adding known peer", func(t *testing.T) { - t.Parallel() - - peerstore, db := setupPeerStore(t) - defer db.Close() - - var ( - peerID = mocks.GenericPeerID - addr = genericMultiAddr(t) - info = peer.AddrInfo{ - ID: peerID, - Addrs: []multiaddr.Multiaddr{addr}, - } - ) - - err := peerstore.Store(mocks.GenericPeerID, addr, info) - require.NoError(t, err) - - // Add the same peer again - we should still only have one peer in the list. - err = peerstore.Store(mocks.GenericPeerID, addr, info) - require.NoError(t, err) - - peers, err := peerstore.Peers() - require.NoError(t, err) - require.Len(t, peers, 1) - }) -} - -func Test_PeerStore_Store(t *testing.T) { - - t.Run("handles failure to store peer", func(t *testing.T) { - - store := mocks.BaselineStore(t) - store.SetRecordFunc = func(string, interface{}) error { - return mocks.GenericError - } - - peerstore := peerstore.New(store) - - var ( - peerID = mocks.GenericPeerID - addr = genericMultiAddr(t) - info = peer.AddrInfo{ - ID: peerID, - Addrs: []multiaddr.Multiaddr{addr}, - } - ) - - err := peerstore.Store(peerID, addr, info) - require.ErrorIs(t, err, mocks.GenericError) - }) - t.Run("handles failure to get peer", func(t *testing.T) { - - store := mocks.BaselineStore(t) - store.GetRecordFunc = func(string, interface{}) error { - return mocks.GenericError - } - peerstore := peerstore.New(store) - - var ( - peerID = mocks.GenericPeerID - ) - - _, err := peerstore.Get(peerID) - require.ErrorIs(t, err, mocks.GenericError) - }) - t.Run("handles peer list retrieval error", func(t *testing.T) { - store := mocks.BaselineStore(t) - store.KeysFunc = func() []string { - return []string{"dummy-key"} - } - store.GetRecordFunc = func(string, interface{}) error { - return mocks.GenericError - } - - peerstore := peerstore.New(store) - - _, err := peerstore.Peers() - require.ErrorIs(t, err, mocks.GenericError) - }) - t.Run("handles peer removal error", func(t *testing.T) { - t.Parallel() - - store := mocks.BaselineStore(t) - store.DeleteFunc = func(string) error { - return mocks.GenericError - } - - var ( - peerID = mocks.GenericPeerID - ) - - peerstore := peerstore.New(store) - - err := peerstore.Remove(peerID) - require.ErrorIs(t, err, mocks.GenericError) - }) -} -func setupPeerStore(t *testing.T) (*peerstore.PeerStore, *pebble.DB) { - t.Helper() - - db := helpers.InMemoryDB(t) - store := store.New(db) - ps := peerstore.New(store) - - return ps, db -} - -func genericMultiAddr(t *testing.T) multiaddr.Multiaddr { - t.Helper() - - addr, err := multiaddr.NewMultiaddr(mocks.GenericAddress) - require.NoError(t, err) - - return addr -} diff --git a/peerstore/store.go b/peerstore/store.go deleted file mode 100644 index 5d3ffd38..00000000 --- a/peerstore/store.go +++ /dev/null @@ -1,8 +0,0 @@ -package peerstore - -type Store interface { - SetRecord(key string, value interface{}) error - GetRecord(key string, out interface{}) error - Keys() []string - Delete(key string) error -} diff --git a/store/codec.go b/store/codec.go new file mode 100644 index 00000000..d55ce249 --- /dev/null +++ b/store/codec.go @@ -0,0 +1,6 @@ +package store + +type Codec interface { + Marshal(any) ([]byte, error) + Unmarshal([]byte, any) error +} diff --git a/store/codec/json.go b/store/codec/json.go new file mode 100644 index 00000000..ebcd5d2e --- /dev/null +++ b/store/codec/json.go @@ -0,0 +1,19 @@ +package codec + +import ( + "encoding/json" +) + +type JSON struct{} + +func NewJSONCodec() JSON { + return JSON{} +} + +func (c JSON) Marshal(obj any) ([]byte, error) { + return json.Marshal(obj) +} + +func (c JSON) Unmarshal(data []byte, obj any) error { + return json.Unmarshal(data, obj) +} diff --git a/store/delete.go b/store/delete.go deleted file mode 100644 index 6b4d8c9d..00000000 --- a/store/delete.go +++ /dev/null @@ -1,18 +0,0 @@ -package store - -import ( - "fmt" - - "github.com/cockroachdb/pebble" -) - -// Delete removes the key from the database. -func (s *Store) Delete(key string) error { - - err := s.db.Delete([]byte(key), pebble.Sync) - if err != nil { - return fmt.Errorf("could not delete value: %w", err) - } - - return nil -} diff --git a/store/get.go b/store/get.go deleted file mode 100644 index 9fb5f3a1..00000000 --- a/store/get.go +++ /dev/null @@ -1,53 +0,0 @@ -package store - -import ( - "encoding/json" - "errors" - "fmt" - - "github.com/cockroachdb/pebble" - - "github.com/blocklessnetwork/b7s/models/blockless" -) - -// Get retrieves the value for a key. -func (s *Store) Get(key string) (string, error) { - - value, closer, err := s.db.Get([]byte(key)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return "", blockless.ErrNotFound - } - - return "", fmt.Errorf("could not retrieve value: %w", err) - } - // Closer must be called else a memory leak occurs. - defer closer.Close() - - // After closer is done, the slice is no longer valid, so we need to copy it. - dup := make([]byte, len(value)) - copy(dup, value) - - return string(dup), nil -} - -// GetRecord will read and JSON-decode the record associated with the provided key. -func (s *Store) GetRecord(key string, out interface{}) error { - - value, closer, err := s.db.Get([]byte(key)) - if err != nil { - if errors.Is(err, pebble.ErrNotFound) { - return blockless.ErrNotFound - } - return fmt.Errorf("could not retrieve value: %w", err) - } - // Closer must be called else a memory leak occurs. - defer closer.Close() - - err = json.Unmarshal(value, out) - if err != nil { - return fmt.Errorf("could not decode record: %w", err) - } - - return nil -} diff --git a/store/key.go b/store/key.go new file mode 100644 index 00000000..d56fc831 --- /dev/null +++ b/store/key.go @@ -0,0 +1,33 @@ +package store + +import ( + "fmt" +) + +func encodeKey(prefix uint8, segments ...any) []byte { + + key := []byte{prefix} + + for _, segment := range segments { + switch s := segment.(type) { + + // Technically it would be nicer to have this an actual peer.ID. + // However, peerID MarshalBinary() method returns an error, meaning we would need to + // check it here or rely on the fact that it never errs (which it doesn't - in the current implementation). + // Having the `encodeKey` function return an error here leads to a lot of unnecessary throughout the package. + case []byte: + + key = append(key, Separator) + key = append(key, s...) + + case string: + key = append(key, Separator) + key = append(key, []byte(s)...) + + default: + panic(fmt.Sprintf("unsupported type (%T)", segment)) + } + } + + return key +} diff --git a/store/key_test.go b/store/key_test.go new file mode 100644 index 00000000..075a1e2e --- /dev/null +++ b/store/key_test.go @@ -0,0 +1,47 @@ +package store + +import ( + "bytes" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/blocklessnetwork/b7s/testing/mocks" +) + +func TestStore_KeyEncoding(t *testing.T) { + + t.Run("encoding peer ID works", func(t *testing.T) { + + idBytes, err := mocks.GenericPeerID.MarshalBinary() + require.NoError(t, err) + + encodedKey := bytes.Join([][]byte{ + {0x1}, // peer prefix + idBytes}, // peer ID + []byte{Separator}) // join segments by a separator + + key := encodeKey(PrefixPeer, idBytes) + require.Equal(t, encodedKey, key) + }) + t.Run("encoding function ID works", func(t *testing.T) { + + id := mocks.GenericString + + encodedKey := bytes.Join([][]byte{ + {0x2}, // function prefix + []byte(id)}, // function ID + []byte{Separator}) // join segments by a separator + + key := encodeKey(PrefixFunction, id) + require.Equal(t, encodedKey, key) + }) + t.Run("unsupported key type fails", func(t *testing.T) { + + require.Panics(t, func() { + var empty struct{} + _ = encodeKey(PrefixPeer, empty) + }) + }) + +} diff --git a/store/params.go b/store/params.go new file mode 100644 index 00000000..19755d46 --- /dev/null +++ b/store/params.go @@ -0,0 +1,10 @@ +package store + +const ( + PrefixPeer = 1 + PrefixFunction = 2 +) + +const ( + Separator = ':' +) diff --git a/store/remove.go b/store/remove.go new file mode 100644 index 00000000..669472d1 --- /dev/null +++ b/store/remove.go @@ -0,0 +1,39 @@ +package store + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/libp2p/go-libp2p/core/peer" +) + +func (s *Store) RemovePeer(id peer.ID) error { + + idBytes, err := id.MarshalBinary() + if err != nil { + return fmt.Errorf("could not encode peer ID: %w", err) + } + + key := encodeKey(PrefixPeer, idBytes) + err = s.remove(key) + if err != nil { + return fmt.Errorf("could not remove peer: %w", err) + } + + return nil +} + +func (s *Store) RemoveFunction(cid string) error { + + key := encodeKey(PrefixFunction, cid) + err := s.remove(key) + if err != nil { + return fmt.Errorf("could not remove function: %w", err) + } + + return nil +} + +func (s *Store) remove(key []byte) error { + return s.db.Delete(key, pebble.Sync) +} diff --git a/store/retrieve.go b/store/retrieve.go new file mode 100644 index 00000000..161fdf24 --- /dev/null +++ b/store/retrieve.go @@ -0,0 +1,127 @@ +package store + +import ( + "errors" + "fmt" + + "github.com/cockroachdb/pebble" + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/blocklessnetwork/b7s/models/blockless" +) + +func (s *Store) RetrievePeer(id peer.ID) (blockless.Peer, error) { + + idBytes, err := id.MarshalBinary() + if err != nil { + return blockless.Peer{}, fmt.Errorf("could not serialize peer ID: %w", err) + } + + key := encodeKey(PrefixPeer, idBytes) + var peer blockless.Peer + err = s.retrieve(key, &peer) + if err != nil { + return blockless.Peer{}, fmt.Errorf("could not retrieve value: %w", err) + } + + return peer, nil +} + +func (s *Store) RetrievePeers() ([]blockless.Peer, error) { + + peers := make([]blockless.Peer, 0) + + opts := prefixIterOptions([]byte{PrefixPeer}) + it, err := s.db.NewIter(opts) + if err != nil { + return nil, fmt.Errorf("could not create iterator: %w", err) + } + for it.First(); it.Valid(); it.Next() { + + var peer blockless.Peer + err := s.retrieve(it.Key(), &peer) + if err != nil { + return nil, fmt.Errorf("could not retrieve peer (key: %x): %w", it.Key(), err) + } + + peers = append(peers, peer) + } + + return peers, nil +} + +func (s *Store) RetrieveFunction(cid string) (blockless.FunctionRecord, error) { + + key := encodeKey(PrefixFunction, cid) + var function blockless.FunctionRecord + err := s.retrieve(key, &function) + if err != nil { + return blockless.FunctionRecord{}, fmt.Errorf("could not retrieve function record: %w", err) + } + + return function, nil +} + +func (s *Store) RetrieveFunctions() ([]blockless.FunctionRecord, error) { + + functions := make([]blockless.FunctionRecord, 0) + + opts := prefixIterOptions([]byte{PrefixFunction}) + it, err := s.db.NewIter(opts) + if err != nil { + return nil, fmt.Errorf("could not create iterator: %w", err) + } + for it.First(); it.Valid(); it.Next() { + + var function blockless.FunctionRecord + err := s.retrieve(it.Key(), &function) + if err != nil { + return nil, fmt.Errorf("could not retrieve functioN (key: %x): %w", it.Key(), err) + } + + functions = append(functions, function) + } + + return functions, nil +} + +func (s *Store) retrieve(key []byte, out any) error { + + value, closer, err := s.db.Get(key) + if err != nil { + if errors.Is(err, pebble.ErrNotFound) { + return blockless.ErrNotFound + } + return fmt.Errorf("could not retrieve value: %w", err) + } + // Closer must be called else a memory leak occurs. + defer closer.Close() + + err = s.codec.Unmarshal(value, out) + if err != nil { + return fmt.Errorf("cold not decode record: %w", err) + } + + return nil +} + +func prefixIterOptions(prefix []byte) *pebble.IterOptions { + return &pebble.IterOptions{ + LowerBound: prefix, + UpperBound: iteratorPrefixUpperBound(prefix), + } +} + +func iteratorPrefixUpperBound(prefix []byte) []byte { + + end := make([]byte, len(prefix)) + copy(end, prefix) + for i := len(end) - 1; i >= 0; i-- { + end[i] = end[i] + 1 + if end[i] != 0 { + return end[:i+1] + } + } + + return nil +} diff --git a/store/save.go b/store/save.go new file mode 100644 index 00000000..b1dcf8e2 --- /dev/null +++ b/store/save.go @@ -0,0 +1,51 @@ +package store + +import ( + "fmt" + + "github.com/cockroachdb/pebble" + + "github.com/blocklessnetwork/b7s/models/blockless" +) + +func (s *Store) SavePeer(peer blockless.Peer) error { + + id, err := peer.ID.MarshalBinary() + if err != nil { + return fmt.Errorf("could not serialize peer ID: %w", err) + } + + key := encodeKey(PrefixPeer, id) + err = s.save(key, peer) + if err != nil { + return fmt.Errorf("could not save peer: %w", err) + } + + return nil +} + +func (s *Store) SaveFunction(function blockless.FunctionRecord) error { + + key := encodeKey(PrefixFunction, function.CID) + err := s.save(key, function) + if err != nil { + return fmt.Errorf("could not save function: %w", err) + } + + return nil +} + +func (s *Store) save(key []byte, value any) error { + + encoded, err := s.codec.Marshal(value) + if err != nil { + return fmt.Errorf("could not encode value: %w", err) + } + + err = s.db.Set(key, encoded, pebble.Sync) + if err != nil { + return fmt.Errorf("could not store value: %w", err) + } + + return nil +} diff --git a/store/set.go b/store/set.go deleted file mode 100644 index f0469d42..00000000 --- a/store/set.go +++ /dev/null @@ -1,35 +0,0 @@ -package store - -import ( - "encoding/json" - "fmt" - - "github.com/cockroachdb/pebble" -) - -// Set sets the value for a key. -func (s *Store) Set(key string, value string) error { - - err := s.db.Set([]byte(key), []byte(value), pebble.Sync) - if err != nil { - return fmt.Errorf("could not store value: %w", err) - } - - return nil -} - -// SetRecord will JSON-encode the provided record and store it in the DB. -func (s *Store) SetRecord(key string, value interface{}) error { - - encoded, err := json.Marshal(value) - if err != nil { - return fmt.Errorf("could not serialize the record: %w", err) - } - - err = s.db.Set([]byte(key), encoded, pebble.Sync) - if err != nil { - return fmt.Errorf("could not store value: %w", err) - } - - return nil -} diff --git a/store/store.go b/store/store.go index 6bca45e8..255dee06 100644 --- a/store/store.go +++ b/store/store.go @@ -6,14 +6,16 @@ import ( // Store enables interaction with a database. type Store struct { - db *pebble.DB + db *pebble.DB + codec Codec } // New creates a new Store backed by the database at the given path. -func New(db *pebble.DB) *Store { +func New(db *pebble.DB, codec Codec) *Store { store := Store{ - db: db, + db: db, + codec: codec, } return &store diff --git a/store/store_test.go b/store/store_test.go index 136b8acd..8198127a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -1,251 +1,218 @@ package store_test import ( + "encoding/json" + "errors" + "fmt" "testing" + "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" "github.com/blocklessnetwork/b7s/models/blockless" "github.com/blocklessnetwork/b7s/store" + "github.com/blocklessnetwork/b7s/store/codec" "github.com/blocklessnetwork/b7s/testing/helpers" + "github.com/blocklessnetwork/b7s/testing/mocks" ) -func Test_Store(t *testing.T) { - t.Run("setting value", func(t *testing.T) { - t.Parallel() +func TestStore_PeerOperations(t *testing.T) { + db := helpers.InMemoryDB(t) + defer db.Close() - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) + peer := helpers.CreateRandomPeers(t, 1)[0] + store := store.New(db, codec.NewJSONCodec()) - const ( - key = "some-key" - value = "some-value" - ) - - err := store.Set(key, value) + t.Run("save peer", func(t *testing.T) { + err := store.SavePeer(peer) require.NoError(t, err) - - read, err := store.Get(key) + }) + t.Run("retrieve peer", func(t *testing.T) { + retrieved, err := store.RetrievePeer(peer.ID) require.NoError(t, err) - require.Equal(t, value, read) + require.Equal(t, peer, retrieved) }) - t.Run("missing value correctly reported", func(t *testing.T) { - t.Parallel() - - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) + t.Run("remove peer", func(t *testing.T) { + err := store.RemovePeer(peer.ID) + require.NoError(t, err) - read, err := store.Get("missing-key") - require.Equal(t, "", read) + // Verify peer is gone. + _, err = store.RetrievePeer(peer.ID) require.ErrorIs(t, err, blockless.ErrNotFound) }) - t.Run("overwriting value", func(t *testing.T) { - t.Parallel() +} - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) +func TestStore_RetrievePeers(t *testing.T) { + db := helpers.InMemoryDB(t) + defer db.Close() + store := store.New(db, codec.NewJSONCodec()) + + count := 10 + peers := make(map[peer.ID]blockless.Peer) + generated := helpers.CreateRandomPeers(t, count) + for _, peer := range generated { + peers[peer.ID] = peer + } + + // Save peers. + for _, peer := range peers { + err := store.SavePeer(peer) + require.NoError(t, err) + } - const ( - key = "some-key" - valueV1 = "some-value-v1" - valueV2 = "some-value-v2" - ) + retrieved, err := store.RetrievePeers() + require.NoError(t, err) + require.Len(t, retrieved, count) - // Set value V1. - err := store.Set(key, valueV1) - require.NoError(t, err) + // Verify peers. + for _, peer := range retrieved { + require.Equal(t, peers[peer.ID], peer) + } +} - read, err := store.Get(key) - require.NoError(t, err) +func TestStore_FunctionOperations(t *testing.T) { + db := helpers.InMemoryDB(t) + defer db.Close() - require.Equal(t, valueV1, read) + function := mocks.GenericFunctionRecord + store := store.New(db, codec.NewJSONCodec()) - // Set value V2. - err = store.Set(key, valueV2) + t.Run("save function", func(t *testing.T) { + err := store.SaveFunction(function) require.NoError(t, err) - - read, err = store.Get(key) + }) + t.Run("retrieve function", func(t *testing.T) { + retrieved, err := store.RetrieveFunction(function.CID) require.NoError(t, err) - require.Equal(t, read, valueV2) + require.Equal(t, function, retrieved) }) - t.Run("setting record", func(t *testing.T) { - t.Parallel() - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) + t.Run("remove function", func(t *testing.T) { + err := store.RemoveFunction(function.CID) + require.NoError(t, err) - const ( - key = "some-key" - ) + // Verify function is gone. + _, err = store.RetrieveFunction(function.CID) + require.ErrorIs(t, err, blockless.ErrNotFound) + }) +} - type person struct { - Name string - Age uint +func TestStore_RetrieveFunctions(t *testing.T) { + db := helpers.InMemoryDB(t) + defer db.Close() + store := store.New(db, codec.NewJSONCodec()) + + count := 10 + functions := make(map[string]blockless.FunctionRecord) + for i := 0; i < count; i++ { + + fn := blockless.FunctionRecord{ + CID: fmt.Sprintf("dummy-cid-%v", i), + URL: fmt.Sprintf("https://example.com/dummy-url-%v", i), + Manifest: mocks.GenericManifest, + Archive: fmt.Sprintf("/var/tmp/archive-%v.tar.gz", i), + Files: fmt.Sprintf("/var/tmp/files/%v", i), } - var value = person{ - Name: "John", - Age: 30, - } + functions[fn.CID] = fn + } - err := store.SetRecord(key, value) + // Save functions. + for _, fn := range functions { + err := store.SaveFunction(fn) require.NoError(t, err) + } - var read person - err = store.GetRecord(key, &read) - require.NoError(t, err) + retrieved, err := store.RetrieveFunctions() + require.NoError(t, err) + require.Len(t, retrieved, count) - require.Equal(t, value, read) - }) - t.Run("handling missing record", func(t *testing.T) { - t.Parallel() + // Verify functions. + for _, fn := range retrieved { + require.Equal(t, functions[fn.CID], fn) + } +} - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) +func TestStore_HandlesFailures(t *testing.T) { - const ( - key = "some-key" - ) + db := helpers.InMemoryDB(t) + defer db.Close() - type person struct { - Name string - Age uint - } + t.Run("retrieving missing peer fails", func(t *testing.T) { + store := store.New(db, codec.NewJSONCodec()) - var read person - err := store.GetRecord(key, &read) - require.Equal(t, person{}, read) - require.ErrorIs(t, err, blockless.ErrNotFound) + _, err := store.RetrievePeer(mocks.GenericPeerID) + require.Error(t, err) }) - t.Run("overwriting record", func(t *testing.T) { - t.Parallel() - - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) + t.Run("retrieving missing function fails", func(t *testing.T) { + store := store.New(db, codec.NewJSONCodec()) - const ( - key = "some-key" - ) - - type person struct { - Name string - Age uint - } + _, err := store.RetrieveFunction(mocks.GenericString) + require.Error(t, err) + }) + t.Run("save peer handles marshalling failures", func(t *testing.T) { - var value = person{ - Name: "John", - Age: 30, + codec := mocks.BaselineCodec(t) + codec.MarshalFunc = func(any) ([]byte, error) { + return nil, mocks.GenericError } + store := store.New(db, codec) - err := store.SetRecord(key, value) - require.NoError(t, err) - - var read person - err = store.GetRecord(key, &read) - require.NoError(t, err) - - require.Equal(t, value, read) + err := store.SavePeer(mocks.GenericPeer) + require.Error(t, err) + }) + t.Run("save function handles marshalling failures", func(t *testing.T) { - // Change record values. - valueV2 := person{ - Name: "Paul", - Age: 20, + codec := mocks.BaselineCodec(t) + codec.MarshalFunc = func(any) ([]byte, error) { + return nil, mocks.GenericError } + store := store.New(db, codec) - err = store.SetRecord(key, valueV2) - require.NoError(t, err) - - err = store.GetRecord(key, &read) - require.NoError(t, err) - require.Equal(t, valueV2, read) + err := store.SaveFunction(mocks.GenericFunctionRecord) + require.Error(t, err) }) - t.Run("handle invalid output type", func(t *testing.T) { - t.Parallel() + t.Run("retrieve peer handles unmarshalling failures", func(t *testing.T) { - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) - - const ( - key = "some-key" - ) - - type person struct { - Name string - Age uint + unmarshalErr := errors.New("unmarshalling error") + codec := mocks.BaselineCodec(t) + codec.MarshalFunc = func(obj any) ([]byte, error) { + return json.Marshal(obj) } - - var value = person{ - Name: "John", - Age: 30, + codec.UnmarshalFunc = func([]byte, any) error { + return unmarshalErr } + store := store.New(db, codec) - err := store.SetRecord(key, value) + // First, save the peer so we don't end up with a "not found" error. + peer := helpers.CreateRandomPeers(t, 1)[0] + err := store.SavePeer(peer) require.NoError(t, err) - type invalidModel struct { - Name float64 - Age string - } - - var rec invalidModel - err = store.GetRecord(key, &rec) + _, err = store.RetrievePeer(peer.ID) require.Error(t, err) + require.ErrorIs(t, err, unmarshalErr) }) - t.Run("listing keys", func(t *testing.T) { - t.Parallel() - - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) - - readKeys := store.Keys() - require.Empty(t, readKeys) + t.Run("retrieve function handles unmarshalling failures", func(t *testing.T) { - keys := []string{ - "key1", - "key2", - "key3", - "key4", + unmarshalErr := errors.New("unmarshalling error") + codec := mocks.BaselineCodec(t) + codec.MarshalFunc = func(obj any) ([]byte, error) { + return json.Marshal(obj) } - - for _, key := range keys { - err := store.SetRecord(key, struct{}{}) - require.NoError(t, err) + codec.UnmarshalFunc = func([]byte, any) error { + return unmarshalErr } + store := store.New(db, codec) - readKeys = store.Keys() - require.Equal(t, keys, readKeys) - }) - t.Run("deleting key", func(t *testing.T) { - t.Parallel() - - db := helpers.InMemoryDB(t) - defer db.Close() - store := store.New(db) - - const ( - key = "some-key" - value = "some-value" - ) - - err := store.Set(key, value) - require.NoError(t, err) - - // Deleting valid key works. - err = store.Delete(key) + // First, save the peer so we don't end up with a "not found" error. + err := store.SaveFunction(mocks.GenericFunctionRecord) require.NoError(t, err) - // Value is no longer found. - _, err = store.Get(key) - require.ErrorIs(t, err, blockless.ErrNotFound) + _, err = store.RetrieveFunction(mocks.GenericFunctionRecord.CID) + require.Error(t, err) + require.ErrorIs(t, err, unmarshalErr) }) } diff --git a/testing/helpers/peer.go b/testing/helpers/peer.go new file mode 100644 index 00000000..8ff65171 --- /dev/null +++ b/testing/helpers/peer.go @@ -0,0 +1,63 @@ +package helpers + +import ( + "crypto/rand" + "fmt" + "testing" + + "github.com/blocklessnetwork/b7s/models/blockless" + "github.com/libp2p/go-libp2p/core/peer" + ma "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" + + "github.com/stretchr/testify/require" +) + +// NOTE: Inspiration by go-libp2p/core/test + +func RandPeerID(t *testing.T) peer.ID { + t.Helper() + + buf := make([]byte, 16) + rand.Read(buf) + h, err := mh.Sum(buf, mh.SHA2_256, -1) + require.NoError(t, err) + + return peer.ID(h) +} + +func GenerateTestAddrs(t *testing.T, n int) []ma.Multiaddr { + t.Helper() + + out := make([]ma.Multiaddr, n) + for i := 0; i < n; i++ { + a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/1.2.3.4/tcp/%d", i)) + require.NoError(t, err) + + out[i] = a + } + return out +} + +func CreateRandomPeers(t *testing.T, count int) []blockless.Peer { + + peers := make([]blockless.Peer, count) + for i := 0; i < count; i++ { + + id := RandPeerID(t) + addrs := GenerateTestAddrs(t, 1) + + p := blockless.Peer{ + ID: id, + MultiAddr: addrs[0].String(), + AddrInfo: peer.AddrInfo{ + ID: id, + Addrs: addrs, + }, + } + + peers[i] = p + } + + return peers +} diff --git a/testing/mocks/codec.go b/testing/mocks/codec.go new file mode 100644 index 00000000..f7589ec3 --- /dev/null +++ b/testing/mocks/codec.go @@ -0,0 +1,33 @@ +package mocks + +import ( + "testing" +) + +type Codec struct { + MarshalFunc func(any) ([]byte, error) + UnmarshalFunc func([]byte, any) error +} + +func BaselineCodec(t *testing.T) *Codec { + t.Helper() + + codec := Codec{ + MarshalFunc: func(any) ([]byte, error) { + return []byte{}, nil + }, + UnmarshalFunc: func([]byte, any) error { + return nil + }, + } + + return &codec +} + +func (c *Codec) Marshal(obj any) ([]byte, error) { + return c.MarshalFunc(obj) +} + +func (c *Codec) Unmarshal(data []byte, obj any) error { + return c.UnmarshalFunc(data, obj) +} diff --git a/testing/mocks/fstore.go b/testing/mocks/fstore.go index 7b8b499a..db47f738 100644 --- a/testing/mocks/fstore.go +++ b/testing/mocks/fstore.go @@ -11,7 +11,7 @@ type FStore struct { GetFunc func(string) (*blockless.FunctionManifest, error) InstalledFunc func(string) (bool, error) InstalledFunctionsFunc func() ([]string, error) - SyncFunc func(string) error + SyncFunc func(bool) error } func BaselineFStore(t *testing.T) *FStore { @@ -30,7 +30,7 @@ func BaselineFStore(t *testing.T) *FStore { InstalledFunctionsFunc: func() ([]string, error) { return nil, nil }, - SyncFunc: func(string) error { + SyncFunc: func(bool) error { return nil }, } @@ -54,6 +54,6 @@ func (f *FStore) InstalledFunctions() ([]string, error) { return f.InstalledFunctionsFunc() } -func (f *FStore) Sync(cid string) error { - return f.SyncFunc(cid) +func (f *FStore) Sync(haltOnError bool) error { + return f.SyncFunc(haltOnError) } diff --git a/testing/mocks/generic.go b/testing/mocks/generic.go index 9cf551de..07f8d9af 100644 --- a/testing/mocks/generic.go +++ b/testing/mocks/generic.go @@ -2,10 +2,12 @@ package mocks import ( "errors" + "fmt" "io" "github.com/google/uuid" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" "github.com/rs/zerolog" "github.com/blocklessnetwork/b7s/models/blockless" @@ -23,6 +25,8 @@ var ( GenericAddress = "/ip4/127.0.0.1/tcp/9000/p2p/12D3KooWRp3AVk7qtc2Av6xiqgAza1ZouksQaYcS2cvN94kHSCoa" + GenericMultiaddress = multiaddr.StringCast(GenericAddress) + GenericString = "test" GenericUUID = uuid.UUID{0xd1, 0xc2, 0x44, 0xaf, 0xa3, 0x1d, 0x48, 0x87, 0x93, 0x9d, 0xd6, 0xc7, 0xf, 0xe, 0x4f, 0xd0} @@ -84,4 +88,21 @@ var ( peer.ID([]byte{0x0, 0x24, 0x8, 0x1, 0x12, 0x20, 0xaa, 0xed, 0x63, 0x5d, 0xa1, 0xdf, 0x41, 0x2b, 0xe8, 0x9c, 0x49, 0xed, 0xe8, 0x0, 0x5c, 0xa8, 0x64, 0x58, 0x1d, 0x3, 0xf3, 0x59, 0x41, 0x74, 0xff, 0x2b, 0xcd, 0xde, 0x37, 0xfe, 0x15, 0xc6}), peer.ID([]byte{0x0, 0x24, 0x8, 0x1, 0x12, 0x20, 0xc6, 0x8f, 0x95, 0xd3, 0x98, 0x66, 0x40, 0x6b, 0xc4, 0x6c, 0x19, 0x5e, 0x80, 0xe0, 0x8c, 0x9b, 0x15, 0x4f, 0x8c, 0x6b, 0xd0, 0x1d, 0x5b, 0x83, 0x23, 0x7b, 0x9a, 0x97, 0xc0, 0x9b, 0x9d, 0x9b}), } + + GenericPeer = blockless.Peer{ + ID: GenericPeerID, + MultiAddr: GenericMultiaddress.String(), + AddrInfo: peer.AddrInfo{ + ID: GenericPeerID, + Addrs: []multiaddr.Multiaddr{GenericMultiaddress}, + }, + } + + GenericFunctionRecord = blockless.FunctionRecord{ + CID: "dummy-cid", + URL: fmt.Sprintf("https://example.com/%v", GenericString), + Manifest: GenericManifest, + Archive: "/var/tmp/archive.tar.gz", + Files: "/var/tmp/files", + } ) diff --git a/testing/mocks/peerstore.go b/testing/mocks/peerstore.go deleted file mode 100644 index 41b9c052..00000000 --- a/testing/mocks/peerstore.go +++ /dev/null @@ -1,54 +0,0 @@ -package mocks - -import ( - "testing" - - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" - - "github.com/blocklessnetwork/b7s/models/blockless" -) - -type PeerStore struct { - GetFunc func(peer.ID) (blockless.Peer, error) - StoreFunc func(peer.ID, multiaddr.Multiaddr, peer.AddrInfo) error - PeersFunc func() ([]blockless.Peer, error) - RemoveFunc func(peer.ID) error -} - -func BaselinePeerStore(t *testing.T) *PeerStore { - t.Helper() - - peerstore := PeerStore{ - GetFunc: func(peer.ID) (blockless.Peer, error) { - return blockless.Peer{}, nil - }, - StoreFunc: func(peer.ID, multiaddr.Multiaddr, peer.AddrInfo) error { - return nil - }, - PeersFunc: func() ([]blockless.Peer, error) { - return []blockless.Peer{}, nil - }, - RemoveFunc: func(peer.ID) error { - return GenericError - }, - } - - return &peerstore -} - -func (p *PeerStore) Get(id peer.ID) (blockless.Peer, error) { - return p.GetFunc(id) -} - -func (p *PeerStore) Store(id peer.ID, addr multiaddr.Multiaddr, info peer.AddrInfo) error { - return p.StoreFunc(id, addr, info) -} - -func (p *PeerStore) Peers() ([]blockless.Peer, error) { - return p.PeersFunc() -} - -func (p *PeerStore) Remove(id peer.ID) error { - return p.RemoveFunc(id) -} diff --git a/testing/mocks/store.go b/testing/mocks/store.go index 3d8ce7cd..5b8f7be0 100644 --- a/testing/mocks/store.go +++ b/testing/mocks/store.go @@ -2,64 +2,77 @@ package mocks import ( "testing" + + "github.com/libp2p/go-libp2p/core/peer" + + "github.com/blocklessnetwork/b7s/models/blockless" ) type Store struct { - GetFunc func(key string) (string, error) - SetFunc func(key string, value string) error - GetRecordFunc func(key string, value interface{}) error - SetRecordFunc func(key string, value interface{}) error - DeleteFunc func(key string) error - KeysFunc func() []string + SavePeerFunc func(blockless.Peer) error + SaveFunctionFunc func(blockless.FunctionRecord) error + RetrievePeerFunc func(peer.ID) (blockless.Peer, error) + RetrievePeersFunc func() ([]blockless.Peer, error) + RetrieveFunctionFunc func(string) (blockless.FunctionRecord, error) + RetrieveFunctionsFunc func() ([]blockless.FunctionRecord, error) + RemovePeerFunc func(peer.ID) error + RemoveFunctionFunc func(string) error } func BaselineStore(t *testing.T) *Store { t.Helper() store := Store{ - GetFunc: func(string) (string, error) { - return GenericString, nil - }, - SetFunc: func(string, string) error { + SavePeerFunc: func(blockless.Peer) error { return nil }, - GetRecordFunc: func(string, interface{}) error { + SaveFunctionFunc: func(blockless.FunctionRecord) error { return nil }, - SetRecordFunc: func(string, interface{}) error { - return nil + RetrievePeerFunc: func(peer.ID) (blockless.Peer, error) { + return GenericPeer, nil }, - DeleteFunc: func(string) error { + RetrievePeersFunc: func() ([]blockless.Peer, error) { + return []blockless.Peer{GenericPeer}, nil + }, + RetrieveFunctionFunc: func(string) (blockless.FunctionRecord, error) { + return GenericFunctionRecord, nil + }, + RetrieveFunctionsFunc: func() ([]blockless.FunctionRecord, error) { + return []blockless.FunctionRecord{GenericFunctionRecord}, nil + }, + RemovePeerFunc: func(peer.ID) error { return nil }, - KeysFunc: func() []string { - return []string{} + RemoveFunctionFunc: func(string) error { + return nil }, } return &store } -func (s *Store) Get(key string) (string, error) { - return s.GetFunc(key) +func (s *Store) SavePeer(peer blockless.Peer) error { + return s.SavePeerFunc(peer) } - -func (s *Store) Set(key string, value string) error { - return s.SetFunc(key, value) +func (s *Store) SaveFunction(function blockless.FunctionRecord) error { + return s.SaveFunctionFunc(function) } - -func (s *Store) GetRecord(key string, value interface{}) error { - return s.GetRecordFunc(key, value) +func (s *Store) RetrievePeer(id peer.ID) (blockless.Peer, error) { + return s.RetrievePeerFunc(id) } - -func (s *Store) SetRecord(key string, value interface{}) error { - return s.SetRecordFunc(key, value) +func (s *Store) RetrievePeers() ([]blockless.Peer, error) { + return s.RetrievePeersFunc() } - -func (s *Store) Delete(key string) error { - return s.DeleteFunc(key) +func (s *Store) RetrieveFunction(cid string) (blockless.FunctionRecord, error) { + return s.RetrieveFunctionFunc(cid) } - -func (s *Store) Keys() []string { - return s.KeysFunc() +func (s *Store) RetrieveFunctions() ([]blockless.FunctionRecord, error) { + return s.RetrieveFunctionsFunc() +} +func (s *Store) RemovePeer(id peer.ID) error { + return s.RemovePeerFunc(id) +} +func (s *Store) RemoveFunction(id string) error { + return s.RemoveFunctionFunc(id) }