Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement consenus test go #69

Merged
merged 12 commits into from
Oct 15, 2024
3 changes: 1 addition & 2 deletions common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ func (e BlockNotFoundError) Error() string {
return fmt.Sprintf("block not available: %s", e.Block)
}


// need to confirm how such primitive types will be imported
type hash [32]byte;
type hash [32]byte

type SlotNotFoundError struct {
slot hash
Expand Down
41 changes: 21 additions & 20 deletions config/base_test.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package config

import (
"testing"
)
func TestCorrectDefaultBaseConfig(t *testing.T) {
baseConfig := BaseConfig{}

baseConfig = baseConfig.Default()

if baseConfig.RpcBindIp != "127.0.0.1" {
t.Errorf("Expected RpcBindIP to be %s, but got %s", "127.0.0.1", baseConfig.RpcBindIp)
}
if baseConfig.RpcPort != 0 {
t.Errorf("Expected RpcPort to be %v, but got %v", 0, baseConfig.RpcPort)
}
if baseConfig.ConsensusRpc != nil {
t.Errorf("Expected ConsensusRpc to be %v, but got %v", nil, baseConfig.ConsensusRpc)
}
}
package config

import (
"testing"
)

func TestCorrectDefaultBaseConfig(t *testing.T) {
baseConfig := BaseConfig{}

baseConfig = baseConfig.Default()

if baseConfig.RpcBindIp != "127.0.0.1" {
t.Errorf("Expected RpcBindIP to be %s, but got %s", "127.0.0.1", baseConfig.RpcBindIp)
}
if baseConfig.RpcPort != 0 {
t.Errorf("Expected RpcPort to be %v, but got %v", 0, baseConfig.RpcPort)
}
if baseConfig.ConsensusRpc != nil {
t.Errorf("Expected ConsensusRpc to be %v, but got %v", nil, baseConfig.ConsensusRpc)
}
}
165 changes: 82 additions & 83 deletions config/checkpoints/checkpoints.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package checkpoints

import (
"context"
"encoding/json"
"fmt"
"github.com/BlocSoc-iitr/selene/config"
"github.com/avast/retry-go"
"gopkg.in/yaml.v2"
"io"
"log"
"net/http"
"strconv"
"strings"
"sync"
"context"
"time"
"log"
"github.com/BlocSoc-iitr/selene/config"
"github.com/avast/retry-go"
"gopkg.in/yaml.v2"
)

// / The location where the list of checkpoint services are stored.
Expand Down Expand Up @@ -186,10 +186,10 @@ func (ch CheckpointFallback) Build() (CheckpointFallback, error) {
return ch, fmt.Errorf("expected a map for service in network %s", network)
}

endpoint, _ := serviceMap["endpoint"].(string) // Handle potential nil
name, _ := serviceMap["name"].(string) // Handle potential nil
state, _ := serviceMap["state"].(bool) // Handle potential nil
verification, _ := serviceMap["verification"].(bool) // Handle potential nil
endpoint, _ := serviceMap["endpoint"].(string) // Handle potential nil
name, _ := serviceMap["name"].(string) // Handle potential nil
state, _ := serviceMap["state"].(bool) // Handle potential nil
verification, _ := serviceMap["verification"].(bool) // Handle potential nil

// Check contacts and notes
var contacts *yaml.MapSlice
Expand All @@ -206,17 +206,17 @@ func (ch CheckpointFallback) Build() (CheckpointFallback, error) {
if !ok {
return ch, fmt.Errorf("expected a map for health in service %s", name)
}
healthResult, _ := healthRaw["result"].(bool) // Handle potential nil
healthDate, _ := healthRaw["date"].(string) // Handle potential nil
healthResult, _ := healthRaw["result"].(bool) // Handle potential nil
healthDate, _ := healthRaw["date"].(string) // Handle potential nil

ch.Services[network] = append(ch.Services[network], CheckpointFallbackService{
Endpoint: endpoint,
Name: name,
State: state,
Verification: verification,
Contacts: contacts,
Notes: notes,
Health_from_fallback: &Health{
Endpoint: endpoint,
Name: name,
State: state,
Verification: verification,
Contacts: contacts,
Notes: notes,
Health_from_fallback: &Health{
Result: healthResult,
Date: healthDate,
},
Expand All @@ -227,7 +227,6 @@ func (ch CheckpointFallback) Build() (CheckpointFallback, error) {
return ch, nil
}


// fetch the latest checkpoint from the given network
func (ch CheckpointFallback) FetchLatestCheckpoint(network config.Network) byte256 {
services := ch.GetHealthyFallbackServices(network)
Expand Down Expand Up @@ -258,73 +257,73 @@ func (ch CheckpointFallback) QueryService(endpoint string) (*RawSlotResponse, er

// fetch the latest checkpoint from the given services
func (ch CheckpointFallback) FetchLatestCheckpointFromServices(services []CheckpointFallbackService) (byte256, error) {
var (
slots []Slot
wg sync.WaitGroup
slotChan = make(chan Slot, len(services)) // Buffered channel
errorsChan = make(chan error, len(services))
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

for _, service := range services {
wg.Add(1)
go func(service CheckpointFallbackService) {
defer wg.Done()
raw, err := ch.QueryService(service.Endpoint)
if err != nil {
errorsChan <- fmt.Errorf("failed to fetch checkpoint from service %s: %w", service.Endpoint, err)
return
}
if len(raw.Data.Slots) > 0 {
slotChan <- raw.Data.Slots[0] // Send the first valid slot
}
}(service)
}

go func() {
wg.Wait()
close(slotChan)
close(errorsChan)
}()

for {
select {
case slot, ok := <-slotChan:
if !ok {
// Channel closed, all slots processed
if len(slots) == 0 {
return byte256{}, fmt.Errorf("failed to find max epoch from checkpoint slots")
}
return processSlots(slots)
}
slots = append(slots, slot)
case err := <-errorsChan:
if err != nil {
log.Printf("Error fetching checkpoint: %v", err) // Log only if the error is not nil.
}
case <-ctx.Done():
if len(slots) == 0 {
return byte256{}, ctx.Err()
}
return processSlots(slots)
}
}
var (
slots []Slot
wg sync.WaitGroup
slotChan = make(chan Slot, len(services)) // Buffered channel
errorsChan = make(chan error, len(services))
)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

for _, service := range services {
wg.Add(1)
go func(service CheckpointFallbackService) {
defer wg.Done()
raw, err := ch.QueryService(service.Endpoint)
if err != nil {
errorsChan <- fmt.Errorf("failed to fetch checkpoint from service %s: %w", service.Endpoint, err)
return
}
if len(raw.Data.Slots) > 0 {
slotChan <- raw.Data.Slots[0] // Send the first valid slot
}
}(service)
}

go func() {
wg.Wait()
close(slotChan)
close(errorsChan)
}()

for {
select {
case slot, ok := <-slotChan:
if !ok {
// Channel closed, all slots processed
if len(slots) == 0 {
return byte256{}, fmt.Errorf("failed to find max epoch from checkpoint slots")
}
return processSlots(slots)
}
slots = append(slots, slot)
case err := <-errorsChan:
if err != nil {
log.Printf("Error fetching checkpoint: %v", err) // Log only if the error is not nil.
}
case <-ctx.Done():
if len(slots) == 0 {
return byte256{}, ctx.Err()
}
return processSlots(slots)
}
}
}

func processSlots(slots []Slot) (byte256, error) {
maxEpochSlot := slots[0]
for _, slot := range slots {
if slot.Epoch > maxEpochSlot.Epoch {
maxEpochSlot = slot
}
}

if maxEpochSlot.Block_root == nil {
return byte256{}, fmt.Errorf("no valid block root found")
}

return *maxEpochSlot.Block_root, nil
maxEpochSlot := slots[0]
for _, slot := range slots {
if slot.Epoch > maxEpochSlot.Epoch {
maxEpochSlot = slot
}
}

if maxEpochSlot.Block_root == nil {
return byte256{}, fmt.Errorf("no valid block root found")
}

return *maxEpochSlot.Block_root, nil
}

func (ch CheckpointFallback) FetchLatestCheckpointFromApi(url string) (byte256, error) {
Expand Down
6 changes: 2 additions & 4 deletions config/checkpoints/checkpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package checkpoints
import (
"bytes"
"encoding/json"
"github.com/BlocSoc-iitr/selene/config"
"io"
"net/http"
"net/http/httptest"
"testing"
"io"
"github.com/BlocSoc-iitr/selene/config"
)

type CustomTransport struct {
Expand Down Expand Up @@ -289,7 +289,6 @@ func TestGetHealthyFallbackServices(t *testing.T) {
}
}


func equalNetworks(a, b []config.Network) bool {
if len(a) != len(b) {
return false
Expand Down Expand Up @@ -317,4 +316,3 @@ func equalStringSlices(a, b []string) bool {
}
return true
}

84 changes: 43 additions & 41 deletions config/cli.go
Original file line number Diff line number Diff line change
@@ -1,50 +1,52 @@
package config

import (
"encoding/hex"
"encoding/hex"
)

// The format of configuration to be stored in the configuratin file is map[string]interface{}
type CliConfig struct {
ExecutionRpc *string `mapstructure:"execution_rpc"`
ConsensusRpc *string `mapstructure:"consensus_rpc"`
Checkpoint *[]byte `mapstructure:"checkpoint"`
RpcBindIp *string `mapstructure:"rpc_bind_ip"`
RpcPort *uint16 `mapstructure:"rpc_port"`
DataDir *string `mapstructure:"data_dir"`
Fallback *string `mapstructure:"fallback"`
LoadExternalFallback *bool `mapstructure:"load_external_fallback"`
StrictCheckpointAge *bool `mapstructure:"strict_checkpoint_age"`
ExecutionRpc *string `mapstructure:"execution_rpc"`
ConsensusRpc *string `mapstructure:"consensus_rpc"`
Checkpoint *[]byte `mapstructure:"checkpoint"`
RpcBindIp *string `mapstructure:"rpc_bind_ip"`
RpcPort *uint16 `mapstructure:"rpc_port"`
DataDir *string `mapstructure:"data_dir"`
Fallback *string `mapstructure:"fallback"`
LoadExternalFallback *bool `mapstructure:"load_external_fallback"`
StrictCheckpointAge *bool `mapstructure:"strict_checkpoint_age"`
}

func (cfg *CliConfig) as_provider() map[string]interface{} {
// Create a map to hold the configuration data
userDict := make(map[string]interface{})
// Populate the map with values from the CliConfig struct
if cfg.ExecutionRpc != nil {
userDict["execution_rpc"] = *cfg.ExecutionRpc
}
if cfg.ConsensusRpc != nil {
userDict["consensus_rpc"] = *cfg.ConsensusRpc
}
if cfg.Checkpoint != nil {
userDict["checkpoint"] = hex.EncodeToString(*cfg.Checkpoint)
}
if cfg.RpcBindIp != nil {
userDict["rpc_bind_ip"] = *cfg.RpcBindIp
}
if cfg.RpcPort != nil {
userDict["rpc_port"] = *cfg.RpcPort
}
if cfg.DataDir != nil {
userDict["data_dir"] = *cfg.DataDir
}
if cfg.Fallback != nil {
userDict["fallback"] = *cfg.Fallback
}
if cfg.LoadExternalFallback != nil {
userDict["load_external_fallback"] = *cfg.LoadExternalFallback
}
if cfg.StrictCheckpointAge != nil {
userDict["strict_checkpoint_age"] = *cfg.StrictCheckpointAge
}
return userDict
// Create a map to hold the configuration data
userDict := make(map[string]interface{})
// Populate the map with values from the CliConfig struct
if cfg.ExecutionRpc != nil {
userDict["execution_rpc"] = *cfg.ExecutionRpc
}
if cfg.ConsensusRpc != nil {
userDict["consensus_rpc"] = *cfg.ConsensusRpc
}
if cfg.Checkpoint != nil {
userDict["checkpoint"] = hex.EncodeToString(*cfg.Checkpoint)
}
if cfg.RpcBindIp != nil {
userDict["rpc_bind_ip"] = *cfg.RpcBindIp
}
if cfg.RpcPort != nil {
userDict["rpc_port"] = *cfg.RpcPort
}
if cfg.DataDir != nil {
userDict["data_dir"] = *cfg.DataDir
}
if cfg.Fallback != nil {
userDict["fallback"] = *cfg.Fallback
}
if cfg.LoadExternalFallback != nil {
userDict["load_external_fallback"] = *cfg.LoadExternalFallback
}
if cfg.StrictCheckpointAge != nil {
userDict["strict_checkpoint_age"] = *cfg.StrictCheckpointAge
}
return userDict
}
Loading
Loading