Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use cpluginv2 and conduit commons #130

Merged
merged 5 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 51 additions & 49 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -88,7 +90,7 @@ type AcceptanceTestDriver interface {
// The generated record will contain mixed data types in the field Key and
// Payload (i.e. RawData and StructuredData), unless configured otherwise
// (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
GenerateRecord(*testing.T, Operation) Record
GenerateRecord(*testing.T, opencdc.Operation) opencdc.Record

// WriteToSource receives a slice of records that should be prepared in the
// 3rd party system so that the source will read them. The returned slice
Expand All @@ -97,15 +99,15 @@ type AcceptanceTestDriver interface {
// It is encouraged for the driver to return the same slice, unless there is
// no way to write the records to the 3rd party system, then the returning
// slice should contain the expected records a source should read.
WriteToSource(*testing.T, []Record) []Record
WriteToSource(*testing.T, []opencdc.Record) []opencdc.Record
// ReadFromDestination should return a slice with the records that were
// written to the destination. The slice will be used to verify the
// destination has successfully executed writes.
// The parameter contains records that were actually written to the
// destination. These will be compared to the returned slice of records. It
// is encouraged for the driver to only touch the input records to change
// the order of records and to not change the records themselves.
ReadFromDestination(*testing.T, []Record) []Record
ReadFromDestination(*testing.T, []opencdc.Record) []opencdc.Record

// ReadTimeout controls the time the test should wait for a read operation
// to return before it considers the operation as failed.
Expand Down Expand Up @@ -230,20 +232,20 @@ func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.O
return d.Config.GoleakOptions
}

func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operation) Record {
func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op opencdc.Operation) opencdc.Record {
// TODO we currently only generate records with operation "create" and
// "snapshot", because this is the only operation we know all connectors
// should be able to handle. We should make acceptance tests more
// sophisticated and also check how the connector handles other operations,
// specifically "update" and "delete".
// Once we generate different operations we need to adjust how we compare
// records!
return Record{
Position: Position(d.randString(32)), // position doesn't matter, as long as it's unique
return opencdc.Record{
Position: opencdc.Position(d.randString(32)), // position doesn't matter, as long as it's unique
Operation: op,
Metadata: map[string]string{d.randString(32): d.randString(32)},
Key: d.GenerateData(t),
Payload: Change{
Payload: opencdc.Change{
Before: nil,
After: d.GenerateData(t),
},
Expand All @@ -253,7 +255,7 @@ func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operat
// GenerateData generates either RawData or StructuredData depending on the
// configured data type (see
// ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data {
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) opencdc.Data {
rand := d.getRand()

gen := d.Config.GenerateDataType
Expand All @@ -263,9 +265,9 @@ func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data {

switch gen {
case GenerateRawData:
return RawData(d.randString(rand.Intn(1024) + 32))
return opencdc.RawData(d.randString(rand.Intn(1024) + 32))
case GenerateStructuredData:
data := StructuredData{}
data := opencdc.StructuredData{}
for {
data[d.randString(rand.Intn(1024)+32)] = d.GenerateValue(t)
if rand.Int63()%2 == 0 {
Expand Down Expand Up @@ -355,7 +357,7 @@ func (d ConfigurableAcceptanceTestDriver) randString(n int) string {
// destination. It is expected that the destination is writing to the same
// location the source is reading from. If the connector does not implement a
// destination the function will fail the test.
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []Record) []Record {
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record {
if d.Connector().NewDestination == nil {
t.Fatal("connector is missing the field NewDestination, either implement the destination or overwrite the driver method Write")
}
Expand Down Expand Up @@ -394,7 +396,7 @@ func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []
// the source. It is expected that the destination is writing to the same
// location the source is reading from. If the connector does not implement a
// source the function will fail the test.
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []Record) []Record {
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
if d.Connector().NewSource == nil {
t.Fatal("connector is missing the field NewSource, either implement the source or overwrite the driver method Read")
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
Max: time.Second, // 8 tries
}

output := make([]Record, 0, len(records))
output := make([]opencdc.Record, 0, len(records))
for i := 0; i < cap(output); i++ {
// now try to read from the source
readCtx, readCancel := context.WithTimeout(ctx, d.ReadTimeout())
Expand Down Expand Up @@ -458,7 +460,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
readCtx, readCancel := context.WithTimeout(ctx, d.ReadTimeout())
defer readCancel()
r, err := src.Read(readCtx)
is.Equal(Record{}, r) // record should be empty
is.Equal(opencdc.Record{}, r) // record should be empty
is.True(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrBackoffRetry))

return output
Expand Down Expand Up @@ -585,9 +587,9 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
origCfg := a.driver.SourceConfig(t)

for name, p := range srcSpec.Parameters() {
isRequired := p.Required
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(ValidationRequired); ok {
if _, ok := v.(config.ValidationRequired); ok {
isRequired = true
break
}
Expand Down Expand Up @@ -620,7 +622,7 @@ func (a acceptanceTest) TestSource_Open_ResumeAtPositionSnapshot(t *testing.T) {
// Write expectations before source is started, this means the source will
// have to first read the existing data (i.e. snapshot), but we will
// interrupt it and try to resume.
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationSnapshot, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationSnapshot, 10))

source, sourceCleanup := a.openSource(ctx, t, nil) // listen from beginning
defer sourceCleanup()
Expand Down Expand Up @@ -681,7 +683,7 @@ func (a acceptanceTest) TestSource_Open_ResumeAtPositionCDC(t *testing.T) {
// Write expectations after source is open, this means the source is already
// listening to ongoing changes (i.e. CDC), we will interrupt it and try to
// resume.
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationCreate, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationCreate, 10))

// read all records, but ack only half of them
got, err := a.readMany(ctx, t, source, len(want))
Expand Down Expand Up @@ -718,7 +720,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {
defer goleak.VerifyNone(t, a.driver.GoleakOptions(t)...)

positions := make(map[string]bool)
isUniquePositions := func(t *testing.T, records []Record) {
isUniquePositions := func(t *testing.T, records []opencdc.Record) {
is := is.New(t)
for _, r := range records {
is.True(!positions[string(r.Position)])
Expand All @@ -727,7 +729,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {
}

// write expectation before source exists
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationSnapshot, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationSnapshot, 10))

source, sourceCleanup := a.openSource(ctx, t, nil) // listen from beginning
defer sourceCleanup()
Expand All @@ -745,7 +747,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {

// while connector is running write more data and make sure the connector
// detects it
want = a.driver.WriteToSource(t, a.generateRecords(t, OperationCreate, 20))
want = a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationCreate, 20))

t.Run("cdc", func(t *testing.T) {
is := is.New(t)
Expand All @@ -771,7 +773,7 @@ func (a acceptanceTest) TestSource_Read_Timeout(t *testing.T) {
readCtx, cancel := context.WithTimeout(ctx, a.driver.ReadTimeout())
defer cancel()
r, err := a.readWithBackoffRetry(readCtx, t, source)
is.Equal(Record{}, r) // record should be empty
is.Equal(opencdc.Record{}, r) // record should be empty
is.True(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrBackoffRetry))
}

Expand Down Expand Up @@ -819,9 +821,9 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
origCfg := a.driver.DestinationConfig(t)

for name, p := range destSpec.Parameters() {
isRequired := p.Required
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(ValidationRequired); ok {
if _, ok := v.(config.ValidationRequired); ok {
isRequired = true
break
}
Expand Down Expand Up @@ -854,7 +856,7 @@ func (a acceptanceTest) TestDestination_Write_Success(t *testing.T) {
dest, cleanup := a.openDestination(ctx, t)
defer cleanup()

want := a.generateRecords(t, OperationSnapshot, 20)
want := a.generateRecords(t, opencdc.OperationSnapshot, 20)

writeCtx, cancel := context.WithTimeout(ctx, a.driver.WriteTimeout())
defer cancel()
Expand Down Expand Up @@ -893,7 +895,7 @@ func (a acceptanceTest) cloneConfig(orig map[string]string) map[string]string {
return cloned
}

func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos Position) (source Source, cleanup func()) {
func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos opencdc.Position) (source Source, cleanup func()) {
is := is.New(t)

source = a.driver.Connector().NewSource()
Expand Down Expand Up @@ -941,16 +943,16 @@ func (a acceptanceTest) openDestination(ctx context.Context, t *testing.T) (dest
return dest, cleanup
}

func (a acceptanceTest) generateRecords(t *testing.T, op Operation, count int) []Record {
records := make([]Record, count)
func (a acceptanceTest) generateRecords(t *testing.T, op opencdc.Operation, count int) []opencdc.Record {
records := make([]opencdc.Record, count)
for i := range records {
records[i] = a.driver.GenerateRecord(t, op)
}
return records
}

func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Source, limit int) ([]Record, error) {
var got []Record
func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Source, limit int) ([]opencdc.Record, error) {
var got []opencdc.Record
for i := 0; i < limit; i++ {
readCtx, cancel := context.WithTimeout(ctx, a.driver.ReadTimeout())
defer cancel()
Expand All @@ -965,7 +967,7 @@ func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Sourc
return got, nil
}

func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T, source Source) (Record, error) {
func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T, source Source) (opencdc.Record, error) {
b := &backoff.Backoff{
Factor: 2,
Min: time.Millisecond * 100,
Expand All @@ -979,7 +981,7 @@ func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T,
select {
case <-ctx.Done():
// return error
return Record{}, ctx.Err()
return opencdc.Record{}, ctx.Err()
case <-time.After(b.Duration()):
continue
}
Expand All @@ -989,7 +991,7 @@ func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T,
}

// isEqualRecords compares two record slices and disregards their order.
func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
func (a acceptanceTest) isEqualRecords(is *is.I, want, got []opencdc.Record) {
is.Equal(len(want), len(got)) // record number did not match

if len(want) == 0 {
Expand All @@ -1004,14 +1006,14 @@ func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
// retrieves them as a whole in the payload
// this is valid behavior, we need to adjust the expectations
for i, wantRec := range want {
want[i] = Record{
want[i] = opencdc.Record{
Position: nil,
Operation: OperationSnapshot, // we allow operations Snapshot or Create
Metadata: nil, // no expectation for metadata
Key: got[i].Key, // no expectation for key
Payload: Change{
Operation: opencdc.OperationSnapshot, // we allow operations Snapshot or Create
Metadata: nil, // no expectation for metadata
Key: got[i].Key, // no expectation for key
Payload: opencdc.Change{
Before: nil,
After: RawData(wantRec.Bytes()), // the payload should contain the whole expected record
After: opencdc.RawData(wantRec.Bytes()), // the payload should contain the whole expected record
},
}
}
Expand All @@ -1031,7 +1033,7 @@ func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
// - It then sorts only the want slice using the output of Record.Bytes().
// This assumes that the destination writes whole records and not only the
// payload. It does not check if the first records match after this.
func (a acceptanceTest) sortMatchingRecords(want, got []Record) {
func (a acceptanceTest) sortMatchingRecords(want, got []opencdc.Record) {
sort.Slice(want, func(i, j int) bool {
return string(want[i].Payload.After.Bytes()) < string(want[j].Payload.After.Bytes())
})
Expand All @@ -1051,9 +1053,9 @@ func (a acceptanceTest) sortMatchingRecords(want, got []Record) {
})
}

func (a acceptanceTest) isEqualRecord(is *is.I, want, got Record) {
if want.Operation == OperationSnapshot &&
got.Operation == OperationCreate {
func (a acceptanceTest) isEqualRecord(is *is.I, want, got opencdc.Record) {
if want.Operation == opencdc.OperationSnapshot &&
got.Operation == opencdc.OperationCreate {
// This is a special case and we accept it. Not all connectors will
// create records with operation "snapshot", but they will still able to
// produce records that were written before the source was open in
Expand All @@ -1078,18 +1080,18 @@ func (a acceptanceTest) isEqualRecord(is *is.I, want, got Record) {
}
}

func (a acceptanceTest) isEqualChange(is *is.I, want, got Change) {
func (a acceptanceTest) isEqualChange(is *is.I, want, got opencdc.Change) {
a.isEqualData(is, want.Before, got.Before)
a.isEqualData(is, want.After, got.After)
}

// isEqualData will match the two data objects in their entirety if the types
// match or only the byte slice content if types differ.
func (a acceptanceTest) isEqualData(is *is.I, want, got Data) {
_, wantIsRaw := want.(RawData)
_, gotIsRaw := got.(RawData)
_, wantIsStructured := want.(StructuredData)
_, gotIsStructured := got.(StructuredData)
func (a acceptanceTest) isEqualData(is *is.I, want, got opencdc.Data) {
_, wantIsRaw := want.(opencdc.RawData)
_, gotIsRaw := got.(opencdc.RawData)
_, wantIsStructured := want.(opencdc.StructuredData)
_, gotIsStructured := got.(opencdc.StructuredData)

if (wantIsRaw && gotIsRaw) ||
(wantIsStructured && gotIsStructured) ||
Expand Down
6 changes: 4 additions & 2 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
)

// BenchmarkSource is a benchmark that any source implementation can run to figure
Expand Down Expand Up @@ -84,7 +86,7 @@ func (bm *benchmarkSource) Run(b *testing.B) {
}
})

acks := make(chan Record, b.N) // huge buffer so we don't delay reads
acks := make(chan opencdc.Record, b.N) // huge buffer so we don't delay reads
var wg sync.WaitGroup
wg.Add(1)
go bm.acker(b, acks, &wg)
Expand Down Expand Up @@ -127,7 +129,7 @@ func (bm *benchmarkSource) Run(b *testing.B) {
bm.reportMetrics(b)
}

func (bm *benchmarkSource) acker(b *testing.B, c <-chan Record, wg *sync.WaitGroup) {
func (bm *benchmarkSource) acker(b *testing.B, c <-chan opencdc.Record, wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()

Expand Down
Loading