Skip to content

Commit

Permalink
GODRIVER-3172 Read response in the background after an op timeout. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewdale authored Apr 12, 2024
1 parent 722a2f2 commit b605d09
Show file tree
Hide file tree
Showing 16 changed files with 868 additions and 39 deletions.
6 changes: 4 additions & 2 deletions internal/csot/csot.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ type timeoutKey struct{}
// TODO default behavior.
func MakeTimeoutContext(ctx context.Context, to time.Duration) (context.Context, context.CancelFunc) {
// Only use the passed in Duration as a timeout on the Context if it
// is non-zero.
// is non-zero and if the Context doesn't already have a timeout.
cancelFunc := func() {}
if to != 0 {
if _, deadlineSet := ctx.Deadline(); to != 0 && !deadlineSet {
ctx, cancelFunc = context.WithTimeout(ctx, to)
}

// Add timeoutKey either way to indicate CSOT is enabled.
return context.WithValue(ctx, timeoutKey{}, true), cancelFunc
}

Expand Down
8 changes: 4 additions & 4 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,10 @@ func (cs *ChangeStream) executeOperation(ctx context.Context, resuming bool) err
cs.aggregate.Pipeline(plArr)
}

// If no deadline is set on the passed-in context, cs.client.timeout is set, and context is not already
// a Timeout context, honor cs.client.timeout in new Timeout context for change stream operation execution
// and potential retry.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
// If cs.client.timeout is set and context is not already a Timeout context,
// honor cs.client.timeout in new Timeout context for change stream
// operation execution and potential retry.
if cs.client.timeout != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *cs.client.timeout)
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down
30 changes: 28 additions & 2 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,15 @@ func aggregate(a aggregateParams) (cur *Cursor, err error) {
Timeout(a.client.timeout).
MaxTime(ao.MaxTime)

// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors. To maintain existing
// behavior for users who set "timeoutMS" with no context deadline, only
// omit "maxTimeMS" when a context deadline is set.
//
// See DRIVERS-2722 for more detail.
_, deadlineSet := a.ctx.Deadline()
op.OmitCSOTMaxTimeMS(deadlineSet)

if ao.AllowDiskUse != nil {
op.AllowDiskUse(*ao.AllowDiskUse)
}
Expand Down Expand Up @@ -1191,6 +1200,22 @@ func (coll *Collection) Distinct(ctx context.Context, fieldName string, filter i
// For more information about the command, see https://www.mongodb.com/docs/manual/reference/command/find/.
func (coll *Collection) Find(ctx context.Context, filter interface{},
opts ...*options.FindOptions) (cur *Cursor, err error) {
// Omit "maxTimeMS" from operations that return a user-managed cursor to
// prevent confusing "cursor not found" errors. To maintain existing
// behavior for users who set "timeoutMS" with no context deadline, only
// omit "maxTimeMS" when a context deadline is set.
//
// See DRIVERS-2722 for more detail.
_, deadlineSet := ctx.Deadline()
return coll.find(ctx, filter, deadlineSet, opts...)
}

func (coll *Collection) find(
ctx context.Context,
filter interface{},
omitCSOTMaxTimeMS bool,
opts ...*options.FindOptions,
) (cur *Cursor, err error) {

if ctx == nil {
ctx = context.Background()
Expand Down Expand Up @@ -1230,7 +1255,8 @@ func (coll *Collection) Find(ctx context.Context, filter interface{},
CommandMonitor(coll.client.monitor).ServerSelector(selector).
ClusterClock(coll.client.clock).Database(coll.db.name).Collection(coll.name).
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).ServerAPI(coll.client.serverAPI).
Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger)
Timeout(coll.client.timeout).MaxTime(fo.MaxTime).Logger(coll.client.logger).
OmitCSOTMaxTimeMS(omitCSOTMaxTimeMS)

cursorOpts := coll.client.createBaseCursorOptions()

Expand Down Expand Up @@ -1408,7 +1434,7 @@ func (coll *Collection) FindOne(ctx context.Context, filter interface{},
// by the server.
findOpts = append(findOpts, options.Find().SetLimit(-1))

cursor, err := coll.Find(ctx, filter, findOpts...)
cursor, err := coll.find(ctx, filter, false, findOpts...)
return &SingleResult{
ctx: ctx,
cur: cursor,
Expand Down
12 changes: 6 additions & 6 deletions mongo/gridfs/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ func (b *Bucket) Delete(fileID interface{}) error {
//
// Use the context parameter to time-out or cancel the delete operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DeleteContext(ctx context.Context, fileID interface{}) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// If Timeout is set on the Client and context is not already a Timeout
// context, honor Timeout in new Timeout context for operation execution to
// be shared by both delete operations.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down Expand Up @@ -384,10 +384,10 @@ func (b *Bucket) Drop() error {
//
// Use the context parameter to time-out or cancel the drop operation. The deadline set by SetWriteDeadline is ignored.
func (b *Bucket) DropContext(ctx context.Context) error {
// If no deadline is set on the passed-in context, Timeout is set on the Client, and context is
// not already a Timeout context, honor Timeout in new Timeout context for operation execution to
// If Timeout is set on the Client and context is not already a Timeout
// context, honor Timeout in new Timeout context for operation execution to
// be shared by both drop operations.
if _, deadlineSet := ctx.Deadline(); !deadlineSet && b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
if b.db.Client().Timeout() != nil && !csot.IsTimeoutContext(ctx) {
newCtx, cancelFunc := csot.MakeTimeoutContext(ctx, *b.db.Client().Timeout())
// Redefine ctx to be the new timeout-derived context.
ctx = newCtx
Expand Down
Loading

0 comments on commit b605d09

Please sign in to comment.