Skip to content

Commit

Permalink
chore: use rudder-go-kit
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach committed Aug 23, 2024
1 parent c475544 commit 6e69435
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 338 deletions.
151 changes: 39 additions & 112 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ package cluster_test

import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"

Expand All @@ -23,6 +16,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
Expand All @@ -49,118 +43,13 @@ import (
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

var (
hold bool
DB_DSN = "root@tcp(127.0.0.1:3306)/service"
db *sql.DB
)

func TestMain(m *testing.M) {
flag.BoolVar(&hold, "hold", false, "hold environment clean-up after test execution until Ctrl+C is provided")
flag.Parse()

// hack to make defer work, without being affected by the os.Exit in TestMain
os.Exit(run(m))
}

func run(m *testing.M) int {
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
if err != nil {
log.Printf("Could not connect to docker: %s", err)
return 1
}

database := "jobsdb"
// pulls an image, creates a container based on it and runs it
resourcePostgres, err := pool.Run("postgres", "15-alpine", []string{
"POSTGRES_PASSWORD=password",
"POSTGRES_DB=" + database,
"POSTGRES_USER=rudder",
})
if err != nil {
log.Printf("Could not start resource: %s", err)
return 1
}

DB_DSN = fmt.Sprintf("postgres://rudder:password@localhost:%s/%s?sslmode=disable", resourcePostgres.GetPort("5432/tcp"), database)
fmt.Println("DB_DSN:", DB_DSN)
os.Setenv("JOBS_DB_DB_NAME", database)
os.Setenv("JOBS_DB_HOST", "localhost")
os.Setenv("JOBS_DB_NAME", "jobsdb")
os.Setenv("JOBS_DB_USER", "rudder")
os.Setenv("JOBS_DB_PASSWORD", "password")
os.Setenv("JOBS_DB_PORT", resourcePostgres.GetPort("5432/tcp"))

// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
if err := pool.Retry(func() error {
var err error
db, err = sql.Open("postgres", DB_DSN)
if err != nil {
return err
}
return db.Ping()
}); err != nil {
log.Printf("Could not connect to docker: %s", err)
return 1
}

defer func() {
if err := pool.Purge(resourcePostgres); err != nil {
log.Printf("Could not purge resource: %s \n", err)
}
}()

code := m.Run()
blockOnHold()

return code
}

func blockOnHold() {
if !hold {
return
}

fmt.Println("Test on hold, before cleanup")
fmt.Println("Press Ctrl+C to exit")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
}

const (
WriteKeyEnabled = "enabled-write-key"
SourceIDEnabled = "enabled-source"
GADestinationID = "did1"
GADestinationDefinitionID = "gaid1"
)

var (
workspaceID = uuid.New().String()
gaDestinationDefinition = backendConfig.DestinationDefinitionT{
ID: GADestinationDefinitionID, Name: "GA",
DisplayName: "Google Analytics", Config: nil, ResponseRules: nil,
}
sampleBackendConfig = backendConfig.ConfigT{
WorkspaceID: workspaceID,
Sources: []backendConfig.SourceT{
{
WorkspaceID: workspaceID,
ID: SourceIDEnabled,
WriteKey: WriteKeyEnabled,
Enabled: true,
Destinations: []backendConfig.DestinationT{{
ID: GADestinationID, Name: "ga dest",
DestinationDefinition: gaDestinationDefinition, Enabled: true, IsProcessorEnabled: true,
}},
},
},
}
)

func initJobsDB() {
config.Reset()
logger.Reset()
Expand All @@ -169,8 +58,46 @@ func initJobsDB() {
}

func TestDynamicClusterManager(t *testing.T) {
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
require.NoError(t, err)

resourcePostgres, err := postgres.Setup(pool, t)
require.NoError(t, err)

t.Log("DB_DSN:", resourcePostgres.DBDsn)
t.Setenv("JOBS_DB_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_HOST", resourcePostgres.Host)
t.Setenv("JOBS_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_USER", resourcePostgres.User)
t.Setenv("JOBS_DB_PASSWORD", resourcePostgres.Password)
t.Setenv("JOBS_DB_PORT", resourcePostgres.Port)

initJobsDB()

var (
workspaceID = uuid.New().String()
gaDestinationDefinition = backendConfig.DestinationDefinitionT{
ID: GADestinationDefinitionID, Name: "GA",
DisplayName: "Google Analytics", Config: nil, ResponseRules: nil,
}
sampleBackendConfig = backendConfig.ConfigT{
WorkspaceID: workspaceID,
Sources: []backendConfig.SourceT{
{
WorkspaceID: workspaceID,
ID: SourceIDEnabled,
WriteKey: WriteKeyEnabled,
Enabled: true,
Destinations: []backendConfig.DestinationT{{
ID: GADestinationID, Name: "ga dest",
DestinationDefinition: gaDestinationDefinition, Enabled: true, IsProcessorEnabled: true,
}},
},
},
}
)

mockCtrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl)
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
Expand Down
98 changes: 27 additions & 71 deletions app/cluster/state/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@ package state_test

import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"

Expand All @@ -17,88 +12,49 @@ import (

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
thEtcd "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/etcd"
"github.com/rudderlabs/rudder-server/app/cluster/state"
"github.com/rudderlabs/rudder-server/testhelper"
thEtcd "github.com/rudderlabs/rudder-server/testhelper/etcd"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

var (
hold bool
etcdHosts = []string{"http://localhost:2379"}
etcdClient *etcd.Client
)

func TestMain(m *testing.M) {
flag.BoolVar(&hold, "hold", false, "hold environment clean-up after test execution until Ctrl+C is provided")
flag.Parse()

// hack to make defer work, without being affected by the os.Exit in TestMain
os.Exit(run(m))
}

func run(m *testing.M) int {
pool, err := dockertest.NewPool("")
if err != nil {
log.Printf("Could not connect to docker: %s \n", err)
return 1
}

cleaner := &testhelper.Cleanup{}
defer cleaner.Run()

var etcdRes *thEtcd.Resource
if etcdRes, err = thEtcd.Setup(pool, cleaner); err != nil {
log.Printf("Could not setup ETCD: %v", err)
return 1
}

etcdHosts = etcdRes.Hosts
etcdClient = etcdRes.Client

code := m.Run()
blockOnHold()

return code
}

func blockOnHold() {
if !hold {
return
}

fmt.Println("Test on hold, before cleanup")
fmt.Println("Press Ctrl+C to exit")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
}

func Init() {
config.Reset()
logger.Reset()
}

func Test_Ping(t *testing.T) {
em := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
},
}

err := em.Ping()
require.NoError(t, err)
em.Close()
}

func Test_ServerMode(t *testing.T) {
Init()

var etcdClient *etcd.Client

pool, err := dockertest.NewPool("")
require.NoError(t, err)

cleaner := &testhelper.Cleanup{}
defer cleaner.Run()

etcdRes, err := thEtcd.Setup(pool, t)
require.NoError(t, err)

t.Run("ping", func(t *testing.T) {
em := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdRes.Hosts,
},
}

err := em.Ping()
require.NoError(t, err)
em.Close()
})

provider := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
Endpoints: etcdRes.Hosts,
ReleaseName: "test",
ServerIndex: "0",
},
Expand Down Expand Up @@ -174,7 +130,7 @@ func Test_ServerMode(t *testing.T) {

provider := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
Endpoints: etcdRes.Hosts,
ReleaseName: "test_ack_timeout",
ServerIndex: "0",
ACKTimeout: time.Duration(1),
Expand All @@ -197,7 +153,7 @@ func Test_ServerMode(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := etcdClient.Put(ctx, modeRequestKey, `{"mode": "DEGRADED", "ack_key": "test-ack/1"}`)
_, err = etcdRes.Client.Put(ctx, modeRequestKey, `{"mode": "DEGRADED", "ack_key": "test-ack/1"}`)
require.NoError(t, err)

ch := provider.ServerMode(ctx)
Expand Down
43 changes: 10 additions & 33 deletions backend-config/backend_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

adminpkg "github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/backend-config/internal/cache"
Expand Down Expand Up @@ -400,42 +401,18 @@ func TestCache(t *testing.T) {
if err != nil {
t.Fatalf("Could not connect to docker: %s\n", err)
}
database := "jobsdb"
resourcePostgres, err := pool.Run("postgres", "15-alpine", []string{
"POSTGRES_PASSWORD=password",
"POSTGRES_DB=" + database,
"POSTGRES_USER=rudder",
})
resourcePostgres, err := postgres.Setup(pool, t)
if err != nil {
t.Fatalf("Could not start resource: %s\n", err)
}
defer func() {
if err := pool.Purge(resourcePostgres); err != nil {
t.Fatalf("Could not purge resource: %s \n", err)
}
}()
port := resourcePostgres.GetPort("5432/tcp")
DB_DSN := fmt.Sprintf("postgres://rudder:password@localhost:%s/%s?sslmode=disable", port, database)
fmt.Println("DB_DSN:", DB_DSN)
t.Setenv("JOBS_DB_DB_NAME", database)
t.Setenv("JOBS_DB_HOST", "localhost")
t.Setenv("JOBS_DB_NAME", database)
t.Setenv("JOBS_DB_USER", "rudder")
t.Setenv("JOBS_DB_PASSWORD", "password")
t.Setenv("JOBS_DB_PORT", port)
// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
var db *sql.DB
if err := pool.Retry(func() error {
var err error
db, err = sql.Open("postgres", DB_DSN)
if err != nil {
return err
}
return db.Ping()
}); err != nil {
t.Fatalf("Could not connect to docker: %s\n", err)
}

t.Logf("DB_DSN: %s", resourcePostgres.DBDsn)
t.Setenv("JOBS_DB_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_HOST", resourcePostgres.Host)
t.Setenv("JOBS_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_USER", resourcePostgres.User)
t.Setenv("JOBS_DB_PASSWORD", resourcePostgres.Password)
t.Setenv("JOBS_DB_PORT", resourcePostgres.Port)
db := resourcePostgres.DB
t.Run("initialize from cache when a call to control plane fails", func(t *testing.T) {
var (
ctrl = gomock.NewController(t)
Expand Down
Loading

0 comments on commit 6e69435

Please sign in to comment.