Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add remaining services to cartesi-rollups-node #153

Merged
merged 7 commits into from
Nov 28, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions build/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -2,7 +2,8 @@ version: "3.9"

name: rollups-node
services:
rollups-node:
node:
image: "cartesi/rollups-node:devel"
build:
context: ..
dockerfile: build/Dockerfile
@@ -11,6 +12,7 @@ services:
entrypoint: ["cartesi-rollups-node", "validator"]
ports:
- "4000:4000"
- "5005:5005"
depends_on:
hardhat:
condition: service_healthy
@@ -28,6 +30,31 @@ services:
RUST_LOG: info
CARTESI_LOG_LEVEL: info
POSTGRES_ENDPOINT: postgres://postgres:password@database:5432/postgres
CHAIN_ID: 31337 # dispacher + indexer

#Advance Runner
ADVANCE_RUNNER_HEALTHCHECK_PORT: 8080
SERVER_MANAGER_ENDPOINT: http://server_manager:5001
PROVIDER_HTTP_ENDPOINT: http://hardhat:8545
SESSION_ID: default_rollups_id
SNAPSHOT_DIR: /var/opt/cartesi/machine-snapshots
SNAPSHOT_LATEST: /var/opt/cartesi/machine-snapshots/latest

#Authority Claimer
AUTHORITY_CLAIMER_HTTP_SERVER_PORT: 8085
TX_PROVIDER_HTTP_ENDPOINT: http://hardhat:8545
TX_CHAIN_ID: 31337
TX_CHAIN_IS_LEGACY: ${TX_LEGACY:-false}
TX_DEFAULT_CONFIRMATIONS: 2
TX_SIGNING_MNEMONIC: "test test test test test test test test test test test junk"

#Dispatcher
DISPATCHER_HTTP_SERVER_PORT: 8081
DAPP_DEPLOYMENT_FILE: /deployments/localhost/dapp.json
ROLLUPS_DEPLOYMENT_FILE: /opt/cartesi/share/deployments/localhost.json
RD_EPOCH_DURATION: 86400
SC_GRPC_ENDPOINT: http://0.0.0.0:50051
SC_DEFAULT_CONFIRMATIONS: 1

#GraphQL Server
GRAPHQL_HEALTHCHECK_PORT: 8082
@@ -38,7 +65,19 @@ services:
INDEXER_HEALTHCHECK_PORT: 8083
DAPP_CONTRACT_ADDRESS_FILE: /deployments/localhost/dapp.json
REDIS_ENDPOINT: redis://redis:6379
CHAIN_ID: 31337

#Inspect Server
INSPECT_SERVER_HEALTHCHECK_PORT: 8084
INSPECT_SERVER_ADDRESS: 0.0.0.0:5005
SERVER_MANAGER_ADDRESS: server_manager:5001

#State Server
SS_SERVER_ADDRESS: 0.0.0.0:50051
SF_GENESIS_BLOCK: 0x1
SF_SAFETY_MARGIN: 1
BH_HTTP_ENDPOINT: http://hardhat:8545
BH_WS_ENDPOINT: ws://hardhat:8545
BH_BLOCK_TIMEOUT: 8

volumes:
- machine:/var/opt/cartesi/machine-snapshots
4 changes: 3 additions & 1 deletion cmd/cartesi-rollups-node/main.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
package main

import (
"context"
"os"

"github.com/cartesi/rollups-node/internal/logger"
@@ -14,7 +15,8 @@ func main() {
_, enableTimestamp := os.LookupEnv("CARTESI_LOG_ENABLE_TIMESTAMP")
logger.Init(logLevel, enableTimestamp)

if err := rootCmd.Execute(); err != nil {
ctx := context.Background()
if err := rootCmd.ExecuteContext(ctx); err != nil {
logger.Error.Panic(err)
}
}
8 changes: 2 additions & 6 deletions cmd/cartesi-rollups-node/validator.go
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
package main

import (
"github.com/cartesi/rollups-node/internal/node"
"github.com/cartesi/rollups-node/internal/services"
"github.com/spf13/cobra"
)
@@ -16,10 +17,5 @@ var validator = &cobra.Command{
}

func runValidatorNode(cmd *cobra.Command, args []string) {
validatorServices := []services.Service{
services.GraphQLServer,
services.Indexer,
}

services.Run(validatorServices)
services.Run(cmd.Context(), node.ValidatorServices)
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -66,6 +66,7 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/supranational/blst v0.3.11 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -238,6 +238,7 @@ github.com/status-im/keycard-go v0.2.0 h1:QDLFswOQu1r5jsycloeQh3bVU8n/NatHHaZobt
github.com/status-im/keycard-go v0.2.0/go.mod h1:wlp8ZLbsmrF6g6WjugPAx+IzoLrkdf9+mHxBEeo3Hbg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.1.5-0.20170601210322-f6abca593680/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
90 changes: 90 additions & 0 deletions internal/node/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

// Package node defines the individual services internally used to implement
// Rollups Node's features
package node

import (
"fmt"
"os"
"strings"

"github.com/cartesi/rollups-node/internal/services"
)

var ValidatorServices = []services.Service{
StateServer,
AdvanceRunner,
AuthorityClaimer,
Dispatcher,
GraphQLServer,
Indexer,
InspectServer,
}

var (
AdvanceRunner = services.NewService(
"advance-runner",
"cartesi-rollups-advance-runner",
healthcheckPort("advance-runner"),
)
AuthorityClaimer = services.NewService(
"authority-claimer",
"cartesi-rollups-authority-claimer",
healthcheckPort("authority-claimer"),
)
Dispatcher = services.NewService(
"dispatcher",
"cartesi-rollups-dispatcher",
healthcheckPort("dispatcher"),
)
GraphQLServer = services.NewService(
"graphql-server",
"cartesi-rollups-graphql-server",
healthcheckPort("graphql"),
)
Indexer = services.NewService(
"indexer",
"cartesi-rollups-indexer",
healthcheckPort("indexer"),
)
InspectServer = services.NewService(
"inspect-server",
"cartesi-rollups-inspect-server",
healthcheckPort("inspect-server"),
)
StateServer = services.NewService(
"state-server",
"cartesi-rollups-state-server",
stateServerHealthcheckPort(),
)
)

func healthcheckPort(serviceName string) string {
env := healthcheckEnv(serviceName)
if port, ok := os.LookupEnv(env); ok {
return port
}
panic(fmt.Sprintf("environment variable %s is empty", env))
}

func healthcheckEnv(serviceName string) string {
suffix := "_HEALTHCHECK_PORT"
if serviceName == "dispatcher" || serviceName == "authority-claimer" {
suffix = "_HTTP_SERVER_PORT"
}
normalizedName := strings.Replace(serviceName, "-", "_", -1)
return fmt.Sprintf("%s%s", strings.ToUpper(normalizedName), suffix)
}

func stateServerHealthcheckPort() string {
env := "SS_SERVER_ADDRESS"
if address, ok := os.LookupEnv(env); ok {
split := strings.Split(address, ":")
if len(split) > 1 {
return split[1]
}
}
panic(fmt.Sprintf("environment variable %s is empty", env))
}
17 changes: 17 additions & 0 deletions internal/services/fakeservice/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

// This file creates a dummy webserver with the sole pupose of being used
// as a binary to test the services.Service struct
package main

import (
"net/http"
"os"
)

func main() {
addr := os.Getenv("SERVICE_ADDRESS")
err := http.ListenAndServe(addr, nil)
panic(err)
}
132 changes: 76 additions & 56 deletions internal/services/service.go
Original file line number Diff line number Diff line change
@@ -1,120 +1,140 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

// Package services provides mechanisms to start multiple services in the background
// Package services provides mechanisms to start multiple services in the
// background
package services

import (
"context"
"fmt"
"net"
"os"
"os/exec"
"sync"
"syscall"
"time"

"github.com/cartesi/rollups-node/internal/logger"
)

// A service that runs in the background endlessly until the context is canceled
type Service interface {
fmt.Stringer
const (
DefaultServiceTimeout = 15 * time.Second
DefaultDialInterval = 100 * time.Millisecond
)

// Start a service that will run until completion or until the context is
// canceled
Start(ctx context.Context) error
type Service struct {
name string
binaryName string
healthcheckPort string
}

const DefaultServiceTimeout = 15 * time.Second

// simpleService implements the context cancelation logic of the Service interface
type simpleService struct {
serviceName string
binaryName string
func NewService(name, binaryName, healthcheckPort string) Service {
return Service{name, binaryName, healthcheckPort}
}

func (s simpleService) Start(ctx context.Context) error {
cmd := exec.Command(s.binaryName)
// Start will execute a binary and wait for its completion or until the context
// is canceled
func (s Service) Start(ctx context.Context) error {
cmd := exec.CommandContext(ctx, s.binaryName)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout

if err := cmd.Start(); err != nil {
cmd.Cancel = func() error {
err := cmd.Process.Signal(syscall.SIGTERM)
if err != nil {
msg := "failed to send SIGTERM to %v: %v\n"
logger.Warning.Printf(msg, s.name, err)
}
return err
}

go func() {
<-ctx.Done()
logger.Debug.Printf("%v: %v\n", s.String(), ctx.Err())
if err := cmd.Process.Signal(syscall.SIGTERM); err != nil {
msg := "%v: failed to send SIGTERM to %v\n"
logger.Error.Printf(msg, s.String(), s.binaryName)
err := cmd.Run()
if err != nil {
exitCode := cmd.ProcessState.ExitCode()
signal := cmd.ProcessState.Sys().(syscall.WaitStatus).Signal()
if exitCode != 0 && signal != syscall.SIGTERM {
// only return error if the service exits for reason other than shutdown
return err
}
}()

err := cmd.Wait()
if err != nil && cmd.ProcessState.ExitCode() != int(syscall.SIGTERM) {
return err
}
return nil
}

func (s simpleService) String() string {
return s.serviceName
// Ready blocks until the service is ready or the context is canceled.
//
// A service is considered ready when it is possible to establish a connection
// to its healthcheck endpoint.
func (s Service) Ready(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
for {
conn, err := net.Dial("tcp", fmt.Sprintf("0.0.0.0:%s", s.healthcheckPort))
if err == nil {
logger.Debug.Printf("%s is ready\n", s.name)
conn.Close()
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(DefaultDialInterval):
}
}
}

func (s Service) String() string {
return s.name
}

// The Run function serves as a very simple supervisor: it will start all the
// services provided to it and will run until the first of them finishes. Next
// it will try to stop the remaining services or timeout if they take too long
func Run(services []Service) {
func Run(ctx context.Context, services []Service) {
if len(services) == 0 {
logger.Error.Panic("there are no services to run")
}

// start services
ctx, cancel := context.WithCancel(context.Background())
exit := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
for _, service := range services {
service := service
wg.Add(1)
go func() {
// cancel the context when one of the services finish
defer cancel()
defer wg.Done()
if err := service.Start(ctx); err != nil {
msg := "main: service '%v' exited with error: %v\n"
logger.Error.Printf(msg, service.String(), err)
} else {
msg := "main: service '%v' exited successfully\n"
logger.Info.Printf(msg, service.String())
}
exit <- struct{}{}
}()

// wait for service to be ready or stop all services if it times out
if err := service.Ready(ctx, DefaultServiceTimeout); err != nil {
cancel()
msg := "main: service '%v' failed to be ready with error: %v. Exiting\n"
logger.Error.Printf(msg, service.name, err)
break
}
}

// wait for first service to exit
<-exit
// wait until the context is canceled
<-ctx.Done()

// send stop message to all other services and wait for them to finish
// or timeout
// wait for the services to finish or timeout
wait := make(chan struct{})
go func() {
cancel()
for i := 0; i < len(services)-1; i++ {
<-exit
}
wg.Wait()
wait <- struct{}{}
}()

select {
case <-wait:
logger.Info.Println("main: all services were shutdown")
case <-time.After(DefaultServiceTimeout):
logger.Warning.Println("main: exited after timeout")
logger.Warning.Println("main: exited after a timeout")
}
}

var (
GraphQLServer Service = simpleService{
serviceName: "graphql-server",
binaryName: "cartesi-rollups-graphql-server",
}
Indexer Service = simpleService{
serviceName: "indexer",
binaryName: "cartesi-rollups-indexer",
}
)
142 changes: 142 additions & 0 deletions internal/services/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package services

import (
"context"
"fmt"
"os"
"os/exec"
"path/filepath"
"testing"
"time"

"github.com/cartesi/rollups-node/internal/logger"
"github.com/stretchr/testify/suite"
)

type ServiceTestSuite struct {
suite.Suite
tmpDir string
servicePort int
}

func (s *ServiceTestSuite) SetupSuite() {
logger.Init("warning", false)
s.buildFakeService()
s.servicePort = 55555
}

func (s *ServiceTestSuite) TearDownSuite() {
err := os.RemoveAll(s.tmpDir)
if err != nil {
panic(err)
}
}

func (s *ServiceTestSuite) SetupTest() {
s.servicePort++
serviceAdress := "0.0.0.0:" + fmt.Sprint(s.servicePort)
os.Setenv("SERVICE_ADDRESS", serviceAdress)
}

// Service should stop when context is cancelled
func (s *ServiceTestSuite) TestServiceStops() {
service := Service{
name: "fake-service",
binaryName: "fake-service",
healthcheckPort: fmt.Sprint(s.servicePort),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start service in goroutine
result := make(chan error)
go func() {
result <- service.Start(ctx)
}()

time.Sleep(100 * time.Millisecond)

// shutdown
cancel()
err := <-result
s.Nil(err, "service exited for the wrong reason: %v", err)
}

// Service should stop if timeout is reached and it isn't ready yet
func (s *ServiceTestSuite) TestServiceTimeout() {
service := Service{
name: "fake-service",
binaryName: "fake-service",
healthcheckPort: "0000", // wrong port
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start service in goroutine
result := make(chan error, 1)
go func() {
result <- service.Start(ctx)
}()

// expect timeout because of wrong port
err := service.Ready(ctx, 500*time.Millisecond)
s.NotNil(err, "expected service to timeout")

// shutdown
cancel()
s.Nil(<-result, "service exited for the wrong reason: %v", err)
}

// Service should be ready soon after starting
func (s *ServiceTestSuite) TestServiceReady() {
service := Service{
name: "fake-service",
binaryName: "fake-service",
healthcheckPort: fmt.Sprint(s.servicePort),
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// start service in goroutine
result := make(chan error)
go func() {
result <- service.Start(ctx)
}()

// wait for service to be ready
err := service.Ready(ctx, 1*time.Second)
s.Nil(err, "service timed out")

// shutdown
cancel()
s.Nil(<-result, "service exited for the wrong reason: %v", err)
}

// Builds the fake-service binary and adds it to PATH
func (s *ServiceTestSuite) buildFakeService() {
tempDir, err := os.MkdirTemp("", "")
if err != nil {
panic(err)
}
s.tmpDir = tempDir

cmd := exec.Command(
"go",
"build",
"-o",
filepath.Join(s.tmpDir, "fake-service"),
"fakeservice/main.go",
)
if err := cmd.Run(); err != nil {
panic(err)
}

os.Setenv("PATH", os.Getenv("PATH")+":"+s.tmpDir)
}

func TestServiceSuite(t *testing.T) {
suite.Run(t, new(ServiceTestSuite))
}
60 changes: 0 additions & 60 deletions internal/services/simple_service_test.go

This file was deleted.