Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding concurrent batch writes #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

davidjumani
Copy link

Added RunConcurrently() to concurrently execute BatchWriteItem()

Copy link
Contributor

@greggjs greggjs left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no power over if this gets merged or not, I'm just waiting for PR in the same repo to get merged myself. I thought I'd provide some feedback on your PR. I think you need some big changes to it if you want this thing merged.

batchwrite.go Outdated

channel := make(chan BatchResponse)

batchCounter := 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need this variable when you could just do

for _, batch := range totalBatch {
        go bw.writeBatch(ctx, batch, channel)
}

and I don't think you even need the batch counter. It doesn't really seem like you're using it for any reason.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was using this for testing and debugging. Forgot to remove it!

batchwrite.go Outdated
}

if batchResponse.Error != nil {
err = batchResponse.Error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you rather use some kind of multi error thing here? To me you could have multiple errors that occur from all your batches that you'd want to record. I'd suggest something like hashicorp/go-multierror

batchwrite.go Outdated
}

// Receive
for j := 0; j < end; j++ {
Copy link
Contributor

@greggjs greggjs May 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need to do it this way. I'd think a better way to do this would be

for {
        select {
        case batchResponse, ok := <- channel:
                // logic for handing errors / unexpected stream close / updating written records
                // when all batches responded, return the wrote count.
        case <-ctx.Done():
                // do some error logic because we timed out the ctx, return the currently recorded wrote records
        }
}

Copy link
Author

@davidjumani davidjumani May 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try this. However the final writeBatch does not close the channel. Would it suffice to count and break within the case or is there a smarter way about it??

batchwrite.go Outdated
batchCounter := 0
wrote = 0
err = nil
for i := 0; i < iterations; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you take those suggestions, you could get rid of this outer for loop. and just have those two.

batchwrite.go Outdated
}
}

close(channel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this a deferred operation, as I wouldn't want to prematurely close the channel. I think with your current code this is a real possibility.

return 0, bw.err
}

cfg := new(batchWriteConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're not really using the notion of your threads really. I think it's overly complicating your operation as it's currently implemented. All you are really doing here is spinning up a go routine for each batch you're running, and you'll make too many operation if your threads number is larger than your total batches.

If you were truly concerned about how many go routines were concurrently running, you'd be making a worker pool, like https://gobyexample.com/worker-pools and send your batches to be processed by the worker pool. I'd also not call these things "Threads" as a common Go convention would be to refer to it as "Pool Size".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will try this. Thanks!

batchwrite.go Outdated
}

// Sets the number of threads to process the request
func WithThreards(threads int) BatchWriteOption {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use the word thread, use pool size.

@davidjumani
Copy link
Author

davidjumani commented May 20, 2019

@greggjs Thanks for your suggestions. I've made the changes! Let me know what you think!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants