Skip to content

Commit

Permalink
feat: create receiver command (#604)
Browse files Browse the repository at this point in the history
* build of indexer

* create receiver

* indexer

* fix build

* mody tidy
  • Loading branch information
robdefeo authored Jul 11, 2020
1 parent f430c52 commit e3ff1f7
Show file tree
Hide file tree
Showing 36 changed files with 1,002 additions and 36 deletions.
29 changes: 29 additions & 0 deletions .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ builds:
- linux
goarch:
- amd64
- id: receiver-linux-amd64
ldflags: -s -w -X github.com/mailchain/mailchain.Version={{.Version}} -X github.com/mailchain/mailchain.BuildDate={{.Date}} -X github.com/mailchain/mailchain.CommitHash={{ .ShortCommit }}
binary: receiver
env:
- CGO_ENABLED=0
main: ./cmd/receiver/main.go
goos:
- linux
goarch:
- amd64
- id: indexer-linux-amd64
ldflags: -s -w -X github.com/mailchain/mailchain.Version={{.Version}} -X github.com/mailchain/mailchain.BuildDate={{.Date}} -X github.com/mailchain/mailchain.CommitHash={{ .ShortCommit }}
binary: indexer
Expand Down Expand Up @@ -120,3 +130,22 @@ dockers:
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=repository=https://github.com/mailchain/mailchain"
- "--label=homepage=https://mailchain.xyz"
-
binaries: # Name templates of the built binaries that should be used.
- receiver
goos: linux # GOOS of the built binary that should be used.
goarch: amd64 # GOARCH of the built binary that should be used.
dockerfile: ./cmd/receiver/Dockerfile

image_templates:
- "mailchain/receiver:latest"
- "mailchain/receiver:{{.Version}}"

build_flag_templates:
- "--pull"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.name={{.ProjectName}}/receiver"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=repository=https://github.com/mailchain/mailchain"
- "--label=homepage=https://mailchain.xyz"
9 changes: 2 additions & 7 deletions cmd/indexer/commands/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package commands

import (
"context"
"fmt"

"github.com/ethereum/go-ethereum/params"
"github.com/jmoiron/sqlx"
Expand Down Expand Up @@ -57,13 +56,9 @@ func ethereumCmd() *cobra.Command {
return err
}

for {
err := seqProcessor.NextBlock(context.Background())
doSequential(cmd, seqProcessor)

if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "%+v", err)
}
}
return nil
},
}

Expand Down
1 change: 1 addition & 0 deletions cmd/indexer/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ func rootCmd() *cobra.Command {
}

cmd.AddCommand(ethereumCmd())
cmd.AddCommand(substrateCmd())
cmd.AddCommand(databaseCmd())

cmd.PersistentFlags().String("postgres-host", "localhost", "Postgres server host")
Expand Down
19 changes: 19 additions & 0 deletions cmd/indexer/commands/sequenial.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package commands

import (
"context"
"fmt"

"github.com/mailchain/mailchain/cmd/indexer/internal/processor"
"github.com/spf13/cobra"
)

func doSequential(cmd *cobra.Command, p *processor.Sequential) {
for {
err := p.NextBlock(context.Background())

if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(), "%+v", err)
}
}
}
117 changes: 117 additions & 0 deletions cmd/indexer/commands/substrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package commands

import (
"context"

"github.com/jmoiron/sqlx"
"github.com/mailchain/mailchain/cmd/indexer/internal/processor"
sub "github.com/mailchain/mailchain/cmd/indexer/internal/substrate"
"github.com/mailchain/mailchain/cmd/internal/datastore/os"
"github.com/mailchain/mailchain/cmd/internal/datastore/pq"
"github.com/mailchain/mailchain/internal/protocols"
"github.com/mailchain/mailchain/internal/protocols/substrate"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

func substrateCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "substrate",
Short: "run substrate sequential processor",
TraverseChildren: true,
RunE: func(cmd *cobra.Command, args []string) error {
network, _ := cmd.Flags().GetString("network")
protocol, _ := cmd.Flags().GetString("protocol")
blockNumber, _ := cmd.Flags().GetUint64("start-block")

addressRPC, _ := cmd.Flags().GetString("rpc-address")
if addressRPC == "" {
return errors.Errorf("rpc-address must not be empty")
}

rawStorePath, _ := cmd.Flags().GetString("raw-store-path")

connIndexer, err := newPostgresConnection(cmd, "indexer")
if err != nil {
return err
}

connPublicKey, err := newPostgresConnection(cmd, "pubkey")
if err != nil {
return err
}

connEnvelope, err := newPostgresConnection(cmd, "envelope")
if err != nil {
return err
}

defer connIndexer.Close()
defer connPublicKey.Close()
defer connEnvelope.Close()

seqProcessor, err := createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope, blockNumber, protocol, network, rawStorePath, addressRPC)
if err != nil {
return err
}

doSequential(cmd, seqProcessor)

return nil
},
}

cmd.Flags().Uint64("start-block", 0, "Block number from which the indexer will start")
cmd.Flags().String("protocol", protocols.Substrate, "Protocol to run against")
cmd.Flags().String("network", substrate.EdgewareMainnet, "Network to run against")
cmd.Flags().String("rpc-address", "", "Substrate RPC-JSON address")

return cmd
}

func createSubstrateProcessor(connIndexer, connPublicKey, connEnvelope *sqlx.DB, blockNumber uint64, protocol, network, rawStorePath, addressRPC string) (*processor.Sequential, error) {
ctx := context.Background()

subClient, err := sub.NewRPC(addressRPC)
if err != nil {
return nil, err
}

syncStore, err := pq.NewSyncStore(connIndexer)
if err != nil {
return nil, err
}

pubKeyStore, err := pq.NewPublicKeyStore(connPublicKey)
if err != nil {
return nil, err
}

transactionStore, err := pq.NewTransactionStore(connEnvelope)
if err != nil {
return nil, err
}

rawStore, err := os.NewRawTransactionStore(rawStorePath)
if err != nil {
return nil, err
}

processorTransaction := sub.NewExtrinsicProcessor(
transactionStore,
rawStore,
pubKeyStore,
)

if err := syncStore.PutBlockNumber(ctx, protocol, network, blockNumber); err != nil {
return nil, err
}

return processor.NewSequential(
protocols.Substrate,
network,
syncStore,
sub.NewBlockProcessor(processorTransaction),
subClient,
), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,28 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
)

func NewRPC(address string) (*Client, error) {
func NewRPC(address string) (*BlockClient, error) {
client, err := ethclient.Dial(address)
if err != nil {
return nil, err
}

return &Client{client: client}, nil
return &BlockClient{client: client}, nil
}

type Client struct {
type BlockClient struct {
client *ethclient.Client
}

func (c *Client) BlockByNumber(ctx context.Context, blockNo uint64) (blk interface{}, err error) {
func (c *BlockClient) BlockByNumber(ctx context.Context, blockNo uint64) (blk interface{}, err error) {
return c.client.BlockByNumber(ctx, big.NewInt(int64(blockNo)))
}

func (c *Client) NetworkID(ctx context.Context) (*big.Int, error) {
func (c *BlockClient) NetworkID(ctx context.Context) (*big.Int, error) {
return c.client.NetworkID(ctx)
}

func (c *Client) LatestBlockNumber(ctx context.Context) (blockNo uint64, err error) {
func (c *BlockClient) LatestBlockNumber(ctx context.Context) (blockNo uint64, err error) {
hdr, err := c.client.HeaderByNumber(ctx, nil)
if err != nil {
return 0, err
Expand All @@ -37,6 +37,6 @@ func (c *Client) LatestBlockNumber(ctx context.Context) (blockNo uint64, err err
return hdr.Number.Uint64(), nil
}

func (c *Client) GetLatest(ctx context.Context) (blk interface{}, err error) {
func (c *BlockClient) GetLatest(ctx context.Context) (blk interface{}, err error) {
return c.client.BlockByNumber(ctx, nil)
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestClient_BlockByNumber(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
c := &BlockClient{
client: tt.fields.client,
}
gotBlk, err := c.BlockByNumber(tt.args.ctx, tt.args.blockNo)
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestClient_NetworkID(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
c := &BlockClient{
client: tt.fields.client,
}
got, err := c.NetworkID(tt.args.ctx)
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestClient_LatestBlockNumber(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
c := &BlockClient{
client: tt.fields.client,
}
gotNo, err := c.LatestBlockNumber(tt.args.ctx)
Expand Down Expand Up @@ -232,7 +232,7 @@ func TestClient_GetLatest(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
c := &BlockClient{
client: tt.fields.client,
}
gotBlk, err := c.GetLatest(tt.args.ctx)
Expand Down
10 changes: 7 additions & 3 deletions cmd/indexer/internal/substrate/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@ func NewBlockProcessor(tx actions.Transaction) *Block {
func (b *Block) Run(ctx context.Context, protocol, network string, blk interface{}) error {
sbrtBlk, ok := blk.(*types.Block)
if !ok {
return errors.New("tx must be go-substrate-rpc-client/types.Block")
return errors.Errorf("tx %T must be go-substrate-rpc-client/types.Block", blk)
}

fmt.Println("block hash: ", sbrtBlk.Header.Digest)
fmt.Println("block hash: ", sbrtBlk.Header.ParentHash.Hex())

txs := sbrtBlk.Extrinsics
for i := range txs {
if err := b.txProcessor.Run(ctx, protocol, network, txs[i], &TxOptions{Block: sbrtBlk}); err != nil {
if !txs[i].Signature.Signer.IsAccountID {
continue
}

if err := b.txProcessor.Run(ctx, protocol, network, &txs[i], &TxOptions{Block: sbrtBlk}); err != nil {
return errors.Wrapf(err, "fails to process transaction: block=%s, transaction-index=%d", sbrtBlk.Header.ParentHash.Hex(), i)
}
}
Expand Down
43 changes: 43 additions & 0 deletions cmd/indexer/internal/substrate/block_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package substrate

import (
"context"

gsrpc "github.com/centrifuge/go-substrate-rpc-client"
)

func NewRPC(address string) (*BlockClient, error) {
api, err := gsrpc.NewSubstrateAPI(address)
if err != nil {
return nil, err
}

return &BlockClient{api: api}, nil
}

type BlockClient struct {
api *gsrpc.SubstrateAPI
}

func (c *BlockClient) BlockByNumber(ctx context.Context, blockNo uint64) (blk interface{}, err error) {
blkHash, err := c.api.RPC.Chain.GetBlockHash(blockNo)
if err != nil {
return nil, err
}

sb, err := c.api.RPC.Chain.GetBlock(blkHash)
if err != nil {
return nil, err
}

return &sb.Block, nil
}

func (c *BlockClient) LatestBlockNumber(ctx context.Context) (blockNo uint64, err error) {
signedBlock, err := c.api.RPC.Chain.GetBlockLatest()
if err != nil {
return 0, err
}

return uint64(signedBlock.Block.Header.Number), nil
}
Loading

0 comments on commit e3ff1f7

Please sign in to comment.