Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Potenial data consistency issues #381

Open
abdelhakimbendjabeur opened this issue Feb 7, 2024 · 6 comments
Open

Potenial data consistency issues #381

abdelhakimbendjabeur opened this issue Feb 7, 2024 · 6 comments

Comments

@abdelhakimbendjabeur
Copy link

abdelhakimbendjabeur commented Feb 7, 2024

Hello 👋

I have been experiencing some data consistency issues when sinking a single Kafka topic to a BigQuery table using the connector

Version

wepay/kafka-connect-bigquery 2.5.0

cp-kafka-connect-base:7.4.0

...
ARG KC_BQ_SINK_VERSION=2.5.0

RUN confluent-hub install --no-prompt wepay/kafka-connect-bigquery:${KC_BQ_SINK_VERSION}
...

Source Topic

The source topic contains CDC data from a PG table, it goes though a Flink pipeline that add a few extra columns and filterd rows based on some criteria.
-> timestamp refers to the moment when the PG transaction occurred (insert, update, delete)
-> event_id is UUID that is unique per payload, it's hash to identify unique events.

"number_of_partitions" = 12
"cleanup.policy"       = "compact"
"retention.ms"         = 25 * 30 * 24 * 3600 * 1000 # 25 months
"retention.bytes"      = "-1"
"segment.bytes"        = 100 * 1024 * 1024    # 100MB
"segment.ms"           = 7 * 24 * 3600 * 1000 # 7 days

Connector configuration

connect-distributed.properties

group.id=kc-analytics-cdc-filtered-bq-sink
bootstrap.servers=SASL_SSL://xxx.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='uuu'   password='xxx';
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='uuu'   password='xxx';
consumer.auto.offset.reset=earliest
consumer.max.poll.records=500
consumer.max.partition.fetch.bytes=1048576
consumer.fetch.min.bytes=1
consumer.fetch.max.wait.ms=500
    
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
    
value.converter=io.apicurio.registry.utils.converter.AvroConverter
value.converter.schemas.enable=true


value.converter.apicurio.registry.url=http://apicurio-registry.apicurio.svc.cluster.local:8080/apis/registry/v2
value.converter.apicurio.auth.username=uu1
value.converter.apicurio.auth.password=xxx
value.converter.apicurio.registry.as-confluent=false


offset.storage.topic=kc.internal.analytics-cdc-filtered.offsets
offset.storage.replication.factor=3
config.storage.topic=kc.internal.analytics-cdc-filtered.configs
config.storage.replication.factor=3
status.storage.topic=kc.internal.analytics-cdc-filtered.status
status.storage.replication.factor=3
offset.flush.interval.ms=10000
plugin.path=/usr/share/java,/usr/share/confluent-hub-components,/usr/share/java/kafka-connect-plugins/

sink-connector.json

{
    "allowNewBigQueryFields": "true",
    "autoCreateTables": "false",
    "bigQueryPartitionDecorator": "false",
    "bigQueryRetry": "3",
    "bigQueryRetryWait": "1000",
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "defaultDataset": "analytics",
    "keyfile": "/opt/kafka/config/gcp_credentials.json",
    "project": "xxx-production",
    "tasks.max": 4,
    "topics": "analytics.cdc.ticket-message",
    "transforms": "tombstoneHandler,copyFieldsFromKey,renameFields,routeTicketMessage,fixEpochTs,addMetadata,timestampConverterEventTs",
    "transforms.addMetadata.offset.field": "__kafka_offset",
    "transforms.addMetadata.partition.field": "__kafka_partition",
    "transforms.addMetadata.static.field": "__cluster",
    "transforms.addMetadata.static.value": "aus-xxx",
    "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",

    "transforms.copyFieldsFromKey.fields": "event_id:STRING:__event_id",
    "transforms.copyFieldsFromKey.type": "com.xxx.kafka.connect.transforms.CopyFieldFromKeyToValue",
    
    "transforms.fixEpochTs.fields": "filtered_ticket_message:__event_timestamp",
    "transforms.fixEpochTs.type": "com.xxx.kafka.connect.transforms.SetToEpoch",

    "transforms.renameFields.renames": "timestamp:__event_timestamp",
    "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",

    "transforms.routeTicketMessage.regex": "analytics.cdc.ticket-message",
    "transforms.routeTicketMessage.replacement": "filtered_ticket_message",
    "transforms.routeTicketMessage.type": "org.apache.kafka.connect.transforms.RegexRouter",

    "transforms.timestampConverterEventTs.field": "__event_timestamp",
    "transforms.timestampConverterEventTs.target.type": "Timestamp",
    "transforms.timestampConverterEventTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",

    "transforms.tombstoneHandler.behavior": "ignore",
    "transforms.tombstoneHandler.type": "io.confluent.connect.transforms.TombstoneHandler"
}

I use 2 custom SMT that do not ignore records

CopyFieldFromKeyToValue -> add a new field to the payload from the record key.
SetToEpoch -> if the value is negative, replace by 0.

Deployment on Kubernetes

replica_count     = 4 
resource_requests = { cpu = "2", memory = "4Gi" }
resource_limits   = { cpu = "2", memory = "4Gi" }
number_of_tasks   = 4

PS. I deliberately put high resources because I already had the data consistency issues and I thought it was related multiple restarted caused by throttling on CPU/Memory when the resources were not enough.

Bug description

After deploying the connector, I waited for it to reach the tail of the topic before running some checks. I noticed some records missing.
How? -> I have another pipeline that sinks records to ClickHouse for analytics purposes and the records are there.

How I proceeded?

  • Create a backup of the BQ table
  • Delete the connector
curl -X DELETE localhost:8083/connectors/analytics-cdc-filtered-bq-sink
  • Reset the offsets of the consumer group to the earliest using the kafka-binaries
$ ./bin/kafka-consumer-groups.sh \
  --command-config=... \
  --bootstrap-server=... \
  --group="connect-analytics-cdc-filtered-bq-sink" \
  --topic="analytics.cdc.ticket-message" \
  --reset-offsets --to-earliest --execute  --timeout=100000

GROUP                          TOPIC                          PARTITION  NEW-OFFSET
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 9          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 11         0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 3          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 2          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 5          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 8          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 6          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 1          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 7          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 10         0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 0          0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 4          0
  • Recreate the connector
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/analytics-cdc-json-bq-sink/config -d @sink-connector.json
  • Wait for it to finish...
    PS. Note that I did not clean the table, so there will be a lot of duplicates. This is okay because I will be counting unique events/tickets/messages.

After noticing that the consumer-group is at the tail, run the same query on both the table with duplicates and the backup.

$ ./bin/kafka-consumer-groups.sh \
  --command-config=... \
  --bootstrap-server=... \
  --group="connect-analytics-cdc-filtered-bq-sink" \
  --describe --timeout=100000

GROUP                                  TOPIC                                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 3          20576030        20576030        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 4          20617505        20617506        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 5          20633814        20633814        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 6          20606746        20606747        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 7          20570934        20570935        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 8          20627953        20627953        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 9          20641628        20641633        5
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 10         20610793        20610795        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 11         20586025        20586027        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 0          20625081        20625083        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 1          20642950        20642958        8
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 2          20593419        20593429        10

Screenshot 2024-02-07 at 17 26 24

What I discovered is the second run had brought new records, meaning the first run skipped them somehow, which is very concerning...
Note the rows that are circled in red, they show more unique tickets/messages/events on the run_after_offset_reset data.

The fact that it has more unique tickets/messages/events means that these records are indeed in the topic and there were somehow missed during the first sink.

I am having trouble understanding where the problem comes from.
No weird error logs have been noticed.

Has anybody experienced something similar or if there is something wrong with the config, I'd love to hear about it.

Thank you 🙏

@pedromazala
Copy link

I think this may be related to #333

But I see 2.6 as RC since September. Do you folks have a due date to release it?

@abdelhakimbendjabeur
Copy link
Author

#333 Seems to have been reverted in #357

@abdelhakimbendjabeur
Copy link
Author

@sp-gupta @b-goyal Sorry to ping you, do you have any insight regarding this issue?

@andrelu
Copy link

andrelu commented Jul 4, 2024

Hello @abdelhakimbendjabeur. We are facing a similar issue with our BigQuery Sink connector deployment. I'm interested in this topic.
Have you find any way of mitigating this?

@abdelhakimbendjabeur
Copy link
Author

Hi @andrelu
No progress on our side on this one. We had to rerun the connector to cover the missing data. This is not ideal as it's more expensive.

@hamsterready
Copy link

We have not confirmed it yet, but it is most probably related to this one https://issues.apache.org/jira/browse/KAFKA-18073

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants