Skip to content

Commit

Permalink
Mostly: Gateway fixes and timeouts (#9)
Browse files Browse the repository at this point in the history
* add delay on processing.

* add sampling delay flag.

* fix reading binary data from gateways.

* make fields private for listener and sampler

* make fields private for pinner structs

* cancel if other calls if there is a success
  • Loading branch information
tarassh authored Aug 24, 2024
1 parent 5dedc65 commit 202fb79
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 112 deletions.
18 changes: 10 additions & 8 deletions cmd/light-client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@ import (
)

var (
loglevel string
rpcURL string
contract string
ipfsAddr string
gcpTopicId string
gcpCredsFile string
clientId string
loglevel string
rpcURL string
contract string
ipfsAddr string
gcpTopicId string
gcpCredsFile string
clientId string
samplingDelay uint
)

var greeting = `
Expand Down Expand Up @@ -75,6 +76,7 @@ func init() {
rootCmd.PersistentFlags().StringVar(&gcpTopicId, "topic-id", "", "Topic name of Pub Sub")
rootCmd.PersistentFlags().StringVar(&gcpCredsFile, "gcp-creds-file", "", "Path of GCP credential json file")
rootCmd.PersistentFlags().StringVar(&clientId, "client-id", "", "arbitrary client ID, used to identify the client")
rootCmd.PersistentFlags().UintVar(&samplingDelay, "sampling-delay", 120, "Delay between sampling process and the receiving of the event")

rootCmd.MarkPersistentFlagRequired("rpc-url")
rootCmd.MarkPersistentFlagRequired("contract")
Expand All @@ -98,7 +100,7 @@ func startClient() {
log.Fatalf("Failed to create publisher: %v", err)
}

sampler, err := sampler.NewSampler(ipfsAddr, pub)
sampler, err := sampler.NewSampler(ipfsAddr, samplingDelay, pub)
if err != nil {
log.Fatalf("Failed to initialize IPFS sampler: %v", err)
}
Expand Down
38 changes: 19 additions & 19 deletions internal/light-client/event-listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ var log = logging.Logger("light-client")

// EventListener listens for events emitted by the contract
type EventListener struct {
Client *ethclient.Client
ContractAddress common.Address
ContractInstance *contract.Contract
Logs chan types.Log
Subscription ethereum.Subscription
Sampler *sampler.Sampler
client *ethclient.Client
contractAddress common.Address
contractInstance *contract.Contract
logs chan types.Log
subscription ethereum.Subscription
sampler *sampler.Sampler
}

// NewEventListener creates a new EventListener instance
Expand All @@ -34,11 +34,11 @@ func NewEventListener(clientURL, contractAddressHex string, sampler *sampler.Sam
contractInstance := loadContract(client, contractAddress)

return &EventListener{
Client: client,
ContractAddress: contractAddress,
ContractInstance: contractInstance,
Logs: make(chan types.Log),
Sampler: sampler,
client: client,
contractAddress: contractAddress,
contractInstance: contractInstance,
logs: make(chan types.Log),
sampler: sampler,
}
}

Expand All @@ -63,31 +63,31 @@ func loadContract(client *ethclient.Client, address common.Address) *contract.Co
// SubscribeToLogs subscribes to logs emitted by the contract
func (el *EventListener) SubscribeToLogs(ctx context.Context) {
query := ethereum.FilterQuery{
Addresses: []common.Address{el.ContractAddress},
Addresses: []common.Address{el.contractAddress},
}

sub, err := el.Client.SubscribeFilterLogs(ctx, query, el.Logs)
sub, err := el.client.SubscribeFilterLogs(ctx, query, el.logs)
if err != nil {
log.Fatalf("Failed to subscribe to logs: %v", err)
}

el.Subscription = sub
el.subscription = sub

go func() {
for err := range sub.Err() {
log.Fatalf("Subscription error: %v", err)
}
}()

log.Infof("Subscribed to logs for contract: %v", el.ContractAddress.Hex())
log.Infof("Subscribed to logs for contract: %v", el.contractAddress.Hex())
}

// ProcessLogs processes the logs emitted by the contract
func (el *EventListener) ProcessLogs() {
for vLog := range el.Logs {
log.Debugf("Log: %v", vLog)
for vLog := range el.logs {
log.Debugf("Log Event: %v", vLog.Topics)

event, err := el.ContractInstance.ParseBlockSpecimenProductionProofSubmitted(vLog)
event, err := el.contractInstance.ParseBlockSpecimenProductionProofSubmitted(vLog)
if err != nil {
if err.Error() == "event signature mismatch" {
log.Debug("Event signature mismatch")
Expand All @@ -107,6 +107,6 @@ func (el *EventListener) ProcessLogs() {
continue
}

el.Sampler.ProcessEvent(parsedURL.Host)
el.sampler.ProcessEvent(parsedURL.Host)
}
}
94 changes: 68 additions & 26 deletions internal/light-client/sampler/sampler.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sampler

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
Expand All @@ -15,9 +16,11 @@ import (

verifier "github.com/covalenthq/das-ipfs-pinner/internal/light-client/c-kzg-verifier"
publisher "github.com/covalenthq/das-ipfs-pinner/internal/light-client/publisher"
ipldencoder "github.com/covalenthq/das-ipfs-pinner/internal/pinner/ipld-encoder"
"github.com/ipfs/go-cid"
ipfs "github.com/ipfs/go-ipfs-api"
logging "github.com/ipfs/go-log/v2"
"github.com/ipld/go-ipld-prime/codec/dagjson"
)

var log = logging.Logger("light-client")
Expand All @@ -30,9 +33,10 @@ var DefaultGateways = []string{

// Sampler is a struct that samples data from IPFS and verifies it.
type Sampler struct {
IPFSShell *ipfs.Shell
Gateways []string
pub *publisher.Publisher
ipfsShell *ipfs.Shell
gateways []string
pub *publisher.Publisher
samplingDelay uint
}

// Link represents a link to another CID in IPFS.
Expand Down Expand Up @@ -65,6 +69,11 @@ type DataMap struct {
Proof InnerMap `json:"proof"`
}

type errorContext struct {
Err error
Context string
}

// UnmarshalJSON handles base64 decoding directly into the Bytes field.
func (n *NestedBytes) UnmarshalJSON(data []byte) error {
var aux struct {
Expand All @@ -87,29 +96,39 @@ func (n *NestedBytes) UnmarshalJSON(data []byte) error {
}

// NewSampler creates a new Sampler instance and checks the connection to the IPFS daemon.
func NewSampler(ipfsAddr string, pub *publisher.Publisher) (*Sampler, error) {
func NewSampler(ipfsAddr string, samplingDelay uint, pub *publisher.Publisher) (*Sampler, error) {
shell := ipfs.NewShell(ipfsAddr)

if _, _, err := shell.Version(); err != nil {
return nil, fmt.Errorf("failed to connect to IPFS daemon: %w", err)
}

return &Sampler{
IPFSShell: shell,
Gateways: DefaultGateways,
pub: pub,
ipfsShell: shell,
gateways: DefaultGateways,
pub: pub,
samplingDelay: samplingDelay,
}, nil
}

// ProcessEvent handles events asynchronously by processing the provided CID.
func (s *Sampler) ProcessEvent(cidStr string) {
go func(cidStr string) {
_, err := cid.Decode(cidStr)
rawCid, err := cid.Decode(cidStr)
if err != nil {
log.Errorf("Invalid CID: %v", err)
return
}

if rawCid.Prefix().Codec != cid.DagCBOR {
log.Debugf("Unsupported CID codec: %v. Skipping", rawCid.Prefix().Codec)
return
}

log.Debugf("Processing event for CID [%s] is defered for %d min", cidStr, s.samplingDelay/60)
time.Sleep(time.Duration(s.samplingDelay) * time.Second)
log.Debugf("Processing event for CID [%s] ...", cidStr)

var rootNode RootNode
if err := s.GetData(cidStr, &rootNode); err != nil {
log.Errorf("Failed to fetch root DAG data: %v", err)
Expand Down Expand Up @@ -155,16 +174,17 @@ func (s *Sampler) GetData(cidStr string, data interface{}) error {
}

resultChan := make(chan interface{})
errorChan := make(chan error)
errorChan := make(chan errorContext)

// Define a context with timeout to prevent hanging
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

// Start a goroutine to get data from the IPFS node
go func() {
if err := s.IPFSShell.DagGet(cid.String(), &data); err != nil {
errorChan <- err
log.Debugf("Getting data from IPFS node: %s", cid.String())
if err := s.ipfsShell.DagGet(cid.String(), &data); err != nil {
errorChan <- errorContext{Err: err, Context: "IPFS node"}
return
}
select {
Expand All @@ -174,42 +194,57 @@ func (s *Sampler) GetData(cidStr string, data interface{}) error {
}()

// Start goroutines to get data from each public gateway
for _, gateway := range s.Gateways {
for _, gateway := range s.gateways {
go func(gateway string) {
gatewayData, err := s.getDataFromGateway(gateway, cid.String())
gatewayData, err := s.getDataFromGateway(ctx, gateway, cid.String())
if err != nil {
errorChan <- errorContext{Err: err, Context: gateway}
return
}

// Decode the data into an IPLD node from CBOR
node, err := ipldencoder.DecodeNode(gatewayData)
if err != nil {
errorChan <- err
errorChan <- errorContext{Err: err, Context: gateway}
return
}

// Encode the IPLD node into JSON
var jsonData bytes.Buffer
if err := dagjson.Encode(node, &jsonData); err != nil {
errorChan <- errorContext{Err: err, Context: gateway}
return
}

// Unmarshal JSON data into the provided data interface
if err := json.Unmarshal(gatewayData, &data); err != nil {
errorChan <- err
if err := json.Unmarshal(jsonData.Bytes(), data); err != nil {
errorChan <- errorContext{Err: err, Context: gateway}
return
}

select {
case resultChan <- data:
cancel()
case <-ctx.Done():
}

// populate ipfs node with data
// TODO: deduce the correct format from the CID
storedCid, err := s.IPFSShell.DagPut(data, "dag-cbor", "dag-cbor")
storedCid, err := s.ipfsShell.DagPut(data, "dag-cbor", "dag-cbor")
if err != nil {
errorChan <- err
errorChan <- errorContext{Err: err, Context: "IPFS node"}
}

if storedCid != cid.String() {
errorChan <- fmt.Errorf("IPFS node returned different CID: %s", storedCid)
errorChan <- errorContext{Err: fmt.Errorf("IPFS node returned different CID: %s", storedCid), Context: "Result CID"}
}
}(gateway)
}

// Wait for the first successful response or all errors
var finalError error
successCount := 0
totalCount := len(s.Gateways) + 1 // +1 for the IPFS node
totalCount := len(s.gateways) + 1 // +1 for the IPFS node

for i := 0; i < totalCount; i++ {
select {
Expand All @@ -227,8 +262,9 @@ func (s *Sampler) GetData(cidStr string, data interface{}) error {
return nil
}

case err := <-errorChan:
finalError = err
case errCxt := <-errorChan:
log.Debugf("Error getting data from %s: %v", errCxt.Context, errCxt.Err)
finalError = errCxt.Err
}
}

Expand All @@ -240,11 +276,11 @@ func (s *Sampler) GetData(cidStr string, data interface{}) error {
return nil
}

func (s *Sampler) getDataFromGateway(gateway, cid string) ([]byte, error) {
func (s *Sampler) getDataFromGateway(ctx context.Context, gateway, cid string) ([]byte, error) {
// Parse the base gateway URL
baseURL, err := url.Parse(gateway)
if err != nil {
return nil, fmt.Errorf("invalid gateway URL: %v", err)
return nil, err
}

// Append the IPFS path and CID
Expand All @@ -255,8 +291,14 @@ func (s *Sampler) getDataFromGateway(gateway, cid string) ([]byte, error) {
query.Set("format", "raw")
baseURL.RawQuery = query.Encode()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, baseURL.String(), nil)
if err != nil {
return nil, err
}

// Perform the HTTP GET request
resp, err := http.Get(baseURL.String())
log.Debugf("Getting data from gateway: %s", baseURL.String())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 202fb79

Please sign in to comment.