-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[temp] Temporary PR to observe diff between working and main branches #153
base: 2.5.2.Final
Are you sure you want to change the base?
Conversation
Initial changes required for the Debezium Connector for Postgres to work with YugabyteDB source.
…st YugabyteDB (#105) This PR includes the changes required for the tests so that they can work against YugabyteDB. YugabyteDB issue: yugabyte/yugabyte-db#21394
Modified Dockerfile to package custom log4j.properties so that the log files can be rolled over when their size exceeds 100MB. Also changed the Kafka connect JDBC jar being used - this new jar has a custom change to log every sink record going to the target database.
Changes in this PR: 1. Modification of Dockerfile to include transformers for aiven at the time of docker image compilation a. Aiven source: https://github.com/Aiven-Open/transforms-for-apache-kafka-connect
…BC driver (#107) ## Problem The Debezium connector for Postgres uses a single host model where the JDBC driver connects to a PG instance and continues execution. However, when we move to YugabyteDB where we have a multi node deployment, the current model can fail in case the node it has connected to goes down. ## Solution To address that, we have made changes in this PR and replaced the Postgres JDBC driver with [YugabyteDB smart driver](https://github.com/yugabyte/pgjdbc) which allows us to specify multiple hosts in the JDBC url so that the connector does not fail or run into any fatal error while maintaining the High Availability aspect of YugabyteDB. Changes in this PR include: 1. Changing of version in `pom.xml` from `2.5.2.Final` to `2.5.2.ybpg.20241-SNAPSHOT` a. This is done to ensure that upon image compilation, the changed code from Debezium Code is picked up. 2. Replacing of all packages from `org.postgresql.*` to `com.yugabyte.*` to comply with the new JDBC driver. 3. Masking the validator method in debezium-core which disallowed characters like `: (colon)` in the configuration property `database.hostname`
**Summary** This PR is to support consistent snapshot in the case of an existing slot. In this case, the consistent_point hybrid time is determined from the pg_replication_slots view, specifically from the yb_restart_commit_ht column. There is an assumption here that this slot has not been used for streaming till this point. If this holds, then the history retention barrier will be in place as of the consistent snapshot time (consistent_point). The snapshot query will be run as of the consistent_point and subsequent streaming will start from the consistent_point of the slot. **Test Plan** Added new test mvn -Dtest=PostgresConnectorIT#initialSnapshotWithExistingSlot test
**Changes:** 1. Providing JMX Exporter jar to KAFKA_OPTS to be further provided to java options. 2. Modifying `metrics.yaml` to include correct regex to be scraped as per Postgres connector.
…peruser (#115) **Summary** This PR adds the support for a non superuser to be configured as the connector user (database.user). Such a user is required to have the privileges listed in https://debezium.io/documentation/reference/2.5/connectors/postgresql.html#postgresql-permissions Specifically, the changes in this revision relate to how the consistent_point is specified to the YugabyteDB server in order to execute a consistent snapshot. **Test Plan** Added new test mvn -Dtest=PostgresConnectorIT#nonSuperUserSnapshotAndStreaming test
…nectorTask (#114) This PR is to add a higher level retry whenever there's a where while starting a PostgresConnectorTask, the failures can include but not limited to the following: 1. Failure of creating JDBC connection 2. Failure to execute query 3. Tserver/master restart 4. Node restart 5. Connection failure
…streaming (#116) ## Problem PG connector does not wait for acknowledgement of snapshot completion offset before transitioning to streaming. This can lead to an issue if there is a connector restart in the streaming phase and it goes for a snapshot on restart. In streaming phase, as soon as the 1st GetChanges call is made on the server, the retention barriers are lifted and so the server can no longer serve the snapshot records on a restart. Therefore it is important that the connector waits for acknowledgement of snapshot completion offset before it actually transitions to streaming. ## Solution This PR introduces a waiting mechanism for acknowledgement of snapshot completion offset before transitioning to streaming. We have introduced a custom heartbeat implementation that will dispatch heartbeat when forced heartbeat method is called but we'll dispatch nothing when a normal heartbeat method is called. With this PR, connector will dispatch heartbeats while waiting for the snapshot completion offset i.e during the transition phase. For these heartbeat calls, there is no need to set the `heartbeat.interval.ms` since we are making forced heartbeat calls which do not rely on this config. Note, this heartbeat call is only required to support applications using debezium engine/embedded engine. It is not required when the connector is run with kakfa-connect. ### Test Plan Manually deployed connector in a docker container and tested two scenarios: 0 snapshot records & non-zero snapshot records. Unit tests corresponding to these scenarios will be added in a separate PR.
#119) **Summary** This PR adds support for the INITIAL_ONLY snapshot mode for Yugabyte. In the case of Yugabyte also, the snapshot is consumed by executing a snapshot query (SELECT statement) . To ensure that the streaming phase continues exactly from where the snapshot left, this snapshot query is executed as of a specific database state. In YB, this database state is represented by a value of HybridTime. Changes due to transactions with commit_time strictly greater than this snapshot HybridTime will be consumed during the streaming phase. This value for HybridTime is the value of the "yb_restart_commit_ht" column of the pg_replication_slots view of the associated slot. Thus, in the case of Yugabyte, even for the INITIAL_ONLY snapshot mode, a slot needs to be created if one does not exist. With this approach, a connector can be deployed in INITIAL_ONLY mode to consume the initial snapshot. This can be followed by the deployment of another connector in NEVER mode. This connector will continue the streaming from exactly where the snapshot left. **Test Plan** 1. Added new test -` mvn -Dtest=PostgresConnectorIT#snapshotInitialOnlyFollowedByNever test ` 2. Enabled existing test - `mvn -Dtest=PostgresConnectorIT#shouldNotProduceEventsWithInitialOnlySnapshot test` 3. Enabled existing test - `mvn -Dtest=PostgresConnectorIT#shouldPerformSnapshotOnceForInitialOnlySnapshotMode test `
… image (#118) This PR adds the dependencies for the `AvroConverter` to function in the Kafka Connect environment. The dependencies will only be added at the time of building the docker image.
This PR adds a log which will be print the IP of the node every time a connection is created.
Retry in case of failures while task is restarting. Right now any kind of failure will lead to task throwing RetriableException exception causing Task restart.
…te source (#120) **Summary** This PR enables 30/33 tests in IncrementalSnapshotIT for Yugabyte source The tests that are excluded are 1. updates 2. updatesLargeChunk 3. updatesWithRestart **Test Plan** `mvn -Dtest=IncrementalSnapshotIT test`
This PR comments out the part in the init_database i.e. the startup script during tests where some extensions are being installed - it is taking more than 2 minutes at this stage and since we do not need it in the tests we use, it can be skipped.
Throw retry for all exceptions. In future, we will need to throw runtime exception for wrong configurations.
…r image (#129) This PR only changes the link in the `Dockerfile` to fetch the latest custom sink connector jar from GitHub. According to PR yugabyte/kafka-connect-jdbc#3, changes include the following: 1. Addition of 3 new configuration properties * `log.table.balance`: i. Default is `false` but when set to `true`, the sink connector will execute a query to get the table balance from the target table. ii. Note that this is only applicable for consistency related tests where the given query is applicable - it will fail if set in any other tests. * `expected.total.balance` i. Default is `1000000` (1M) which can be changed to whatever value we are expecting the total balance to be in the target table. * `tables.for.balance` i. This takes a comma separated string value for all the table names from which the sink connector is supposed to extract balances from. ii. This property will only be valid when `log.table.balance` is specified as `true` iii. There is no default for this property so if `log.table.balance` is set to `true` and `tables.for.balance` is not specified then we will throw a `RuntimeException` 2. Log additions to aid debugging.
…is set (#131) ## Problem PG connector filters out record based on its starting point (WAL position), which in turn depends on the offset received from Kafka. So, in case, the starting point corresponds to a record in the middle of a transaction, PG connector will filter out the records of that transaction with LSN < starting point. This creates a problem in the downstream pipeline expects consistency of data. Filtering of records leads to PG connector shipping transaction with missing records. When such a transaction is applied on the sink, consistency breaks. ## Solution When 'provide.transaction.metadata' connector configuration is set, PG connector ships transaction boundary records BEGIN/COMMIT. Based on these boundary records, sink connector writes data maintaining consistency. Therefore, when this connector config is set, we will disable filtering records based on WAL Resume position. ## Testing Manually testing - Ran the connector with this fix in our QA runs where the issue was first discovered. All 10 runs triggered passed. Unit testing - Cannot reproduce the above mentioned scenario in a unit test.
…rds with CHANGE (#106)
This PR adds a configuration to let the user disable consistent snapshot i.e. `yb.consistent.snapshot` to the connector which is enabled by default. Setting consistent snapshot means setting/establishing the boundary between snapshot records (records that existed at the time of stream creation) and streaming records. If `yb.consistent.snapshot` is disabled i.e. set to `false`: - We will not be setting a boundary for snapshot - If the connector restarts after taking a snapshot but before acknowledging the streaming LSN, the snapshot will be taken again. This can result in some records being received both during the snapshot phase and the streaming phase.
This PR enabled some of the replica identity related tests that were disabled earlier because of the underlying operation not being supported by `yugabyte-db`.
…ies (#130) This PR adds the changes to the connector required to support all kinds of operations supported by YugabyteDB when coupled with various replica identities. One change to note here would be that in case of `DEFUALT` replica identity, Postgres sends out the before image of the primary key column in case of `DELETE` and `UPDATE` operations. Meanwhile, YugabyteDB only sends out the before image of primary key in case of `DELETE` operations, and if a primary key itself is updated, YugabyteDB sends two events: 1. Delete for the existing row 2. Insert with the new value of the primary key Additionally to test out everything, we have a new test class `YugabyteReplicaIdentityIT`.
Currently the PG debezium connector makes use of smart driver for creating connections to YugabyteDB. The smart driver has in built support for load balancing. However to enable this, the property `load-balance=true` needs to be set in the connection URL. This PR adds the `load-balance=true` property to the multi host URL used for creating load-balanced connections.
Fixed and trimmed log messages. PG connector skips empty txns and these txns were being logged as a warning with the message "possible data loss", which is not true. Therefore, we are now removing the "possible data loss" log message.
Based on our testing so far, whenever the PG connector has skipped txns, those txns have always been empty txns.
…put (#140) This PR changes the default plugin in the connector to `yboutput`. This closes yugabyte/yugabyte-db#23153
…llow YB standards (#136) This PR also changes the versioning scheme of the connector where the version would now look like: * `dz.<DebeziumVersion>.yb.<YBVersion>` * For example: `dz.2.5.2.yb.2024.1` This partially addresses yugabyte/yugabyte-db#23081
…g convention (#142) The PR renamed the main class PostgresConnector to YBPostgresConnector to avoid name clashes while loading the classes. Additionally, this closes yugabyte/yugabyte-db#23081
…bezium-core changes (#137) This PR reverts the changes made in `debezium-core` and converts those changes to be in the local module only. Changes include: 1. Adding a custom field definition for `HOSTNAME` in `PostgresConnectorConfig` which uses a new validator `PostgresConnectorConfig#validateYBHostname` a. Currently the validator definition returns a dummy positive result but we can modify this in future to set a validation rule if needed. 2. Removing the method `SnapshotResult#isSkipped` and keeping it localised in `PostgresSnapshotChangeEventSource` 3. Modifying `Dockerfile` to not copy custom `debezium-core-*.jar` - this will enforce that the images use the prepackaged jar only. This closes yugabyte/yugabyte-db#23082
This PR fixes the flakiness and false negatives by changing the following: 1. Disabling the tests which are unsupported i. Tests with postgis are not supported 2. Filtering heartbeat records while consuming 3. Fixing other test issues with incorrect record value matching or value assertions
…s for datatypes (#144) * This PR adds a test class `YBRecordsStreamProducerIT` which is essentially a copy of `RecordsStreamProducerIT`. * The tests are modified in order to support the structure generated by the plugin `yboutput`. * Some tests are disabled owing to the fact that the underlying feature is not yet supported. * Altering not allowed for column under replication --> yugabyte/yugabyte-db#16625 * Altering the replica identity is not allowed
) Some of the tests were not working while working with YugabyteDB's plugin `yboutput` (default) and the default replica identity `CHANGE` because the record structure is different from the vanilla `PostgresConnector`, these tests required change on what is being asserted and the way values are being read from the source records being produced by the plugin `yboutput`. This PR enables the following tests by adding the changes described: a. `TransactionMetadataIT` b. `SourceInfoTest` c. `ReplicaIdentityTestMapperTest` d. `PostgresSkipMessagesWithoutChangeConfigIT` e. `PostgresOffsetContextTest` f. `PostgresMoneyIT` g. `PostgresMetricsIT` h. `PostgresErrorHandlerTest` i. `PostgresConnectorConfigDefTest` j. `FieldTest` k. `DebeziumEngineIT`
This PR changes the name of the jar file which gets generated once the connector is compiled. We have changed the name from `debezium-connector-postgres` to `debezium-connector-yugabytedb`. Additionally, this PR modifies the `name` tag in the `pom.xml` file with the name of the source connector.
…tity CHANGE (#147) ## Problem Suppose we create a table which has a non-key column which also has a constraint `NOT NULL` - when the connector will start and CDC service will send a `RELATION` message, the column will be marked as "required" since a `NOT NULL` column means that the value should be present. But for a table with replica identity `CHANGE` the column will not be present in `UPDATE` operations if the column is not updated. This will violate the assumption of the column being marked as "required" since the value will not be present and the connector will end up throwing error of a similar format as: ``` org.apache.kafka.connect.errors.DataException: Invalid value: null used for required field: "name", schema type: STRUCT ``` ## Solution This PR introduces a fix to the above problem by adding a check while decoding the relation message - the check essentially verifies that if the replica identity of the table is `CHANGE` then we explicitly mark the column as optional **if it is a non key column**.
## Problem The Postgres JDVC driver jar being used currently has a `CRITICAL` vulnerability `CVE-2024-1597` as identified by `trivy` in the version `42.6.0`. ## Solution For the fix, this PR upgrades the jar to a driver version `42.6.1` which does not have the vulnerability.
This PR adds a GitHub action to the repository which can be run manually to publish a Docker image to Quay and create a GitHub release draft with a fat jar.
This PR fixes the path for the connector jar file to pick up the correct artefact to be packaged in the Docker image.
Hi @vaibhav-yb. Thank you for your valuable contribution. |
Welcome as a new contributor to Debezium, @vaibhav-yb. Reviewers, please add missing author name(s) and alias name(s) to the COPYRIGHT.txt and Aliases.txt respectively. |
Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
Hi @vaibhav-yb. Thank you for your valuable contribution. |
Hi @vaibhav-yb, thanks for your contribution. Please prefix the commit message(s) with the DBZ-xxx JIRA issue key. |
*/ | ||
@ThreadSafe | ||
@Immutable | ||
public class PGTableSchemaBuilder extends TableSchemaBuilder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class will need comparison with the base class TableSchemaBuilder
* | ||
* see the {@code sslmode} Postgres JDBC driver option | ||
*/ | ||
* Establish a secure connection first. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This formatting was wrong in the base branch itself, the formatter fixed it.
try (PostgresConnection tempConnection = new PostgresConnection(connectorConfig.getJdbcConfig(), PostgresConnection.CONNECTION_GENERAL)) { | ||
databaseCharset = tempConnection.getDatabaseCharset(); | ||
} | ||
try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes here are because of the try-catch
block we have added for retries.
No description provided.