Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP Sink Connector - not batching the messages #1476

Open
mpromny opened this issue Nov 5, 2024 · 2 comments
Open

HTTP Sink Connector - not batching the messages #1476

mpromny opened this issue Nov 5, 2024 · 2 comments

Comments

@mpromny
Copy link

mpromny commented Nov 5, 2024

Issue

I am using HTTP Sink connector and it sends some random records. It is supposed to send data in a batch of 5 messages.

What version of the Stream Reactor are you reporting this issue for?

8.1.15

Are you running the correct version of Kafka/Confluent for the Stream reactor release?

NO - 3.6.2

Have you read the docs?

Yes

What is the expected behaviour?

It is supposed to send data in a batches. Batch of messages should be set by connect.http.batch.count

What was observed?

When I set connect.http.batch.count connector doesn't send all messages. Debug showed that in batch queue there is only 1 or 2 messages.

What is your Connect cluster configuration?

  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1

What is your connector properties configuration?

    config: 
      topics: "sinkTest"
      connect.http.batch.count: 5
      connect.http.endpoint: >-
        https://echo:8080
      connect.http.method: POST
      connect.http.request.content: '{"topic":"{{ "{{topic}}" }}","value":{{ "{{value}}" }}}'
      key.converter: org.apache.kafka.connect.storage.StringConverter
      key.converter.schemas.enable: 'false'
      value.converter: org.apache.kafka.connect.storage.StringConverter
      value.converter.schemas.enable: 'false'

Logs

DEBUG [kafka-http-sink-connector|task-0] [Consumer clientId=connector-consumer-kafka-http-sink-connector-0, groupId=connect-kafka-http-sink-connector] Added READ_UNCOMMITTED fetch request for partition sinkTest-0 at position FetchPosition{offset=11, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[XXXkafka-kafka-defaultXXX (id: 0 rack: null)], epoch=0}} to node XXXkafka-kafka-defaultXXX (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractFetch) [task-thread-kafka-http-sink-connector-0]
DEBUG [kafka-http-sink-connector|task-0] [Consumer clientId=connector-consumer-kafka-http-sink-connector-0, groupId=connect-kafka-http-sink-connector] Built incremental fetch (sessionId=85595006, epoch=93) for node 0. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s), replaced 0 partition(s) out of 3 partition(s) (org.apache.kafka.clients.FetchSessionHandler) [task-thread-kafka-http-sink-connector-0]
DEBUG [kafka-http-sink-connector|task-0] [Consumer clientId=connector-consumer-kafka-http-sink-connector-0, groupId=connect-kafka-http-sink-connector] Adding pending request for node XXXkafka-kafka-defaultXXX (id: 0 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractFetch) [task-thread-kafka-http-sink-connector-0]
DEBUG [kafka-http-sink-connector|task-0] [Consumer clientId=connector-consumer-kafka-http-sink-connector-0, groupId=connect-kafka-http-sink-connector] Sending FETCH request with header RequestHeader(apiKey=FETCH, apiVersion=15, clientId=connector-consumer-kafka-http-sink-connector-0, correlationId=299, headerVersion=2) and timeout 30000 to node 0: FetchRequestData(clusterId=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=85595006, sessionEpoch=93, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient) [task-thread-kafka-http-sink-connector-0]
DEBUG [kafka-http-sink-connector|task-0] 1 records taken from cats.effect.kernel.SyncRef@4d73f33f (io.lenses.streamreactor.connect.http.sink.RecordsQueue) [io-compute-1]
DEBUG [kafka-http-sink-connector|task-0] [kafka-http-sink-connector] No batch yet, queue size: 1 (io.lenses.streamreactor.connect.http.sink.HttpWriter) [io-compute-1]
DEBUG [kafka-http-sink-connector|task-0] 1 records taken from cats.effect.kernel.SyncRef@64c6b210 (io.lenses.streamreactor.connect.http.sink.RecordsQueue) [io-compute-7]
DEBUG [kafka-http-sink-connector|task-0] [kafka-http-sink-connector] No batch yet, queue size: 1 (io.lenses.streamreactor.connect.http.sink.HttpWriter) [io-compute-7]
DEBUG [kafka-http-sink-connector|task-0] 1 records taken from cats.effect.kernel.SyncRef@3c9a46b2 (io.lenses.streamreactor.connect.http.sink.RecordsQueue) [io-compute-1]
DEBUG [kafka-http-sink-connector|task-0] [kafka-http-sink-connector] No batch yet, queue size: 1 (io.lenses.streamreactor.connect.http.sink.HttpWriter) [io-compute-1]
DEBUG [kafka-http-sink-connector|task-0] [kafka-http-sink-connector] All writer processes completed successfully (io.lenses.streamreactor.connect.http.sink.HttpWriterManager) [io-compute-1]
@davidsloan
Copy link
Collaborator

hi @mpromny

Today's new release 8.1.16 takes care of this and other stability fixes.

Let us know if you spot any further issues.

@mpromny
Copy link
Author

mpromny commented Nov 12, 2024

Hi @davidsloan

I tested a new version. It is more stable but I still have a problem with batches.
Please find below the logs and details from the connector:

  1. Connector configuration:
  config:
    connect.http.batch.count: 2
    connect.http.time.interval: 300
    connect.http.batch.size: 1000
    connect.http.endpoint: http://echo:8080/
    connect.http.method: POST
    connect.http.request.content: '{{value}}'
    key.converter: org.apache.kafka.connect.storage.StringConverter
    key.converter.schemas.enable: 'false'
    value.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter.schemas.enable: 'false'
    topics: >-
      mptest3
  1. Messages sent to the echo service by the connector:
--------  127.0.0.6:48527 | POST /
00000000  7b 22 74 65 73 74 33 22  3a 20 22 74 31 22 7d     |{"test3": "t1"}|
--------  127.0.0.6:48527 | POST /
00000000  7b 22 74 65 73 74 33 22  3a 20 22 74 33 22 7d     |{"test3": "t3"}|

*connector sent only 1st message (only the 1st message of the two gathered is sent)

  1. Logs from the connector
2024-11-12 18:53:22,274 INFO [kafka-http-sink-connector|task-0] Not Flushing for {count: '1/2', fileSize: '15/1000', interval: {frequency:300s, in:296s, lastFlush:2024-11-12T18:53:18, nextFlush:2024-11-12T18:58:18}} (io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy$) [io-compute-3]
2024-11-12 18:53:22,278 INFO [kafka-http-sink-connector|task-0] Flushing for {count*: '2/2', fileSize: '30/1000', interval: {frequency:300s, in:296s, lastFlush:2024-11-12T18:53:18, nextFlush:2024-11-12T18:58:18}} (io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy$) [io-compute-3]
2024-11-12 18:53:25,774 INFO [kafka-http-sink-connector|task-0] Not Flushing for {count: '1/2', fileSize: '15/1000', interval: {frequency:300s, in:297s, lastFlush:2024-11-12T18:53:22, nextFlush:2024-11-12T18:58:22}} (io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy$) [io-compute-5]
...
2024-11-12 18:53:46,373 INFO [kafka-http-sink-connector|task-0] Not Flushing for {count: '1/2', fileSize: '15/1000', interval: {frequency:300s, in:276s, lastFlush:2024-11-12T18:53:22, nextFlush:2024-11-12T18:58:22}} (io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy$) [io-compute-1]
2024-11-12 18:53:46,373 INFO [kafka-http-sink-connector|task-0] Flushing for {count*: '2/2', fileSize: '30/1000', interval: {frequency:300s, in:276s, lastFlush:2024-11-12T18:53:22, nextFlush:2024-11-12T18:58:22}} (io.lenses.streamreactor.connect.http.sink.commit.BatchPolicy$) [io-compute-1]
  1. Messages published to the topic

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants