Skip to content

Commit

Permalink
fix: consumer for worker and some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
logicalangel committed Apr 6, 2024
1 parent 2538aba commit 3611fa3
Show file tree
Hide file tree
Showing 16 changed files with 146 additions and 71 deletions.
21 changes: 16 additions & 5 deletions internal/cmd/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
evmlogService "github.com/KenshiTech/unchained/service/evmlog"
uniswapService "github.com/KenshiTech/unchained/service/uniswap"
"github.com/KenshiTech/unchained/transport/client"
"github.com/KenshiTech/unchained/transport/client/conn"
"github.com/KenshiTech/unchained/transport/client/handler"
"github.com/KenshiTech/unchained/transport/server"
"github.com/KenshiTech/unchained/transport/server/gql"
Expand All @@ -25,6 +26,7 @@ var consumerCmd = &cobra.Command{

PreRun: func(cmd *cobra.Command, args []string) {
config.App.Network.BrokerURI = cmd.Flags().Lookup("broker").Value.String()
config.App.Network.Bind = cmd.Flags().Lookup("graphql").Value.String()
},

Run: func(cmd *cobra.Command, args []string) {
Expand All @@ -45,16 +47,18 @@ var consumerCmd = &cobra.Command{
pos := pos.New(ethRPC)
db.Start()

server.New(
gql.WithGraphQL(),
)

correctnessService := correctnessService.New(ethRPC)
evmLogService := evmlogService.New(ethRPC, pos)
uniswapService := uniswapService.New(ethRPC, pos)

handler := handler.New(correctnessService, uniswapService, evmLogService)
conn.Start()

handler := handler.NewConsumerHandler(correctnessService, uniswapService, evmLogService)
client.Consume(handler)

server.New(
gql.WithGraphQL(),
)
},
}

Expand All @@ -67,4 +71,11 @@ func init() {
"wss://shinobi.brokers.kenshi.io",
"Unchained broker to connect to",
)

consumerCmd.Flags().StringP(
"graphql",
"g",
"127.0.0.1:8080",
"The graphql server path to bind",
)
}
11 changes: 7 additions & 4 deletions internal/cmd/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/KenshiTech/unchained/persistence"
"github.com/KenshiTech/unchained/pos"
"github.com/KenshiTech/unchained/scheduler"
correctnessService "github.com/KenshiTech/unchained/service/correctness"
evmlogService "github.com/KenshiTech/unchained/service/evmlog"
uniswapService "github.com/KenshiTech/unchained/service/uniswap"
"github.com/KenshiTech/unchained/transport/client"
"github.com/KenshiTech/unchained/transport/client/conn"
"github.com/KenshiTech/unchained/transport/client/handler"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -45,17 +45,20 @@ var workerCmd = &cobra.Command{
pos := pos.New(ethRPC)
badger := persistence.New(contextPath)

correctnessService := correctnessService.New(ethRPC)
evmLogService := evmlogService.New(ethRPC, pos)
uniswapService := uniswapService.New(ethRPC, pos)

scheduler.New(
scheduler := scheduler.New(
scheduler.WithEthLogs(evmLogService, ethRPC, badger),
scheduler.WithUniswapEvents(uniswapService, ethRPC),
)

handler := handler.New(correctnessService, uniswapService, evmLogService)
conn.Start()

handler := handler.NewWorkerHandler()
client.Consume(handler)

scheduler.Start()
},
}

Expand Down
2 changes: 0 additions & 2 deletions internal/scheduler/logs/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ func (e *EvmLog) Run() {
return
}

log.Logger.With("Chain", e.chain).Info("Run evm log task")

for _, conf := range config.App.Plugins.EthLog.Events {
if conf.Chain != e.chain {
continue
Expand Down
2 changes: 2 additions & 0 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,6 @@ func (s *Scheduler) AddTask(duration time.Duration, task Task) {

func (s *Scheduler) Start() {
s.scheduler.Start()

select {}
}
2 changes: 0 additions & 2 deletions internal/scheduler/uniswap/uniswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ func (u *Uniswap) Run() {
return
}

log.Logger.With("Chain", u.chain).Info("Run Uniswap task")

currBlockNumber, err := u.uniswapService.GetBlockNumber(u.chain)
if err != nil {
log.Logger.Error(
Expand Down
20 changes: 14 additions & 6 deletions internal/service/uniswap/uniswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/KenshiTech/unchained/utils"

"github.com/KenshiTech/unchained/config"

Expand Down Expand Up @@ -567,11 +570,20 @@ func New(ethRPC *ethereum.Repository, pos *pos.Repository) *Service {
ethRPC: ethRPC,
pos: pos,

crossPrices: map[string]big.Int{},
crossTokens: map[string]datasets.TokenKey{},
consensus: nil,
signatureCache: nil,
aggregateCache: nil,
signatureMutex: sync.Mutex{},
LastBlock: *xsync.NewMapOf[datasets.TokenKey, uint64](),
SupportedTokens: map[datasets.TokenKey]bool{},
PriceCache: map[string]*lru.Cache[uint64, big.Int]{},
crossPrices: map[string]big.Int{},
crossTokens: map[string]datasets.TokenKey{},
}
DebouncedSaveSignatures = utils.Debounce[datasets.AssetKey, SaveSignatureArgs](5*time.Second, u.saveSignatures)
u.twoOneNineTwo.Exp(big.NewInt(2), big.NewInt(192), nil)
u.tenEighteen.Exp(big.NewInt(10), big.NewInt(18), nil)
u.tenEighteenF.SetInt(&u.tenEighteen)

if config.App.Plugins.Uniswap != nil {
for _, t := range config.App.Plugins.Uniswap.Tokens {
Expand All @@ -582,10 +594,6 @@ func New(ethRPC *ethereum.Repository, pos *pos.Repository) *Service {
}
}

u.twoOneNineTwo.Exp(big.NewInt(2), big.NewInt(192), nil)
u.tenEighteen.Exp(big.NewInt(10), big.NewInt(18), nil)
u.tenEighteenF.SetInt(&u.tenEighteen)

var err error
u.signatureCache, err = lru.New[bls12381.G1Affine, []datasets.Signature](evmlog.LruSize)
if err != nil {
Expand Down
55 changes: 30 additions & 25 deletions internal/transport/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,37 +7,42 @@ import (
"github.com/KenshiTech/unchained/transport/client/handler"
)

func Consume(handler *handler.Handler) {
conn.Start()

func Consume(handler handler.Handler) {
incoming := conn.Read()

for payload := range incoming {
switch opcodes.OpCode(payload[0]) {
case opcodes.Feedback:
log.Logger.
With("Feedback", string(payload[1:])).
Info("Broker")
go func() {
log.Logger.Info("Starting consumer from broker")

case opcodes.KoskChallenge:
challenge := handler.Challenge(payload[1:])
conn.Send(opcodes.KoskResult, challenge.Sia().Content)
for payload := range incoming {
switch opcodes.OpCode(payload[0]) {
case opcodes.Error:
log.Logger.
With("Error", string(payload[1:])).
Error("Broker")

case opcodes.PriceReportBroadcast:
go handler.PriceReport(payload[1:])
case opcodes.Feedback:
log.Logger.
With("Feedback", string(payload[1:])).
Warn("Broker")

case opcodes.EventLogBroadcast:
go handler.EventLog(payload[1:])
case opcodes.KoskChallenge:
challenge := handler.Challenge(payload[1:])
conn.Send(opcodes.KoskResult, challenge.Sia().Content)

case opcodes.CorrectnessReportBroadcast:
go handler.CorrectnessReport(payload[1:])
case opcodes.PriceReportBroadcast:
go handler.PriceReport(payload[1:])

default:
log.Logger.
With("Code", payload[0]).
Info("Unknown call code")
}
}
case opcodes.EventLogBroadcast:
go handler.EventLog(payload[1:])

log.Logger.Error("Client loop breaks")
case opcodes.CorrectnessReportBroadcast:
go handler.CorrectnessReport(payload[1:])

default:
log.Logger.
With("Code", payload[0]).
Info("Unknown call code")
}
}
}()
}
3 changes: 2 additions & 1 deletion internal/transport/client/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ func Start() {
}

Send(opcodes.Hello, bls.ClientSigner.Sia().Content)
Send(opcodes.RegisterConsumer, nil)
}

func Reconnect(err error) {
Expand Down Expand Up @@ -119,6 +118,8 @@ func Read() <-chan []byte {

continue
}

out <- payload
}
}()

Expand Down
11 changes: 10 additions & 1 deletion internal/transport/client/handler/challenge.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,16 @@ import (
sia "github.com/pouya-eghbali/go-sia/v2/pkg"
)

func (h *Handler) Challenge(message []byte) *kosk.Challenge {
func (h *consumer) Challenge(message []byte) *kosk.Challenge {
challenge := new(kosk.Challenge).DeSia(&sia.Sia{Content: message})

signature, _ := bls.Sign(*bls.ClientSecretKey, challenge.Random[:])
challenge.Signature = signature.Bytes()

return challenge
}

func (w worker) Challenge(message []byte) *kosk.Challenge {
challenge := new(kosk.Challenge).DeSia(&sia.Sia{Content: message})

signature, _ := bls.Sign(*bls.ClientSecretKey, challenge.Random[:])
Expand Down
29 changes: 29 additions & 0 deletions internal/transport/client/handler/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package handler

import (
"github.com/KenshiTech/unchained/constants/opcodes"
"github.com/KenshiTech/unchained/service/correctness"
"github.com/KenshiTech/unchained/service/evmlog"
"github.com/KenshiTech/unchained/service/uniswap"
"github.com/KenshiTech/unchained/transport/client/conn"
)

type consumer struct {
correctness *correctness.Service
uniswap *uniswap.Service
evmlog *evmlog.Service
}

func NewConsumerHandler(
correctness *correctness.Service,
uniswap *uniswap.Service,
evmlog *evmlog.Service,
) Handler {
conn.Send(opcodes.RegisterConsumer, nil)

return &consumer{
correctness: correctness,
uniswap: uniswap,
evmlog: evmlog,
}
}
4 changes: 3 additions & 1 deletion internal/transport/client/handler/correctness.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
sia "github.com/pouya-eghbali/go-sia/v2/pkg"
)

func (h *Handler) CorrectnessReport(message []byte) {
func (h *consumer) CorrectnessReport(message []byte) {
packet := new(datasets.BroadcastCorrectnessPacket).DeSia(&sia.Sia{Content: message})
toHash := packet.Info.Sia().Content
hash, err := bls.Hash(toHash)
Expand Down Expand Up @@ -38,3 +38,5 @@ func (h *Handler) CorrectnessReport(message []byte) {
true,
)
}

func (w worker) CorrectnessReport(message []byte) {}
4 changes: 3 additions & 1 deletion internal/transport/client/handler/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
sia "github.com/pouya-eghbali/go-sia/v2/pkg"
)

func (h *Handler) EventLog(message []byte) {
func (h *consumer) EventLog(message []byte) {
packet := new(datasets.BroadcastEventPacket).DeSia(&sia.Sia{Content: message})
toHash := packet.Info.Sia().Content
hash, err := bls.Hash(toHash)
Expand Down Expand Up @@ -39,3 +39,5 @@ func (h *Handler) EventLog(message []byte) {
false,
)
}

func (w worker) EventLog(message []byte) {}
25 changes: 6 additions & 19 deletions internal/transport/client/handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
package handler

import (
"github.com/KenshiTech/unchained/service/correctness"
"github.com/KenshiTech/unchained/service/evmlog"
"github.com/KenshiTech/unchained/service/uniswap"
"github.com/KenshiTech/unchained/crypto/kosk"
)

type Handler struct {
correctness *correctness.Service
uniswap *uniswap.Service
evmlog *evmlog.Service
}

func New(
correctness *correctness.Service,
uniswap *uniswap.Service,
evmlog *evmlog.Service,
) *Handler {
return &Handler{
correctness: correctness,
uniswap: uniswap,
evmlog: evmlog,
}
type Handler interface {
Challenge(message []byte) *kosk.Challenge
CorrectnessReport(message []byte)
EventLog(message []byte)
PriceReport(message []byte)
}
4 changes: 3 additions & 1 deletion internal/transport/client/handler/price.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
sia "github.com/pouya-eghbali/go-sia/v2/pkg"
)

func (h *Handler) PriceReport(message []byte) {
func (h *consumer) PriceReport(message []byte) {
packet := new(datasets.BroadcastPricePacket).DeSia(&sia.Sia{Content: message})
toHash := packet.Info.Sia().Content
hash, err := bls.Hash(toHash)
Expand Down Expand Up @@ -39,3 +39,5 @@ func (h *Handler) PriceReport(message []byte) {
false,
)
}

func (w worker) PriceReport(message []byte) {}
8 changes: 8 additions & 0 deletions internal/transport/client/handler/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package handler

type worker struct {
}

func NewWorkerHandler() Handler {
return &worker{}
}
Loading

0 comments on commit 3611fa3

Please sign in to comment.