forked from ConduitIO/conduit-connector-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsource.go
100 lines (86 loc) · 4.03 KB
/
source.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package connectorname
//go:generate paramgen -output=paramgen_src.go SourceConfig
import (
"context"
"fmt"
"github.com/conduitio/conduit-commons/config"
"github.com/conduitio/conduit-commons/opencdc"
sdk "github.com/conduitio/conduit-connector-sdk"
)
type Source struct {
sdk.UnimplementedSource
config SourceConfig
lastPositionRead opencdc.Position //nolint:unused // this is just an example
}
type SourceConfig struct {
// Config includes parameters that are the same in the source and destination.
Config
// SourceConfigParam is named foo and must be provided by the user.
SourceConfigParam string `json:"foo" validate:"required"`
}
func NewSource() sdk.Source {
// Create Source and wrap it in the default middleware.
return sdk.SourceWithMiddleware(&Source{}, sdk.DefaultSourceMiddleware()...)
}
func (s *Source) Parameters() config.Parameters {
// Parameters is a map of named Parameters that describe how to configure
// the Source. Parameters can be generated from SourceConfig with paramgen.
return s.config.Parameters()
}
func (s *Source) Configure(ctx context.Context, cfg config.Config) error {
// Configure is the first function to be called in a connector. It provides
// the connector with the configuration that can be validated and stored.
// In case the configuration is not valid it should return an error.
// Testing if your connector can reach the configured data source should be
// done in Open, not in Configure.
// The SDK will validate the configuration and populate default values
// before calling Configure. If you need to do more complex validations you
// can do them manually here.
sdk.Logger(ctx).Info().Msg("Configuring Source...")
err := sdk.Util.ParseConfig(ctx, cfg, &s.config, NewSource().Parameters())
if err != nil {
return fmt.Errorf("invalid config: %w", err)
}
return nil
}
func (s *Source) Open(_ context.Context, _ opencdc.Position) error {
// Open is called after Configure to signal the plugin it can prepare to
// start producing records. If needed, the plugin should open connections in
// this function. The position parameter will contain the position of the
// last record that was successfully processed, Source should therefore
// start producing records after this position. The context passed to Open
// will be cancelled once the plugin receives a stop signal from Conduit.
return nil
}
func (s *Source) Read(_ context.Context) (opencdc.Record, error) {
// Read returns a new Record and is supposed to block until there is either
// a new record or the context gets cancelled. It can also return the error
// ErrBackoffRetry to signal to the SDK it should call Read again with a
// backoff retry.
// If Read receives a cancelled context or the context is cancelled while
// Read is running it must stop retrieving new records from the source
// system and start returning records that have already been buffered. If
// there are no buffered records left Read must return the context error to
// signal a graceful stop. If Read returns ErrBackoffRetry while the context
// is cancelled it will also signal that there are no records left and Read
// won't be called again.
// After Read returns an error the function won't be called again (except if
// the error is ErrBackoffRetry, as mentioned above).
// Read can be called concurrently with Ack.
return opencdc.Record{}, nil
}
func (s *Source) Ack(_ context.Context, _ opencdc.Position) error {
// Ack signals to the implementation that the record with the supplied
// position was successfully processed. This method might be called after
// the context of Read is already cancelled, since there might be
// outstanding acks that need to be delivered. When Teardown is called it is
// guaranteed there won't be any more calls to Ack.
// Ack can be called concurrently with Read.
return nil
}
func (s *Source) Teardown(_ context.Context) error {
// Teardown signals to the plugin that there will be no more calls to any
// other function. After Teardown returns, the plugin should be ready for a
// graceful shutdown.
return nil
}