Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Understanding Number of records emitted to S3 #85

Open
sudharshanPLT opened this issue Aug 22, 2017 · 3 comments
Open

Understanding Number of records emitted to S3 #85

sudharshanPLT opened this issue Aug 22, 2017 · 3 comments
Labels

Comments

@sudharshanPLT
Copy link

sudharshanPLT commented Aug 22, 2017

Hello All,
I have configured my properties file , in a way to write each record in a different file in S3

 # 1MB = 1024*1024 = 1048756
 bufferByteSizeLimit = 4096 
 bufferRecordCountLimit = 1
 bufferMillisecondsLimit = 3600000

My each record is of size < ~4KB.

Successfully emitted 713 records to Amazon S3
Successfully emitted 745 records to Amazon S3
Successfully emitted 644 records to Amazon S3

And each file in S3 is of size 1.7 MB or 1.6 MB. can someone shed some light on what might be happening here?

@sahilpalvia
Copy link

@sudharshanPLT This is just an initial speculation of your issue based on the information provided. More details would be required for a more concrete and detailed explanation.

It seems like your all the records are being buffered and being emitted at once to S3. For your use case of writing single record to S3, you want to transform the each record and immediately emit it to S3. You do not need to buffer the records. If you plan to use the buffer, you need intermediate flushing of your buffer after transforming a record. Although this would not be recommended, since S3 recommends having larger object sizes.

@sudharshanPLT
Copy link
Author

@sahilpalvia Thanks for your input. When I setbufferByteSizeLimit = 4096 doesn't that mean my buffer size is limited to 4KB, which is actually one record in my case. Am I missing something here? Please let me know what additional details you need to dig deep.

@sahilpalvia
Copy link

sahilpalvia commented Aug 25, 2017

@sudharshanPLT bufferByteSizeLimit, bufferRecordCountLimit and bufferMillisecondsLimit are used to check if the buffer needs to be flushed. You can check the logic behind the shouldFlush method here.

As you can see they are just used to check if the buffer meets the conditions to be flushed, the conditions being at least and not at most. The buffer does not automatically flush the records to the downstream service once it gets full. That logic needs to be implemented in your processRecords method. Have you implemented KinesisConnectorRecordProcessor and overriden processRecords? If not, you need to implement it and override the processRecords method. In there you could transform every single record of yours and emit it, without buffering it. If you implement KinesisConnectorRecordProcessor, make sure to implement KinesisConnectorRecordProcessorFactory and override the createKinesisConnectorRecordProcessor method to use your new record processor and provide your new factory in the KinesisConnectorExecutor.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

2 participants