Skip to content

Commit

Permalink
Use cpluginv2 and conduit commons (#130)
Browse files Browse the repository at this point in the history
* use cpluginv2 and conduit commons

* organize imports

* update to new cplugin types
  • Loading branch information
lovromazgon authored May 24, 2024
1 parent 2924827 commit 28feaf4
Show file tree
Hide file tree
Showing 48 changed files with 743 additions and 3,755 deletions.
100 changes: 51 additions & 49 deletions acceptance_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"testing"
"time"

"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
"github.com/jpillora/backoff"
"github.com/matryer/is"
"github.com/rs/zerolog"
Expand Down Expand Up @@ -88,7 +90,7 @@ type AcceptanceTestDriver interface {
// The generated record will contain mixed data types in the field Key and
// Payload (i.e. RawData and StructuredData), unless configured otherwise
// (see ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
GenerateRecord(*testing.T, Operation) Record
GenerateRecord(*testing.T, opencdc.Operation) opencdc.Record

// WriteToSource receives a slice of records that should be prepared in the
// 3rd party system so that the source will read them. The returned slice
Expand All @@ -97,15 +99,15 @@ type AcceptanceTestDriver interface {
// It is encouraged for the driver to return the same slice, unless there is
// no way to write the records to the 3rd party system, then the returning
// slice should contain the expected records a source should read.
WriteToSource(*testing.T, []Record) []Record
WriteToSource(*testing.T, []opencdc.Record) []opencdc.Record
// ReadFromDestination should return a slice with the records that were
// written to the destination. The slice will be used to verify the
// destination has successfully executed writes.
// The parameter contains records that were actually written to the
// destination. These will be compared to the returned slice of records. It
// is encouraged for the driver to only touch the input records to change
// the order of records and to not change the records themselves.
ReadFromDestination(*testing.T, []Record) []Record
ReadFromDestination(*testing.T, []opencdc.Record) []opencdc.Record

// ReadTimeout controls the time the test should wait for a read operation
// to return before it considers the operation as failed.
Expand Down Expand Up @@ -230,20 +232,20 @@ func (d ConfigurableAcceptanceTestDriver) GoleakOptions(_ *testing.T) []goleak.O
return d.Config.GoleakOptions
}

func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operation) Record {
func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op opencdc.Operation) opencdc.Record {
// TODO we currently only generate records with operation "create" and
// "snapshot", because this is the only operation we know all connectors
// should be able to handle. We should make acceptance tests more
// sophisticated and also check how the connector handles other operations,
// specifically "update" and "delete".
// Once we generate different operations we need to adjust how we compare
// records!
return Record{
Position: Position(d.randString(32)), // position doesn't matter, as long as it's unique
return opencdc.Record{
Position: opencdc.Position(d.randString(32)), // position doesn't matter, as long as it's unique
Operation: op,
Metadata: map[string]string{d.randString(32): d.randString(32)},
Key: d.GenerateData(t),
Payload: Change{
Payload: opencdc.Change{
Before: nil,
After: d.GenerateData(t),
},
Expand All @@ -253,7 +255,7 @@ func (d ConfigurableAcceptanceTestDriver) GenerateRecord(t *testing.T, op Operat
// GenerateData generates either RawData or StructuredData depending on the
// configured data type (see
// ConfigurableAcceptanceTestDriverConfig.GenerateDataType).
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data {
func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) opencdc.Data {
rand := d.getRand()

gen := d.Config.GenerateDataType
Expand All @@ -263,9 +265,9 @@ func (d ConfigurableAcceptanceTestDriver) GenerateData(t *testing.T) Data {

switch gen {
case GenerateRawData:
return RawData(d.randString(rand.Intn(1024) + 32))
return opencdc.RawData(d.randString(rand.Intn(1024) + 32))
case GenerateStructuredData:
data := StructuredData{}
data := opencdc.StructuredData{}
for {
data[d.randString(rand.Intn(1024)+32)] = d.GenerateValue(t)
if rand.Int63()%2 == 0 {
Expand Down Expand Up @@ -355,7 +357,7 @@ func (d ConfigurableAcceptanceTestDriver) randString(n int) string {
// destination. It is expected that the destination is writing to the same
// location the source is reading from. If the connector does not implement a
// destination the function will fail the test.
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []Record) []Record {
func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []opencdc.Record) []opencdc.Record {
if d.Connector().NewDestination == nil {
t.Fatal("connector is missing the field NewDestination, either implement the destination or overwrite the driver method Write")
}
Expand Down Expand Up @@ -394,7 +396,7 @@ func (d ConfigurableAcceptanceTestDriver) WriteToSource(t *testing.T, records []
// the source. It is expected that the destination is writing to the same
// location the source is reading from. If the connector does not implement a
// source the function will fail the test.
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []Record) []Record {
func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, records []opencdc.Record) []opencdc.Record {
if d.Connector().NewSource == nil {
t.Fatal("connector is missing the field NewSource, either implement the source or overwrite the driver method Read")
}
Expand Down Expand Up @@ -429,7 +431,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
Max: time.Second, // 8 tries
}

output := make([]Record, 0, len(records))
output := make([]opencdc.Record, 0, len(records))
for i := 0; i < cap(output); i++ {
// now try to read from the source
readCtx, readCancel := context.WithTimeout(ctx, d.ReadTimeout())
Expand Down Expand Up @@ -458,7 +460,7 @@ func (d ConfigurableAcceptanceTestDriver) ReadFromDestination(t *testing.T, reco
readCtx, readCancel := context.WithTimeout(ctx, d.ReadTimeout())
defer readCancel()
r, err := src.Read(readCtx)
is.Equal(Record{}, r) // record should be empty
is.Equal(opencdc.Record{}, r) // record should be empty
is.True(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrBackoffRetry))

return output
Expand Down Expand Up @@ -585,9 +587,9 @@ func (a acceptanceTest) TestSource_Configure_RequiredParams(t *testing.T) {
origCfg := a.driver.SourceConfig(t)

for name, p := range srcSpec.Parameters() {
isRequired := p.Required
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(ValidationRequired); ok {
if _, ok := v.(config.ValidationRequired); ok {
isRequired = true
break
}
Expand Down Expand Up @@ -620,7 +622,7 @@ func (a acceptanceTest) TestSource_Open_ResumeAtPositionSnapshot(t *testing.T) {
// Write expectations before source is started, this means the source will
// have to first read the existing data (i.e. snapshot), but we will
// interrupt it and try to resume.
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationSnapshot, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationSnapshot, 10))

source, sourceCleanup := a.openSource(ctx, t, nil) // listen from beginning
defer sourceCleanup()
Expand Down Expand Up @@ -681,7 +683,7 @@ func (a acceptanceTest) TestSource_Open_ResumeAtPositionCDC(t *testing.T) {
// Write expectations after source is open, this means the source is already
// listening to ongoing changes (i.e. CDC), we will interrupt it and try to
// resume.
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationCreate, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationCreate, 10))

// read all records, but ack only half of them
got, err := a.readMany(ctx, t, source, len(want))
Expand Down Expand Up @@ -718,7 +720,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {
defer goleak.VerifyNone(t, a.driver.GoleakOptions(t)...)

positions := make(map[string]bool)
isUniquePositions := func(t *testing.T, records []Record) {
isUniquePositions := func(t *testing.T, records []opencdc.Record) {
is := is.New(t)
for _, r := range records {
is.True(!positions[string(r.Position)])
Expand All @@ -727,7 +729,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {
}

// write expectation before source exists
want := a.driver.WriteToSource(t, a.generateRecords(t, OperationSnapshot, 10))
want := a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationSnapshot, 10))

source, sourceCleanup := a.openSource(ctx, t, nil) // listen from beginning
defer sourceCleanup()
Expand All @@ -745,7 +747,7 @@ func (a acceptanceTest) TestSource_Read_Success(t *testing.T) {

// while connector is running write more data and make sure the connector
// detects it
want = a.driver.WriteToSource(t, a.generateRecords(t, OperationCreate, 20))
want = a.driver.WriteToSource(t, a.generateRecords(t, opencdc.OperationCreate, 20))

t.Run("cdc", func(t *testing.T) {
is := is.New(t)
Expand All @@ -771,7 +773,7 @@ func (a acceptanceTest) TestSource_Read_Timeout(t *testing.T) {
readCtx, cancel := context.WithTimeout(ctx, a.driver.ReadTimeout())
defer cancel()
r, err := a.readWithBackoffRetry(readCtx, t, source)
is.Equal(Record{}, r) // record should be empty
is.Equal(opencdc.Record{}, r) // record should be empty
is.True(errors.Is(err, context.DeadlineExceeded) || errors.Is(err, ErrBackoffRetry))
}

Expand Down Expand Up @@ -819,9 +821,9 @@ func (a acceptanceTest) TestDestination_Configure_RequiredParams(t *testing.T) {
origCfg := a.driver.DestinationConfig(t)

for name, p := range destSpec.Parameters() {
isRequired := p.Required
isRequired := false
for _, v := range p.Validations {
if _, ok := v.(ValidationRequired); ok {
if _, ok := v.(config.ValidationRequired); ok {
isRequired = true
break
}
Expand Down Expand Up @@ -854,7 +856,7 @@ func (a acceptanceTest) TestDestination_Write_Success(t *testing.T) {
dest, cleanup := a.openDestination(ctx, t)
defer cleanup()

want := a.generateRecords(t, OperationSnapshot, 20)
want := a.generateRecords(t, opencdc.OperationSnapshot, 20)

writeCtx, cancel := context.WithTimeout(ctx, a.driver.WriteTimeout())
defer cancel()
Expand Down Expand Up @@ -893,7 +895,7 @@ func (a acceptanceTest) cloneConfig(orig map[string]string) map[string]string {
return cloned
}

func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos Position) (source Source, cleanup func()) {
func (a acceptanceTest) openSource(ctx context.Context, t *testing.T, pos opencdc.Position) (source Source, cleanup func()) {
is := is.New(t)

source = a.driver.Connector().NewSource()
Expand Down Expand Up @@ -941,16 +943,16 @@ func (a acceptanceTest) openDestination(ctx context.Context, t *testing.T) (dest
return dest, cleanup
}

func (a acceptanceTest) generateRecords(t *testing.T, op Operation, count int) []Record {
records := make([]Record, count)
func (a acceptanceTest) generateRecords(t *testing.T, op opencdc.Operation, count int) []opencdc.Record {
records := make([]opencdc.Record, count)
for i := range records {
records[i] = a.driver.GenerateRecord(t, op)
}
return records
}

func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Source, limit int) ([]Record, error) {
var got []Record
func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Source, limit int) ([]opencdc.Record, error) {
var got []opencdc.Record
for i := 0; i < limit; i++ {
readCtx, cancel := context.WithTimeout(ctx, a.driver.ReadTimeout())
defer cancel()
Expand All @@ -965,7 +967,7 @@ func (a acceptanceTest) readMany(ctx context.Context, t *testing.T, source Sourc
return got, nil
}

func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T, source Source) (Record, error) {
func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T, source Source) (opencdc.Record, error) {
b := &backoff.Backoff{
Factor: 2,
Min: time.Millisecond * 100,
Expand All @@ -979,7 +981,7 @@ func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T,
select {
case <-ctx.Done():
// return error
return Record{}, ctx.Err()
return opencdc.Record{}, ctx.Err()
case <-time.After(b.Duration()):
continue
}
Expand All @@ -989,7 +991,7 @@ func (a acceptanceTest) readWithBackoffRetry(ctx context.Context, t *testing.T,
}

// isEqualRecords compares two record slices and disregards their order.
func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
func (a acceptanceTest) isEqualRecords(is *is.I, want, got []opencdc.Record) {
is.Equal(len(want), len(got)) // record number did not match

if len(want) == 0 {
Expand All @@ -1004,14 +1006,14 @@ func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
// retrieves them as a whole in the payload
// this is valid behavior, we need to adjust the expectations
for i, wantRec := range want {
want[i] = Record{
want[i] = opencdc.Record{
Position: nil,
Operation: OperationSnapshot, // we allow operations Snapshot or Create
Metadata: nil, // no expectation for metadata
Key: got[i].Key, // no expectation for key
Payload: Change{
Operation: opencdc.OperationSnapshot, // we allow operations Snapshot or Create
Metadata: nil, // no expectation for metadata
Key: got[i].Key, // no expectation for key
Payload: opencdc.Change{
Before: nil,
After: RawData(wantRec.Bytes()), // the payload should contain the whole expected record
After: opencdc.RawData(wantRec.Bytes()), // the payload should contain the whole expected record
},
}
}
Expand All @@ -1031,7 +1033,7 @@ func (a acceptanceTest) isEqualRecords(is *is.I, want, got []Record) {
// - It then sorts only the want slice using the output of Record.Bytes().
// This assumes that the destination writes whole records and not only the
// payload. It does not check if the first records match after this.
func (a acceptanceTest) sortMatchingRecords(want, got []Record) {
func (a acceptanceTest) sortMatchingRecords(want, got []opencdc.Record) {
sort.Slice(want, func(i, j int) bool {
return string(want[i].Payload.After.Bytes()) < string(want[j].Payload.After.Bytes())
})
Expand All @@ -1051,9 +1053,9 @@ func (a acceptanceTest) sortMatchingRecords(want, got []Record) {
})
}

func (a acceptanceTest) isEqualRecord(is *is.I, want, got Record) {
if want.Operation == OperationSnapshot &&
got.Operation == OperationCreate {
func (a acceptanceTest) isEqualRecord(is *is.I, want, got opencdc.Record) {
if want.Operation == opencdc.OperationSnapshot &&
got.Operation == opencdc.OperationCreate {
// This is a special case and we accept it. Not all connectors will
// create records with operation "snapshot", but they will still able to
// produce records that were written before the source was open in
Expand All @@ -1078,18 +1080,18 @@ func (a acceptanceTest) isEqualRecord(is *is.I, want, got Record) {
}
}

func (a acceptanceTest) isEqualChange(is *is.I, want, got Change) {
func (a acceptanceTest) isEqualChange(is *is.I, want, got opencdc.Change) {
a.isEqualData(is, want.Before, got.Before)
a.isEqualData(is, want.After, got.After)
}

// isEqualData will match the two data objects in their entirety if the types
// match or only the byte slice content if types differ.
func (a acceptanceTest) isEqualData(is *is.I, want, got Data) {
_, wantIsRaw := want.(RawData)
_, gotIsRaw := got.(RawData)
_, wantIsStructured := want.(StructuredData)
_, gotIsStructured := got.(StructuredData)
func (a acceptanceTest) isEqualData(is *is.I, want, got opencdc.Data) {
_, wantIsRaw := want.(opencdc.RawData)
_, gotIsRaw := got.(opencdc.RawData)
_, wantIsStructured := want.(opencdc.StructuredData)
_, gotIsStructured := got.(opencdc.StructuredData)

if (wantIsRaw && gotIsRaw) ||
(wantIsStructured && gotIsStructured) ||
Expand Down
6 changes: 4 additions & 2 deletions benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"sync"
"testing"
"time"

"github.com/conduitio/conduit-commons/opencdc"
)

// BenchmarkSource is a benchmark that any source implementation can run to figure
Expand Down Expand Up @@ -84,7 +86,7 @@ func (bm *benchmarkSource) Run(b *testing.B) {
}
})

acks := make(chan Record, b.N) // huge buffer so we don't delay reads
acks := make(chan opencdc.Record, b.N) // huge buffer so we don't delay reads
var wg sync.WaitGroup
wg.Add(1)
go bm.acker(b, acks, &wg)
Expand Down Expand Up @@ -127,7 +129,7 @@ func (bm *benchmarkSource) Run(b *testing.B) {
bm.reportMetrics(b)
}

func (bm *benchmarkSource) acker(b *testing.B, c <-chan Record, wg *sync.WaitGroup) {
func (bm *benchmarkSource) acker(b *testing.B, c <-chan opencdc.Record, wg *sync.WaitGroup) {
defer wg.Done()
ctx := context.Background()

Expand Down
Loading

0 comments on commit 28feaf4

Please sign in to comment.