-
Notifications
You must be signed in to change notification settings - Fork 4
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Potenial data consistency issues #381
Comments
I think this may be related to #333 But I see 2.6 as RC since September. Do you folks have a due date to release it? |
Hello @abdelhakimbendjabeur. We are facing a similar issue with our BigQuery Sink connector deployment. I'm interested in this topic. |
Hi @andrelu |
We have not confirmed it yet, but it is most probably related to this one https://issues.apache.org/jira/browse/KAFKA-18073 |
Hello 👋
I have been experiencing some data consistency issues when sinking a single Kafka topic to a BigQuery table using the connector
Version
wepay/kafka-connect-bigquery 2.5.0
cp-kafka-connect-base:7.4.0
Source Topic
The source topic contains CDC data from a PG table, it goes though a Flink pipeline that add a few extra columns and filterd rows based on some criteria.
->
timestamp
refers to the moment when the PG transaction occurred (insert, update, delete)->
event_id
is UUID that is unique per payload, it's hash to identify unique events.Connector configuration
I use 2 custom SMT that do not ignore records
CopyFieldFromKeyToValue -> add a new field to the payload from the record key.
SetToEpoch -> if the value is negative, replace by 0.
Deployment on Kubernetes
PS. I deliberately put high resources because I already had the data consistency issues and I thought it was related multiple restarted caused by throttling on CPU/Memory when the resources were not enough.
Bug description
After deploying the connector, I waited for it to reach the tail of the topic before running some checks. I noticed some records missing.
How? -> I have another pipeline that sinks records to ClickHouse for analytics purposes and the records are there.
How I proceeded?
PS. Note that I did not clean the table, so there will be a lot of duplicates. This is okay because I will be counting unique events/tickets/messages.
After noticing that the consumer-group is at the tail, run the same query on both the table with duplicates and the backup.
$ ./bin/kafka-consumer-groups.sh \ --command-config=... \ --bootstrap-server=... \ --group="connect-analytics-cdc-filtered-bq-sink" \ --describe --timeout=100000 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 3 20576030 20576030 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 4 20617505 20617506 1 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 5 20633814 20633814 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 6 20606746 20606747 1 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 7 20570934 20570935 1 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 8 20627953 20627953 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 9 20641628 20641633 5 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 10 20610793 20610795 2 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 11 20586025 20586027 2 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 0 20625081 20625083 2 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 1 20642950 20642958 8 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 2 20593419 20593429 10
What I discovered is the second run had brought new records, meaning the first run skipped them somehow, which is very concerning...
Note the rows that are circled in red, they show more unique tickets/messages/events on the
run_after_offset_reset
data.The fact that it has more unique tickets/messages/events means that these records are indeed in the topic and there were somehow missed during the first sink.
I am having trouble understanding where the problem comes from.
No weird error logs have been noticed.
Has anybody experienced something similar or if there is something wrong with the config, I'd love to hear about it.
Thank you 🙏
The text was updated successfully, but these errors were encountered: