From 7af07bd08f4eed66fbc9eee47762fd2c3365c89f Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Wed, 11 Dec 2024 11:28:04 +0100 Subject: [PATCH 01/11] fix(api): add time.Duration and rsmt2d.Axis types (#3994) --- api/docgen/examples.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/docgen/examples.go b/api/docgen/examples.go index 8d56506d1b..640cfe8ab3 100644 --- a/api/docgen/examples.go +++ b/api/docgen/examples.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "reflect" + "time" "cosmossdk.io/math" sdk "github.com/cosmos/cosmos-sdk/types" @@ -63,6 +64,7 @@ var ExampleValues = map[reflect.Type]interface{}{ reflect.TypeOf(float64(42)): float64(42), reflect.TypeOf(true): true, reflect.TypeOf([]byte{}): []byte("byte array"), + reflect.TypeOf(time.Duration(0)): time.Second, reflect.TypeOf(node.Full): node.Full, reflect.TypeOf(auth.Permission("admin")): auth.Permission("admin"), reflect.TypeOf(byzantine.BadEncoding): byzantine.BadEncoding, @@ -188,6 +190,8 @@ func init() { state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"), ) addToExampleValues(txConfig) + + addToExampleValues(rsmt2d.Row) } func addToExampleValues(v interface{}) { From 01bb3e2db655fca370f530d2fb7177a39dd8acba Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 12 Dec 2024 06:53:29 +0200 Subject: [PATCH 02/11] fix(cmd/blob): fix namespace parsing in cli (#4003) --- nodebuilder/blob/cmd/blob.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/nodebuilder/blob/cmd/blob.go b/nodebuilder/blob/cmd/blob.go index 504533d2fc..17eec5bbf1 100644 --- a/nodebuilder/blob/cmd/blob.go +++ b/nodebuilder/blob/cmd/blob.go @@ -43,11 +43,11 @@ var getCmd = &cobra.Command{ Short: "Returns the blob for the given namespace by commitment at a particular height.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] } - if !strings.HasPrefix(args[2], "0x") { - return fmt.Errorf("only hex commitment is supported") + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, @@ -84,8 +84,11 @@ var getAllCmd = &cobra.Command{ Short: "Returns all blobs for the given namespace at a particular height.\n" + "Note:\n* Namespace input parameter is expected to be in its hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] + } if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + args[1] = "0x" + args[1] } return nil }, @@ -136,7 +139,10 @@ var submitCmd = &cobra.Command{ }, PreRunE: func(_ *cobra.Command, args []string) error { if !strings.HasPrefix(args[0], "0x") { - return fmt.Errorf("only hex namespace is supported") + args[0] = "0x" + args[0] + } + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, @@ -235,11 +241,11 @@ var getProofCmd = &cobra.Command{ Short: "Retrieves the blob in the given namespaces at the given height by commitment and returns its Proof.\n" + "Note:\n* Both namespace and commitment input parameters are expected to be in their hex representation.", PreRunE: func(_ *cobra.Command, args []string) error { - if !strings.HasPrefix(args[1], "0x") { - return fmt.Errorf("only hex namespace is supported") + if !strings.HasPrefix(args[0], "0x") { + args[0] = "0x" + args[0] } - if !strings.HasPrefix(args[2], "0x") { - return fmt.Errorf("only hex commitment is supported") + if !strings.HasPrefix(args[1], "0x") { + args[1] = "0x" + args[1] } return nil }, From 7c586c7d41c87b23725812236072e8241c45305e Mon Sep 17 00:00:00 2001 From: Nguyen Nhu Viet Date: Fri, 13 Dec 2024 12:31:50 +0100 Subject: [PATCH 03/11] chore/make: add celestia-node make file for arabica testnet (#3992) Co-authored-by: Viacheslav --- Makefile | 3 ++ README.md | 37 ++++++++++++++++- celestia-node.mk | 101 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) create mode 100644 celestia-node.mk diff --git a/Makefile b/Makefile index def65cc137..8c1d06f0a8 100644 --- a/Makefile +++ b/Makefile @@ -21,6 +21,9 @@ ifeq ($(SHORT),true) else INTEGRATION_RUN_LENGTH = endif + +include celestia-node.mk + ## help: Get more info on make commands. help: Makefile @echo " Choose a command run in "$(PROJECTNAME)":" diff --git a/README.md b/README.md index d4667e6fb0..6db1959599 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ Continue reading [here](https://blog.celestia.org/celestia-mvp-release-data-avai - [API docs](#api-docs) - [Node types](#node-types) - [Run a node](#run-a-node) + - [Quick Start with Light Node on arabica](#quick-start-with-light-node-on-arabica) - [Environment variables](#environment-variables) - [Package-specific documentation](#package-specific-documentation) - [Code of Conduct](#code-of-conduct) @@ -31,7 +32,7 @@ Continue reading [here](https://blog.celestia.org/celestia-mvp-release-data-avai ## Minimum requirements | Requirement | Notes | -| ----------- |----------------| +| ----------- | -------------- | | Go version | 1.23 or higher | ## System Requirements @@ -79,6 +80,40 @@ celestia start Please refer to [this guide](https://docs.celestia.org/nodes/celestia-node/) for more information on running a node. +### Quick Start with Light Node on arabica + +View available commands and their usage: + +```sh +make node-help +``` + +Install celestia node and cel-key binaries: + +```sh +make node-install +``` + +Start a light node with automated setup: + +```sh +make light-arabica-up +``` + +This command: + +- Automatically checks wallet balance +- Requests funds from faucet if needed +- Sets node height to latest-1 for quick startup +- Initializes the node if running for the first time + +Options: + +```sh +make light-arabica-up COMMAND=again # Reset node state to latest height +make light-arabica-up CORE_IP= # Use custom core IP +``` + ## Environment variables | Variable | Explanation | Default value | Required | diff --git a/celestia-node.mk b/celestia-node.mk new file mode 100644 index 0000000000..2729acc13b --- /dev/null +++ b/celestia-node.mk @@ -0,0 +1,101 @@ +# Celestia Node Management Rules +# These rules can be included in the main Makefile + +.PHONY: install get-address check-and-fund reset-node light-up node-help + +node-help: + @echo "Celestia Light Node Management Commands:" + @echo "" + @echo "Available targets:" + @echo " node-install - Install celestia node and cel-key binaries" + @echo " get-address - Display the wallet address from cel-key" + @echo " check-and-fund - Check wallet balance and request funds if needed" + @echo " reset-node - Reset node state and update config with latest block height" + @echo " light-arabica-up - Start the Celestia light node" + @echo "" + @echo "Special usage:" + @echo " light-arabica-up options:" + @echo " COMMAND=again - Reset the node before starting" + @echo " CORE_IP= - Use custom IP instead of default validator" + @echo "" + @echo "Examples:" + @echo " make light-arabica-up" + @echo " make light-arabica-up COMMAND=again" + @echo " make light-arabica-up CORE_IP=custom.ip.address" + @echo " make light-arabica-up COMMAND=again CORE_IP=custom.ip.address" + +# Install celestia node and cel-key binaries +node-install: + make install + make cel-key + +# Get wallet address from cel-key +get-address: + @address=$$(cel-key list --node.type light --p2p.network arabica | grep "address: " | cut -d' ' -f3); \ + echo $$address + +# Check balance and fund if needed +check-and-fund: + @address=$$(cel-key list --node.type light --p2p.network arabica | grep "address: " | cut -d' ' -f3); \ + echo "Checking balance for address: $$address"; \ + balance=$$(curl -s "https://api.celestia-arabica-11.com/cosmos/bank/v1beta1/balances/$$address" | jq -r '.balances[] | select(.denom == "utia") | .amount // "0"'); \ + if [[ $$balance =~ ^[0-9]+$$ ]]; then \ + balance_tia=$$(echo "scale=6; $$balance/1000000" | bc); \ + echo "Current balance: $$balance_tia TIA"; \ + else \ + balance_tia=0; \ + fi; \ + if (( $$(echo "$$balance_tia < 1" | bc -l) )); then \ + echo "Balance too low. Requesting funds from faucet..."; \ + curl -X POST 'https://faucet.celestia-arabica-11.com/api/v1/faucet/give_me' \ + -H 'Content-Type: application/json' \ + -d '{"address": "'$$address'", "chainId": "arabica-11" }'; \ + echo "Waiting 10 seconds for transaction to process..."; \ + sleep 10; \ + fi + +# Reset node state and update config +reset-node: + @echo "Resetting node state..." + @celestia light unsafe-reset-store --p2p.network arabica + @echo "Getting latest block height and hash..." + @block_response=$$(curl -s https://rpc.celestia-arabica-11.com/block); \ + latest_block=$$(echo $$block_response | jq -r '.result.block.header.height'); \ + latest_hash=$$(echo $$block_response | jq -r '.result.block_id.hash'); \ + echo "Latest block height: $$latest_block"; \ + echo "Latest block hash: $$latest_hash"; \ + config_file="$$HOME/.celestia-light-arabica-11/config.toml"; \ + echo "Updating config.toml..."; \ + sed -i.bak -e "s/\(TrustedHash[[:space:]]*=[[:space:]]*\).*/\1\"$$latest_hash\"/" \ + -e "s/\(SampleFrom[[:space:]]*=[[:space:]]*\).*/\1$$latest_block/" \ + "$$config_file"; \ + echo "Configuration updated successfully" + +# Start the Celestia light node +# Usage: make light-arabica-up [COMMAND=again] [CORE_IP=custom_ip] +light-arabica-up: + @config_file="$$HOME/.celestia-light-arabica-11/config.toml"; \ + if [ "$(COMMAND)" = "again" ]; then \ + $(MAKE) reset-node; \ + fi; \ + if [ -e "$$config_file" ]; then \ + echo "Using config file: $$config_file"; \ + else \ + celestia light init --p2p.network arabica; \ + $(MAKE) reset-node; \ + $(MAKE) check-and-fund; \ + fi; \ + $(MAKE) check-and-fund; \ + if [ -n "$(CORE_IP)" ]; then \ + celestia light start \ + --core.ip $(CORE_IP) \ + --rpc.skip-auth \ + --rpc.addr 0.0.0.0 \ + --p2p.network arabica; \ + else \ + celestia light start \ + --core.ip validator-1.celestia-arabica-11.com \ + --rpc.skip-auth \ + --rpc.addr 0.0.0.0 \ + --p2p.network arabica; \ + fi From 1d555a3234e622d225260fc5a5bbad02979c0f40 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Fri, 13 Dec 2024 13:55:02 +0200 Subject: [PATCH 04/11] fix(sync/test): fix TestSyncStartStopLightWithBridge (#3990) --- nodebuilder/tests/sync_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nodebuilder/tests/sync_test.go b/nodebuilder/tests/sync_test.go index 1a63975574..48de6376a3 100644 --- a/nodebuilder/tests/sync_test.go +++ b/nodebuilder/tests/sync_test.go @@ -250,6 +250,7 @@ func TestSyncStartStopLightWithBridge(t *testing.T) { light = sw.NewLightNode() require.NoError(t, light.Start(ctx)) + lightClient = getAdminClient(ctx, light, t) // ensure when light node comes back up, it can sync the remainder of the chain it // missed while sleeping From af7a0b70c7961537f7da770a33074b3ff85215cc Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Fri, 13 Dec 2024 16:22:09 +0100 Subject: [PATCH 05/11] perf(shwap): cache Both Row sides (#4005) --- share/eds/rsmt2d.go | 7 +-- share/shwap/row.go | 97 +++++++++++++++++++++++++++-------------- share/shwap/row_test.go | 52 +++++++++------------- 3 files changed, 86 insertions(+), 70 deletions(-) diff --git a/share/eds/rsmt2d.go b/share/eds/rsmt2d.go index 6c244de700..1d806324b5 100644 --- a/share/eds/rsmt2d.go +++ b/share/eds/rsmt2d.go @@ -99,12 +99,7 @@ func (eds *Rsmt2D) AxisHalf(_ context.Context, axisType rsmt2d.Axis, axisIdx int // HalfRow constructs a new shwap.Row from an Extended Data Square based on the specified index and // side. func (eds *Rsmt2D) HalfRow(idx int, side shwap.RowSide) (shwap.Row, error) { - shares := eds.ExtendedDataSquare.Row(uint(idx)) - sh, err := libshare.FromBytes(shares) - if err != nil { - return shwap.Row{}, fmt.Errorf("while converting shares from bytes: %w", err) - } - return shwap.RowFromShares(sh, side), nil + return shwap.RowFromEDS(eds.ExtendedDataSquare, idx, side) } // RowNamespaceData returns data for the given namespace and row index. diff --git a/share/shwap/row.go b/share/shwap/row.go index ba2c0d0b82..7057680573 100644 --- a/share/shwap/row.go +++ b/share/shwap/row.go @@ -6,6 +6,7 @@ import ( "github.com/celestiaorg/celestia-app/v3/pkg/wrapper" libshare "github.com/celestiaorg/go-square/v2/share" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/shwap/pb" @@ -20,33 +21,42 @@ type RowSide int const ( Left RowSide = iota // Left side of the row. Right // Right side of the row. + Both // Both sides of the row. ) // Row represents a portion of a row in an EDS, either left or right half. type Row struct { - halfShares []libshare.Share // halfShares holds the shares of either the left or right half of a row. - side RowSide // side indicates whether the row half is left or right. + shares []libshare.Share // holds the shares of Left or Right or Both sides of the row. + side RowSide // side indicates which side the shares belong to. } // NewRow creates a new Row with the specified shares and side. -func NewRow(halfShares []libshare.Share, side RowSide) Row { +func NewRow(shares []libshare.Share, side RowSide) Row { return Row{ - halfShares: halfShares, - side: side, + shares: shares, + side: side, } } -// RowFromShares constructs a new Row from an Extended Data Square based on the specified index and -// side. -func RowFromShares(shares []libshare.Share, side RowSide) Row { - var halfShares []libshare.Share - if side == Right { - halfShares = shares[len(shares)/2:] // Take the right half of the shares. - } else { - halfShares = shares[:len(shares)/2] // Take the left half of the shares. +// RowFromEDS constructs a new Row from an EDS based on the specified row index and side. +func RowFromEDS(eds *rsmt2d.ExtendedDataSquare, rowIdx int, side RowSide) (Row, error) { + rowBytes := eds.Row(uint(rowIdx)) + shares, err := libshare.FromBytes(rowBytes) + if err != nil { + return Row{}, fmt.Errorf("while converting shares from bytes: %w", err) + } + + switch side { + case Both: + case Left: + shares = shares[:len(shares)/2] + case Right: + shares = shares[len(shares)/2:] + default: + return Row{}, fmt.Errorf("invalid RowSide: %d", side) } - return NewRow(halfShares, side) + return NewRow(shares, side), nil } // RowFromProto converts a protobuf Row to a Row structure. @@ -56,20 +66,22 @@ func RowFromProto(r *pb.Row) (Row, error) { return Row{}, err } return Row{ - halfShares: shrs, - side: sideFromProto(r.GetHalfSide()), + shares: shrs, + side: sideFromProto(r.GetHalfSide()), }, nil } // Shares reconstructs the complete row shares from the half provided, using RSMT2D for data // recovery if needed. -func (r Row) Shares() ([]libshare.Share, error) { - shares := make([]libshare.Share, len(r.halfShares)*2) - offset := 0 - if r.side == Right { - offset = len(r.halfShares) // Position the halfShares in the second half if it's the right side. +// It caches the reconstructed shares for future use and converts Row to Both side. +func (r *Row) Shares() ([]libshare.Share, error) { + if r.side == Both { + return r.shares, nil } - for i, share := range r.halfShares { + + shares := make([]libshare.Share, len(r.shares)*2) + offset := len(r.shares) * int(r.side) + for i, share := range r.shares { shares[i+offset] = share } @@ -77,33 +89,52 @@ func (r Row) Shares() ([]libshare.Share, error) { if err != nil { return nil, err } - return libshare.FromBytes(rowShares) + + r.shares, err = libshare.FromBytes(rowShares) + if err != nil { + return nil, err + } + + r.side = Both + return r.shares, nil } // ToProto converts the Row to its protobuf representation. func (r Row) ToProto() *pb.Row { + if r.side == Both { + // we don't need to send the whole row over the wire + // so if we have both sides, we can save bandwidth and send the left half only + return &pb.Row{ + SharesHalf: SharesToProto(r.shares[:len(r.shares)/2]), + HalfSide: pb.Row_LEFT, + } + } + return &pb.Row{ - SharesHalf: SharesToProto(r.halfShares), + SharesHalf: SharesToProto(r.shares), HalfSide: r.side.ToProto(), } } // IsEmpty reports whether the Row is empty, i.e. doesn't contain any shares. func (r Row) IsEmpty() bool { - return r.halfShares == nil + return len(r.shares) == 0 } // Verify checks if the row's shares match the expected number from the root data and validates // the side of the row. -func (r Row) Verify(roots *share.AxisRoots, idx int) error { - if len(r.halfShares) == 0 { - return fmt.Errorf("empty half row") +func (r *Row) Verify(roots *share.AxisRoots, idx int) error { + if len(r.shares) == 0 { + return fmt.Errorf("empt row") + } + expectedShares := len(roots.RowRoots) + if r.side != Both { + expectedShares /= 2 } - expectedShares := len(roots.RowRoots) / 2 - if len(r.halfShares) != expectedShares { - return fmt.Errorf("shares size doesn't match root size: %d != %d", len(r.halfShares), expectedShares) + if len(r.shares) != expectedShares { + return fmt.Errorf("shares size doesn't match root size: %d != %d", len(r.shares), expectedShares) } - if r.side != Left && r.side != Right { + if r.side != Left && r.side != Right && r.side != Both { return fmt.Errorf("invalid RowSide: %d", r.side) } @@ -115,7 +146,7 @@ func (r Row) Verify(roots *share.AxisRoots, idx int) error { // verifyInclusion verifies the integrity of the row's shares against the provided root hash for the // given row index. -func (r Row) verifyInclusion(roots *share.AxisRoots, idx int) error { +func (r *Row) verifyInclusion(roots *share.AxisRoots, idx int) error { shrs, err := r.Shares() if err != nil { return fmt.Errorf("while extending shares: %w", err) diff --git a/share/shwap/row_test.go b/share/shwap/row_test.go index 9249ea87f4..16bce3893b 100644 --- a/share/shwap/row_test.go +++ b/share/shwap/row_test.go @@ -11,28 +11,20 @@ import ( "github.com/celestiaorg/celestia-node/share/eds/edstest" ) -func TestRowFromShares(t *testing.T) { +func TestRowShares(t *testing.T) { const odsSize = 8 eds := edstest.RandEDS(t, odsSize) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) + require.Equal(t, side, row.side) + extended, err := row.Shares() require.NoError(t, err) - require.Equal(t, shares, extended) - - var half []libshare.Share - if side == Right { - half = shares[odsSize:] - } else { - half = shares[:odsSize] - } - require.Equal(t, half, row.halfShares) - require.Equal(t, side, row.side) + require.Len(t, extended, odsSize*2) + require.Equal(t, Both, row.side) } } } @@ -44,11 +36,9 @@ func TestRowValidate(t *testing.T) { require.NoError(t, err) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) err = row.Verify(root, rowIdx) require.NoError(t, err) @@ -65,10 +55,10 @@ func TestRowValidateNegativeCases(t *testing.T) { shrs := eds.Row(0) shares, err := libshare.FromBytes(shrs) require.NoError(t, err) - row := RowFromShares(shares, Left) + row := NewRow(shares, Left) // Test with incorrect side specification - invalidSideRow := Row{halfShares: row.halfShares, side: RowSide(999)} + invalidSideRow := Row{shares: row.shares, side: RowSide(999)} err = invalidSideRow.Verify(root, 0) require.Error(t, err, "should error on invalid row side") @@ -79,12 +69,12 @@ func TestRowValidateNegativeCases(t *testing.T) { require.NoError(t, err) incorrectShares[i] = *shr } - invalidRow := Row{halfShares: incorrectShares, side: Left} + invalidRow := Row{shares: incorrectShares, side: Left} err = invalidRow.Verify(root, 0) require.Error(t, err, "should error on incorrect number of shares") // Test with empty shares - emptyRow := Row{halfShares: []libshare.Share{}, side: Left} + emptyRow := Row{shares: []libshare.Share{}, side: Left} err = emptyRow.Verify(root, 0) require.Error(t, err, "should error on empty halfShares") @@ -99,16 +89,18 @@ func TestRowProtoEncoding(t *testing.T) { eds := edstest.RandEDS(t, odsSize) for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { - for _, side := range []RowSide{Left, Right} { - shrs := eds.Row(uint(rowIdx)) - shares, err := libshare.FromBytes(shrs) + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) require.NoError(t, err) - row := RowFromShares(shares, side) pb := row.ToProto() rowOut, err := RowFromProto(pb) require.NoError(t, err) - require.Equal(t, row, rowOut) + if side == Both { + require.NotEqual(t, row, rowOut) + } else { + require.Equal(t, row, rowOut) + } } } } @@ -120,10 +112,8 @@ func BenchmarkRowValidate(b *testing.B) { eds := edstest.RandEDS(b, odsSize) root, err := share.NewAxisRoots(eds) require.NoError(b, err) - shrs := eds.Row(0) - shares, err := libshare.FromBytes(shrs) + row, err := RowFromEDS(eds, 0, Left) require.NoError(b, err) - row := RowFromShares(shares, Left) b.ResetTimer() for i := 0; i < b.N; i++ { From 4d3d3c20330347a4fafc04ad538b4ae0832ed50b Mon Sep 17 00:00:00 2001 From: Oleg Kovalov Date: Mon, 23 Dec 2024 10:55:33 +0100 Subject: [PATCH 06/11] refactor(docgen): simplify init (#3997) --- api/docgen/examples.go | 196 ++++++++++++++++++++--------------------- api/docgen/openrpc.go | 2 +- 2 files changed, 96 insertions(+), 102 deletions(-) diff --git a/api/docgen/examples.go b/api/docgen/examples.go index 640cfe8ab3..572d5c3f04 100644 --- a/api/docgen/examples.go +++ b/api/docgen/examples.go @@ -35,171 +35,158 @@ import ( "github.com/celestiaorg/celestia-node/state" ) -//go:embed "exampledata/extendedHeader.json" -var exampleExtendedHeader string - -//go:embed "exampledata/samplingStats.json" -var exampleSamplingStats string - -//go:embed "exampledata/txResponse.json" -var exampleTxResponse string - -//go:embed "exampledata/resourceManagerStats.json" -var exampleResourceMngrStats string - -//go:embed "exampledata/blob.json" -var exampleBlob string - -//go:embed "exampledata/blobProof.json" -var exampleBlobProof string - -var ExampleValues = map[reflect.Type]interface{}{ - reflect.TypeOf(""): "string value", - reflect.TypeOf(uint64(42)): uint64(42), - reflect.TypeOf(uint32(42)): uint32(42), - reflect.TypeOf(int32(42)): int32(42), - reflect.TypeOf(int64(42)): int64(42), - reflect.TypeOf(42): 42, - reflect.TypeOf(byte(7)): byte(7), - reflect.TypeOf(float64(42)): float64(42), - reflect.TypeOf(true): true, - reflect.TypeOf([]byte{}): []byte("byte array"), - reflect.TypeOf(time.Duration(0)): time.Second, - reflect.TypeOf(node.Full): node.Full, - reflect.TypeOf(auth.Permission("admin")): auth.Permission("admin"), - reflect.TypeOf(byzantine.BadEncoding): byzantine.BadEncoding, - reflect.TypeOf((*fraud.Proof[*header.ExtendedHeader])(nil)).Elem(): byzantine.CreateBadEncodingProof( +var ( + //go:embed "exampledata/extendedHeader.json" + exampleExtendedHeader string + + //go:embed "exampledata/samplingStats.json" + exampleSamplingStats string + + //go:embed "exampledata/txResponse.json" + exampleTxResponse string + + //go:embed "exampledata/resourceManagerStats.json" + exampleResourceMngrStats string + + //go:embed "exampledata/blob.json" + exampleBlob string + + //go:embed "exampledata/blobProof.json" + exampleBlobProof string +) + +var exampleValues = map[reflect.Type]any{} + +func add(v any) { + typ := reflect.TypeOf(v) + exampleValues[typ] = v +} + +func init() { + add("string value") + add(uint64(42)) + add(uint32(42)) + add(int32(42)) + add(int64(42)) + add(42) + add(byte(7)) + add(float64(42)) + add(float64(42)) + add(true) + add([]byte("byte array")) + add(time.Second) + add(node.Full) + add(auth.Permission("admin")) + add(byzantine.BadEncoding) + + // TODO: this case requires more debugging, simple to leave it as it was. + exampleValues[reflect.TypeOf((*fraud.Proof[*header.ExtendedHeader])(nil)).Elem()] = byzantine.CreateBadEncodingProof( []byte("bad encoding proof"), 42, &byzantine.ErrByzantine{ Index: 0, - Axis: rsmt2d.Axis(0), Shares: []*byzantine.ShareWithProof{}, + Axis: rsmt2d.Axis(0), }, - ), - reflect.TypeOf((*error)(nil)).Elem(): errors.New("error"), - reflect.TypeOf(state.Balance{}): state.Balance{Amount: sdk.NewInt(42), Denom: "utia"}, -} + ) -func init() { - addToExampleValues(share.EmptyEDS()) - addr, err := sdk.AccAddressFromBech32("celestia1377k5an3f94v6wyaceu0cf4nq6gk2jtpc46g7h") - if err != nil { - panic(err) - } - addToExampleValues(addr) - ExampleValues[reflect.TypeOf((*sdk.Address)(nil)).Elem()] = addr + add(errors.New("error")) + add(state.Balance{Amount: sdk.NewInt(42), Denom: "utia"}) + add(share.EmptyEDS()) + add(rsmt2d.Row) + add(network.Connected) + add(network.ReachabilityPrivate) - valAddr, err := sdk.ValAddressFromBech32("celestiavaloper1q3v5cugc8cdpud87u4zwy0a74uxkk6u4q4gx4p") - if err != nil { - panic(err) - } - addToExampleValues(valAddr) + addr := must(sdk.AccAddressFromBech32("celestia1377k5an3f94v6wyaceu0cf4nq6gk2jtpc46g7h")) + add(addr) + add(state.Address{Address: addr}) + exampleValues[reflect.TypeOf((*sdk.Address)(nil)).Elem()] = addr - addToExampleValues(state.Address{Address: addr}) + valAddr := must(sdk.ValAddressFromBech32("celestiavaloper1q3v5cugc8cdpud87u4zwy0a74uxkk6u4q4gx4p")) + add(valAddr) var txResponse *state.TxResponse - err = json.Unmarshal([]byte(exampleTxResponse), &txResponse) + err := json.Unmarshal([]byte(exampleTxResponse), &txResponse) if err != nil { panic(err) } + add(txResponse) var samplingStats das.SamplingStats err = json.Unmarshal([]byte(exampleSamplingStats), &samplingStats) if err != nil { panic(err) } + add(samplingStats) var extendedHeader *header.ExtendedHeader err = json.Unmarshal([]byte(exampleExtendedHeader), &extendedHeader) if err != nil { panic(err) } + add(extendedHeader) var resourceMngrStats rcmgr.ResourceManagerStat err = json.Unmarshal([]byte(exampleResourceMngrStats), &resourceMngrStats) if err != nil { panic(err) } + add(resourceMngrStats) var exBlob *blob.Blob err = json.Unmarshal([]byte(exampleBlob), &exBlob) if err != nil { panic(err) } + add(exBlob) + add(exBlob.Blob) var blobProof *blob.Proof err = json.Unmarshal([]byte(exampleBlobProof), &blobProof) if err != nil { panic(err) } - - addToExampleValues(exBlob) - addToExampleValues(exBlob.Blob) - addToExampleValues(blobProof) - addToExampleValues(txResponse) - addToExampleValues(samplingStats) - addToExampleValues(extendedHeader) - addToExampleValues(resourceMngrStats) + add(blobProof) mathInt, _ := math.NewIntFromString("42") - addToExampleValues(mathInt) - - addToExampleValues(network.Connected) - addToExampleValues(network.ReachabilityPrivate) + add(mathInt) pID := protocol.ID("/celestia/mocha/ipfs/bitswap") - addToExampleValues(pID) + add(pID) peerID := peer.ID("12D3KooWPRb5h3g9MH7sx9qfbSQZG5cXv1a2Qs3o4aW5YmmzPq82") - addToExampleValues(peerID) + add(peerID) ma, _ := multiaddr.NewMultiaddr("/ip6/::1/udp/2121/quic-v1") addrInfo := peer.AddrInfo{ ID: peerID, Addrs: []multiaddr.Multiaddr{ma}, } - addToExampleValues(addrInfo) + add(addrInfo) - commitment, err := base64.StdEncoding.DecodeString("aHlbp+J9yub6hw/uhK6dP8hBLR2mFy78XNRRdLf2794=") - if err != nil { - panic(err) - } - addToExampleValues(blob.Commitment(commitment)) + commitment := must(base64.StdEncoding.DecodeString("aHlbp+J9yub6hw/uhK6dP8hBLR2mFy78XNRRdLf2794=")) + add(blob.Commitment(commitment)) // randomly generated namespace that's used in the blob example above // (AAAAAAAAAAAAAAAAAAAAAAAAAAAAAMJ/xGlNMdE=) - namespace, err := libshare.NewV0Namespace([]byte{0xc2, 0x7f, 0xc4, 0x69, 0x4d, 0x31, 0xd1}) - if err != nil { - panic(err) - } - addToExampleValues(namespace) + namespace := must(libshare.NewV0Namespace([]byte{0xc2, 0x7f, 0xc4, 0x69, 0x4d, 0x31, 0xd1})) + add(namespace) hashStr := "453D0BC3CB88A2ED6F2E06021383B22C72D25D7741AE51B4CAE1AD34D72A3F07" - hash, err := hex.DecodeString(hashStr) - if err != nil { - panic(err) - } - addToExampleValues(libhead.Hash(hash)) + hash := must(hex.DecodeString(hashStr)) + add(libhead.Hash(hash)) - txConfig := state.NewTxConfig( + add(state.NewTxConfig( state.WithGasPrice(0.002), state.WithGas(142225), state.WithKeyName("my_celes_key"), state.WithSignerAddress("celestia1pjcmwj8w6hyr2c4wehakc5g8cfs36aysgucx66"), state.WithFeeGranterAddress("celestia1hakc56ax66ypjcmwj8w6hyr2c4g8cfs3wesguc"), - ) - addToExampleValues(txConfig) - - addToExampleValues(rsmt2d.Row) -} - -func addToExampleValues(v interface{}) { - ExampleValues[reflect.TypeOf(v)] = v + )) } -func ExampleValue(t, parent reflect.Type) (interface{}, error) { - v, ok := ExampleValues[t] +func exampleValue(t, parent reflect.Type) (any, error) { + v, ok := exampleValues[t] if ok { return v, nil } @@ -207,26 +194,26 @@ func ExampleValue(t, parent reflect.Type) (interface{}, error) { switch t.Kind() { case reflect.Slice: out := reflect.New(t).Elem() - val, err := ExampleValue(t.Elem(), t) + val, err := exampleValue(t.Elem(), t) if err != nil { return nil, err } out = reflect.Append(out, reflect.ValueOf(val)) return out.Interface(), nil case reflect.Chan: - return ExampleValue(t.Elem(), nil) + return exampleValue(t.Elem(), nil) case reflect.Struct: es, err := exampleStruct(t, parent) if err != nil { return nil, err } v := reflect.ValueOf(es).Elem().Interface() - ExampleValues[t] = v + exampleValues[t] = v return v, nil case reflect.Array: out := reflect.New(t).Elem() for i := 0; i < t.Len(); i++ { - val, err := ExampleValue(t.Elem(), t) + val, err := exampleValue(t.Elem(), t) if err != nil { return nil, err } @@ -248,7 +235,7 @@ func ExampleValue(t, parent reflect.Type) (interface{}, error) { return nil, fmt.Errorf("failed to retrieve example value for type: %s on parent '%s')", t, parent) } -func exampleStruct(t, parent reflect.Type) (interface{}, error) { +func exampleStruct(t, parent reflect.Type) (any, error) { ns := reflect.New(t) for i := 0; i < t.NumField(); i++ { f := t.Field(i) @@ -256,7 +243,7 @@ func exampleStruct(t, parent reflect.Type) (interface{}, error) { continue } if cases.Title(language.Und, cases.NoLower).String(f.Name) == f.Name { - val, err := ExampleValue(f.Type, t) + val, err := exampleValue(f.Type, t) if err != nil { return nil, err } @@ -266,3 +253,10 @@ func exampleStruct(t, parent reflect.Type) (interface{}, error) { return ns.Interface(), nil } + +func must[T any](v T, err error) T { + if err != nil { + panic(err) + } + return v +} diff --git a/api/docgen/openrpc.go b/api/docgen/openrpc.go index a5e52e7ee1..86d901372f 100644 --- a/api/docgen/openrpc.go +++ b/api/docgen/openrpc.go @@ -185,7 +185,7 @@ func NewOpenRPCDocument(comments, permissions Comments) *go_openrpc_reflect.Docu } appReflector.FnSchemaExamples = func(ty reflect.Type) (examples *meta_schema.Examples, err error) { - v, err := ExampleValue(ty, ty) // This isn't ideal, but seems to work well enough. + v, err := exampleValue(ty, ty) // This isn't ideal, but seems to work well enough. if err != nil { fmt.Println(err) } From 87ece3a84e2ceb301edff6c4d0c8647c006ee28c Mon Sep 17 00:00:00 2001 From: rene <41963722+renaynay@users.noreply.github.com> Date: Tue, 7 Jan 2025 14:07:28 +0100 Subject: [PATCH 07/11] fix(nodebuilder/da): remove random print (#4029) --- nodebuilder/da/service.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/nodebuilder/da/service.go b/nodebuilder/da/service.go index aded1f0ea9..c7e403fbff 100644 --- a/nodebuilder/da/service.go +++ b/nodebuilder/da/service.go @@ -4,7 +4,6 @@ import ( "context" "encoding/binary" "encoding/json" - "fmt" "strings" logging "github.com/ipfs/go-log/v2" @@ -256,7 +255,6 @@ func (s *Service) Validate( // invalid proof") but analysis of the code in celestia-node implies this should never happen - // maybe it's caused by openrpc? there is no way of gently handling errors here, but returned // value is fine for us - fmt.Println("proof", proofs[i] == nil, "commitment", commitment == nil) isIncluded, _ := s.blobServ.Included(ctx, height, ns, proofs[i], commitment) included[i] = isIncluded } From c0055c51dad43a80927c0f107753b3c0ed753238 Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Thu, 9 Jan 2025 17:14:33 +0200 Subject: [PATCH 08/11] feat(modshare): implement GetRow (#4002) Co-authored-by: Hlib Kanunnikov --- nodebuilder/share/mocks/api.go | 15 ++++++ nodebuilder/share/share.go | 19 +++++++ share/availability/light/availability_test.go | 6 ++- share/shwap/getter.go | 2 + share/shwap/getters/cascade.go | 12 +++++ share/shwap/getters/mock/getter.go | 15 ++++++ share/shwap/getters/testing.go | 17 ++++++ share/shwap/p2p/bitswap/getter.go | 26 +++++++++ share/shwap/p2p/shrex/shrex_getter/shrex.go | 4 ++ share/shwap/row.go | 54 +++++++++++++++++++ share/shwap/row_test.go | 28 ++++++++++ store/getter.go | 15 ++++++ store/getter_test.go | 20 +++++++ 13 files changed, 232 insertions(+), 1 deletion(-) diff --git a/nodebuilder/share/mocks/api.go b/nodebuilder/share/mocks/api.go index 7fde2338cc..d392f39f5b 100644 --- a/nodebuilder/share/mocks/api.go +++ b/nodebuilder/share/mocks/api.go @@ -84,6 +84,21 @@ func (mr *MockModuleMockRecorder) GetRange(arg0, arg1, arg2, arg3 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRange", reflect.TypeOf((*MockModule)(nil).GetRange), arg0, arg1, arg2, arg3) } +// GetRow mocks base method. +func (m *MockModule) GetRow(arg0 context.Context, arg1 uint64, arg2 int) (shwap.Row, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRow", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.Row) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRow indicates an expected call of GetRow. +func (mr *MockModuleMockRecorder) GetRow(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRow", reflect.TypeOf((*MockModule)(nil).GetRow), arg0, arg1, arg2) +} + // GetSamples mocks base method. func (m *MockModule) GetSamples(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 []shwap.SampleCoords) ([]shwap.Sample, error) { m.ctrl.T.Helper() diff --git a/nodebuilder/share/share.go b/nodebuilder/share/share.go index 783018fe47..46df1e2379 100644 --- a/nodebuilder/share/share.go +++ b/nodebuilder/share/share.go @@ -50,6 +50,8 @@ type Module interface { GetSamples(ctx context.Context, header *header.ExtendedHeader, indices []shwap.SampleCoords) ([]shwap.Sample, error) // GetEDS gets the full EDS identified by the given extended header. GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedDataSquare, error) + // GetRow gets all shares from specified row. + GetRow(context.Context, uint64, int) (shwap.Row, error) // GetNamespaceData gets all shares from an EDS within the given namespace. // Shares are returned in a row-by-row order if the namespace spans multiple rows. GetNamespaceData( @@ -77,6 +79,11 @@ type API struct { ctx context.Context, height uint64, ) (*rsmt2d.ExtendedDataSquare, error) `perm:"read"` + GetRow func( + context.Context, + uint64, + int, + ) (shwap.Row, error) `perm:"read"` GetNamespaceData func( ctx context.Context, height uint64, @@ -108,6 +115,10 @@ func (api *API) GetEDS(ctx context.Context, height uint64) (*rsmt2d.ExtendedData return api.Internal.GetEDS(ctx, height) } +func (api *API) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) { + return api.Internal.GetRow(ctx, height, rowIdx) +} + func (api *API) GetRange(ctx context.Context, height uint64, start, end int) (*GetRangeResult, error) { return api.Internal.GetRange(ctx, height, start, end) } @@ -196,3 +207,11 @@ func (m module) GetNamespaceData( } return m.getter.GetNamespaceData(ctx, header, namespace) } + +func (m module) GetRow(ctx context.Context, height uint64, rowIdx int) (shwap.Row, error) { + header, err := m.hs.GetByHeight(ctx, height) + if err != nil { + return shwap.Row{}, err + } + return m.getter.GetRow(ctx, header, rowIdx) +} diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 191c286938..df441c46a1 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -290,7 +290,7 @@ func (g successGetter) checkOnce(t *testing.T) { } } -func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, +func (g successGetter) GetSamples(_ context.Context, _ *header.ExtendedHeader, indices []shwap.SampleCoords, ) ([]shwap.Sample, error) { g.Lock() @@ -305,6 +305,10 @@ func (g successGetter) GetSamples(_ context.Context, hdr *header.ExtendedHeader, return smpls, nil } +func (g successGetter) GetRow(_ context.Context, _ *header.ExtendedHeader, _ int) (shwap.Row, error) { + panic("not implemented") +} + func (g successGetter) GetEDS(_ context.Context, _ *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { panic("not implemented") } diff --git a/share/shwap/getter.go b/share/shwap/getter.go index 9e0a5d3131..f56c8edc63 100644 --- a/share/shwap/getter.go +++ b/share/shwap/getter.go @@ -39,6 +39,8 @@ type Getter interface { // GetEDS gets the full EDS identified by the given extended header. GetEDS(context.Context, *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) + // GetRow gets Row by its index committed to the given extended header. + GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (Row, error) // GetNamespaceData gets all shares from an EDS within the given namespace. // Shares are returned in a row-by-row order if the namespace spans multiple rows. // Inclusion of returned data could be verified using Verify method on NamespacedShares. diff --git a/share/shwap/getters/cascade.go b/share/shwap/getters/cascade.go index 39ceb2fdb1..c9ac091ddd 100644 --- a/share/shwap/getters/cascade.go +++ b/share/shwap/getters/cascade.go @@ -71,6 +71,18 @@ func (cg *CascadeGetter) GetEDS( return cascadeGetters(ctx, cg.getters, get) } +// GetRow gets row shares from any of registered shwap.Getters in cascading +// order. +func (cg *CascadeGetter) GetRow(ctx context.Context, header *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + ctx, span := tracer.Start(ctx, "cascade/get-row") + defer span.End() + + get := func(ctx context.Context, get shwap.Getter) (shwap.Row, error) { + return get.GetRow(ctx, header, rowIdx) + } + return cascadeGetters(ctx, cg.getters, get) +} + // GetNamespaceData gets NamespacedShares from any of registered shwap.Getters in cascading // order. func (cg *CascadeGetter) GetNamespaceData( diff --git a/share/shwap/getters/mock/getter.go b/share/shwap/getters/mock/getter.go index 7e4dacb24a..ade35b39f8 100644 --- a/share/shwap/getters/mock/getter.go +++ b/share/shwap/getters/mock/getter.go @@ -68,6 +68,21 @@ func (mr *MockGetterMockRecorder) GetNamespaceData(arg0, arg1, arg2 interface{}) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetNamespaceData", reflect.TypeOf((*MockGetter)(nil).GetNamespaceData), arg0, arg1, arg2) } +// GetRow mocks base method. +func (m *MockGetter) GetRow(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 int) (shwap.Row, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetRow", arg0, arg1, arg2) + ret0, _ := ret[0].(shwap.Row) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetRow indicates an expected call of GetRow. +func (mr *MockGetterMockRecorder) GetRow(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRow", reflect.TypeOf((*MockGetter)(nil).GetRow), arg0, arg1, arg2) +} + // GetSamples mocks base method. func (m *MockGetter) GetSamples(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 []shwap.SampleCoords) ([]shwap.Sample, error) { m.ctrl.T.Helper() diff --git a/share/shwap/getters/testing.go b/share/shwap/getters/testing.go index a3ee53753d..c244204aba 100644 --- a/share/shwap/getters/testing.go +++ b/share/shwap/getters/testing.go @@ -58,6 +58,23 @@ func (seg *SingleEDSGetter) GetSamples(ctx context.Context, hdr *header.Extended return smpls, nil } +func (seg *SingleEDSGetter) GetRow( + ctx context.Context, + header *header.ExtendedHeader, + rowIdx int, +) (shwap.Row, error) { + err := seg.checkRoots(header.DAH) + if err != nil { + return shwap.Row{}, err + } + + axisHalf, err := seg.EDS.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.Row{}, err + } + return axisHalf.ToRow(), nil +} + // GetEDS returns a kept EDS if the correct root is given. func (seg *SingleEDSGetter) GetEDS( _ context.Context, diff --git a/share/shwap/p2p/bitswap/getter.go b/share/shwap/p2p/bitswap/getter.go index db2938663c..f008809833 100644 --- a/share/shwap/p2p/bitswap/getter.go +++ b/share/shwap/p2p/bitswap/getter.go @@ -134,6 +134,32 @@ func (g *Getter) GetSamples( return smpls, nil } +func (g *Getter) GetRow(ctx context.Context, hdr *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + ctx, span := tracer.Start(ctx, "get-eds") + defer span.End() + + blk, err := NewEmptyRowBlock(hdr.Height(), rowIdx, len(hdr.DAH.RowRoots)) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "NewEmptyRowBlock") + return shwap.Row{}, err + } + + isArchival := g.isArchival(hdr) + span.SetAttributes(attribute.Bool("is_archival", isArchival)) + + ses, release := g.getSession(isArchival) + defer release() + + err = Fetch(ctx, g.exchange, hdr.DAH, []Block{blk}, WithFetcher(ses)) + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "Fetch") + return shwap.Row{}, err + } + return blk.Container, nil +} + // GetEDS uses [RowBlock] and [Fetch] to get half of the first EDS quadrant(ODS) and // recomputes the whole EDS from it. // We fetch the ODS or Q1 to ensure better compatibility with archival nodes that only diff --git a/share/shwap/p2p/shrex/shrex_getter/shrex.go b/share/shwap/p2p/shrex/shrex_getter/shrex.go index 6c91a44736..142b3f3ab2 100644 --- a/share/shwap/p2p/shrex/shrex_getter/shrex.go +++ b/share/shwap/p2p/shrex/shrex_getter/shrex.go @@ -150,6 +150,10 @@ func (sg *Getter) GetSamples(context.Context, *header.ExtendedHeader, []shwap.Sa return nil, fmt.Errorf("getter/shrex: GetShare %w", shwap.ErrOperationNotSupported) } +func (sg *Getter) GetRow(_ context.Context, _ *header.ExtendedHeader, _ int) (shwap.Row, error) { + return shwap.Row{}, fmt.Errorf("getter/shrex: GetRow %w", shwap.ErrOperationNotSupported) +} + func (sg *Getter) GetEDS(ctx context.Context, header *header.ExtendedHeader) (*rsmt2d.ExtendedDataSquare, error) { var err error ctx, span := tracer.Start(ctx, "shrex/get-eds") diff --git a/share/shwap/row.go b/share/shwap/row.go index 7057680573..87faff4875 100644 --- a/share/shwap/row.go +++ b/share/shwap/row.go @@ -2,6 +2,7 @@ package shwap import ( "bytes" + "encoding/json" "fmt" "github.com/celestiaorg/celestia-app/v3/pkg/wrapper" @@ -171,6 +172,33 @@ func (r *Row) verifyInclusion(roots *share.AxisRoots, idx int) error { return nil } +// MarshalJSON encodes row to the json encoded bytes. +func (r Row) MarshalJSON() ([]byte, error) { + jsonRow := struct { + Shares []libshare.Share `json:"shares"` + Side string `json:"side"` + }{ + Shares: r.shares, + Side: r.side.String(), + } + return json.Marshal(&jsonRow) +} + +// UnmarshalJSON decodes json bytes to the row. +func (r *Row) UnmarshalJSON(data []byte) error { + jsonRow := struct { + Shares []libshare.Share `json:"shares"` + Side string `json:"side"` + }{} + err := json.Unmarshal(data, &jsonRow) + if err != nil { + return err + } + r.shares = jsonRow.Shares + r.side = toRowSide(jsonRow.Side) + return nil +} + // ToProto converts a RowSide to its protobuf representation. func (s RowSide) ToProto() pb.Row_HalfSide { if s == Left { @@ -186,3 +214,29 @@ func sideFromProto(side pb.Row_HalfSide) RowSide { } return Right } + +func (s RowSide) String() string { + switch s { + case Left: + return "LEFT" + case Right: + return "RIGHT" + case Both: + return "BOTH" + default: + panic("invalid row side") + } +} + +func toRowSide(s string) RowSide { + switch s { + case "LEFT": + return Left + case "RIGHT": + return Right + case "BOTH": + return Both + default: + panic("invalid row side") + } +} diff --git a/share/shwap/row_test.go b/share/shwap/row_test.go index 16bce3893b..3cf80befca 100644 --- a/share/shwap/row_test.go +++ b/share/shwap/row_test.go @@ -1,6 +1,7 @@ package shwap import ( + "encoding/json" "testing" "github.com/stretchr/testify/require" @@ -29,6 +30,33 @@ func TestRowShares(t *testing.T) { } } +func TestRowMarshal(t *testing.T) { + const odsSize = 8 + eds := edstest.RandEDS(t, odsSize) + for rowIdx := 0; rowIdx < odsSize*2; rowIdx++ { + for _, side := range []RowSide{Left, Right, Both} { + row, err := RowFromEDS(eds, rowIdx, side) + require.NoError(t, err) + rowData, err := json.Marshal(row) + require.NoError(t, err) + + decodedRow := &Row{} + err = json.Unmarshal(rowData, decodedRow) + require.NoError(t, err) + + require.Equal(t, side, decodedRow.side) + extended, err := decodedRow.Shares() + require.NoError(t, err) + + shares, err := row.Shares() + require.NoError(t, err) + + require.Equal(t, shares, extended) + require.Equal(t, row.side, decodedRow.side) + } + } +} + func TestRowValidate(t *testing.T) { const odsSize = 8 eds := edstest.RandEDS(t, odsSize) diff --git a/store/getter.go b/store/getter.go index 1315561730..7a94c408ac 100644 --- a/store/getter.go +++ b/store/getter.go @@ -71,6 +71,21 @@ func (g *Getter) GetEDS(ctx context.Context, h *header.ExtendedHeader) (*rsmt2d. return rsmt2d.ExtendedDataSquare, nil } +func (g *Getter) GetRow(ctx context.Context, h *header.ExtendedHeader, rowIdx int) (shwap.Row, error) { + acc, err := g.store.GetByHeight(ctx, h.Height()) + if err != nil { + if errors.Is(err, ErrNotFound) { + return shwap.Row{}, shwap.ErrNotFound + } + return shwap.Row{}, fmt.Errorf("getting accessor from store: %w", err) + } + axisHalf, err := acc.AxisHalf(ctx, rsmt2d.Row, rowIdx) + if err != nil { + return shwap.Row{}, fmt.Errorf("getting axis half from accessor: %w", err) + } + return axisHalf.ToRow(), nil +} + func (g *Getter) GetNamespaceData( ctx context.Context, h *header.ExtendedHeader, diff --git a/store/getter_test.go b/store/getter_test.go index b0b027fc19..0042f43ecf 100644 --- a/store/getter_test.go +++ b/store/getter_test.go @@ -69,6 +69,26 @@ func TestStoreGetter(t *testing.T) { require.ErrorIs(t, err, shwap.ErrNotFound) }) + t.Run("GetRow", func(t *testing.T) { + eds, roots := randomEDS(t) + eh := headertest.RandExtendedHeaderWithRoot(t, roots) + height := height.Add(1) + eh.RawHeader.Height = int64(height) + + err := edsStore.PutODSQ4(ctx, eh.DAH, height, eds) + require.NoError(t, err) + + for i := 0; i < len(eh.DAH.RowRoots); i++ { + row, err := sg.GetRow(ctx, eh, i) + require.NoError(t, err) + retreivedShrs, err := row.Shares() + require.NoError(t, err) + edsShrs, err := libshare.FromBytes(eds.Row(uint(i))) + require.NoError(t, err) + require.Equal(t, edsShrs, retreivedShrs) + } + }) + t.Run("GetNamespaceData", func(t *testing.T) { ns := libshare.RandomNamespace() eds, roots := edstest.RandEDSWithNamespace(t, ns, 8, 16) From 66b94a84fa22dc681dcbae082e70a87d8b989127 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Mon, 13 Jan 2025 11:04:02 +0100 Subject: [PATCH 09/11] Revert "fix(p2p): disable quic (#3937)" (#4039) This reverts commit e6dbb54dd28462c2820ae455295acc856af92b64 as https://github.com/quic-go/quic-go/issues/4712 is solved and fixed. It's not yet released, but the bug is not critical to block on it. --- nodebuilder/p2p/addrs.go | 17 +++-------------- nodebuilder/p2p/host.go | 24 +++++++----------------- 2 files changed, 10 insertions(+), 31 deletions(-) diff --git a/nodebuilder/p2p/addrs.go b/nodebuilder/p2p/addrs.go index 607d63c2bd..1a928e3be2 100644 --- a/nodebuilder/p2p/addrs.go +++ b/nodebuilder/p2p/addrs.go @@ -2,7 +2,6 @@ package p2p import ( "fmt" - "slices" p2pconfig "github.com/libp2p/go-libp2p/config" hst "github.com/libp2p/go-libp2p/core/host" @@ -12,22 +11,12 @@ import ( // Listen returns invoke function that starts listening for inbound connections with libp2p.Host. func Listen(cfg *Config) func(h hst.Host) (err error) { return func(h hst.Host) (err error) { - maListen := make([]ma.Multiaddr, 0, len(cfg.ListenAddresses)) - for _, addr := range cfg.ListenAddresses { - maddr, err := ma.NewMultiaddr(addr) + maListen := make([]ma.Multiaddr, len(cfg.ListenAddresses)) + for i, addr := range cfg.ListenAddresses { + maListen[i], err = ma.NewMultiaddr(addr) if err != nil { return fmt.Errorf("failure to parse config.P2P.ListenAddresses: %w", err) } - if !enableQUIC { - // TODO(@walldiss): Remove this check when QUIC is stable - if slices.ContainsFunc(maddr.Protocols(), func(p ma.Protocol) bool { - return p.Code == ma.P_QUIC_V1 || p.Code == ma.P_WEBTRANSPORT - }) { - continue - } - } - - maListen = append(maListen, maddr) } return h.Network().Listen(maListen...) } diff --git a/nodebuilder/p2p/host.go b/nodebuilder/p2p/host.go index c38713f6a3..1003970199 100644 --- a/nodebuilder/p2p/host.go +++ b/nodebuilder/p2p/host.go @@ -3,7 +3,6 @@ package p2p import ( "context" "fmt" - "os" "strings" "github.com/libp2p/go-libp2p" @@ -28,8 +27,6 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/node" ) -var enableQUIC = os.Getenv("CELESTIA_ENABLE_QUIC") == "1" - // routedHost constructs a wrapped Host that may fallback to address discovery, // if any top-level operation on the Host is provided with PeerID(Hash(PbK)) only. func routedHost(base HostBase, r routing.PeerRouting) hst.Host { @@ -83,19 +80,6 @@ func host(params hostParams) (HostBase, error) { params.Cfg.Upgrade() } - transports := []libp2p.Option{ - libp2p.Transport(tcp.NewTCPTransport), - libp2p.Transport(libp2pwebrtc.New), - wsTransport(tlsCfg), - } - - // disable quic and webtransport client support until it is stable - if enableQUIC { - transports = append(transports, - libp2p.Transport(quic.NewTransport), - libp2p.Transport(webtransport.New)) - } - opts := []libp2p.Option{ libp2p.NoListenAddrs, // do not listen automatically libp2p.AddrsFactory(params.AddrF), @@ -108,7 +92,13 @@ func host(params hostParams) (HostBase, error) { libp2p.DisableRelay(), libp2p.BandwidthReporter(params.Bandwidth), libp2p.ResourceManager(params.ResourceManager), - libp2p.ChainOptions(transports...), + libp2p.ChainOptions( + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(quic.NewTransport), + libp2p.Transport(webtransport.New), + libp2p.Transport(libp2pwebrtc.New), + wsTransport(tlsCfg), + ), // to clearly define what defaults we rely upon libp2p.DefaultSecurity, libp2p.DefaultMuxers, From c599285bba4d46cc945d094bcf1a22644bf34bae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E3=81=8B=E3=81=92?= <47621124+ronething-bot@users.noreply.github.com> Date: Wed, 15 Jan 2025 16:42:21 +0800 Subject: [PATCH 10/11] chore(pruner): remove SpacedHeaderGenerator (#4032) --- header/headertest/testing.go | 19 ++++++++++++++++--- pruner/service_test.go | 33 ++------------------------------- 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/header/headertest/testing.go b/header/headertest/testing.go index 251592a9a5..2e25592f27 100644 --- a/header/headertest/testing.go +++ b/header/headertest/testing.go @@ -15,7 +15,6 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/tendermint/tendermint/proto/tendermint/version" "github.com/tendermint/tendermint/types" - tmtime "github.com/tendermint/tendermint/types/time" "github.com/celestiaorg/celestia-app/v3/pkg/da" libhead "github.com/celestiaorg/go-header" @@ -40,6 +39,7 @@ type TestSuite struct { // blockTime is optional - if set, the test suite will generate // blocks timestamped at the specified interval blockTime time.Duration + startTime time.Time } func NewStore(t *testing.T) libhead.Store[*header.ExtendedHeader] { @@ -62,6 +62,18 @@ func NewTestSuite(t *testing.T, numValidators int, blockTime time.Duration) *Tes vals: vals, valSet: valSet, blockTime: blockTime, + startTime: time.Now(), + } +} + +func NewTestSuiteWithGenesisTime(t *testing.T, startTime time.Time, blockTime time.Duration) *TestSuite { + valSet, vals := RandValidatorSet(3, 1) + return &TestSuite{ + t: t, + vals: vals, + valSet: valSet, + blockTime: blockTime, + startTime: startTime, } } @@ -74,10 +86,11 @@ func (s *TestSuite) genesis() *header.ExtendedHeader { gen.ValidatorsHash = s.valSet.Hash() gen.NextValidatorsHash = s.valSet.Hash() gen.Height = 1 + gen.Time = s.startTime voteSet := types.NewVoteSet(gen.ChainID, gen.Height, 0, tmproto.PrecommitType, s.valSet) blockID := RandBlockID(s.t) blockID.Hash = gen.Hash() - commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, time.Now()) + commit, err := MakeCommit(blockID, gen.Height, 0, voteSet, s.vals, s.startTime) require.NoError(s.t, err) eh := &header.ExtendedHeader{ @@ -199,7 +212,7 @@ func (s *TestSuite) Commit(h *header.RawHeader) *types.Commit { ValidatorIndex: int32(i), Height: h.Height, Round: round, - Timestamp: tmtime.Now().UTC(), + Timestamp: h.Time, Type: tmproto.PrecommitType, BlockID: bid, } diff --git a/pruner/service_test.go b/pruner/service_test.go index c7ebeac03a..75e0e0198e 100644 --- a/pruner/service_test.go +++ b/pruner/service_test.go @@ -248,8 +248,8 @@ func TestFindPruneableHeaders(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - headerGenerator := NewSpacedHeaderGenerator(t, tc.startTime, tc.blockTime) - store := headertest.NewCustomStore(t, headerGenerator, tc.headerAmount) + suite := headertest.NewTestSuiteWithGenesisTime(t, tc.startTime, tc.blockTime) + store := headertest.NewCustomStore(t, suite, tc.headerAmount) mp := &mockPruner{} @@ -317,32 +317,3 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error { mp.deletedHeaderHashes = append(mp.deletedHeaderHashes, pruned{hash: h.Hash().String(), height: h.Height()}) return nil } - -// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility. -// https://github.com/celestiaorg/celestia-node/issues/3278. -type SpacedHeaderGenerator struct { - t *testing.T - TimeBetweenHeaders time.Duration - currentTime time.Time - currentHeight int64 -} - -func NewSpacedHeaderGenerator( - t *testing.T, startTime time.Time, timeBetweenHeaders time.Duration, -) *SpacedHeaderGenerator { - return &SpacedHeaderGenerator{ - t: t, - TimeBetweenHeaders: timeBetweenHeaders, - currentTime: startTime, - currentHeight: 1, - } -} - -func (shg *SpacedHeaderGenerator) NextHeader() *header.ExtendedHeader { - h := headertest.RandExtendedHeaderAtTimestamp(shg.t, shg.currentTime) - h.RawHeader.Height = shg.currentHeight - h.RawHeader.Time = shg.currentTime - shg.currentHeight++ - shg.currentTime = shg.currentTime.Add(shg.TimeBetweenHeaders) - return h -} From e11cd7130ea145d3855b13607fa835c8cc0b136c Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Wed, 15 Jan 2025 09:53:49 +0100 Subject: [PATCH 11/11] feat(modp2p): Ping and ConnectionState APIs (#3989) Yet another helpful p2p debugging endpoints with respective commands: * Ping tries to reach a peer by its peer id * ConnectionState provides information about active connection for a peer by its ID --- nodebuilder/p2p/cmd/p2p.go | 74 ++++++++++++++++++++++++++++++++++++ nodebuilder/p2p/mocks/api.go | 32 ++++++++++++++++ nodebuilder/p2p/p2p.go | 62 ++++++++++++++++++++++++++++-- nodebuilder/p2p/p2p_test.go | 5 +++ 4 files changed, 170 insertions(+), 3 deletions(-) diff --git a/nodebuilder/p2p/cmd/p2p.go b/nodebuilder/p2p/cmd/p2p.go index 391944a85f..9cbf8357af 100644 --- a/nodebuilder/p2p/cmd/p2p.go +++ b/nodebuilder/p2p/cmd/p2p.go @@ -1,6 +1,8 @@ package cmd import ( + "time" + "github.com/libp2p/go-libp2p/core/metrics" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -9,6 +11,7 @@ import ( "github.com/spf13/cobra" cmdnode "github.com/celestiaorg/celestia-node/cmd" + "github.com/celestiaorg/celestia-node/nodebuilder/p2p" ) type peerInfo struct { @@ -35,6 +38,8 @@ func init() { bandwidthForProtocolCmd, pubsubPeersCmd, pubsubTopicsCmd, + connectionInfoCmd, + pingCmd, ) } @@ -599,3 +604,72 @@ var pubsubTopicsCmd = &cobra.Command{ return cmdnode.PrintOutput(topics, err, formatter) }, } + +var connectionInfoCmd = &cobra.Command{ + Use: "connection-state [peerID]", + Short: "Gets connection info for a given peer ID", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := cmdnode.ParseClientFromCtx(cmd.Context()) + if err != nil { + return err + } + defer client.Close() + + pid, err := peer.Decode(args[0]) + if err != nil { + return err + } + + infos, err := client.P2P.ConnectionState(cmd.Context(), pid) + return cmdnode.PrintOutput(infos, err, func(i interface{}) interface{} { + type state struct { + Info network.ConnectionState + NumStreams int + Direction string + Opened string + Limited bool + } + + states := i.([]p2p.ConnectionState) + infos := make([]state, len(states)) + for i, s := range states { + infos[i] = state{ + Info: s.Info, + NumStreams: s.NumStreams, + Direction: s.Direction.String(), + Opened: s.Opened.Format("2006-01-02 15:04:05"), + Limited: s.Limited, + } + } + + if len(infos) == 1 { + return infos[0] + } + return infos + }) + }, +} + +var pingCmd = &cobra.Command{ + Use: "ping [peerID]", + Short: "Pings given peer and tell how much time that took or errors", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + client, err := cmdnode.ParseClientFromCtx(cmd.Context()) + if err != nil { + return err + } + defer client.Close() + + pid, err := peer.Decode(args[0]) + if err != nil { + return err + } + + pingDuration, err := client.P2P.Ping(cmd.Context(), pid) + return cmdnode.PrintOutput(pingDuration, err, func(i interface{}) interface{} { + return i.(time.Duration).String() + }) + }, +} diff --git a/nodebuilder/p2p/mocks/api.go b/nodebuilder/p2p/mocks/api.go index 42e4f0a892..8ef5638d52 100644 --- a/nodebuilder/p2p/mocks/api.go +++ b/nodebuilder/p2p/mocks/api.go @@ -7,7 +7,9 @@ package mocks import ( context "context" reflect "reflect" + time "time" + p2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p" gomock "github.com/golang/mock/gomock" metrics "github.com/libp2p/go-libp2p/core/metrics" network "github.com/libp2p/go-libp2p/core/network" @@ -141,6 +143,21 @@ func (mr *MockModuleMockRecorder) Connectedness(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connectedness", reflect.TypeOf((*MockModule)(nil).Connectedness), arg0, arg1) } +// ConnectionState mocks base method. +func (m *MockModule) ConnectionState(arg0 context.Context, arg1 peer.ID) ([]p2p.ConnectionState, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConnectionState", arg0, arg1) + ret0, _ := ret[0].([]p2p.ConnectionState) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ConnectionState indicates an expected call of ConnectionState. +func (mr *MockModuleMockRecorder) ConnectionState(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectionState", reflect.TypeOf((*MockModule)(nil).ConnectionState), arg0, arg1) +} + // Info mocks base method. func (m *MockModule) Info(arg0 context.Context) (peer.AddrInfo, error) { m.ctrl.T.Helper() @@ -231,6 +248,21 @@ func (mr *MockModuleMockRecorder) Peers(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Peers", reflect.TypeOf((*MockModule)(nil).Peers), arg0) } +// Ping mocks base method. +func (m *MockModule) Ping(arg0 context.Context, arg1 peer.ID) (time.Duration, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Ping", arg0, arg1) + ret0, _ := ret[0].(time.Duration) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ping indicates an expected call of Ping. +func (mr *MockModuleMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockModule)(nil).Ping), arg0, arg1) +} + // Protect mocks base method. func (m *MockModule) Protect(arg0 context.Context, arg1 peer.ID, arg2 string) error { m.ctrl.T.Helper() diff --git a/nodebuilder/p2p/p2p.go b/nodebuilder/p2p/p2p.go index 2f673f5482..e98ece9ba8 100644 --- a/nodebuilder/p2p/p2p.go +++ b/nodebuilder/p2p/p2p.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" libhost "github.com/libp2p/go-libp2p/core/host" @@ -14,14 +15,29 @@ import ( "github.com/libp2p/go-libp2p/p2p/host/autonat" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/net/conngater" + "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) var _ Module = (*API)(nil) +// ConnectionState holds information about a connection. +type ConnectionState struct { + Info network.ConnectionState + // NumStreams is the number of streams on the connection. + NumStreams int + // Direction specifies whether this is an inbound or an outbound connection. + Direction network.Direction + // Opened is the timestamp when this connection was opened. + Opened time.Time + // Limited indicates that this connection is Limited. It maybe limited by + // bytes or time. In practice, this is a connection formed over a circuit v2 + // relay. + Limited bool +} + // Module represents all accessible methods related to the node's p2p // host / operations. // -//nolint:dupl //go:generate mockgen -destination=mocks/api.go -package=mocks . Module type Module interface { // Info returns address information about the host. @@ -39,6 +55,9 @@ type Module interface { ClosePeer(ctx context.Context, id peer.ID) error // Connectedness returns a state signaling connection capabilities. Connectedness(ctx context.Context, id peer.ID) (network.Connectedness, error) + // ConnectionState returns information about each *active* connection to the peer. + // NOTE: At most cases there should be only a single connection. + ConnectionState(ctx context.Context, id peer.ID) ([]ConnectionState, error) // NATStatus returns the current NAT status. NATStatus(context.Context) (network.Reachability, error) @@ -80,6 +99,9 @@ type Module interface { PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error) // PubSubTopics reports current PubSubTopics the node participates in. PubSubTopics(ctx context.Context) ([]string, error) + + // Ping pings the selected peer and returns time it took or error. + Ping(ctx context.Context, peer peer.ID) (time.Duration, error) } // module contains all components necessary to access information and @@ -205,9 +227,33 @@ func (m *module) PubSubTopics(_ context.Context) ([]string, error) { return m.ps.GetTopics(), nil } +func (m *module) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) { + res := <-ping.Ping(ctx, m.host, peer) // context is handled for us + return res.RTT, res.Error +} + +func (m *module) ConnectionState(_ context.Context, peer peer.ID) ([]ConnectionState, error) { + cons := m.host.Network().ConnsToPeer(peer) + if len(cons) == 0 { + return nil, fmt.Errorf("no connections to peer %s", peer) + } + + conInfos := make([]ConnectionState, len(cons)) + for i, con := range cons { + stat := con.Stat() + conInfos[i] = ConnectionState{ + Info: con.ConnState(), + NumStreams: stat.NumStreams, + Direction: stat.Direction, + Opened: stat.Opened, + Limited: stat.Limited, + } + } + + return conInfos, nil +} + // API is a wrapper around Module for the RPC. -// -//nolint:dupl type API struct { Internal struct { Info func(context.Context) (peer.AddrInfo, error) `perm:"admin"` @@ -229,6 +275,8 @@ type API struct { ResourceState func(context.Context) (rcmgr.ResourceManagerStat, error) `perm:"admin"` PubSubPeers func(ctx context.Context, topic string) ([]peer.ID, error) `perm:"admin"` PubSubTopics func(ctx context.Context) ([]string, error) `perm:"admin"` + Ping func(ctx context.Context, peer peer.ID) (time.Duration, error) `perm:"admin"` + ConnectionState func(context.Context, peer.ID) ([]ConnectionState, error) `perm:"admin"` } } @@ -307,3 +355,11 @@ func (api *API) PubSubPeers(ctx context.Context, topic string) ([]peer.ID, error func (api *API) PubSubTopics(ctx context.Context) ([]string, error) { return api.Internal.PubSubTopics(ctx) } + +func (api *API) Ping(ctx context.Context, peer peer.ID) (time.Duration, error) { + return api.Internal.Ping(ctx, peer) +} + +func (api *API) ConnectionState(ctx context.Context, peer peer.ID) ([]ConnectionState, error) { + return api.Internal.ConnectionState(ctx, peer) +} diff --git a/nodebuilder/p2p/p2p_test.go b/nodebuilder/p2p/p2p_test.go index 01e53fa553..b889663c20 100644 --- a/nodebuilder/p2p/p2p_test.go +++ b/nodebuilder/p2p/p2p_test.go @@ -41,6 +41,11 @@ func TestP2PModule_Host(t *testing.T) { connectedness, err := mgr.Connectedness(ctx, peer.ID()) require.NoError(t, err) + + infos, err := mgr.ConnectionState(ctx, peer.ID()) + require.NoError(t, err) + require.GreaterOrEqual(t, len(infos), 1) + assert.Equal(t, host.Network().Connectedness(peer.ID()), connectedness) // now disconnect using manager and check for connectedness match again assert.NoError(t, mgr.ClosePeer(ctx, peer.ID()))