Skip to content

Commit

Permalink
chore: master pull
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Aug 27, 2024
2 parents 19de343 + 5327d44 commit 40f1a69
Show file tree
Hide file tree
Showing 71 changed files with 2,979 additions and 6,292 deletions.
1 change: 1 addition & 0 deletions app/apphandlers/apphandlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func startJobsDBPostgresql(t *testing.T) {
require.NoError(t, err)
r, err := postgres.Setup(pool, t)
require.NoError(t, err)
config.Set("DB.host", r.Host)
config.Set("DB.port", r.Port)
config.Set("DB.user", r.User)
config.Set("DB.name", r.Database)
Expand Down
151 changes: 39 additions & 112 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,6 @@ package cluster_test

import (
"context"
"database/sql"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"

Expand All @@ -23,6 +16,7 @@ import (
"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/postgres"

"github.com/rudderlabs/rudder-server/admin"
"github.com/rudderlabs/rudder-server/app/cluster"
Expand All @@ -49,118 +43,13 @@ import (
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

var (
hold bool
DB_DSN = "root@tcp(127.0.0.1:3306)/service"
db *sql.DB
)

func TestMain(m *testing.M) {
flag.BoolVar(&hold, "hold", false, "hold environment clean-up after test execution until Ctrl+C is provided")
flag.Parse()

// hack to make defer work, without being affected by the os.Exit in TestMain
os.Exit(run(m))
}

func run(m *testing.M) int {
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
if err != nil {
log.Printf("Could not connect to docker: %s", err)
return 1
}

database := "jobsdb"
// pulls an image, creates a container based on it and runs it
resourcePostgres, err := pool.Run("postgres", "15-alpine", []string{
"POSTGRES_PASSWORD=password",
"POSTGRES_DB=" + database,
"POSTGRES_USER=rudder",
})
if err != nil {
log.Printf("Could not start resource: %s", err)
return 1
}

DB_DSN = fmt.Sprintf("postgres://rudder:password@localhost:%s/%s?sslmode=disable", resourcePostgres.GetPort("5432/tcp"), database)
fmt.Println("DB_DSN:", DB_DSN)
os.Setenv("JOBS_DB_DB_NAME", database)
os.Setenv("JOBS_DB_HOST", "localhost")
os.Setenv("JOBS_DB_NAME", "jobsdb")
os.Setenv("JOBS_DB_USER", "rudder")
os.Setenv("JOBS_DB_PASSWORD", "password")
os.Setenv("JOBS_DB_PORT", resourcePostgres.GetPort("5432/tcp"))

// exponential backoff-retry, because the application in the container might not be ready to accept connections yet
if err := pool.Retry(func() error {
var err error
db, err = sql.Open("postgres", DB_DSN)
if err != nil {
return err
}
return db.Ping()
}); err != nil {
log.Printf("Could not connect to docker: %s", err)
return 1
}

defer func() {
if err := pool.Purge(resourcePostgres); err != nil {
log.Printf("Could not purge resource: %s \n", err)
}
}()

code := m.Run()
blockOnHold()

return code
}

func blockOnHold() {
if !hold {
return
}

fmt.Println("Test on hold, before cleanup")
fmt.Println("Press Ctrl+C to exit")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
}

const (
WriteKeyEnabled = "enabled-write-key"
SourceIDEnabled = "enabled-source"
GADestinationID = "did1"
GADestinationDefinitionID = "gaid1"
)

var (
workspaceID = uuid.New().String()
gaDestinationDefinition = backendConfig.DestinationDefinitionT{
ID: GADestinationDefinitionID, Name: "GA",
DisplayName: "Google Analytics", Config: nil, ResponseRules: nil,
}
sampleBackendConfig = backendConfig.ConfigT{
WorkspaceID: workspaceID,
Sources: []backendConfig.SourceT{
{
WorkspaceID: workspaceID,
ID: SourceIDEnabled,
WriteKey: WriteKeyEnabled,
Enabled: true,
Destinations: []backendConfig.DestinationT{{
ID: GADestinationID, Name: "ga dest",
DestinationDefinition: gaDestinationDefinition, Enabled: true, IsProcessorEnabled: true,
}},
},
},
}
)

func initJobsDB() {
config.Reset()
logger.Reset()
Expand All @@ -169,8 +58,46 @@ func initJobsDB() {
}

func TestDynamicClusterManager(t *testing.T) {
// uses a sensible default on windows (tcp/http) and linux/osx (socket)
pool, err := dockertest.NewPool("")
require.NoError(t, err)

resourcePostgres, err := postgres.Setup(pool, t)
require.NoError(t, err)

t.Log("DB_DSN:", resourcePostgres.DBDsn)
t.Setenv("JOBS_DB_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_HOST", resourcePostgres.Host)
t.Setenv("JOBS_DB_NAME", resourcePostgres.Database)
t.Setenv("JOBS_DB_USER", resourcePostgres.User)
t.Setenv("JOBS_DB_PASSWORD", resourcePostgres.Password)
t.Setenv("JOBS_DB_PORT", resourcePostgres.Port)

initJobsDB()

var (
workspaceID = uuid.New().String()
gaDestinationDefinition = backendConfig.DestinationDefinitionT{
ID: GADestinationDefinitionID, Name: "GA",
DisplayName: "Google Analytics", Config: nil, ResponseRules: nil,
}
sampleBackendConfig = backendConfig.ConfigT{
WorkspaceID: workspaceID,
Sources: []backendConfig.SourceT{
{
WorkspaceID: workspaceID,
ID: SourceIDEnabled,
WriteKey: WriteKeyEnabled,
Enabled: true,
Destinations: []backendConfig.DestinationT{{
ID: GADestinationID, Name: "ga dest",
DestinationDefinition: gaDestinationDefinition, Enabled: true, IsProcessorEnabled: true,
}},
},
},
}
)

mockCtrl := gomock.NewController(t)
mockBackendConfig := mocksBackendConfig.NewMockBackendConfig(mockCtrl)
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
Expand Down
97 changes: 24 additions & 73 deletions app/cluster/state/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,103 +2,54 @@ package state_test

import (
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"testing"
"time"

"github.com/ory/dockertest/v3"
"github.com/stretchr/testify/require"
etcd "go.etcd.io/etcd/client/v3"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/logger"
thEtcd "github.com/rudderlabs/rudder-go-kit/testhelper/docker/resource/etcd"
"github.com/rudderlabs/rudder-server/app/cluster/state"
"github.com/rudderlabs/rudder-server/testhelper"
thEtcd "github.com/rudderlabs/rudder-server/testhelper/etcd"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

var (
hold bool
etcdHosts = []string{"http://localhost:2379"}
etcdClient *etcd.Client
)

func TestMain(m *testing.M) {
flag.BoolVar(&hold, "hold", false, "hold environment clean-up after test execution until Ctrl+C is provided")
flag.Parse()

// hack to make defer work, without being affected by the os.Exit in TestMain
os.Exit(run(m))
}

func run(m *testing.M) int {
pool, err := dockertest.NewPool("")
if err != nil {
log.Printf("Could not connect to docker: %s \n", err)
return 1
}

cleaner := &testhelper.Cleanup{}
defer cleaner.Run()

var etcdRes *thEtcd.Resource
if etcdRes, err = thEtcd.Setup(pool, cleaner); err != nil {
log.Printf("Could not setup ETCD: %v", err)
return 1
}

etcdHosts = etcdRes.Hosts
etcdClient = etcdRes.Client

code := m.Run()
blockOnHold()

return code
}

func blockOnHold() {
if !hold {
return
}

fmt.Println("Test on hold, before cleanup")
fmt.Println("Press Ctrl+C to exit")

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)

<-c
}

func Init() {
config.Reset()
logger.Reset()
}

func Test_Ping(t *testing.T) {
em := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
},
}

err := em.Ping()
require.NoError(t, err)
em.Close()
}

func Test_ServerMode(t *testing.T) {
Init()

pool, err := dockertest.NewPool("")
require.NoError(t, err)

etcdRes, err := thEtcd.Setup(pool, t)
require.NoError(t, err)

etcdClient := etcdRes.Client

t.Run("ping", func(t *testing.T) {
em := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdRes.Hosts,
},
}

err := em.Ping()
require.NoError(t, err)
em.Close()
})

provider := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
Endpoints: etcdRes.Hosts,
ReleaseName: "test",
ServerIndex: "0",
},
Expand Down Expand Up @@ -174,7 +125,7 @@ func Test_ServerMode(t *testing.T) {

provider := state.ETCDManager{
Config: &state.ETCDConfig{
Endpoints: etcdHosts,
Endpoints: etcdRes.Hosts,
ReleaseName: "test_ack_timeout",
ServerIndex: "0",
ACKTimeout: time.Duration(1),
Expand All @@ -197,7 +148,7 @@ func Test_ServerMode(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

_, err := etcdClient.Put(ctx, modeRequestKey, `{"mode": "DEGRADED", "ack_key": "test-ack/1"}`)
_, err = etcdRes.Client.Put(ctx, modeRequestKey, `{"mode": "DEGRADED", "ack_key": "test-ack/1"}`)
require.NoError(t, err)

ch := provider.ServerMode(ctx)
Expand Down
1 change: 1 addition & 0 deletions archiver/archiver_isolation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func ArchivalScenario(
config.Set("HOSTED_SERVICE_SECRET", "brt_isolation_secret")
config.Set("recovery.storagePath", path.Join(t.TempDir(), "/recovery_data.json"))

config.Set("DB.host", postgresContainer.Host)
config.Set("DB.port", postgresContainer.Port)
config.Set("DB.user", postgresContainer.User)
config.Set("DB.name", postgresContainer.Database)
Expand Down
Loading

0 comments on commit 40f1a69

Please sign in to comment.