Skip to content

Commit

Permalink
Fix cursor types and implement BatchCursor
Browse files Browse the repository at this point in the history
GODRIVER-3
GODRIVER-759
GODRIVER-791

Change-Id: I7d4121e7fffcfadd7427a6fc64d97d4c131acbbe
  • Loading branch information
skriptble committed Jan 31, 2019
1 parent 42e68e3 commit d23c3a5
Show file tree
Hide file tree
Showing 52 changed files with 1,429 additions and 1,244 deletions.
3 changes: 2 additions & 1 deletion .errcheck-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
(*github.com/mongodb/mongo-go-driver/x/mongo/driver/topology.Server).Close
(*github.com/mongodb/mongo-go-driver/x/network/connection.pool).closeConnection
(github.com/mongodb/mongo-go-driver/x/network/wiremessage.ReadWriteCloser).Close
(github.com/mongodb/mongo-go-driver/mongo.Cursor).Close
(*github.com/mongodb/mongo-go-driver/mongo.Cursor).Close
(*github.com/mongodb/mongo-go-driver/mongo.ChangeStream).Close
(net.Conn).Close
encoding/pem.Encode
fmt.Fprintf
Expand Down
5 changes: 1 addition & 4 deletions benchmark/multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,7 @@ func MultiFindMany(ctx context.Context, tm TimerManager, iters int) error {
return err
}
var r bson.Raw
r, err = cursor.DecodeBytes()
if err != nil {
return err
}
r = cursor.DecodeBytes()
if len(r) == 0 {
return errors.New("error retrieving document")
}
Expand Down
2 changes: 1 addition & 1 deletion examples/documentation_examples/examples.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/stretchr/testify/require"
)

func requireCursorLength(t *testing.T, cursor mongo.Cursor, length int) {
func requireCursorLength(t *testing.T, cursor *mongo.Cursor, length int) {
i := 0
for cursor.Next(context.Background()) {
i++
Expand Down
4 changes: 3 additions & 1 deletion examples/documentation_examples/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
"testing"

"github.com/mongodb/mongo-go-driver/examples/documentation_examples"
"github.com/mongodb/mongo-go-driver/internal/testutil"
"github.com/mongodb/mongo-go-driver/mongo"
"github.com/stretchr/testify/require"
)

func TestDocumentationExamples(t *testing.T) {
client, err := mongo.Connect(context.Background(), "mongodb://localhost:27017", nil)
cs := testutil.ConnString(t)
client, err := mongo.Connect(context.Background(), cs.String(), nil)
require.NoError(t, err)

db := client.Database("documentation_examples")
Expand Down
30 changes: 30 additions & 0 deletions mongo/batch_cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mongo

import (
"context"
)

// batchCursor is the interface implemented by types that can provide batches of document results.
// The Cursor type is built on top of this type.
type batchCursor interface {
// ID returns the ID of the cursor.
ID() int64

// Next returns true if there is a batch available.
Next(context.Context) bool

// Batch appends the current batch of documents to dst. RequiredBytes can be used to determine
// the length of the current batch of documents.
//
// If there is no batch available, this method should do nothing.
Batch(dst []byte) []byte

// RequiredBytes returns the number of bytes required fo rthe current batch.
RequiredBytes() int

// Err returns the last error encountered.
Err() error

// Close closes the cursor.
Close(context.Context) error
}
98 changes: 52 additions & 46 deletions mongo/change_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/mongodb/mongo-go-driver/mongo/readconcern"
"github.com/mongodb/mongo-go-driver/mongo/readpref"
"github.com/mongodb/mongo-go-driver/x/bsonx"
"github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
"github.com/mongodb/mongo-go-driver/x/mongo/driver"
"github.com/mongodb/mongo-go-driver/x/mongo/driver/session"
"github.com/mongodb/mongo-go-driver/x/network/command"
Expand All @@ -34,14 +35,25 @@ var ErrMissingResumeToken = errors.New("cannot provide resume functionality when
// ErrNilCursor indicates that the cursor for the change stream is nil.
var ErrNilCursor = errors.New("cursor is nil")

type changeStream struct {
cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
pipeline bsonx.Arr
options *options.ChangeStreamOptions
coll *Collection
db *Database
ns command.Namespace
cursor Cursor
// ChangeStream instances iterate a stream of change documents. Each document can be decoded via the
// Decode method. Resume tokens should be retrieved via the ResumeToken method and can be stored to
// resume the change stream at a specific point in time.
//
// A typical usage of the ChangeStream type would be:
type ChangeStream struct {
// Current is the BSON bytes of the current change document. This property is only valid until
// the next call to Next or Close. If continued access is required to the bson.Raw, you must
// make a copy of it.
Current bson.Raw

cmd bsonx.Doc // aggregate command to run to create stream and rebuild cursor
pipeline bsonx.Arr
options *options.ChangeStreamOptions
coll *Collection
db *Database
ns command.Namespace
cursor *Cursor
cursorOpts bsonx.Doc

resumeToken bsonx.Doc
err error
Expand All @@ -53,7 +65,7 @@ type changeStream struct {
registry *bsoncodec.Registry
}

func (cs *changeStream) replaceOptions(desc description.SelectedServer) {
func (cs *ChangeStream) replaceOptions(desc description.SelectedServer) {
// if cs has not received any changes and resumeAfter not specified and max wire version >= 7, run known agg cmd
// with startAtOperationTime set to startAtOperationTime provided by user or saved from initial agg
// must not send resumeAfter key
Expand Down Expand Up @@ -156,7 +168,7 @@ func parseOptions(csType StreamType, opts *options.ChangeStreamOptions, registry
return pipelineDoc, cursorDoc, optsDoc, nil
}

func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) error {
func (cs *ChangeStream) runCommand(ctx context.Context, replaceOptions bool) error {
ss, err := cs.client.topology.SelectServer(ctx, cs.db.writeSelector)
if err != nil {
return err
Expand Down Expand Up @@ -198,7 +210,12 @@ func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) err
return err
}

cursor, err := ss.BuildCursor(rdr, readCmd.Session, readCmd.Clock)
batchCursor, err := driver.NewBatchCursor(bsoncore.Document(rdr), readCmd.Session, readCmd.Clock, ss.Server)
if err != nil {
cs.sess.EndSession(ctx)
return err
}
cursor, err := newCursor(batchCursor, cs.registry)
if err != nil {
cs.sess.EndSession(ctx)
return err
Expand All @@ -216,7 +233,7 @@ func (cs *changeStream) runCommand(ctx context.Context, replaceOptions bool) err
}

func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*changeStream, error) {
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {

pipelineArr, err := transformAggregatePipeline(coll.registry, pipeline)
if err != nil {
Expand Down Expand Up @@ -245,7 +262,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
}
cmd = append(cmd, optsDoc...)

cs := &changeStream{
cs := &ChangeStream{
client: coll.client,
sess: sess,
cmd: cmd,
Expand All @@ -257,6 +274,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
readConcern: coll.readConcern,
options: csOpts,
registry: coll.registry,
cursorOpts: cursorDoc,
}

err = cs.runCommand(ctx, false)
Expand All @@ -268,7 +286,7 @@ func newChangeStream(ctx context.Context, coll *Collection, pipeline interface{}
}

func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*changeStream, error) {
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {

pipelineArr, err := transformAggregatePipeline(db.registry, pipeline)
if err != nil {
Expand Down Expand Up @@ -297,7 +315,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
}
cmd = append(cmd, optsDoc...)

cs := &changeStream{
cs := &ChangeStream{
client: db.client,
db: db,
sess: sess,
Expand All @@ -308,6 +326,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
readConcern: db.readConcern,
options: csOpts,
registry: db.registry,
cursorOpts: cursorDoc,
}

err = cs.runCommand(ctx, false)
Expand All @@ -319,7 +338,7 @@ func newDbChangeStream(ctx context.Context, db *Database, pipeline interface{},
}

func newClientChangeStream(ctx context.Context, client *Client, pipeline interface{},
opts ...*options.ChangeStreamOptions) (*changeStream, error) {
opts ...*options.ChangeStreamOptions) (*ChangeStream, error) {

pipelineArr, err := transformAggregatePipeline(client.registry, pipeline)
if err != nil {
Expand Down Expand Up @@ -348,7 +367,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
}
cmd = append(cmd, optsDoc...)

cs := &changeStream{
cs := &ChangeStream{
client: client,
db: client.Database("admin"),
sess: sess,
Expand All @@ -359,6 +378,7 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
readConcern: client.readConcern,
options: csOpts,
registry: client.registry,
cursorOpts: cursorDoc,
}

err = cs.runCommand(ctx, false)
Expand All @@ -369,13 +389,8 @@ func newClientChangeStream(ctx context.Context, client *Client, pipeline interfa
return cs, nil
}

func (cs *changeStream) storeResumeToken() error {
br, err := cs.cursor.DecodeBytes()
if err != nil {
return err
}

idVal, err := br.LookupErr("_id")
func (cs *ChangeStream) storeResumeToken() error {
idVal, err := cs.cursor.Current.LookupErr("_id")
if err != nil {
_ = cs.Close(context.Background())
return ErrMissingResumeToken
Expand All @@ -397,15 +412,18 @@ func (cs *changeStream) storeResumeToken() error {
return nil
}

func (cs *changeStream) ID() int64 {
// ID returns the cursor ID for this change stream.
func (cs *ChangeStream) ID() int64 {
if cs.cursor == nil {
return 0
}

return cs.cursor.ID()
}

func (cs *changeStream) Next(ctx context.Context) bool {
// Next gets the next result from this change stream. Returns true if there were no errors and the next
// result is available for decoding.
func (cs *ChangeStream) Next(ctx context.Context) bool {
// execute in a loop to retry resume-able errors and advance the underlying cursor
for {
if cs.cursor == nil {
Expand All @@ -419,6 +437,7 @@ func (cs *changeStream) Next(ctx context.Context) bool {
return false
}

cs.Current = cs.cursor.Current
return true
}

Expand Down Expand Up @@ -447,31 +466,17 @@ func (cs *changeStream) Next(ctx context.Context) bool {
}
}

func (cs *changeStream) Decode(out interface{}) error {
// Decode will decode the current document into val.
func (cs *ChangeStream) Decode(out interface{}) error {
if cs.cursor == nil {
return ErrNilCursor
}

br, err := cs.DecodeBytes()
if err != nil {
return err
}

return bson.UnmarshalWithRegistry(cs.registry, br, out)
}

func (cs *changeStream) DecodeBytes() (bson.Raw, error) {
if cs.cursor == nil {
return nil, ErrNilCursor
}
if cs.err != nil {
return nil, cs.err
}

return cs.cursor.DecodeBytes()
return bson.UnmarshalWithRegistry(cs.registry, cs.Current, out)
}

func (cs *changeStream) Err() error {
// Err returns the current error.
func (cs *ChangeStream) Err() error {
if cs.err != nil {
return cs.err
}
Expand All @@ -482,7 +487,8 @@ func (cs *changeStream) Err() error {
return cs.cursor.Err()
}

func (cs *changeStream) Close(ctx context.Context) error {
// Close closes this cursor.
func (cs *ChangeStream) Close(ctx context.Context) error {
if cs.cursor == nil {
return nil // cursor is already closed
}
Expand Down
4 changes: 2 additions & 2 deletions mongo/change_stream_spec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func TestChangeStreamSpec(t *testing.T) {
}
}

func closeCursor(stream Cursor) {
func closeCursor(stream *ChangeStream) {
_ = stream.Close(ctx)
}

Expand Down Expand Up @@ -214,7 +214,7 @@ func runCsTestFile(t *testing.T, globalClient *Client, path string) {

drainChannels()
opts := getStreamOptions(&test)
var cursor Cursor
var cursor *ChangeStream
switch test.Target {
case "collection":
cursor, err = clientColl.Watch(ctx, test.Pipeline, opts)
Expand Down
Loading

0 comments on commit d23c3a5

Please sign in to comment.