Skip to content

Commit

Permalink
[Server] Round repo light APIs (#399)
Browse files Browse the repository at this point in the history
  • Loading branch information
altafan authored Dec 10, 2024
1 parent f1272b5 commit b1906e1
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 307 deletions.
9 changes: 7 additions & 2 deletions server/internal/core/application/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,19 @@ func (a *adminService) GetRounds(ctx context.Context, after int64, before int64)
}

func (a *adminService) GetScheduledSweeps(ctx context.Context) ([]ScheduledSweep, error) {
sweepableRounds, err := a.repoManager.Rounds().GetSweepableRounds(ctx)
sweepableRounds, err := a.repoManager.Rounds().GetExpiredRoundsTxid(ctx)
if err != nil {
return nil, err
}

scheduledSweeps := make([]ScheduledSweep, 0, len(sweepableRounds))

for _, round := range sweepableRounds {
for _, txid := range sweepableRounds {
round, err := a.repoManager.Rounds().GetRoundWithTxid(ctx, txid)
if err != nil {
return nil, err
}

sweepable, err := findSweepableOutputs(
ctx, a.walletSvc, a.txBuilder, a.sweeperTimeUnit, round.VtxoTree,
)
Expand Down
15 changes: 7 additions & 8 deletions server/internal/core/application/covenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (s *covenantService) startFinalization() {
return
}

sweptRounds, err := s.repoManager.Rounds().GetSweptRounds(ctx)
sweptRounds, err := s.repoManager.Rounds().GetSweptRoundsConnectorAddress(ctx)
if err != nil {
round.Fail(fmt.Errorf("failed to retrieve swept rounds: %s", err))
log.WithError(err).Warn("failed to retrieve swept rounds")
Expand Down Expand Up @@ -1077,19 +1077,18 @@ func (s *covenantService) stopWatchingVtxos(vtxos []domain.Vtxo) {
}

func (s *covenantService) restoreWatchingVtxos() error {
sweepableRounds, err := s.repoManager.Rounds().GetSweepableRounds(context.Background())
ctx := context.Background()

expiredRounds, err := s.repoManager.Rounds().GetExpiredRoundsTxid(ctx)
if err != nil {
return err
}

vtxos := make([]domain.Vtxo, 0)

for _, round := range sweepableRounds {
fromRound, err := s.repoManager.Vtxos().GetVtxosForRound(
context.Background(), round.Txid,
)
for _, txid := range expiredRounds {
fromRound, err := s.repoManager.Vtxos().GetVtxosForRound(ctx, txid)
if err != nil {
log.WithError(err).Warnf("failed to retrieve vtxos for round %s", round.Txid)
log.WithError(err).Warnf("failed to retrieve vtxos for round %s", txid)
continue
}
for _, v := range fromRound {
Expand Down
21 changes: 8 additions & 13 deletions server/internal/core/application/covenantless.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,7 +1052,7 @@ func (s *covenantlessService) startFinalization() {
return
}

sweptRounds, err := s.repoManager.Rounds().GetSweptRounds(ctx)
connectorAddresses, err := s.repoManager.Rounds().GetSweptRoundsConnectorAddress(ctx)
if err != nil {
round.Fail(fmt.Errorf("failed to retrieve swept rounds: %s", err))
log.WithError(err).Warn("failed to retrieve swept rounds")
Expand All @@ -1069,11 +1069,7 @@ func (s *covenantlessService) startFinalization() {
cosigners = append(cosigners, ephemeralKey.PubKey())

unsignedRoundTx, vtxoTree, connectorAddress, connectors, err := s.builder.BuildRoundTx(
s.pubkey,
requests,
boardingInputs,
sweptRounds,
cosigners...,
s.pubkey, requests, boardingInputs, connectorAddresses, cosigners...,
)
if err != nil {
round.Fail(fmt.Errorf("failed to create round tx: %s", err))
Expand Down Expand Up @@ -1665,19 +1661,18 @@ func (s *covenantlessService) stopWatchingVtxos(vtxos []domain.Vtxo) {
}

func (s *covenantlessService) restoreWatchingVtxos() error {
sweepableRounds, err := s.repoManager.Rounds().GetSweepableRounds(context.Background())
ctx := context.Background()

expiredRounds, err := s.repoManager.Rounds().GetExpiredRoundsTxid(ctx)
if err != nil {
return err
}

vtxos := make([]domain.Vtxo, 0)

for _, round := range sweepableRounds {
fromRound, err := s.repoManager.Vtxos().GetVtxosForRound(
context.Background(), round.Txid,
)
for _, txid := range expiredRounds {
fromRound, err := s.repoManager.Vtxos().GetVtxosForRound(ctx, txid)
if err != nil {
log.WithError(err).Warnf("failed to retrieve vtxos for round %s", round.Txid)
log.WithError(err).Warnf("failed to retrieve vtxos for round %s", txid)
continue
}
for _, v := range fromRound {
Expand Down
11 changes: 9 additions & 2 deletions server/internal/core/application/sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ func newSweeper(
func (s *sweeper) start() error {
s.scheduler.Start()

allRounds, err := s.repoManager.Rounds().GetSweepableRounds(context.Background())
ctx := context.Background()

expiredRounds, err := s.repoManager.Rounds().GetExpiredRoundsTxid(ctx)
if err != nil {
return err
}

for _, round := range allRounds {
for _, txid := range expiredRounds {
round, err := s.repoManager.Rounds().GetRoundWithTxid(ctx, txid)
if err != nil {
return err
}

task := s.createTask(round.Txid, round.VtxoTree)
task()
}
Expand Down
4 changes: 2 additions & 2 deletions server/internal/core/domain/round_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type RoundRepository interface {
AddOrUpdateRound(ctx context.Context, round Round) error
GetRoundWithId(ctx context.Context, id string) (*Round, error)
GetRoundWithTxid(ctx context.Context, txid string) (*Round, error)
GetSweepableRounds(ctx context.Context) ([]Round, error)
GetExpiredRoundsTxid(ctx context.Context) ([]string, error)
GetRoundsIds(ctx context.Context, startedAfter int64, startedBefore int64) ([]string, error)
GetSweptRounds(ctx context.Context) ([]Round, error)
GetSweptRoundsConnectorAddress(ctx context.Context) ([]string, error)
Close()
}

Expand Down
18 changes: 10 additions & 8 deletions server/internal/core/ports/tx_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ type BoardingInput struct {
}

type TxBuilder interface {
// BuildRoundTx builds a round tx for the given tx requests, boarding inputs
// it selects coin from swept rounds and server wallet
// returns the round partial tx, the vtxo tree and the set of connectors
// BuildRoundTx builds a round tx for the given offchain and boarding tx
// requests. It expects an optional list of connector addresses of expired
// rounds from which selecting UTXOs as inputs of the transaction.
// Returns the round tx, the VTXO tree, the connector chain and its root
// address.
BuildRoundTx(
serverPubkey *secp256k1.PublicKey, txRequests []domain.TxRequest, boardingInputs []BoardingInput, sweptRounds []domain.Round,
serverPubkey *secp256k1.PublicKey, txRequests []domain.TxRequest,
boardingInputs []BoardingInput, connectorAddresses []string,
cosigners ...*secp256k1.PublicKey,
) (
roundTx string,
Expand All @@ -41,11 +44,10 @@ type TxBuilder interface {
connectors []string,
err error,
)
// VerifyForfeitTxs verifies the given forfeit txs for the given vtxos and connectors
// VerifyForfeitTxs verifies a list of forfeit txs against a set of VTXOs and
// connectors.
VerifyForfeitTxs(
vtxos []domain.Vtxo,
connectors []string,
txs []string,
vtxos []domain.Vtxo, connectors []string, txs []string,
) (valid map[domain.VtxoKey][]string, err error)
BuildSweepTx(inputs []SweepInput) (signedSweepTx string, err error)
GetSweepInput(node tree.Node) (lifetime *common.Locktime, sweepInput SweepInput, err error)
Expand Down
30 changes: 25 additions & 5 deletions server/internal/infrastructure/db/badger/round_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,38 @@ func (r *roundRepository) GetRoundWithTxid(
return round, nil
}

func (r *roundRepository) GetSweepableRounds(
func (r *roundRepository) GetExpiredRoundsTxid(
ctx context.Context,
) ([]domain.Round, error) {
) ([]string, error) {
query := badgerhold.Where("Stage.Code").Eq(domain.FinalizationStage).
And("Stage.Ended").Eq(true).And("Swept").Eq(false)
return r.findRound(ctx, query)
rounds, err := r.findRound(ctx, query)
if err != nil {
return nil, err
}

txids := make([]string, 0, len(rounds))
for _, r := range rounds {
txids = append(txids, r.Txid)
}
return txids, nil
}

func (r *roundRepository) GetSweptRounds(ctx context.Context) ([]domain.Round, error) {
func (r *roundRepository) GetSweptRoundsConnectorAddress(
ctx context.Context,
) ([]string, error) {
query := badgerhold.Where("Stage.Code").Eq(domain.FinalizationStage).
And("Stage.Ended").Eq(true).And("Swept").Eq(true).And("ConnectorAddress").Ne("")
return r.findRound(ctx, query)
rounds, err := r.findRound(ctx, query)
if err != nil {
return nil, err
}

txids := make([]string, 0, len(rounds))
for _, r := range rounds {
txids = append(txids, r.Txid)
}
return txids, nil
}

func (r *roundRepository) GetRoundsIds(ctx context.Context, startedAfter int64, startedBefore int64) ([]string, error) {
Expand Down
62 changes: 4 additions & 58 deletions server/internal/infrastructure/db/sqlite/round_repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,66 +262,12 @@ func (r *roundRepository) GetRoundWithTxid(ctx context.Context, txid string) (*d
return nil, errors.New("round not found")
}

func (r *roundRepository) GetSweepableRounds(ctx context.Context) ([]domain.Round, error) {
rows, err := r.querier.SelectSweepableRounds(ctx)
if err != nil {
return nil, err
}

rvs := make([]combinedRow, 0, len(rows))
for _, row := range rows {
rvs = append(rvs, combinedRow{
round: row.Round,
request: row.RoundRequestVw,
tx: row.RoundTxVw,
receiver: row.RequestReceiverVw,
vtxo: row.RequestVtxoVw,
})
}

rounds, err := rowsToRounds(rvs)
if err != nil {
return nil, err
}

res := make([]domain.Round, 0)

for _, round := range rounds {
res = append(res, *round)
}

return res, nil
func (r *roundRepository) GetExpiredRoundsTxid(ctx context.Context) ([]string, error) {
return r.querier.SelectExpiredRoundsTxid(ctx)
}

func (r *roundRepository) GetSweptRounds(ctx context.Context) ([]domain.Round, error) {
rows, err := r.querier.SelectSweptRounds(ctx)
if err != nil {
return nil, err
}

rvs := make([]combinedRow, 0, len(rows))
for _, row := range rows {
rvs = append(rvs, combinedRow{
round: row.Round,
request: row.RoundRequestVw,
tx: row.RoundTxVw,
receiver: row.RequestReceiverVw,
vtxo: row.RequestVtxoVw,
})
}

rounds, err := rowsToRounds(rvs)
if err != nil {
return nil, err
}

res := make([]domain.Round, 0)

for _, round := range rounds {
res = append(res, *round)
}

return res, nil
func (r *roundRepository) GetSweptRoundsConnectorAddress(ctx context.Context) ([]string, error) {
return r.querier.SelectSweptRoundsConnectorAddress(ctx)
}

func rowToReceiver(row queries.RequestReceiverVw) domain.Receiver {
Expand Down
Loading

0 comments on commit b1906e1

Please sign in to comment.