Skip to content

Commit

Permalink
Implement listen stream in wasm
Browse files Browse the repository at this point in the history
Co-authored-by: Pietralberto Mazza <[email protected]>
  • Loading branch information
bordalix and altafan committed Oct 29, 2024
1 parent 6e0ac4a commit c404b7d
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 44 deletions.
123 changes: 93 additions & 30 deletions pkg/client-sdk/client/rest/client.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package restclient

import (
"bufio"
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -620,48 +624,107 @@ func (t treeFromProto) parse() tree.CongestionTree {
}

func (c *restClient) GetTransactionsStream(ctx context.Context) (<-chan client.TransactionEvent, func(), error) {
ctx, cancel := context.WithTimeout(ctx, c.requestTimeout)
eventsCh := make(chan client.TransactionEvent)

cl := &http.Client{
Timeout: time.Second * 0, // No timeout for streaming requests
}

resp, err := cl.Get("http://localhost:7070/v1/transactions")
if err != nil {
return nil, nil, err
}

// Check if the request was successful
if resp.StatusCode != http.StatusOK {
return nil, nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
}

go func() {
ticker := time.NewTicker(1 * time.Second)
defer close(eventsCh)
defer ticker.Stop()
defer resp.Body.Close()
reader := bufio.NewReader(resp.Body)

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
resp, err := c.svc.ArkServiceGetTransactionsStream(ark_service.NewArkServiceGetTransactionsStreamParams())
if err != nil {
eventsCh <- client.TransactionEvent{Err: err}
return
chunk, err := reader.ReadBytes('\n')
if err != nil {
if err == io.EOF {
fmt.Println("Stream ended")
break
}
fmt.Println("Error reading stream:", err)
break
}

if resp.Payload.Result.Round != nil {
eventsCh <- client.TransactionEvent{
Round: &client.RoundTransaction{
Txid: resp.Payload.Result.Round.Txid,
SpentVtxos: outpointsFromRest(resp.Payload.Result.Round.SpentVtxos),
SpendableVtxos: vtxosFromRest(resp.Payload.Result.Round.SpendableVtxos),
ClaimedBoardingUtxos: outpointsFromRest(resp.Payload.Result.Round.ClaimedBoardingUtxos),
},
}
} else if resp.Payload.Result.Redeem != nil {
eventsCh <- client.TransactionEvent{
Redeem: &client.RedeemTransaction{
Txid: resp.Payload.Result.Redeem.Txid,
SpentVtxos: outpointsFromRest(resp.Payload.Result.Redeem.SpentVtxos),
SpendableVtxos: vtxosFromRest(resp.Payload.Result.Redeem.SpendableVtxos),
},
}
resp := ark_service.ArkServiceGetTransactionsStreamOK{}
if err := json.Unmarshal(chunk, &resp); err != nil {
break
}
fmt.Printf("AAAAAAA %s\nBBBBB %+v\n", string(chunk), resp)

if resp.Payload == nil {
fmt.Println("SKIP!!!")
continue
}
if resp.Payload.Result.Round != nil {
eventsCh <- client.TransactionEvent{
Round: &client.RoundTransaction{
Txid: resp.Payload.Result.Round.Txid,
SpentVtxos: outpointsFromRest(resp.Payload.Result.Round.SpentVtxos),
SpendableVtxos: vtxosFromRest(resp.Payload.Result.Round.SpendableVtxos),
ClaimedBoardingUtxos: outpointsFromRest(resp.Payload.Result.Round.ClaimedBoardingUtxos),
},
}
} else if resp.Payload.Result.Redeem != nil {
eventsCh <- client.TransactionEvent{
Redeem: &client.RedeemTransaction{
Txid: resp.Payload.Result.Redeem.Txid,
SpentVtxos: outpointsFromRest(resp.Payload.Result.Redeem.SpentVtxos),
SpendableVtxos: vtxosFromRest(resp.Payload.Result.Redeem.SpendableVtxos),
},
}
}
}
}()

return eventsCh, cancel, nil
// go func() {
// ticker := time.NewTicker(1 * time.Second)
// defer close(eventsCh)
// defer ticker.Stop()

// for {
// select {
// case <-ctx.Done():
// return
// case <-ticker.C:
// resp, err := c.svc.ArkServiceGetTransactionsStream(ark_service.NewArkServiceGetTransactionsStreamParams())
// if err != nil {
// eventsCh <- client.TransactionEvent{Err: err}
// return
// }

// if resp.Payload.Result.Round != nil {
// eventsCh <- client.TransactionEvent{
// Round: &client.RoundTransaction{
// Txid: resp.Payload.Result.Round.Txid,
// SpentVtxos: outpointsFromRest(resp.Payload.Result.Round.SpentVtxos),
// SpendableVtxos: vtxosFromRest(resp.Payload.Result.Round.SpendableVtxos),
// ClaimedBoardingUtxos: outpointsFromRest(resp.Payload.Result.Round.ClaimedBoardingUtxos),
// },
// }
// } else if resp.Payload.Result.Redeem != nil {
// eventsCh <- client.TransactionEvent{
// Redeem: &client.RedeemTransaction{
// Txid: resp.Payload.Result.Redeem.Txid,
// SpentVtxos: outpointsFromRest(resp.Payload.Result.Redeem.SpentVtxos),
// SpendableVtxos: vtxosFromRest(resp.Payload.Result.Redeem.SpendableVtxos),
// },
// }
// }
// }
// }
// }()

return eventsCh, nil, nil
}

func outpointsFromRest(restOutpoints []*models.V1Outpoint) []client.Outpoint {
Expand Down
23 changes: 18 additions & 5 deletions pkg/client-sdk/covenantless_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (a *covenantlessArkClient) listenForTransactions(ctx context.Context) {
if !ok {
continue
}
fmt.Printf("RECEIVED EVENT %+v\n", event)

if event.Err != nil {
log.WithError(event.Err).Error("Error in transaction stream")
Expand All @@ -248,6 +249,7 @@ func (a *covenantlessArkClient) listenForTransactions(ctx context.Context) {
continue
}

fmt.Println("listenForTransactions")
if err := a.store.TransactionStore().
AddTransactions(ctx, newPendingBoardingTxs); err != nil {
log.WithError(err).Error("Failed to insert new boarding transactions")
Expand All @@ -262,7 +264,7 @@ func (a *covenantlessArkClient) listenForTransactions(ctx context.Context) {
}

func (a *covenantlessArkClient) listenForBoardingUtxos(ctx context.Context) {
ticker := time.NewTicker(2 * time.Second)
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()

for {
Expand All @@ -274,10 +276,12 @@ func (a *covenantlessArkClient) listenForBoardingUtxos(ctx context.Context) {
continue
}

if err := a.store.TransactionStore().
AddTransactions(ctx, newPendingBoardingTxs); err != nil {
log.WithError(err).Error("Failed to insert new boarding transactions")
continue
if len(newPendingBoardingTxs) > 0 {
if err := a.store.TransactionStore().
AddTransactions(ctx, newPendingBoardingTxs); err != nil {
log.WithError(err).Error("Failed to insert new boarding transactions")
continue
}
}
case <-ctx.Done():
return
Expand All @@ -292,6 +296,15 @@ func (a *covenantlessArkClient) getBoardingPendingTransactions(
if err != nil {
return nil, err
}
fmt.Println("OLD TXS", len(oldTxs))
for _, tx := range oldTxs {
fmt.Println("boarding txid", tx.BoardingTxid)
fmt.Println("amount", tx.Amount)
fmt.Println("type", tx.Type)
fmt.Println("settled", tx.Settled)
fmt.Println("createdAt", tx.CreatedAt)
fmt.Println("----------------")
}

boardingUtxos, err := a.getClaimableBoardingUtxos(ctx, nil)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/client-sdk/wasm/browser/config_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (s *configStore) AddData(ctx context.Context, data types.Config) error {
ExplorerURL: data.ExplorerURL,
ForfeitAddress: data.ForfeitAddress,
BoardingDescriptorTemplate: data.BoardingDescriptorTemplate,
WithTransactionFeed: strconv.FormatBool(data.WithTransactionFeed),
}
return s.writeData(sd)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/client-sdk/wasm/browser/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func init() {
js.Global().Set("getRoundLifetime", GetRoundLifetimeWrapper())
js.Global().Set("getUnilateralExitDelay", GetUnilateralExitDelayWrapper())
js.Global().Set("getDust", GetDustWrapper())
js.Global().Set("getTransactionStream", GetTransactionEventChannelWrapper())
}

func NewCovenantClient(
Expand Down
9 changes: 6 additions & 3 deletions pkg/client-sdk/wasm/browser/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@ import (
// TODO: support vtxo and transaction stores localstorage impls.
type localStorageStore struct {
configStore types.ConfigStore
txStore types.TransactionStore
}

func NewLocalStorageStore() types.Store {
configStore := NewConfigStore(js.Global().Get("localStorage"))
return &localStorageStore{configStore}
store := js.Global().Get("localStorage")
configStore := NewConfigStore(store)
txStore := NewTxStore(store)
return &localStorageStore{configStore, txStore}
}

func (s *localStorageStore) ConfigStore() types.ConfigStore {
Expand All @@ -28,7 +31,7 @@ func (s *localStorageStore) VtxoStore() types.VtxoStore {
}

func (s *localStorageStore) TransactionStore() types.TransactionStore {
return nil
return s.txStore
}

func (s *localStorageStore) Close() {}
Loading

0 comments on commit c404b7d

Please sign in to comment.