Skip to content

Commit

Permalink
Merge branch 'main' into shwap-bitswap-get-size
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay authored Nov 12, 2024
2 parents 4f93b11 + 353141f commit f5c4f4c
Show file tree
Hide file tree
Showing 46 changed files with 781 additions and 678 deletions.
34 changes: 1 addition & 33 deletions .github/workflows/ci_release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,6 @@ on:
pull_request:
workflow_dispatch:
inputs:
version-bump:
# Friendly description to be shown in the UI instead of 'name'
description: "Semver type of new version (major / minor / patch)"
# Input has to be provided for the workflow to run
required: true
type: choice
options:
- patch
- minor
- major
tag-as:
description: 'Tag for snapshot release (optional)'
required: false
Expand Down Expand Up @@ -88,31 +78,9 @@ jobs:
with:
go-version: ${{ needs.setup.outputs.go-version }}

# If this was a workflow dispatch event, we need to generate and push a tag
# for goreleaser to grab
version_bump:
needs: [hadolint, yamllint, markdown-lint, go-ci, setup]
runs-on: ubuntu-latest
permissions: "write-all"
steps:
- uses: actions/checkout@v4

- name: Bump version and push tag
# Placing the if condition here is a workaround for needing to block
# on this step during workflow dispatch events but the step not
# needing to run on tags. If we had the if condition on the full
# version_bump section, it would skip and not run, which would result
# in goreleaser not running either.
if: ${{ github.event_name == 'workflow_dispatch' }}
uses: mathieudutour/[email protected]
with:
github_token: ${{ secrets.GITHUB_TOKEN }}
default_bump: ${{ inputs.version-bump }}
release_branches: ${{ needs.setup.outputs.branch }}

# Generate the release with goreleaser to include pre-built binaries
goreleaser:
needs: [version_bump, setup]
needs: [setup]
runs-on: ubuntu-latest
if: |
github.event_name == 'workflow_dispatch' ||
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/docker-build-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ jobs:
permissions:
contents: write
packages: write
uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_pipeline.yml@v0.4.5 # yamllint disable-line rule:line-length
uses: celestiaorg/.github/.github/workflows/reusable_dockerfile_pipeline.yml@v0.5.0 # yamllint disable-line rule:line-length
with:
dockerfile: Dockerfile
checkout_ref: ${{ github.event.inputs.ref }}
Expand Down
2 changes: 1 addition & 1 deletion api/gateway/share.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (h *Handler) getShares(
height uint64,
namespace libshare.Namespace,
) ([]libshare.Share, error) {
shares, err := h.share.GetSharesByNamespace(ctx, height, namespace)
shares, err := h.share.GetNamespaceData(ctx, height, namespace)
if err != nil {
return nil, err
}
Expand Down
5 changes: 1 addition & 4 deletions api/gateway/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@ const (

const addrKey = "address"

var (
ErrInvalidAddressFormat = errors.New("address must be a valid account or validator address")
ErrMissingAddress = errors.New("address not specified")
)
var ErrInvalidAddressFormat = errors.New("address must be a valid account or validator address")

func (h *Handler) handleBalanceRequest(w http.ResponseWriter, r *http.Request) {
var (
Expand Down
2 changes: 1 addition & 1 deletion blob/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (s *Service) retrieve(
getCtx, getSharesSpan := tracer.Start(ctx, "get-shares-by-namespace")

// collect shares for the requested namespace
namespacedShares, err := s.shareGetter.GetSharesByNamespace(getCtx, header, namespace)
namespacedShares, err := s.shareGetter.GetNamespaceData(getCtx, header, namespace)
if err != nil {
if errors.Is(err, shwap.ErrNotFound) {
err = ErrBlobNotFound
Expand Down
8 changes: 4 additions & 4 deletions blob/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,15 +412,15 @@ func TestBlobService_Get(t *testing.T) {
innerGetter := service.shareGetter
getterWrapper := mock.NewMockGetter(ctrl)
getterWrapper.EXPECT().
GetSharesByNamespace(gomock.Any(), gomock.Any(), gomock.Any()).
GetNamespaceData(gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(
func(
ctx context.Context, h *header.ExtendedHeader, ns libshare.Namespace,
) (shwap.NamespaceData, error) {
if ns.Equals(blobsWithDiffNamespaces[0].Namespace()) {
return nil, errors.New("internal error")
}
return innerGetter.GetSharesByNamespace(ctx, h, ns)
return innerGetter.GetNamespaceData(ctx, h, ns)
}).AnyTimes()

service.shareGetter = getterWrapper
Expand Down Expand Up @@ -876,7 +876,7 @@ func createServiceWithSub(ctx context.Context, t testing.TB, blobs []*Blob) *Ser
ctrl := gomock.NewController(t)
shareGetter := mock.NewMockGetter(ctrl)

shareGetter.EXPECT().GetSharesByNamespace(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
shareGetter.EXPECT().GetNamespaceData(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, ns libshare.Namespace) (shwap.NamespaceData, error) {
idx := int(h.Height()) - 1
accessor := &eds.Rsmt2D{ExtendedDataSquare: edsses[idx]}
Expand All @@ -897,7 +897,7 @@ func createService(ctx context.Context, t testing.TB, shares []libshare.Share) *
accessor := &eds.Rsmt2D{ExtendedDataSquare: square}
ctrl := gomock.NewController(t)
shareGetter := mock.NewMockGetter(ctrl)
shareGetter.EXPECT().GetSharesByNamespace(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
shareGetter.EXPECT().GetNamespaceData(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().
DoAndReturn(func(ctx context.Context, h *header.ExtendedHeader, ns libshare.Namespace) (shwap.NamespaceData, error) {
nd, err := eds.NamespaceData(ctx, accessor, ns)
return nd, err
Expand Down
24 changes: 13 additions & 11 deletions das/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,18 +127,20 @@ func (w *worker) sample(ctx context.Context, timeout time.Duration, height uint6

w.metrics.observeSample(ctx, h, time.Since(start), w.state.jobType, err)
if err != nil {
if !errors.Is(err, context.Canceled) {
log.Debugw(
"failed to sample header",
"type", w.state.jobType,
"height", h.Height(),
"hash", h.Hash(),
"square width", len(h.DAH.RowRoots),
"data root", h.DAH.String(),
"err", err,
"finished (s)", time.Since(start),
)
if errors.Is(err, context.Canceled) {
return err
}

log.Errorw(
"failed to sample header",
"type", w.state.jobType,
"height", h.Height(),
"hash", h.Hash(),
"square width", len(h.DAH.RowRoots),
"data root", h.DAH.String(),
"err", err,
"finished (s)", time.Since(start),
)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/BurntSushi/toml v1.4.1-0.20240526193622-a339e1f7089c
github.com/alecthomas/jsonschema v0.0.0-20220216202328-9eeeec9d044b
github.com/benbjohnson/clock v1.3.5
github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0
github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha
github.com/celestiaorg/go-fraud v0.2.1
github.com/celestiaorg/go-header v0.6.3
github.com/celestiaorg/go-libp2p-messenger v0.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -345,8 +345,8 @@ github.com/c-bata/go-prompt v0.2.2/go.mod h1:VzqtzE2ksDBcdln8G7mk2RX9QyGjH+OVqOC
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZIuyASInj1a9ExI8xOsTOw=
github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ=
github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0 h1:cfZVxldi5u/vGZPdFvW95quUmcg307v44PndjYwEOR4=
github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs=
github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha h1:9tdQDaNgOfU56BueKq8i0Qte4FRmJJzG7woPTm6HHhk=
github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs=
github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw=
github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw=
github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU=
Expand Down
31 changes: 0 additions & 31 deletions header/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,37 +64,6 @@ func UnmarshalExtendedHeader(data []byte) (*ExtendedHeader, error) {
return out, nil
}

func ExtendedHeaderToProto(eh *ExtendedHeader) (*header_pb.ExtendedHeader, error) {
pb := &header_pb.ExtendedHeader{
Header: eh.RawHeader.ToProto(),
Commit: eh.Commit.ToProto(),
}
valSet, err := eh.ValidatorSet.ToProto()
if err != nil {
return nil, err
}
pb.ValidatorSet = valSet
dah, err := eh.DAH.ToProto()
if err != nil {
return nil, err
}
pb.Dah = dah
return pb, nil
}

func ProtoToExtendedHeader(pb *header_pb.ExtendedHeader) (*ExtendedHeader, error) {
bin, err := pb.Marshal()
if err != nil {
return nil, err
}
header := new(ExtendedHeader)
err = header.UnmarshalBinary(bin)
if err != nil {
return nil, err
}
return header, nil
}

// msgID computes an id for a pubsub message
// TODO(@Wondertan): This cause additional allocations per each recvd message in the topic
//
Expand Down
45 changes: 45 additions & 0 deletions libs/utils/sessions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package utils

import (
"context"
"sync"
)

// Sessions manages concurrent sessions for the specified key.
// It ensures only one session can proceed for each key, avoiding duplicate efforts.
// If a session is already active for the given key, it waits until the session completes or
// context error occurs.
type Sessions struct {
active sync.Map
}

func NewSessions() *Sessions {
return &Sessions{}
}

// StartSession attempts to start a new session for the given key. It provides a release function
// to clean up the session lock for this key, once the session is complete.
func (s *Sessions) StartSession(
ctx context.Context,
key any,
) (endSession func(), err error) {
// Attempt to load or initialize a channel to track the sampling session for this height
lockChan, alreadyActive := s.active.LoadOrStore(key, make(chan struct{}))
if alreadyActive {
// If a session is already active, wait for it to complete
select {
case <-lockChan.(chan struct{}):
case <-ctx.Done():
return func() {}, ctx.Err()
}
// previous session has completed, try to obtain the lock for this session
return s.StartSession(ctx, key)
}

// Provide a function to release the lock once session is complete
releaseLock := func() {
close(lockChan.(chan struct{}))
s.active.Delete(key)
}
return releaseLock, nil
}
127 changes: 127 additions & 0 deletions libs/utils/sessions_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package utils

import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/require"
)

// TestSessionsSerialExecution verifies that multiple sessions for the same key are executed
// sequentially.
func TestSessionsSerialExecution(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
t.Cleanup(cancel)

sessions := NewSessions()
key := "testKey"
activeCount := atomic.Int32{}
var wg sync.WaitGroup

numSessions := 20

for i := 0; i < numSessions; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
endSession, err := sessions.StartSession(ctx, key)
require.NoError(t, err)
old := activeCount.Add(1)
require.Equal(t, int32(1), old)
// Simulate some work
time.Sleep(50 * time.Millisecond)
old = activeCount.Add(-1)
require.Equal(t, int32(0), old)
// Release the session
endSession()
}(i)
}

wg.Wait()
}

func TestSessionsContextCancellation(t *testing.T) {
sessions := NewSessions()
key := "testCancelKey"

// Start the first session which will hold the lock for a while
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

release, err := sessions.StartSession(ctx, key)
if err != nil {
t.Errorf("First session: failed to start: %v", err)
return
}

// Hold the session for 1 second
time.Sleep(1 * time.Second)
release()
}()

// Give the first goroutine a moment to acquire the session
time.Sleep(100 * time.Millisecond)

// Attempt to start a second session with a context that times out before the first session releases
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
t.Cleanup(cancel)

_, err := sessions.StartSession(ctx, key)
require.ErrorIs(t, err, context.DeadlineExceeded)

// Attempt to start a second session with a context that is canceled before the first session
// releases
ctx, cancel = context.WithCancel(context.Background())
cancel()

_, err = sessions.StartSession(ctx, key)
require.ErrorIs(t, err, context.Canceled)

wg.Wait()
}

// TestSessions_ConcurrentDifferentKeys ensures that sessions with different keys run concurrently.
func TestSessions_ConcurrentDifferentKeys(t *testing.T) {
sessions := NewSessions()
numKeys := 20
var wg sync.WaitGroup
startCh := make(chan struct{})
activeSessions := atomic.Int32{}
maxActive := int32(0)

for i := 0; i < numKeys; i++ {
wg.Add(1)
go func(key int) {
defer wg.Done()
ctx := context.Background()
endSession, err := sessions.StartSession(ctx, key)
require.NoError(t, err)

active := activeSessions.Add(1)
if active > maxActive {
maxActive = active
}

// Wait to simulate work
time.Sleep(100 * time.Millisecond)

activeSessions.Add(-1)
endSession()
}(i)
}

// Start all goroutines
close(startCh)
wg.Wait()

if maxActive > int32(numKeys) {
t.Errorf("Expected %d concurrent active sessions, but got %d", numKeys, maxActive)
}
}
Loading

0 comments on commit f5c4f4c

Please sign in to comment.