Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Last position handling #504

Merged
merged 62 commits into from
Jul 26, 2022
Merged
Show file tree
Hide file tree
Changes from 61 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
1c57687
implement ticket queue
lovromazgon Jun 13, 2022
8bf3f66
experiment with ordered semaphore
lovromazgon Jun 14, 2022
424d889
ticketqueue benchmarks
lovromazgon Jun 14, 2022
8a852db
reduce allocations
lovromazgon Jun 17, 2022
ec7249e
remove ticketqueue (semaphore implementation is more performant)
lovromazgon Jun 21, 2022
b288e21
optimize semaphore for our use case
lovromazgon Jun 21, 2022
0471fbe
fix linter warnings, better benchmarks
lovromazgon Jun 21, 2022
83f8184
better docs
lovromazgon Jun 21, 2022
f4cfe81
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 21, 2022
83c97e0
go mod tidy
lovromazgon Jun 21, 2022
c0ded08
rename AckerNode to DestinationAckerNode
lovromazgon Jun 10, 2022
c66c273
remove message status change middleware to ensure all message handler…
lovromazgon Jun 22, 2022
68cd7c7
implement SourceAckerNode
lovromazgon Jun 22, 2022
6bdc893
add todo note about possible deadlock
lovromazgon Jun 23, 2022
8b6dc73
source acker node test
lovromazgon Jun 23, 2022
d6c9641
remove message status dropped
lovromazgon Jun 23, 2022
f3273f3
document behavior, fanout node return nacked message error
lovromazgon Jun 24, 2022
c6be9e8
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jun 28, 2022
94b4f43
don't forward acks after a failed ack/nack
lovromazgon Jun 28, 2022
7d3749b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jun 28, 2022
3e283dd
use cerrors
lovromazgon Jun 28, 2022
cb70142
Merge branch 'lovro/source-acker-node' into lovro/remove-message-dropped
lovromazgon Jun 29, 2022
ce14149
update plugin interface
lovromazgon Jun 29, 2022
3bdec6c
update standalone plugin implementation
lovromazgon Jun 29, 2022
6ab51fc
update builtin plugin implementation
lovromazgon Jun 29, 2022
08115fe
update connector
lovromazgon Jun 29, 2022
65e087a
update nodes
lovromazgon Jun 29, 2022
ffd3637
change plugin semantics, close stream on teardown
lovromazgon Jun 29, 2022
03cbb0c
refactor stream, reuse it in source and destination
lovromazgon Jun 29, 2022
eb69500
lock stream when stopping
lovromazgon Jun 29, 2022
ef06990
create control message for source stop
lovromazgon Jun 30, 2022
9e50b1f
forward last position to destination
lovromazgon Jun 30, 2022
c46310d
update connector SDK, fix race condition in source node
lovromazgon Jun 30, 2022
a6a898b
make Conduit in charge of closing connector streams
lovromazgon Jul 1, 2022
8c534b1
destination acker tests
lovromazgon Jul 4, 2022
aa5f17b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 5, 2022
1bf9a7b
Merge branch 'main' into lovro/ticketqueue
lovromazgon Jul 6, 2022
f43fe35
use cerrors.New
lovromazgon Jul 7, 2022
19d6123
Merge branch 'lovro/stability' into lovro/ticketqueue
lovromazgon Jul 7, 2022
c36b19a
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 7, 2022
befaf44
use LogOrReplace
lovromazgon Jul 7, 2022
874bb86
Merge branch 'lovro/source-acker-node' into lovro/remove-message-dropped
lovromazgon Jul 7, 2022
054d6c7
Merge branch 'lovro/remove-message-dropped' into lovro/last-position
lovromazgon Jul 7, 2022
297f553
use LogOrReplace
lovromazgon Jul 7, 2022
f18f96c
make signal channel buffered
lovromazgon Jul 8, 2022
64b7d25
improve benchmarks
lovromazgon Jul 11, 2022
24c3386
fix linter error
lovromazgon Jul 11, 2022
d4dd111
add comments
lovromazgon Jul 11, 2022
3ebb744
simplify implementation
lovromazgon Jul 11, 2022
b628a7b
Merge branch 'lovro/ticketqueue' into lovro/source-acker-node
lovromazgon Jul 11, 2022
dc31319
update semaphore
lovromazgon Jul 11, 2022
5a6e8a3
update param name
lovromazgon Jul 11, 2022
54a65d1
remove redundant if clause
lovromazgon Jul 11, 2022
a668a02
Merge branch 'lovro/stability' into lovro/source-acker-node
lovromazgon Jul 11, 2022
25c75bc
Merge branch 'lovro/source-acker-node' into lovro/remove-message-dropped
lovromazgon Jul 11, 2022
9bf1b74
Merge branch 'lovro/stability' into lovro/remove-message-dropped
lovromazgon Jul 11, 2022
540cc66
Merge branch 'lovro/remove-message-dropped' into lovro/last-position
lovromazgon Jul 22, 2022
8ad3b02
Merge branch 'lovro/stability' into lovro/last-position
lovromazgon Jul 22, 2022
e9b8c3b
make it possible only to inject control messages
lovromazgon Jul 25, 2022
7d088d6
improve destination acker caching test
lovromazgon Jul 25, 2022
d9a0087
remove TODO comment
lovromazgon Jul 25, 2022
6099542
update comment
lovromazgon Jul 26, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ require (
github.com/conduitio/conduit-connector-generator v0.1.0
github.com/conduitio/conduit-connector-kafka v0.1.1
github.com/conduitio/conduit-connector-postgres v0.1.0
github.com/conduitio/conduit-connector-protocol v0.2.0
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d
github.com/conduitio/conduit-connector-s3 v0.1.1
github.com/conduitio/conduit-connector-sdk v0.2.0
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435
github.com/dgraph-io/badger/v3 v3.2103.2
github.com/dop251/goja v0.0.0-20210225094849-f3cfc97811c0
github.com/golang/mock v1.6.0
Expand All @@ -33,7 +33,7 @@ require (
go.buf.build/library/go-grpc/conduitio/conduit-connector-protocol v1.4.2
golang.org/x/tools v0.1.11
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509
google.golang.org/grpc v1.47.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.2.0
google.golang.org/protobuf v1.28.0
Expand Down Expand Up @@ -73,6 +73,7 @@ require (
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/gammazero/deque v0.2.0 // indirect
github.com/go-chi/chi/v5 v5.0.7 // indirect
github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect
github.com/gofrs/flock v0.8.1 // indirect
Expand Down Expand Up @@ -123,13 +124,14 @@ require (
github.com/xitongsys/parquet-go-source v0.0.0-20220315005136-aec0fe3e777c // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.21.0 // indirect
golang.org/x/crypto v0.0.0-20220511200225-c6db032c6c88 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e // indirect
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c // indirect
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b // indirect
golang.org/x/term v0.0.0-20220526004731-065cf7ba2467 // indirect
golang.org/x/text v0.3.7 // indirect
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
Expand Down
26 changes: 15 additions & 11 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,12 @@ github.com/conduitio/conduit-connector-kafka v0.1.1 h1:RgN4nafEWjpA4VvXLdSQBNrEO
github.com/conduitio/conduit-connector-kafka v0.1.1/go.mod h1:+CbMUq4fIMxFnrINtjxuVTW5TZYa549WJXQFb63GaIU=
github.com/conduitio/conduit-connector-postgres v0.1.0 h1:Dj2S1NrwnJaUOgQqb9MjGSl2vv2gre0mSFE2Ne/5OSE=
github.com/conduitio/conduit-connector-postgres v0.1.0/go.mod h1:ug4N+2pGKDbG5UN++w7xRqb0A5Ua2J5Ld5wUzLbU1Q0=
github.com/conduitio/conduit-connector-protocol v0.2.0 h1:gwYXVKEMgTtU67ephQ5WwTGIDbT/eTLA9Mdr9Bnbqxc=
github.com/conduitio/conduit-connector-protocol v0.2.0/go.mod h1:udCU2AkLcYQoLjAO06tHVL2iFJPw+DamK+wllnj50hk=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d h1:f3R0yPiH45hDZwNcYMSzKJP6LOGQPELCqW9OkZmd2lA=
github.com/conduitio/conduit-connector-protocol v0.2.1-0.20220608133528-f466a956bd4d/go.mod h1:1nmTaD+l3mvq3PnMmPPx8UxHPM53Xk8zGT3URu2Xx2M=
github.com/conduitio/conduit-connector-s3 v0.1.1 h1:10uIakNmF65IN5TNJB1qPWC6vbdGgrHEMg8r+dxDrc8=
github.com/conduitio/conduit-connector-s3 v0.1.1/go.mod h1:xpfBzOGjZkkglTmF1444qEjXuEx+do1PTYZNroPFcSE=
github.com/conduitio/conduit-connector-sdk v0.2.0 h1:yReJT3SOAGqJIlk59WC5FPgpv0Gg+NG4NFj6FJ89XnM=
github.com/conduitio/conduit-connector-sdk v0.2.0/go.mod h1:zZ/YJqhIZyXdVmFJS55zqkukpBmB+ohbX2kDduoj8Z0=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435 h1:/bjfGf/vG8vV5WjDb7vcsluVxPZVvfsYRF4nhzJg8q4=
github.com/conduitio/conduit-connector-sdk v0.2.1-0.20220622151135-47f1a8905435/go.mod h1:RVVcsR1JBSyN8cxzjBVMyTKDym3KS6MXD2Lons/Wsw4=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand Down Expand Up @@ -205,6 +205,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/fsnotify/fsnotify v1.5.1 h1:mZcQUHVQUQWoPXXtuf9yuEXKudkV2sx1E06UadKWpgI=
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-chi/chi/v5 v5.0.7/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
Expand Down Expand Up @@ -610,8 +612,9 @@ go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
Expand Down Expand Up @@ -676,6 +679,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down Expand Up @@ -726,8 +730,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 h1:Yqz/iviulwKwAREEeUd3nbBFn0XuyJqkoft2IlrvOhc=
golang.org/x/net v0.0.0-20220617184016-355a448f1bc9/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e h1:TsQ7F31D3bUCLeqPT0u+yjp1guoArKaNKmCr22PYgTQ=
golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down Expand Up @@ -805,8 +809,8 @@ golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b h1:2n253B2r0pYSmEV+UNCQoPfU/FiaizQEK5Gu4Bq4JE8=
golang.org/x/sys v0.0.0-20220627191245-f75cf1eec38b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
Expand Down Expand Up @@ -948,8 +952,8 @@ google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad h1:kqrS+lhvaMHCxul6sKQvKJ8nAAhlVItmZV822hYFH/U=
google.golang.org/genproto v0.0.0-20220617124728-180714bec0ad/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509 h1:eUofWZEQ3SqKIW6WImdM2sxVVjnL0ahOYuIYC6WEYI8=
google.golang.org/genproto v0.0.0-20220630150403-404d0664e509/go.mod h1:KEWEmljWE5zPzLBa/oHl6DaEt9LmfH6WtH1OHIvleBA=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
12 changes: 8 additions & 4 deletions pkg/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ type Source interface {
// processed and can be acknowledged.
Ack(context.Context, record.Position) error

// Stop signals to the source to stop producing records. Note that after
// this call Read can still produce records that have been cached by the
// connector.
Stop(context.Context) error
// Stop signals to the source to stop producing records. After this call
// Read will produce records until the record with the last position has
// been read (Conduit might have already received that record).
Stop(context.Context) (record.Position, error)
}

// Destination is a connector that can write records to a destination.
Expand All @@ -111,6 +111,10 @@ type Destination interface {
// processed and returns the position of that record. If the record wasn't
// successfully processed the function returns the position and an error.
Ack(context.Context) (record.Position, error)

// Stop signals to the destination that no more records will be produced
// after record with the last position.
Stop(context.Context, record.Position) error
}

// Config collects common data stored for a connector.
Expand Down
46 changes: 37 additions & 9 deletions pkg/connector/destination.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/foundation/multierror"
"github.com/conduitio/conduit/pkg/plugin"
"github.com/conduitio/conduit/pkg/record"
)
Expand Down Expand Up @@ -54,12 +53,17 @@ type destination struct {
// plugin is the running instance of the destination plugin.
plugin plugin.DestinationPlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a destination from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

var _ Destination = (*destination)(nil)

func (d *destination) ID() string {
return d.XID
}
Expand Down Expand Up @@ -146,19 +150,41 @@ func (d *destination) Open(ctx context.Context) error {
return err
}

err = dest.Start(ctx)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = dest.Start(streamCtx)
if err != nil {
cancelStreamCtx()
_ = dest.Teardown(ctx)
return err
}

d.logger.Info(ctx).Msg("destination connector plugin successfully started")

d.plugin = dest
d.stopStream = cancelStreamCtx
d.persister.ConnectorStarted()
return nil
}

func (d *destination) Stop(ctx context.Context, lastPosition record.Position) error {
cleanup, err := d.preparePluginCall()
defer cleanup()
if err != nil {
return err
}

d.logger.Debug(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("sending stop signal to destination connector plugin")
err = d.plugin.Stop(ctx, lastPosition)
if err != nil {
return cerrors.Errorf("could not stop destination plugin: %w", err)
}

d.logger.Debug(ctx).Msg("destination connector plugin successfully responded to stop signal")
return nil
}

func (d *destination) Teardown(ctx context.Context) error {
// lock destination as we are about to mutate the plugin field
d.m.Lock()
Expand All @@ -167,23 +193,25 @@ func (d *destination) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

d.logger.Debug(ctx).Msg("stopping destination connector plugin")
err := d.plugin.Stop(ctx)
// close stream
if d.stopStream != nil {
d.stopStream()
d.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Ack or Write)
// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Write)
d.wg.Wait()

d.logger.Debug(ctx).Msg("tearing down destination connector plugin")
err = multierror.Append(err, d.plugin.Teardown(ctx))

err := d.plugin.Teardown(ctx)
d.plugin = nil
d.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down destination connector plugin: %w", err)
}

d.logger.Info(ctx).Msg("connector plugin successfully torn down")
d.logger.Info(ctx).Msg("destination connector plugin successfully torn down")
return nil
}

Expand Down
21 changes: 18 additions & 3 deletions pkg/connector/mock/connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 25 additions & 12 deletions pkg/connector/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ type source struct {
// plugin is the running instance of the source plugin.
plugin plugin.SourcePlugin

// stopStream is a function that closes the context of the stream
stopStream context.CancelFunc

// m can lock a source from concurrent access (e.g. in connector persister).
m sync.Mutex
// wg tracks the number of in flight calls to the plugin.
wg sync.WaitGroup
}

// not running -> running -> stopping -> not running
var _ Source = (*source)(nil)

func (s *source) ID() string {
return s.XID
Expand Down Expand Up @@ -146,34 +149,39 @@ func (s *source) Open(ctx context.Context) error {
return err
}

err = src.Start(ctx, s.XState.Position)
streamCtx, cancelStreamCtx := context.WithCancel(ctx)
err = src.Start(streamCtx, s.XState.Position)
if err != nil {
cancelStreamCtx()
_ = src.Teardown(ctx)
return err
}

s.logger.Info(ctx).Msg("source connector plugin successfully started")

s.plugin = src
s.stopStream = cancelStreamCtx
s.persister.ConnectorStarted()
return nil
}

func (s *source) Stop(ctx context.Context) error {
func (s *source) Stop(ctx context.Context) (record.Position, error) {
cleanup, err := s.preparePluginCall()
defer cleanup()
if err != nil {
return err
return nil, err
}

s.logger.Debug(ctx).Msg("stopping source connector plugin")
err = s.plugin.Stop(ctx)
s.logger.Debug(ctx).Msg("sending stop signal to source connector plugin")
lastPosition, err := s.plugin.Stop(ctx)
if err != nil {
return cerrors.Errorf("could not stop plugin: %w", err)
return nil, cerrors.Errorf("could not stop source plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully stopped")
return nil
s.logger.Info(ctx).
Bytes(log.RecordPositionField, lastPosition).
Msg("source connector plugin successfully responded to stop signal")
return lastPosition, nil
}

func (s *source) Teardown(ctx context.Context) error {
Expand All @@ -184,21 +192,26 @@ func (s *source) Teardown(ctx context.Context) error {
return plugin.ErrPluginNotRunning
}

// close stream
if s.stopStream != nil {
s.stopStream()
s.stopStream = nil
}

// wait for any calls to the plugin to stop running first (e.g. Stop, Ack or Read)
s.wg.Wait()

s.logger.Debug(ctx).Msg("tearing down source connector plugin")

err := s.plugin.Teardown(ctx)

s.plugin = nil
s.persister.ConnectorStopped()

if err != nil {
return cerrors.Errorf("could not tear down plugin: %w", err)
return cerrors.Errorf("could not tear down source connector plugin: %w", err)
}

s.logger.Info(ctx).Msg("connector plugin successfully torn down")
s.logger.Info(ctx).Msg("source connector plugin successfully torn down")
return nil
}

Expand Down
Loading