From 84d1318337df128c648d787fdfc2bd5dd26491c5 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 25 Jan 2018 14:05:04 +0400 Subject: [PATCH 01/24] Brings in a patch on having flusher not suppress errors. (#81) https://github.com/go-mgo/mgo/pull/360 --- txn/flusher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/txn/flusher.go b/txn/flusher.go index 3d1882d7f..5d1c1bdd8 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -759,7 +759,7 @@ func (f *flusher) checkpoint(t *transaction, revnos []int64) error { f.debugf("Ready to apply %s. Saving revnos %v: LOST RACE", t, debugRevnos) return f.reload(t) } - return nil + return err } func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) error { From e975147f50162cff7eaeecab95c036d08ec69eb9 Mon Sep 17 00:00:00 2001 From: Steve Gray Date: Wed, 31 Jan 2018 20:30:54 +1000 Subject: [PATCH 02/24] Fallback to JSON tags when BSON tag isn't present (#91) * Fallback to JSON tags when BSON tag isn't present Cleanup. * Add test to demonstrate tagging fallback. - Test coverage for tagging test. --- bson/bson.go | 16 +++++++++-- bson/compatability_test.go | 54 ++++++++++++++++++++++++++++++++++++++ bson/compatibility.go | 16 +++++++++++ 3 files changed, 84 insertions(+), 2 deletions(-) create mode 100644 bson/compatability_test.go create mode 100644 bson/compatibility.go diff --git a/bson/bson.go b/bson/bson.go index d960f7a37..31beab191 100644 --- a/bson/bson.go +++ b/bson/bson.go @@ -698,9 +698,21 @@ func getStructInfo(st reflect.Type) (*structInfo, error) { info := fieldInfo{Num: i} tag := field.Tag.Get("bson") - if tag == "" && strings.Index(string(field.Tag), ":") < 0 { - tag = string(field.Tag) + + // Fall-back to JSON struct tag, if feature flag is set. + if tag == "" && useJSONTagFallback { + tag = field.Tag.Get("json") } + + // If there's no bson/json tag available. + if tag == "" { + // If there's no tag, and also no tag: value splits (i.e. no colon) + // then assume the entire tag is the value + if strings.Index(string(field.Tag), ":") < 0 { + tag = string(field.Tag) + } + } + if tag == "-" { continue } diff --git a/bson/compatability_test.go b/bson/compatability_test.go new file mode 100644 index 000000000..743a00e8a --- /dev/null +++ b/bson/compatability_test.go @@ -0,0 +1,54 @@ +package bson_test + +import ( + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +type mixedTagging struct { + First string + Second string `bson:"second_field"` + Third string `json:"third_field"` + Fourth string `bson:"fourth_field" json:"alternate"` +} + +// TestTaggingFallback checks that tagging fallback can be used/works as expected. +func (s *S) TestTaggingFallback(c *C) { + initial := &mixedTagging{ + First: "One", + Second: "Two", + Third: "Three", + Fourth: "Four", + } + + // Take only testing.T, leave only footprints. + initialState := bson.JSONTagFallbackState() + defer bson.SetJSONTagFallback(initialState) + + // Marshal with the new mode applied. + bson.SetJSONTagFallback(true) + bsonState, errBSON := bson.Marshal(initial) + c.Assert(errBSON, IsNil) + + // Unmarshal into a generic map so that we can pick up the actual field names + // selected. + target := make(map[string]string) + errUnmarshal := bson.Unmarshal(bsonState, target) + c.Assert(errUnmarshal, IsNil) + + // No tag, so standard naming + _, firstExists := target["first"] + c.Assert(firstExists, Equals, true) + + // Just a BSON tag + _, secondExists := target["second_field"] + c.Assert(secondExists, Equals, true) + + // Just a JSON tag + _, thirdExists := target["third_field"] + c.Assert(thirdExists, Equals, true) + + // Should marshal 4th as fourth_field (since we have both tags) + _, fourthExists := target["fourth_field"] + c.Assert(fourthExists, Equals, true) +} diff --git a/bson/compatibility.go b/bson/compatibility.go new file mode 100644 index 000000000..6afecf53c --- /dev/null +++ b/bson/compatibility.go @@ -0,0 +1,16 @@ +package bson + +// Current state of the JSON tag fallback option. +var useJSONTagFallback = false + +// SetJSONTagFallback enables or disables the JSON-tag fallback for structure tagging. When this is enabled, structures +// without BSON tags on a field will fall-back to using the JSON tag (if present). +func SetJSONTagFallback(state bool) { + useJSONTagFallback = state +} + +// JSONTagFallbackState returns the current status of the JSON tag fallback compatability option. See SetJSONTagFallback +// for more information. +func JSONTagFallbackState() bool { + return useJSONTagFallback +} From ee987016113bafae028478fa6d783a7e18291c4e Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Tue, 13 Feb 2018 10:47:44 +0000 Subject: [PATCH 03/24] socket: only send client metadata once per socket Periodic cluster synchronisation calls isMaster() which currently resends the "client" metadata every call - the spec specifies: isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake This hotfix prevents subsequent isMaster calls from sending the client metadata again - fixes #101 and fixes #103. Thanks to @changwoo-nam @qhenkart @canthefason @jyoon17 for spotting the initial issue, opening tickets, and having the problem debugged with a PoC fix before I even woke up. --- cluster.go | 40 +++++++++++++++++++++++++++++++--------- socket.go | 1 + 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/cluster.go b/cluster.go index 7fc639c24..087da61e5 100644 --- a/cluster.go +++ b/cluster.go @@ -148,16 +148,38 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul session := newSession(Monotonic, cluster, 10*time.Second) session.setSocket(socket) - // provide some meta infos on the client, - // see https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake - // for details - metaInfo := bson.M{"driver": bson.M{"name": "mgo", "version": "globalsign"}, - "os": bson.M{"type": runtime.GOOS, "architecture": runtime.GOARCH}} + var cmd = bson.D{{Name: "isMaster", Value: 1}} + + // Send client metadata to the server to identify this socket if this is + // the first isMaster call only. + // + // isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments + // https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake + // + socket.sendMeta.Do(func() { + var meta = bson.M{ + "driver": bson.M{ + "name": "mgo", + "version": "globalsign", + }, + "os": bson.M{ + "type": runtime.GOOS, + "architecture": runtime.GOARCH, + }, + } - if cluster.appName != "" { - metaInfo["application"] = bson.M{"name": cluster.appName} - } - err := session.Run(bson.D{{Name: "isMaster", Value: 1}, {Name: "client", Value: metaInfo}}, result) + // Include the application name if set + if cluster.appName != "" { + meta["application"] = bson.M{"name": cluster.appName} + } + + cmd = append(cmd, bson.DocElem{ + Name: "client", + Value: meta, + }) + }) + + err := session.Run(cmd, result) session.Close() return err } diff --git a/socket.go b/socket.go index f6158189c..a9124b043 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + sendMeta sync.Once } type queryOpFlags uint32 From 93412b55399a37c5359b7e2f80ca239567db4262 Mon Sep 17 00:00:00 2001 From: John Arbash Meinel Date: Thu, 15 Feb 2018 14:08:31 +0400 Subject: [PATCH 04/24] Cluster abended test 254 (#100) * Add a test that mongo Server gets their abended reset as necessary. See https://github.com/go-mgo/mgo/issues/254 and https://github.com/go-mgo/mgo/pull/255/files * Include the patch from Issue 255. This brings in a test which fails without the patch, and passes with the patch. Still to be tested, manual tcpkill of a socket. --- .gitignore | 2 ++ cluster.go | 1 + export_test.go | 13 ++++++++++ harness/setup.sh | 2 +- server_test.go | 64 ++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 81 insertions(+), 1 deletion(-) create mode 100644 .gitignore create mode 100644 server_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..9f4fa6d20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +_harness + diff --git a/cluster.go b/cluster.go index 087da61e5..ac461d5b9 100644 --- a/cluster.go +++ b/cluster.go @@ -682,6 +682,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Sleep(100 * time.Millisecond) continue } else { + // We've managed to successfully reconnect to the master, we are no longer abnormaly ended server.Lock() server.abended = false server.Unlock() diff --git a/export_test.go b/export_test.go index 690f84d38..998c7a2dd 100644 --- a/export_test.go +++ b/export_test.go @@ -1,6 +1,7 @@ package mgo import ( + "net" "time" ) @@ -31,3 +32,15 @@ func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) { syncSocketTimeout = newTimeout return } + +func (s *Session) Cluster() *mongoCluster { + return s.cluster() +} + +func (cluster *mongoCluster) Server(addr string) *mongoServer { + tcpaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + panic(err) + } + return cluster.server(addr, tcpaddr) +} diff --git a/harness/setup.sh b/harness/setup.sh index e5db78a78..25ba562ec 100755 --- a/harness/setup.sh +++ b/harness/setup.sh @@ -30,7 +30,7 @@ start() { UP=$(svstat daemons/* | grep ' up ' | grep -v ' [0-3] seconds' | wc -l) echo "$UP processes up..." if [ x$COUNT = x$UP ]; then - echo "Running setup.js with mongo..." + echo "Running init.js with mongo..." mongo --nodb ../harness/mongojs/init.js exit 0 fi diff --git a/server_test.go b/server_test.go new file mode 100644 index 000000000..1d21ef08b --- /dev/null +++ b/server_test.go @@ -0,0 +1,64 @@ +// mgo - MongoDB driver for Go +// +// Copyright (c) 2018 Canonical Ltd +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package mgo_test + +import ( + "time" + + . "gopkg.in/check.v1" + "github.com/globalsign/mgo" +) + +func (s *S) TestServerRecoversFromAbend(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + // Peek behind the scenes + cluster := session.Cluster() + server := cluster.Server("127.0.0.1:40001") + sock, abended, err := server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + c.Assert(sock, NotNil) + sock.Release() + c.Check(abended, Equals, false) + // Forcefully abend this socket + sock.Close() + server.AbendSocket(sock) + // Next acquire notices the connection was abnormally ended + sock, abended, err = server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + sock.Release() + c.Check(abended, Equals, true) + // cluster.AcquireSocket should fix the abended problems + sock, err = cluster.AcquireSocket(mgo.Primary, false, time.Minute, time.Second, nil, 100) + c.Assert(err, IsNil) + sock.Release() + sock, abended, err = server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + c.Check(abended, Equals, false) + sock.Release() +} From 8a049e5e6fbc786e833729379462b8cfa5201c03 Mon Sep 17 00:00:00 2001 From: Pietro De Caro Date: Thu, 15 Feb 2018 11:35:26 +0100 Subject: [PATCH 05/24] changeStream support (#97) Add $changeStream support --- changestreams.go | 357 ++++++++++++++++++++++++++++++++ changestreams_test.go | 464 ++++++++++++++++++++++++++++++++++++++++++ harness/daemons/.env | 11 +- session.go | 64 +++++- 4 files changed, 885 insertions(+), 11 deletions(-) create mode 100644 changestreams.go create mode 100644 changestreams_test.go diff --git a/changestreams.go b/changestreams.go new file mode 100644 index 000000000..5c2279c66 --- /dev/null +++ b/changestreams.go @@ -0,0 +1,357 @@ +package mgo + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/globalsign/mgo/bson" +) + +type FullDocument string + +const ( + Default = "default" + UpdateLookup = "updateLookup" +) + +type ChangeStream struct { + iter *Iter + isClosed bool + options ChangeStreamOptions + pipeline interface{} + resumeToken *bson.Raw + collection *Collection + readPreference *ReadPreference + err error + m sync.Mutex + sessionCopied bool +} + +type ChangeStreamOptions struct { + + // FullDocument controls the amount of data that the server will return when + // returning a changes document. + FullDocument FullDocument + + // ResumeAfter specifies the logical starting point for the new change stream. + ResumeAfter *bson.Raw + + // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait + // on new documents to satisfy a change stream query. + MaxAwaitTimeMS time.Duration + + // BatchSize specifies the number of documents to return per batch. + BatchSize int + + // Collation specifies the way the server should collate returned data. + //TODO Collation *Collation +} + +var errMissingResumeToken = errors.New("resume token missing from result") + +// Watch constructs a new ChangeStream capable of receiving continuing data +// from the database. +func (coll *Collection) Watch(pipeline interface{}, + options ChangeStreamOptions) (*ChangeStream, error) { + + if pipeline == nil { + pipeline = []bson.M{} + } + + csPipe := constructChangeStreamPipeline(pipeline, options) + pipe := coll.Pipe(&csPipe) + if options.MaxAwaitTimeMS > 0 { + pipe.SetMaxTime(options.MaxAwaitTimeMS) + } + if options.BatchSize > 0 { + pipe.Batch(options.BatchSize) + } + pIter := pipe.Iter() + + // check that there was no issue creating the iterator. + // this will fail immediately with an error from the server if running against + // a standalone. + if err := pIter.Err(); err != nil { + return nil, err + } + + pIter.isChangeStream = true + return &ChangeStream{ + iter: pIter, + collection: coll, + resumeToken: nil, + options: options, + pipeline: pipeline, + }, nil +} + +// Next retrieves the next document from the change stream, blocking if necessary. +// Next returns true if a document was successfully unmarshalled into result, +// and false if an error occured. When Next returns false, the Err method should +// be called to check what error occurred during iteration. If there were no events +// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton. +// +// For example: +// +// pipeline := []bson.M{} +// +// changeStream := collection.Watch(pipeline, ChangeStreamOptions{}) +// for changeStream.Next(&changeDoc) { +// fmt.Printf("Change: %v\n", changeDoc) +// } +// +// if err := changeStream.Close(); err != nil { +// return err +// } +// +// If the pipeline used removes the _id field from the result, Next will error +// because the _id field is needed to resume iteration when an error occurs. +// +func (changeStream *ChangeStream) Next(result interface{}) bool { + // the err field is being constantly overwritten and we don't want the user to + // attempt to read it at this point so we lock. + changeStream.m.Lock() + + defer changeStream.m.Unlock() + + // if we are in a state of error, then don't continue. + if changeStream.err != nil { + return false + } + + if changeStream.isClosed { + changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream") + return false + } + + var err error + + // attempt to fetch the change stream result. + err = changeStream.fetchResultSet(result) + if err == nil { + return true + } + + // if we get no results we return false with no errors so the user can call Next + // again, resuming is not needed as the iterator is simply timed out as no events happened. + // The user will call Timeout in order to understand if this was the case. + if err == ErrNotFound { + return false + } + + // check if the error is resumable + if !isResumableError(err) { + // error is not resumable, give up and return it to the user. + changeStream.err = err + return false + } + + // try to resume. + err = changeStream.resume() + if err != nil { + // we've not been able to successfully resume and should only try once, + // so we give up. + changeStream.err = err + return false + } + + // we've successfully resumed the changestream. + // try to fetch the next result. + err = changeStream.fetchResultSet(result) + if err != nil { + changeStream.err = err + return false + } + + return true +} + +// Err returns nil if no errors happened during iteration, or the actual +// error otherwise. +func (changeStream *ChangeStream) Err() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + return changeStream.err +} + +// Close kills the server cursor used by the iterator, if any, and returns +// nil if no errors happened during iteration, or the actual error otherwise. +func (changeStream *ChangeStream) Close() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + changeStream.isClosed = true + err := changeStream.iter.Close() + if err != nil { + changeStream.err = err + } + if changeStream.sessionCopied { + changeStream.iter.session.Close() + changeStream.sessionCopied = false + } + return err +} + +// ResumeToken returns a copy of the current resume token held by the change stream. +// This token should be treated as an opaque token that can be provided to instantiate +// a new change stream. +func (changeStream *ChangeStream) ResumeToken() *bson.Raw { + changeStream.m.Lock() + defer changeStream.m.Unlock() + if changeStream.resumeToken == nil { + return nil + } + var tokenCopy = *changeStream.resumeToken + return &tokenCopy +} + +// Timeout returns true if the last call of Next returned false because of an iterator timeout. +func (changeStream *ChangeStream) Timeout() bool { + return changeStream.iter.Timeout() +} + +func constructChangeStreamPipeline(pipeline interface{}, + options ChangeStreamOptions) interface{} { + pipelinev := reflect.ValueOf(pipeline) + + // ensure that the pipeline passed in is a slice. + if pipelinev.Kind() != reflect.Slice { + panic("pipeline argument must be a slice") + } + + // construct the options to be used by the change notification + // pipeline stage. + changeStreamStageOptions := bson.M{} + + if options.FullDocument != "" { + changeStreamStageOptions["fullDocument"] = options.FullDocument + } + if options.ResumeAfter != nil { + changeStreamStageOptions["resumeAfter"] = options.ResumeAfter + } + + changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions} + + pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) + + // insert the change notification pipeline stage at the beginning of the + // aggregation. + pipeOfInterfaces[0] = changeStreamStage + + // convert the passed in slice to a slice of interfaces. + for i := 0; i < pipelinev.Len(); i++ { + pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() + } + var pipelineAsInterface interface{} = pipeOfInterfaces + return pipelineAsInterface +} + +func (changeStream *ChangeStream) resume() error { + // copy the information for the new socket. + + // Thanks to Copy() future uses will acquire a new socket against the newly selected DB. + newSession := changeStream.iter.session.Copy() + + // fetch the cursor from the iterator and use it to run a killCursors + // on the connection. + cursorId := changeStream.iter.op.cursorId + err := runKillCursorsOnSession(newSession, cursorId) + if err != nil { + return err + } + + // change out the old connection to the database with the new connection. + if changeStream.sessionCopied { + changeStream.collection.Database.Session.Close() + } + changeStream.collection.Database.Session = newSession + changeStream.sessionCopied = true + + opts := changeStream.options + if changeStream.resumeToken != nil { + opts.ResumeAfter = changeStream.resumeToken + } + // make a new pipeline containing the resume token. + changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts) + + // generate the new iterator with the new connection. + newPipe := changeStream.collection.Pipe(changeStreamPipeline) + changeStream.iter = newPipe.Iter() + if err := changeStream.iter.Err(); err != nil { + return err + } + changeStream.iter.isChangeStream = true + return nil +} + +// fetchResumeToken unmarshals the _id field from the document, setting an error +// on the changeStream if it is unable to. +func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error { + changeStreamResult := struct { + ResumeToken *bson.Raw `bson:"_id,omitempty"` + }{} + + err := rawResult.Unmarshal(&changeStreamResult) + if err != nil { + return err + } + + if changeStreamResult.ResumeToken == nil { + return errMissingResumeToken + } + + changeStream.resumeToken = changeStreamResult.ResumeToken + return nil +} + +func (changeStream *ChangeStream) fetchResultSet(result interface{}) error { + rawResult := bson.Raw{} + + // fetch the next set of documents from the cursor. + gotNext := changeStream.iter.Next(&rawResult) + err := changeStream.iter.Err() + if err != nil { + return err + } + + if !gotNext && err == nil { + // If the iter.Err() method returns nil despite us not getting a next batch, + // it is becuase iter.Err() silences this case. + return ErrNotFound + } + + // grab the resumeToken from the results + if err := changeStream.fetchResumeToken(&rawResult); err != nil { + return err + } + + // put the raw results into the data structure the user provided. + if err := rawResult.Unmarshal(result); err != nil { + return err + } + return nil +} + +func isResumableError(err error) bool { + _, isQueryError := err.(*QueryError) + // if it is not a database error OR it is a database error, + // but the error is a notMaster error + //and is not a missingResumeToken error (caused by the user provided pipeline) + return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken) +} + +func runKillCursorsOnSession(session *Session, cursorId int64) error { + socket, err := session.acquireSocket(true) + if err != nil { + return err + } + err = socket.Query(&killCursorsOp{[]int64{cursorId}}) + if err != nil { + return err + } + socket.Release() + + return nil +} diff --git a/changestreams_test.go b/changestreams_test.go new file mode 100644 index 000000000..792f5d6ef --- /dev/null +++ b/changestreams_test.go @@ -0,0 +1,464 @@ +package mgo_test + +import ( + mgo "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +type updateDesc struct { + UpdatedFields map[string]interface{} `bson:"updatedFields"` + RemovedFields []string `bson:"removedFields"` +} + +type evNamespace struct { + DB string `bson:"db"` + Coll string `bson:"coll"` +} + +type changeEvent struct { + ID interface{} `bson:"_id"` + OperationType string `bson:"operationType"` + FullDocument *bson.Raw `bson:"fullDocument,omitempty"` + Ns evNamespace `bson:"ns"` + DocumentKey M `bson:"documentKey"` + UpdateDescription *updateDesc `bson:"updateDescription,omitempty"` +} + +func (s *S) TestStreamsWatch(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + coll := session.DB("mydb").C("mycoll") + //add a mock document + coll.Insert(M{"a": 0}) + + pipeline := []bson.M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsInsert(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + //get the _id for later check + type A struct { + ID bson.ObjectId `bson:"_id"` + A int `bson:"a"` + } + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "insert") + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextNoEventTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //check we timeout correctly on no events + //we should get a false result and no error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //test the same with default timeout (MaxTimeMS=1000) + //create the stream + changeStream, err = coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check we timeout correctly on no subsequent events + //we should get a false result and no error + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsDelete(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the changeStream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //delete the document + err = coll.Remove(M{"_id": id}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "delete") + c.Assert(ev.FullDocument, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdate(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": 2}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(ev.FullDocument, IsNil) + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateFullDocument(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": "bla"}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500, FullDocument: mgo.UpdateLookup}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + type A struct { + A int `bson:"a"` + ToRemove *string `bson:"toremove"` + } + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(a.ToRemove, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateWithPipeline(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add two docs + id1 := bson.NewObjectId() + err = coll.Insert(M{"_id": id1, "a": 1}) + c.Assert(err, IsNil) + id2 := bson.NewObjectId() + err = coll.Insert(M{"_id": id2, "a": 2}) + c.Assert(err, IsNil) + + pipeline1 := []M{M{"$match": M{"documentKey._id": id1}}} + changeStream1, err := coll.Watch(pipeline1, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + pipeline2 := []M{M{"$match": M{"documentKey._id": id2}}} + changeStream2, err := coll.Watch(pipeline2, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update documents + _, err = coll.UpdateAll(M{"_id": M{"$in": []bson.ObjectId{id1, id2}}}, M{"$inc": M{"a": 1}}) + c.Assert(err, IsNil) + + got1 := false + got2 := false + + //check we got the update for id1 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream1.Next(&ev) + //we will accept only one event, the one that corresponds to our id1 + c.Assert(got1 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id1) + got1 = true + } + } + c.Assert(got1, Equals, true) + + //check we got the update for id2 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream2.Next(&ev) + //we will accept only one event, the one that corresponds to our id2 + c.Assert(got2 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id2) + got2 = true + } + } + c.Assert(got2, Equals, true) + + err = changeStream1.Close() + c.Assert(err, IsNil) + err = changeStream2.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsResumeTokenMissingError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "resume token missing from result") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsClosedStreamError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "illegal use of a closed ChangeStream") +} diff --git a/harness/daemons/.env b/harness/daemons/.env index 7ba8cf599..70acb5b92 100644 --- a/harness/daemons/.env +++ b/harness/daemons/.env @@ -59,7 +59,16 @@ if versionAtLeast 3 2; then COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nohttpinterface/d')" COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nohttpinterface/d')" - # config server need to be started as replica set + + if versionAtLeast 3 6; then + #In version 3.6 --nojournal is deprecated for replica set members using WiredTiger + COMMONDOPTSNOIP="$(echo "$COMMONDOPTSNOIP" | sed '/--nojournal/d')" + COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nojournal/d')" + COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" + fi + + # config server need to be started as replica set + CFG1OPTS="--replSet conf1" CFG2OPTS="--replSet conf2" CFG3OPTS="--replSet conf3" diff --git a/session.go b/session.go index b62707c84..561f79487 100644 --- a/session.go +++ b/session.go @@ -169,7 +169,9 @@ type Iter struct { timeout time.Duration limit int32 timedout bool - findCmd bool + isFindCmd bool + isChangeStream bool + maxTimeMS int64 } var ( @@ -1117,6 +1119,11 @@ func isAuthError(err error) bool { return ok && e.Code == 13 } +func isNotMasterError(err error) bool { + e, ok := err.(*QueryError) + return ok && strings.Contains(e.Message, "not master") +} + func (db *Database) runUserCmd(cmdName string, user *User) error { cmd := make(bson.D, 0, 16) cmd = append(cmd, bson.DocElem{Name: cmdName, Value: user.Username}) @@ -2423,6 +2430,7 @@ type Pipe struct { pipeline interface{} allowDisk bool batchSize int + maxTimeMS int64 } type pipeCmd struct { @@ -2431,6 +2439,7 @@ type pipeCmd struct { Cursor *pipeCmdCursor `bson:",omitempty"` Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` + MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` } type pipeCmdCursor struct { @@ -2485,6 +2494,9 @@ func (p *Pipe) Iter() *Iter { AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, } + if p.maxTimeMS > 0 { + cmd.MaxTimeMS = p.maxTimeMS + } err := c.Database.Run(cmd, &result) if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { cmd.Cursor = nil @@ -2495,7 +2507,11 @@ func (p *Pipe) Iter() *Iter { if firstBatch == nil { firstBatch = result.Cursor.FirstBatch } - return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + it := c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + if p.maxTimeMS > 0 { + it.maxTimeMS = p.maxTimeMS + } + return it } // NewIter returns a newly created iterator with the provided parameters. Using @@ -2557,7 +2573,7 @@ func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId i } if socket.ServerInfo().MaxWireVersion >= 4 && c.FullName != "admin.$cmd" { - iter.findCmd = true + iter.isFindCmd = true } iter.gotReply.L = &iter.m @@ -2659,6 +2675,13 @@ func (p *Pipe) Batch(n int) *Pipe { return p } +// SetMaxTime sets the maximum amount of time to allow the query to run. +// +func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { + p.maxTimeMS = int64(d / time.Millisecond) + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: @@ -3801,7 +3824,7 @@ func (q *Query) Iter() *Iter { op.replyFunc = iter.op.replyFunc if prepareFindOp(socket, &op, limit) { - iter.findCmd = true + iter.isFindCmd = true } iter.server = socket.Server() @@ -4015,7 +4038,8 @@ func (iter *Iter) Timeout() bool { // Next returns true if a document was successfully unmarshalled onto result, // and false at the end of the result set or if an error happened. // When Next returns false, the Err method should be called to verify if -// there was an error during iteration. +// there was an error during iteration, and the Timeout method to verify if the +// false return value was caused by a timeout (no available results). // // For example: // @@ -4031,7 +4055,16 @@ func (iter *Iter) Next(result interface{}) bool { iter.m.Lock() iter.timedout = false timeout := time.Time{} + // for a ChangeStream iterator we have to call getMore before the loop otherwise + // we'll always return false + if iter.isChangeStream { + iter.getMore() + } + // check should we expect more data. for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { + // we should expect more data. + + // If we have yet to receive data, increment the timer until we timeout. if iter.docsToReceive == 0 { if iter.timeout >= 0 { if timeout.IsZero() { @@ -4043,6 +4076,13 @@ func (iter *Iter) Next(result interface{}) bool { return false } } + // for a ChangeStream one loop i enought to declare the timeout + if iter.isChangeStream { + iter.timedout = true + iter.m.Unlock() + return false + } + // run a getmore to fetch more data. iter.getMore() if iter.err != nil { break @@ -4050,7 +4090,7 @@ func (iter *Iter) Next(result interface{}) bool { } iter.gotReply.Wait() } - + // We have data from the getMore. // Exhaust available data before reporting any errors. if docData, ok := iter.docData.Pop().([]byte); ok { close := false @@ -4066,6 +4106,7 @@ func (iter *Iter) Next(result interface{}) bool { } } if iter.op.cursorId != 0 && iter.err == nil { + // we still have a live cursor and currently expect data. iter.docsBeforeMore-- if iter.docsBeforeMore == -1 { iter.getMore() @@ -4255,7 +4296,7 @@ func (iter *Iter) getMore() { } } var op interface{} - if iter.findCmd { + if iter.isFindCmd || iter.isChangeStream { op = iter.getMoreCmd() } else { op = &iter.op @@ -4278,6 +4319,9 @@ func (iter *Iter) getMoreCmd() *queryOp { Collection: iter.op.collection[nameDot+1:], BatchSize: iter.op.limit, } + if iter.maxTimeMS > 0 { + getMore.MaxTimeMS = iter.maxTimeMS + } var op queryOp op.collection = iter.op.collection[:nameDot] + ".$cmd" @@ -4882,7 +4926,7 @@ func (iter *Iter) replyFunc() replyFunc { } else { iter.err = ErrNotFound } - } else if iter.findCmd { + } else if iter.isFindCmd { debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) var findReply struct { Ok bool @@ -4894,7 +4938,7 @@ func (iter *Iter) replyFunc() replyFunc { iter.err = err } else if !findReply.Ok && findReply.Errmsg != "" { iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} - } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { + } else if !iter.isChangeStream && len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { iter.err = ErrNotFound } else { batch := findReply.Cursor.FirstBatch @@ -5262,7 +5306,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { From 91cf46c3ef1013857ec915403d99a9007f84d06b Mon Sep 17 00:00:00 2001 From: Dom Date: Mon, 19 Feb 2018 11:27:01 +0000 Subject: [PATCH 06/24] readme: credit @peterdeka and @steve-gray (#110) --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index c605e6bb0..cd5edff49 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Minimise socket connection timeouts due to excessive locking ([details](https://github.com/globalsign/mgo/pull/52)) * Natively support X509 client authentication ([details](https://github.com/globalsign/mgo/pull/55)) * Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69)) +* Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) +* Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) --- @@ -51,6 +53,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @jameinel * @gazoon * @mapete94 +* @peterdeka * @Reenjii * @smoya +* @steve-gray * @wgallagher \ No newline at end of file From 860240ea96bd411b6f6165bb634ac6209981f2c5 Mon Sep 17 00:00:00 2001 From: Xu Wang Date: Fri, 2 Mar 2018 01:39:58 +0800 Subject: [PATCH 07/24] enable shrink the socket pool size (#116) * enable shrink the socket pool size we found the mgo will allocate the pool size during burst traffic but won't close the sockets any more until restart the client or server. And the mongo document defines two related query options - [minPoolSize](https://docs.mongodb.com/manual/reference/connection-string/#urioption.minPoolSize) - [maxIdleTimeMS](https://docs.mongodb.com/manual/reference/connection-string/#urioption.maxIdleTimeMS) By implementing these two options, it could shrink the pool to minPoolSize after the sockets introduced by burst traffic timeout. The idea comes from https://github.com/JodeZer/mgo , he investigated this issue and provide the initial commits. I found there are still some issue in sockets maintenance, and had a PR against his repo JodeZer/mgo#1 . This commit include JodeZer's commits and my fix, and I simplified the data structure. What's in this commit could be described as this figure: +------------------------+ | Session | <-------+ Add options here +------------------------+ +------------------------+ | Cluster | <-------+ Add options here +------------------------+ +------------------------+ | Server | <-------+*Add options here | | *add timestamp when recycle a socket +---+ | +-----------+ | +---+ *periodically check the unused sockets | | | shrinker <------+ and reclaim the timeout sockets. +---+ | +-----------+ | | | | | +------------------------+ | | +------------------------+ | | Socket | <-------+ Add a field for last used times+---------+ +------------------------+ Signed-off-by: Wang Xu * tests for shrink the socks pool Signed-off-by: Wang Xu --- cluster.go | 36 ++++++++++++--------- server.go | 71 +++++++++++++++++++++++++++++++++++----- session.go | 42 ++++++++++++++++++++++++ session_test.go | 86 +++++++++++++++++++++++++++++++++++++++++++++++++ socket.go | 1 + 5 files changed, 212 insertions(+), 24 deletions(-) diff --git a/cluster.go b/cluster.go index ac461d5b9..204f507bd 100644 --- a/cluster.go +++ b/cluster.go @@ -48,21 +48,23 @@ import ( type mongoCluster struct { sync.RWMutex - serverSynced sync.Cond - userSeeds []string - dynaSeeds []string - servers mongoServers - masters mongoServers - references int - syncing bool - direct bool - failFast bool - syncCount uint - setName string - cachedIndex map[string]bool - sync chan bool - dial dialer - appName string + serverSynced sync.Cond + userSeeds []string + dynaSeeds []string + servers mongoServers + masters mongoServers + references int + syncing bool + direct bool + failFast bool + syncCount uint + setName string + cachedIndex map[string]bool + sync chan bool + dial dialer + appName string + minPoolSize int + maxIdleTimeMS int } func newCluster(userSeeds []string, direct, failFast bool, dial dialer, setName string, appName string) *mongoCluster { @@ -437,11 +439,13 @@ func (cluster *mongoCluster) syncServersLoop() { func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer { cluster.RLock() server := cluster.servers.Search(tcpaddr.String()) + minPoolSize := cluster.minPoolSize + maxIdleTimeMS := cluster.maxIdleTimeMS cluster.RUnlock() if server != nil { return server } - return newServer(addr, tcpaddr, cluster.sync, cluster.dial) + return newServer(addr, tcpaddr, cluster.sync, cluster.dial, minPoolSize, maxIdleTimeMS) } func resolveAddr(addr string) (*net.TCPAddr, error) { diff --git a/server.go b/server.go index 7ad955255..f999b2407 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type mongoServer struct { pingCount uint32 closed bool abended bool + minPoolSize int + maxIdleTimeMS int } type dialer struct { @@ -76,17 +78,22 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ - Addr: addr, - ResolvedAddr: tcpaddr.String(), - tcpaddr: tcpaddr, - sync: sync, - dial: dial, - info: &defaultServerInfo, - pingValue: time.Hour, // Push it back before an actual ping. + Addr: addr, + ResolvedAddr: tcpaddr.String(), + tcpaddr: tcpaddr, + sync: sync, + dial: dial, + info: &defaultServerInfo, + pingValue: time.Hour, // Push it back before an actual ping. + minPoolSize: minPoolSize, + maxIdleTimeMS: maxIdleTimeMS, } go server.pinger(true) + if maxIdleTimeMS != 0 { + go server.poolShrinker() + } return server } @@ -221,6 +228,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { + socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } server.Unlock() @@ -346,6 +354,53 @@ func (server *mongoServer) pinger(loop bool) { } } +func (server *mongoServer) poolShrinker() { + ticker := time.NewTicker(1 * time.Minute) + for _ = range ticker.C { + if server.closed { + ticker.Stop() + return + } + server.Lock() + unused := len(server.unusedSockets) + if unused < server.minPoolSize { + server.Unlock() + continue + } + now := time.Now() + end := 0 + reclaimMap := map[*mongoSocket]struct{}{} + // Because the acquisition and recycle are done at the tail of array, + // the head is always the oldest unused socket. + for _, s := range server.unusedSockets[:unused-server.minPoolSize] { + if s.lastTimeUsed.Add(time.Duration(server.maxIdleTimeMS) * time.Millisecond).After(now) { + break + } + end++ + reclaimMap[s] = struct{}{} + } + tbr := server.unusedSockets[:end] + if end > 0 { + next := make([]*mongoSocket, unused-end) + copy(next, server.unusedSockets[end:]) + server.unusedSockets = next + remainSockets := []*mongoSocket{} + for _, s := range server.liveSockets { + if _, ok := reclaimMap[s]; !ok { + remainSockets = append(remainSockets, s) + } + } + server.liveSockets = remainSockets + stats.conn(-1*end, server.info.Master) + } + server.Unlock() + + for _, s := range tbr { + s.Close() + } + } +} + type mongoServerSlice []*mongoServer func (s mongoServerSlice) Len() int { diff --git a/session.go b/session.go index 561f79487..d1c88420e 100644 --- a/session.go +++ b/session.go @@ -271,6 +271,16 @@ const ( // Defines the per-server socket pool limit. Defaults to 4096. // See Session.SetPoolLimit for details. // +// minPoolSize= +// +// Defines the per-server socket pool minium size. Defaults to 0. +// +// maxIdleTimeMS= +// +// The maximum number of milliseconds that a connection can remain idle in the pool +// before being removed and closed. If maxIdleTimeMS is 0, connections will never be +// closed due to inactivity. +// // appName= // // The identifier of this client application. This parameter is used to @@ -322,6 +332,8 @@ func ParseURL(url string) (*DialInfo, error) { appName := "" readPreferenceMode := Primary var readPreferenceTagSets []bson.D + minPoolSize := 0 + maxIdleTimeMS := 0 for _, opt := range uinfo.options { switch opt.key { case "authSource": @@ -368,6 +380,22 @@ func ParseURL(url string) (*DialInfo, error) { doc = append(doc, bson.DocElem{Name: strings.TrimSpace(kvp[0]), Value: strings.TrimSpace(kvp[1])}) } readPreferenceTagSets = append(readPreferenceTagSets, doc) + case "minPoolSize": + minPoolSize, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for minPoolSize: " + opt.value) + } + if minPoolSize < 0 { + return nil, errors.New("bad value (negtive) for minPoolSize: " + opt.value) + } + case "maxIdleTimeMS": + maxIdleTimeMS, err = strconv.Atoi(opt.value) + if err != nil { + return nil, errors.New("bad value for maxIdleTimeMS: " + opt.value) + } + if maxIdleTimeMS < 0 { + return nil, errors.New("bad value (negtive) for maxIdleTimeMS: " + opt.value) + } case "connect": if opt.value == "direct" { direct = true @@ -402,6 +430,8 @@ func ParseURL(url string) (*DialInfo, error) { TagSets: readPreferenceTagSets, }, ReplicaSetName: setName, + MinPoolSize: minPoolSize, + MaxIdleTimeMS: maxIdleTimeMS, } return &info, nil } @@ -475,6 +505,14 @@ type DialInfo struct { // cluster and establish connections with further servers too. Direct bool + // MinPoolSize defines The minimum number of connections in the connection pool. + // Defaults to 0. + MinPoolSize int + + //The maximum number of milliseconds that a connection can remain idle in the pool + // before being removed and closed. + MaxIdleTimeMS int + // DialServer optionally specifies the dial function for establishing // connections with the MongoDB servers. DialServer func(addr *ServerAddr) (net.Conn, error) @@ -554,6 +592,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) { if info.PoolLimit > 0 { session.poolLimit = info.PoolLimit } + + cluster.minPoolSize = info.MinPoolSize + cluster.maxIdleTimeMS = info.MaxIdleTimeMS + cluster.Release() // People get confused when we return a session that is not actually diff --git a/session_test.go b/session_test.go index eb2c812b3..c740af47a 100644 --- a/session_test.go +++ b/session_test.go @@ -30,11 +30,13 @@ import ( "flag" "fmt" "math" + "math/rand" "os" "runtime" "sort" "strconv" "strings" + "sync" "testing" "time" @@ -166,6 +168,90 @@ func (s *S) TestURLInvalidReadPreference(c *C) { } } +func (s *S) TestMinPoolSize(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?minPoolSize=0", 0, false}, + {"localhost:40001?minPoolSize=1", 1, false}, + {"localhost:40001?minPoolSize=-1", -1, true}, + {"localhost:40001?minPoolSize=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MinPoolSize, Equals, test.size) + } + } +} + +func (s *S) TestMaxIdleTimeMS(c *C) { + tests := []struct { + url string + size int + fail bool + }{ + {"localhost:40001?maxIdleTimeMS=0", 0, false}, + {"localhost:40001?maxIdleTimeMS=1", 1, false}, + {"localhost:40001?maxIdleTimeMS=-1", -1, true}, + {"localhost:40001?maxIdleTimeMS=-.", 0, true}, + } + for _, test := range tests { + info, err := mgo.ParseURL(test.url) + if test.fail { + c.Assert(err, NotNil) + } else { + c.Assert(err, IsNil) + c.Assert(info.MaxIdleTimeMS, Equals, test.size) + } + } +} + +func (s *S) TestPoolShrink(c *C) { + if *fast { + c.Skip("-fast") + } + oldSocket := mgo.GetStats().SocketsAlive + + session, err := mgo.Dial("localhost:40001?minPoolSize=1&maxIdleTimeMS=1000") + c.Assert(err, IsNil) + defer session.Close() + + parallel := 10 + res := make(chan error, parallel+1) + wg := &sync.WaitGroup{} + for i := 1; i < parallel; i++ { + wg.Add(1) + go func() { + s := session.Copy() + defer s.Close() + result := struct{}{} + err := s.Run("ping", &result) + + //sleep random time to make the allocate and release in different sequence + time.Sleep(time.Duration(rand.Intn(parallel)*100) * time.Millisecond) + res <- err + wg.Done() + }() + } + wg.Wait() + stats := mgo.GetStats() + c.Logf("living socket: After queries: %d, before queries: %d", stats.SocketsAlive, oldSocket) + + // give some time for shrink the pool, the tick is set to 1 minute + c.Log("Sleeping... 1 minute to for pool shrinking") + time.Sleep(60 * time.Second) + + stats = mgo.GetStats() + c.Logf("living socket: After shrinking: %d, at the beginning of the test: %d", stats.SocketsAlive, oldSocket) + c.Assert(stats.SocketsAlive-oldSocket > 1, Equals, false) +} + func (s *S) TestURLReadPreferenceTags(c *C) { type test struct { url string diff --git a/socket.go b/socket.go index a9124b043..ae13e401f 100644 --- a/socket.go +++ b/socket.go @@ -54,6 +54,7 @@ type mongoSocket struct { dead error serverInfo *mongoServerInfo closeAfterIdle bool + lastTimeUsed time.Time // for time based idle socket release sendMeta sync.Once } From aa690cdf8874bb698b3089eebb9790745b257d6b Mon Sep 17 00:00:00 2001 From: tadukurow Date: Mon, 12 Mar 2018 12:09:33 +0000 Subject: [PATCH 08/24] Ignore dial error in dbtest after stopping server. (#122) * Ignore dial error when server is stopped. related to #117 * Print dbtest server starting error before panic. As seen in #117, dbtest start() throws a panic when it can't start mongo. This panic is picked up by a panichandler obscuring the actual problem. This PR simply prints the error start() encounters to stderr, before throwing the panic. --- dbtest/dbserver.go | 2 ++ dbtest/dbserver_test.go | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/dbtest/dbserver.go b/dbtest/dbserver.go index b74280801..2fadaf764 100644 --- a/dbtest/dbserver.go +++ b/dbtest/dbserver.go @@ -69,6 +69,8 @@ func (dbs *DBServer) start() { dbs.server.Stderr = &dbs.output err = dbs.server.Start() if err != nil { + // print error to facilitate troubleshooting as the panic will be caught in a panic handler + fmt.Fprintf(os.Stderr, "mongod failed to start: %v\n",err) panic(err) } dbs.tomb.Go(dbs.monitor) diff --git a/dbtest/dbserver_test.go b/dbtest/dbserver_test.go index b3cc45a8a..e3abb1817 100644 --- a/dbtest/dbserver_test.go +++ b/dbtest/dbserver_test.go @@ -77,9 +77,7 @@ func (s *S) TestStop(c *C) { server.Stop() // Server should not be running anymore. - session, err = mgo.DialWithTimeout(addr, 500*time.Millisecond) - c.Assert(err, IsNil) - + session, _ = mgo.DialWithTimeout(addr, 500*time.Millisecond) if session != nil { session.Close() c.Fatalf("Stop did not stop the server") From e854ed5cdaeeaca5b861d72502be0ae2b33334ff Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 10 Mar 2018 18:22:31 +1030 Subject: [PATCH 09/24] Add a test for globalsign/mgo#120 We've seen a deadlock happen occasionally where syncServers needs to acquire a socket to call isMaster, but the socket acquisition needs to know the server topology which isn't known yet. See globalsign/mgo#120 issue for a detailed breakdown. This replicates the issue by setting up a mongo "server" which closes sockets as soon as they're opened; about 20% of the time, this will trigger the deadlock because the acquired socket for ismaster() dies and needs to be reacquired. --- cluster_test.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index 8945e0962..a0a197048 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1964,6 +1964,41 @@ func (s *S) TestConnectCloseConcurrency(c *C) { wg.Wait() } +func (s *S) TestNoDeadlockOnClose(c *C) { + if *fast { + // Unfortunately I seem to need quite a high dial timeout to get this to work + // on my machine. + c.Skip("-fast") + } + + var shouldStop int32 + atomic.StoreInt32(&shouldStop, 0) + + listener, err := net.Listen("tcp4", "127.0.0.1:") + c.Check(err, Equals, nil) + + go func() { + for atomic.LoadInt32(&shouldStop) == 0 { + sock, err := listener.Accept() + if err != nil { + // Probs just closed + continue + } + sock.Close() + } + }() + defer func() { + atomic.StoreInt32(&shouldStop, 1) + listener.Close() + }() + + session, err := mgo.DialWithTimeout(listener.Addr().String(), 10*time.Second) + // If execution reaches here, the deadlock did not happen and all is OK + if session != nil { + session.Close() + } +} + func (s *S) TestSelectServers(c *C) { if !s.versionAtLeast(2, 2) { c.Skip("read preferences introduced in 2.2") From dc0f590e7eb754a87132cb489688751f25032d41 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 10 Mar 2018 18:27:53 +1030 Subject: [PATCH 10/24] Propose a fix for globalsign/mgo#120 As discussed in the issue globalsign/mgo#120, isMaster() can cause a deadlock with the topology scanner if the connection it makes dies before running the command; mgo automagically attempts to make another socket in acquireSocket, but this can't work without topology. This commit forces isMaster() to actually run on the intended socket. --- cluster.go | 2 +- session.go | 16 ++++++++++++++++ 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/cluster.go b/cluster.go index 204f507bd..9434647a7 100644 --- a/cluster.go +++ b/cluster.go @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.Run(cmd, result) + err := session.RunOnSocket(socket, cmd, result) session.Close() return err } diff --git a/session.go b/session.go index d1c88420e..0bbd51621 100644 --- a/session.go +++ b/session.go @@ -848,6 +848,15 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } +// RunOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (db *Database) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + socket.Acquire() + defer socket.Release() + return db.run(socket, cmd, result) +} + // Credential holds details to authenticate with a MongoDB server. type Credential struct { // Username and Password hold the basic details for authentication. @@ -2312,6 +2321,13 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } +// RunOnSocket does the same as Run, but guarantees that your command will be run +// on the provided socket instance; if it's unhealthy, you will receive the error +// from it. +func (s *Session) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").RunOnSocket(socket, cmd, result) +} + // SelectServers restricts communication to servers configured with the // given tags. For example, the following statement restricts servers // used for reading operations to those with both tag "disk" set to From 3838eb8b2417e360cf20a64bd096a18a0186de1a Mon Sep 17 00:00:00 2001 From: Damir Vandic Date: Wed, 21 Mar 2018 11:32:24 +0100 Subject: [PATCH 11/24] Make run on socket helper methods private --- cluster.go | 2 +- session.go | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/cluster.go b/cluster.go index 9434647a7..91a4e9ec2 100644 --- a/cluster.go +++ b/cluster.go @@ -181,7 +181,7 @@ func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResul }) }) - err := session.RunOnSocket(socket, cmd, result) + err := session.runOnSocket(socket, cmd, result) session.Close() return err } diff --git a/session.go b/session.go index 0bbd51621..3a27caf30 100644 --- a/session.go +++ b/session.go @@ -848,10 +848,10 @@ func (db *Database) Run(cmd interface{}, result interface{}) error { return db.run(socket, cmd, result) } -// RunOnSocket does the same as Run, but guarantees that your command will be run +// runOnSocket does the same as Run, but guarantees that your command will be run // on the provided socket instance; if it's unhealthy, you will receive the error // from it. -func (db *Database) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { +func (db *Database) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { socket.Acquire() defer socket.Release() return db.run(socket, cmd, result) @@ -2321,11 +2321,11 @@ func (s *Session) Run(cmd interface{}, result interface{}) error { return s.DB("admin").Run(cmd, result) } -// RunOnSocket does the same as Run, but guarantees that your command will be run +// runOnSocket does the same as Run, but guarantees that your command will be run // on the provided socket instance; if it's unhealthy, you will receive the error // from it. -func (s *Session) RunOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { - return s.DB("admin").RunOnSocket(socket, cmd, result) +func (s *Session) runOnSocket(socket *mongoSocket, cmd interface{}, result interface{}) error { + return s.DB("admin").runOnSocket(socket, cmd, result) } // SelectServers restricts communication to servers configured with the From 76ea203bb49916471d69fa6868e025844b77bc26 Mon Sep 17 00:00:00 2001 From: Konstantinos Tsanaktsidis Date: Tue, 3 Apr 2018 19:02:44 +1000 Subject: [PATCH 12/24] Add signaling support for connection pool waiting (#115) * Add signaling support for connection pool waiting The current behaviour when the poolLimit is reached and a new connection is required is to poll every 100ms to see if there is now headroom to make a new connection. This adds tremendous latency to the limit-hit-path. This commit changes the checkout behaviour to watch on a condition variable for connections to become available, and the checkin behaviour to signal this variable. This should allow waiters to use connections immediately after they become available. A new parameter is also added to DialInfo, PoolTimeout, which is the maximum time that clients will wait for connection headroom to become available. By default this is unlimited. * Add stats for connection pool timings This exposes four new counters * The number of times a socket was successfully obtained from the connection pool * The number of times the connection pool needed to be waited on * The total time that has been spent waiting for a conneciton to become available * The number of times socket acquisition failed due to a pool timeout * Gitignore .vscode directory I'm using vscode and accidently committed the .vscode directroy; .gitignore this footgun. --- .gitignore | 2 +- cluster.go | 22 +++++++------ cluster_test.go | 42 ++++++++++++++++++++++++ server.go | 85 +++++++++++++++++++++++++++++++++++++++++++++---- session.go | 25 ++++++++++++++- stats.go | 45 ++++++++++++++++++++------ 6 files changed, 195 insertions(+), 26 deletions(-) diff --git a/.gitignore b/.gitignore index 9f4fa6d20..aae58a8c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ _harness - +.vscode diff --git a/cluster.go b/cluster.go index 91a4e9ec2..4e54c5d81 100644 --- a/cluster.go +++ b/cluster.go @@ -618,9 +618,17 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) { // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int) (s *mongoSocket, err error) { + return cluster.AcquireSocketWithPoolTimeout(mode, slaveOk, syncTimeout, socketTimeout, serverTags, poolLimit, 0) +} + +// AcquireSocketWithPoolTimeout returns a socket to a server in the cluster. If slaveOk is +// true, it will attempt to return a socket to a slave server. If it is +// false, the socket will necessarily be to a master server. +func (cluster *mongoCluster) AcquireSocketWithPoolTimeout( + mode Mode, slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D, poolLimit int, poolTimeout time.Duration, +) (s *mongoSocket, err error) { var started time.Time var syncCount uint - warnedLimit := false for { cluster.RLock() for { @@ -662,14 +670,10 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout continue } - s, abended, err := server.AcquireSocket(poolLimit, socketTimeout) - if err == errPoolLimit { - if !warnedLimit { - warnedLimit = true - log("WARNING: Per-server connection limit reached.") - } - time.Sleep(100 * time.Millisecond) - continue + s, abended, err := server.AcquireSocketWithBlocking(poolLimit, socketTimeout, poolTimeout) + if err == errPoolTimeout { + // No need to remove servers from the topology if acquiring a socket fails for this reason. + return nil, err } if err != nil { cluster.removeServer(server) diff --git a/cluster_test.go b/cluster_test.go index a0a197048..be11dc1a7 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -1583,6 +1583,9 @@ func (s *S) TestPoolLimitSimple(c *C) { } defer session.Close() + // So we can measure the stats for the blocking operation + mgo.ResetStats() + // Put one socket in use. c.Assert(session.Ping(), IsNil) @@ -1603,6 +1606,11 @@ func (s *S) TestPoolLimitSimple(c *C) { session.Refresh() delay := <-done c.Assert(delay > 300*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + stats := mgo.GetStats() + c.Assert(stats.TimesSocketAcquired, Equals, 2) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.PoolTimeouts, Equals, 0) + c.Assert(stats.TotalPoolWaitTime > 300*time.Millisecond, Equals, true) } } @@ -1649,6 +1657,40 @@ func (s *S) TestPoolLimitMany(c *C) { c.Assert(delay < 6e9, Equals, true) } +func (s *S) TestPoolLimitTimeout(c *C) { + if *fast { + c.Skip("-fast") + } + + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + session.SetPoolTimeout(1 * time.Second) + session.SetPoolLimit(1) + + mgo.ResetStats() + + // Put one socket in use. + c.Assert(session.Ping(), IsNil) + + // Now block trying to get another one due to the pool limit. + copy := session.Copy() + defer copy.Close() + started := time.Now() + err = copy.Ping() + delay := time.Since(started) + + c.Assert(delay > 900*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(delay < 1100*time.Millisecond, Equals, true, Commentf("Delay: %s", delay)) + c.Assert(strings.Contains(err.Error(), "could not acquire connection within pool timeout"), Equals, true, Commentf("Error: %s", err)) + stats := mgo.GetStats() + c.Assert(stats.PoolTimeouts, Equals, 1) + c.Assert(stats.TimesSocketAcquired, Equals, 1) + c.Assert(stats.TimesWaitedForPool, Equals, 1) + c.Assert(stats.TotalPoolWaitTime > 900*time.Millisecond, Equals, true) + c.Assert(stats.TotalPoolWaitTime < 1100*time.Millisecond, Equals, true) +} + func (s *S) TestSetModeEventualIterBug(c *C) { session1, err := mgo.Dial("localhost:40011") c.Assert(err, IsNil) diff --git a/server.go b/server.go index f999b2407..7832bec1b 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,7 @@ type mongoServer struct { abended bool minPoolSize int maxIdleTimeMS int + poolWaiter *sync.Cond } type dialer struct { @@ -78,18 +79,19 @@ type mongoServerInfo struct { var defaultServerInfo mongoServerInfo -func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { +func newServer(addr string, tcpaddr *net.TCPAddr, syncChan chan bool, dial dialer, minPoolSize, maxIdleTimeMS int) *mongoServer { server := &mongoServer{ Addr: addr, ResolvedAddr: tcpaddr.String(), tcpaddr: tcpaddr, - sync: sync, + sync: syncChan, dial: dial, info: &defaultServerInfo, pingValue: time.Hour, // Push it back before an actual ping. minPoolSize: minPoolSize, maxIdleTimeMS: maxIdleTimeMS, } + server.poolWaiter = sync.NewCond(server) go server.pinger(true) if maxIdleTimeMS != 0 { go server.poolShrinker() @@ -98,6 +100,7 @@ func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer, m } var errPoolLimit = errors.New("per-server connection limit reached") +var errPoolTimeout = errors.New("could not acquire connection within pool timeout") var errServerClosed = errors.New("server was closed") // AcquireSocket returns a socket for communicating with the server. @@ -109,6 +112,21 @@ var errServerClosed = errors.New("server was closed") // use in this server is greater than the provided limit, errPoolLimit is // returned. func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, timeout, false, 0*time.Millisecond) +} + +// AcquireSocketWithBlocking wraps AcquireSocket, but if a socket is not available, it will _not_ +// return errPoolLimit. Instead, it will block waiting for a socket to become available. If poolTimeout +// should elapse before a socket is available, it will return errPoolTimeout. +func (server *mongoServer) AcquireSocketWithBlocking( + poolLimit int, socketTimeout time.Duration, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { + return server.acquireSocketInternal(poolLimit, socketTimeout, true, poolTimeout) +} + +func (server *mongoServer) acquireSocketInternal( + poolLimit int, timeout time.Duration, shouldBlock bool, poolTimeout time.Duration, +) (socket *mongoSocket, abended bool, err error) { for { server.Lock() abended = server.abended @@ -116,11 +134,58 @@ func (server *mongoServer) AcquireSocket(poolLimit int, timeout time.Duration) ( server.Unlock() return nil, abended, errServerClosed } - n := len(server.unusedSockets) - if poolLimit > 0 && len(server.liveSockets)-n >= poolLimit { - server.Unlock() - return nil, false, errPoolLimit + if poolLimit > 0 { + if shouldBlock { + // Beautiful. Golang conditions don't have a WaitWithTimeout, so I've implemented the timeout + // with a wait + broadcast. The broadcast will cause the loop here to re-check the timeout, + // and fail if it is blown. + // Yes, this is a spurious wakeup, but we can't do a directed signal without having one condition + // variable per waiter, which would involve loop traversal in the RecycleSocket + // method. + // We also can't use the approach of turning a condition variable into a channel outlined in + // https://github.com/golang/go/issues/16620, since the lock needs to be held in _this_ goroutine. + waitDone := make(chan struct{}) + timeoutHit := false + if poolTimeout > 0 { + go func() { + select { + case <-waitDone: + case <-time.After(poolTimeout): + // timeoutHit is part of the wait condition, so needs to be changed under mutex. + server.Lock() + defer server.Unlock() + timeoutHit = true + server.poolWaiter.Broadcast() + } + }() + } + timeSpentWaiting := time.Duration(0) + for len(server.liveSockets)-len(server.unusedSockets) >= poolLimit && !timeoutHit { + // We only count time spent in Wait(), and not time evaluating the entire loop, + // so that in the happy non-blocking path where the condition above evaluates true + // first time, we record a nice round zero wait time. + waitStart := time.Now() + // unlocks server mutex, waits, and locks again. Thus, the condition + // above is evaluated only when the lock is held. + server.poolWaiter.Wait() + timeSpentWaiting += time.Since(waitStart) + } + close(waitDone) + if timeoutHit { + server.Unlock() + stats.noticePoolTimeout(timeSpentWaiting) + return nil, false, errPoolTimeout + } + // Record that we fetched a connection of of a socket list and how long we spent waiting + stats.noticeSocketAcquisition(timeSpentWaiting) + } else { + if len(server.liveSockets)-len(server.unusedSockets) >= poolLimit { + server.Unlock() + return nil, false, errPoolLimit + } + } } + n := len(server.unusedSockets) if n > 0 { socket = server.unusedSockets[n-1] server.unusedSockets[n-1] = nil // Help GC. @@ -231,6 +296,14 @@ func (server *mongoServer) RecycleSocket(socket *mongoSocket) { socket.lastTimeUsed = time.Now() server.unusedSockets = append(server.unusedSockets, socket) } + // If anybody is waiting for a connection, they should try now. + // Note that this _has_ to be broadcast, not signal; the signature of AcquireSocket + // and AcquireSocketWithBlocking allow the caller to specify the max number of connections, + // rather than that being an intrinsic property of the connection pool (I assume to ensure + // that there is always a connection available for replset topology discovery). Thus, once + // a connection is returned to the pool, _every_ waiter needs to check if the connection count + // is underneath their particular value for poolLimit. + server.poolWaiter.Broadcast() server.Unlock() } diff --git a/session.go b/session.go index 3a27caf30..a9f3a8cb2 100644 --- a/session.go +++ b/session.go @@ -92,6 +92,7 @@ type Session struct { syncTimeout time.Duration sockTimeout time.Duration poolLimit int + poolTimeout time.Duration consistency Mode creds []Credential dialCred *Credential @@ -486,6 +487,11 @@ type DialInfo struct { // See Session.SetPoolLimit for details. PoolLimit int + // PoolTimeout defines max time to wait for a connection to become available + // if the pool limit is reaqched. Defaults to zero, which means forever. + // See Session.SetPoolTimeout for details + PoolTimeout time.Duration + // The identifier of the client application which ran the operation. AppName string @@ -596,6 +602,10 @@ func DialWithInfo(info *DialInfo) (*Session, error) { cluster.minPoolSize = info.MinPoolSize cluster.maxIdleTimeMS = info.MaxIdleTimeMS + if info.PoolTimeout > 0 { + session.poolTimeout = info.PoolTimeout + } + cluster.Release() // People get confused when we return a session that is not actually @@ -711,6 +721,7 @@ func copySession(session *Session, keepCreds bool) (s *Session) { syncTimeout: session.syncTimeout, sockTimeout: session.sockTimeout, poolLimit: session.poolLimit, + poolTimeout: session.poolTimeout, consistency: session.consistency, creds: creds, dialCred: session.dialCred, @@ -2051,6 +2062,16 @@ func (s *Session) SetPoolLimit(limit int) { s.m.Unlock() } +// SetPoolTimeout sets the maxinum time connection attempts will wait to reuse +// an existing connection from the pool if the PoolLimit has been reached. If +// the value is exceeded, the attempt to use a session will fail with an error. +// The default value is zero, which means to wait forever with no timeout. +func (s *Session) SetPoolTimeout(timeout time.Duration) { + s.m.Lock() + s.poolTimeout = timeout + s.m.Unlock() +} + // SetBypassValidation sets whether the server should bypass the registered // validation expressions executed when documents are inserted or modified, // in the interest of preserving invariants in the collection being modified. @@ -4908,7 +4929,9 @@ func (s *Session) acquireSocket(slaveOk bool) (*mongoSocket, error) { } // Still not good. We need a new socket. - sock, err := s.cluster().AcquireSocket(s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit) + sock, err := s.cluster().AcquireSocketWithPoolTimeout( + s.consistency, slaveOk && s.slaveOk, s.syncTimeout, s.sockTimeout, s.queryConfig.op.serverTags, s.poolLimit, s.poolTimeout, + ) if err != nil { return nil, err } diff --git a/stats.go b/stats.go index dcbd01045..8cf4ecec1 100644 --- a/stats.go +++ b/stats.go @@ -28,6 +28,7 @@ package mgo import ( "sync" + "time" ) var stats *Stats @@ -77,15 +78,19 @@ func ResetStats() { // // TODO outdated fields ? type Stats struct { - Clusters int - MasterConns int - SlaveConns int - SentOps int - ReceivedOps int - ReceivedDocs int - SocketsAlive int - SocketsInUse int - SocketRefs int + Clusters int + MasterConns int + SlaveConns int + SentOps int + ReceivedOps int + ReceivedDocs int + SocketsAlive int + SocketsInUse int + SocketRefs int + TimesSocketAcquired int + TimesWaitedForPool int + TotalPoolWaitTime time.Duration + PoolTimeouts int } func (stats *Stats) cluster(delta int) { @@ -155,3 +160,25 @@ func (stats *Stats) socketRefs(delta int) { statsMutex.Unlock() } } + +func (stats *Stats) noticeSocketAcquisition(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesSocketAcquired++ + stats.TotalPoolWaitTime += waitTime + if waitTime > 0 { + stats.TimesWaitedForPool++ + } + statsMutex.Unlock() + } +} + +func (stats *Stats) noticePoolTimeout(waitTime time.Duration) { + if stats != nil { + statsMutex.Lock() + stats.TimesWaitedForPool++ + stats.PoolTimeouts++ + stats.TotalPoolWaitTime += waitTime + statsMutex.Unlock() + } +} From 69bef6a66da4f402c7ffa2221ec4eb5a4033e8bc Mon Sep 17 00:00:00 2001 From: "Mc.Spring" Date: Tue, 3 Apr 2018 18:10:53 +0800 Subject: [PATCH 13/24] What: add spec for array ops with []uint8 or []byte type (#128) Why: * mongodb aquires array for $in/$nin/$all ops How: * encode with array for spec --- bson/bson_test.go | 27 ++++++++++++++++++++++++++- bson/encode.go | 41 ++++++++++++++++++++++++++++++----------- 2 files changed, 56 insertions(+), 12 deletions(-) diff --git a/bson/bson_test.go b/bson/bson_test.go index db72d8a06..ebb8b96bd 100644 --- a/bson/bson_test.go +++ b/bson/bson_test.go @@ -34,9 +34,9 @@ import ( "errors" "net/url" "reflect" + "strings" "testing" "time" - "strings" "github.com/globalsign/mgo/bson" . "gopkg.in/check.v1" @@ -111,6 +111,10 @@ var sampleItems = []testItemType{ {bson.M{"BSON": []interface{}{"awesome", float64(5.05), 1986}}, "1\x00\x00\x00\x04BSON\x00&\x00\x00\x00\x020\x00\x08\x00\x00\x00" + "awesome\x00\x011\x00333333\x14@\x102\x00\xc2\x07\x00\x00\x00\x00"}, + {bson.M{"slice": []uint8{1, 2}}, + "\x13\x00\x00\x00\x05slice\x00\x02\x00\x00\x00\x00\x01\x02\x00"}, + {bson.M{"slice": []byte{1, 2}}, + "\x13\x00\x00\x00\x05slice\x00\x02\x00\x00\x00\x00\x01\x02\x00"}, } func (s *S) TestMarshalSampleItems(c *C) { @@ -343,6 +347,27 @@ func (s *S) TestOneWayMarshalItems(c *C) { } } +// -------------------------------------------------------------------------- +// Some ops marshaling operations which would encode []uint8 or []byte in array. + +var arrayOpsMarshalItems = []testItemType{ + {bson.M{"_": bson.M{"$in": []uint8{1, 2}}}, + "\x03_\x00\x1d\x00\x00\x00\x04$in\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, + {bson.M{"_": bson.M{"$nin": []uint8{1, 2}}}, + "\x03_\x00\x1e\x00\x00\x00\x04$nin\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, + {bson.M{"_": bson.M{"$all": []uint8{1, 2}}}, + "\x03_\x00\x1e\x00\x00\x00\x04$all\x00\x13\x00\x00\x00\x100\x00\x01\x00\x00\x00\x101\x00\x02\x00\x00\x00\x00\x00"}, +} + +func (s *S) TestArrayOpsMarshalItems(c *C) { + for i, item := range arrayOpsMarshalItems { + data, err := bson.Marshal(item.obj) + c.Assert(err, IsNil) + c.Assert(string(data), Equals, wrapInDoc(item.data), + Commentf("Failed on item %d", i)) + } +} + // -------------------------------------------------------------------------- // Two-way tests for user-defined structures using the samples // from bsonspec.org. diff --git a/bson/encode.go b/bson/encode.go index f307c31ec..5d5f452b0 100644 --- a/bson/encode.go +++ b/bson/encode.go @@ -60,6 +60,15 @@ var ( typeTimeDuration = reflect.TypeOf(time.Duration(0)) ) +var ( + // spec for []uint8 or []byte encoding + arrayOps = map[string]bool{ + "$in": true, + "$nin": true, + "$all": true, + } +) + const itoaCacheSize = 32 const ( @@ -423,8 +432,13 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) { vt := v.Type() et := vt.Elem() if et.Kind() == reflect.Uint8 { - e.addElemName(0x05, name) - e.addBinary(0x00, v.Bytes()) + if arrayOps[name] { + e.addElemName(0x04, name) + e.addDoc(v) + } else { + e.addElemName(0x05, name) + e.addBinary(0x00, v.Bytes()) + } } else if et == typeDocElem || et == typeRawDocElem { e.addElemName(0x03, name) e.addDoc(v) @@ -436,16 +450,21 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) { case reflect.Array: et := v.Type().Elem() if et.Kind() == reflect.Uint8 { - e.addElemName(0x05, name) - if v.CanAddr() { - e.addBinary(0x00, v.Slice(0, v.Len()).Interface().([]byte)) + if arrayOps[name] { + e.addElemName(0x04, name) + e.addDoc(v) } else { - n := v.Len() - e.addInt32(int32(n)) - e.addBytes(0x00) - for i := 0; i < n; i++ { - el := v.Index(i) - e.addBytes(byte(el.Uint())) + e.addElemName(0x05, name) + if v.CanAddr() { + e.addBinary(0x00, v.Slice(0, v.Len()).Interface().([]byte)) + } else { + n := v.Len() + e.addInt32(int32(n)) + e.addBytes(0x00) + for i := 0; i < n; i++ { + el := v.Index(i) + e.addBytes(byte(el.Uint())) + } } } } else { From b5611a5ea333c021ce0b9aaa62497de2bd11ac77 Mon Sep 17 00:00:00 2001 From: Max Noel Date: Fri, 6 Apr 2018 05:35:46 -0400 Subject: [PATCH 14/24] bson: Added Encoder and Decoder types for stream encoding/decoding. (#127) * bson: Added Encoder and Decoder types for stream encoding/decoding. Those types are analog to those found in json and yaml. They allow us to operate on io.Readers/io.Writers instead of raw byte slices. Streams are expected to be sequences of concatenated BSON documents: *.bson files from MongoDB dumps, for example. * Stream: NewEncoder and NewDecoder now return pointers. JSON and YAML do that too, so let's be consistent. * Stream decoder: added checks on document size limits, and panic handler. Strangely, the BSON spec defines the document size header as a signed int32, but: - No document can be smaller than 5 bytes (size header + null terminator). - MongoDB constrains BSON documents to 16 MiB at most. Therefore, documents whose header doesn't obey those limits are discarded and Decode returns ErrInvalidDocumentSize. In addition, we're reusing the handleErr panic handler in Decode to protect from unwanted panics in Unmarshal. * Exported MinDocumentSize and MaxDocumentSize consts. --- bson/stream.go | 90 +++++++++++++++++++++++++++++++++++++++++++++ bson/stream_test.go | 77 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 bson/stream.go create mode 100644 bson/stream_test.go diff --git a/bson/stream.go b/bson/stream.go new file mode 100644 index 000000000..466528457 --- /dev/null +++ b/bson/stream.go @@ -0,0 +1,90 @@ +package bson + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" +) + +const ( + // MinDocumentSize is the size of the smallest possible valid BSON document: + // an int32 size header + 0x00 (end of document). + MinDocumentSize = 5 + + // MaxDocumentSize is the largest possible size for a BSON document allowed by MongoDB, + // that is, 16 MiB (see https://docs.mongodb.com/manual/reference/limits/). + MaxDocumentSize = 16777216 +) + +// ErrInvalidDocumentSize is an error returned when a BSON document's header +// contains a size smaller than MinDocumentSize or greater than MaxDocumentSize. +type ErrInvalidDocumentSize struct { + DocumentSize int32 +} + +func (e ErrInvalidDocumentSize) Error() string { + return fmt.Sprintf("invalid document size %d", e.DocumentSize) +} + +// A Decoder reads and decodes BSON values from an input stream. +type Decoder struct { + source io.Reader +} + +// NewDecoder returns a new Decoder that reads from source. +// It does not add any extra buffering, and may not read data from source beyond the BSON values requested. +func NewDecoder(source io.Reader) *Decoder { + return &Decoder{source: source} +} + +// Decode reads the next BSON-encoded value from its input and stores it in the value pointed to by v. +// See the documentation for Unmarshal for details about the conversion of BSON into a Go value. +func (dec *Decoder) Decode(v interface{}) (err error) { + // BSON documents start with their size as a *signed* int32. + var docSize int32 + if err = binary.Read(dec.source, binary.LittleEndian, &docSize); err != nil { + return + } + + if docSize < MinDocumentSize || docSize > MaxDocumentSize { + return ErrInvalidDocumentSize{DocumentSize: docSize} + } + + docBuffer := bytes.NewBuffer(make([]byte, 0, docSize)) + if err = binary.Write(docBuffer, binary.LittleEndian, docSize); err != nil { + return + } + + // docSize is the *full* document's size (including the 4-byte size header, + // which has already been read). + if _, err = io.CopyN(docBuffer, dec.source, int64(docSize-4)); err != nil { + return + } + + // Let Unmarshal handle the rest. + defer handleErr(&err) + return Unmarshal(docBuffer.Bytes(), v) +} + +// An Encoder encodes and writes BSON values to an output stream. +type Encoder struct { + target io.Writer +} + +// NewEncoder returns a new Encoder that writes to target. +func NewEncoder(target io.Writer) *Encoder { + return &Encoder{target: target} +} + +// Encode encodes v to BSON, and if successful writes it to the Encoder's output stream. +// See the documentation for Marshal for details about the conversion of Go values to BSON. +func (enc *Encoder) Encode(v interface{}) error { + data, err := Marshal(v) + if err != nil { + return err + } + + _, err = enc.target.Write(data) + return err +} diff --git a/bson/stream_test.go b/bson/stream_test.go new file mode 100644 index 000000000..14acbe3c5 --- /dev/null +++ b/bson/stream_test.go @@ -0,0 +1,77 @@ +package bson_test + +import ( + "bytes" + + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +var invalidSizeDocuments = [][]byte{ + // Empty document + []byte{}, + // Incomplete header + []byte{0x04}, + // Negative size + []byte{0xff, 0xff, 0xff, 0xff}, + // Full, valid size header but too small (less than 5 bytes) + []byte{0x04, 0x00, 0x00, 0x00}, + // Valid header, valid size but incomplete document + []byte{0xff, 0x00, 0x00, 0x00, 0x00}, + // Too big + []byte{0xff, 0xff, 0xff, 0x7f}, +} + +// Reusing sampleItems from bson_test + +func (s *S) TestEncodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + c.Assert(string(buf.Bytes()), Equals, item.data, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeSampleItems(c *C) { + for i, item := range sampleItems { + buf := bytes.NewBuffer([]byte(item.data)) + dec := bson.NewDecoder(buf) + + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestStreamRoundTrip(c *C) { + buf := bytes.NewBuffer(nil) + enc := bson.NewEncoder(buf) + + for _, item := range sampleItems { + err := enc.Encode(item.obj) + c.Assert(err, IsNil) + } + + // Ensure that everything that was encoded is decodable in the same order. + dec := bson.NewDecoder(buf) + for i, item := range sampleItems { + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, IsNil) + c.Assert(value, DeepEquals, item.obj, Commentf("Failed on item %d", i)) + } +} + +func (s *S) TestDecodeDocumentTooSmall(c *C) { + for i, item := range invalidSizeDocuments { + buf := bytes.NewBuffer(item) + dec := bson.NewDecoder(buf) + value := bson.M{} + err := dec.Decode(&value) + c.Assert(err, NotNil, Commentf("Failed on invalid size item %d", i)) + } +} From e6a7e81fdc642d7f3b7986f06e327c2564193781 Mon Sep 17 00:00:00 2001 From: Carl Dunham Date: Tue, 10 Apr 2018 02:12:59 -0700 Subject: [PATCH 15/24] Minor documentation updates (#141) * docs: updated readme and docs * docs: added minimal readme for bson package --- README.md | 19 ++++++++++++++----- bson/README.md | 9 +++++++++ doc.go | 12 ++++++++---- 3 files changed, 31 insertions(+), 9 deletions(-) create mode 100644 bson/README.md diff --git a/README.md b/README.md index cd5edff49..2990a3cdb 100644 --- a/README.md +++ b/README.md @@ -3,10 +3,19 @@ The MongoDB driver for Go ------------------------- -This fork has had a few improvements by ourselves as well as several PR's merged from the original mgo repo that are currently awaiting review. Changes are mostly geared towards performance improvements and bug fixes, though a few new features have been added. +This is the "official", maintained fork of `mgo` (pronounced as "mango"). +We are grateful for the great work that @niemeyer and the mgo contributors have made over the years. + +This fork has had a few improvements by ourselves as well as several PR's merged from the original mgo repo that are currently awaiting review. +Changes are mostly geared towards performance improvements and bug fixes, though a few new features have been added. Further PR's (with tests) are welcome, but please maintain backwards compatibility. +Detailed documentation of the API is available at +[GoDoc](https://godoc.org/github.com/globalsign/mgo). + +A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implements [BSON](http://bsonspec.org) is also included, and may be used independently of the driver. + ## Changes * Fixes attempting to authenticate before every query ([details](https://github.com/go-mgo/mgo/issues/254)) * Removes bulk update / delete batch size limitations ([details](https://github.com/go-mgo/mgo/issues/288)) @@ -15,13 +24,13 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Support majority read concerns ([details](https://github.com/globalsign/mgo/pull/2)) * Improved connection handling ([details](https://github.com/globalsign/mgo/pull/5)) * Hides SASL warnings ([details](https://github.com/globalsign/mgo/pull/7)) -* Support for partial indexes ([detials](https://github.com/domodwyer/mgo/commit/5efe8eccb028238d93c222828cae4806aeae9f51)) -* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) +* Support for partial indexes ([details](https://github.com/domodwyer/mgo/commit/5efe8eccb028238d93c222828cae4806aeae9f51)) +* Fixes timezone handling ([details](https://github.com/go-mgo/mgo/pull/464)) * Integration tests run against MongoDB 3.2 & 3.4 releases ([details](https://github.com/globalsign/mgo/pull/4), [more](https://github.com/globalsign/mgo/pull/24), [more](https://github.com/globalsign/mgo/pull/35)) * Improved multi-document transaction performance ([details](https://github.com/globalsign/mgo/pull/10), [more](https://github.com/globalsign/mgo/pull/11), [more](https://github.com/globalsign/mgo/pull/16)) * Fixes cursor timeouts ([details](https://jira.mongodb.org/browse/SERVER-24899)) * Support index hints and timeouts for count queries ([details](https://github.com/globalsign/mgo/pull/17)) -* Don't panic when handling indexed `int64` fields ([detials](https://github.com/go-mgo/mgo/issues/475)) +* Don't panic when handling indexed `int64` fields ([details](https://github.com/go-mgo/mgo/issues/475)) * Supports dropping all indexes on a collection ([details](https://github.com/globalsign/mgo/pull/25)) * Annotates log entries/profiler output with optional appName on 3.4+ ([details](https://github.com/globalsign/mgo/pull/28)) * Support for read-only [views](https://docs.mongodb.com/manual/core/views/) in 3.4+ ([details](https://github.com/globalsign/mgo/pull/33)) @@ -57,4 +66,4 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @Reenjii * @smoya * @steve-gray -* @wgallagher \ No newline at end of file +* @wgallagher diff --git a/bson/README.md b/bson/README.md new file mode 100644 index 000000000..ded89869b --- /dev/null +++ b/bson/README.md @@ -0,0 +1,9 @@ +[![GoDoc](https://godoc.org/github.com/globalsign/mgo/bson?status.svg)](https://godoc.org/github.com/globalsign/mgo/bson) + +An Implementation of BSON for Go +-------------------------------- + +Package bson is an implementation of the [BSON specification](http://bsonspec.org) for Go. + +It was created as part of the mgo MongoDB driver for Go, but is standalone +and may be used on its own without the driver. diff --git a/doc.go b/doc.go index 859fd9b8d..f3f373bf4 100644 --- a/doc.go +++ b/doc.go @@ -1,9 +1,8 @@ -// Package mgo offers a rich MongoDB driver for Go. +// Package mgo (pronounced as "mango") offers a rich MongoDB driver for Go. // -// Details about the mgo project (pronounced as "mango") are found -// in its web page: +// Detailed documentation of the API is available at GoDoc: // -// http://labix.org/mgo +// https://godoc.org/github.com/globalsign/mgo // // Usage of the driver revolves around the concept of sessions. To // get started, obtain a session using the Dial function: @@ -26,6 +25,11 @@ // of its life time, so its resources may be put back in the pool or // collected, depending on the case. // +// There is a sub-package that provides support for BSON, which can be +// used by itself as well: +// +// https://godoc.org/github.com/globalsign/mgo/bson +// // For more details, see the documentation for the types and methods. // package mgo From 32b18f7452258a572cc013c7b0a4b247ab8423c7 Mon Sep 17 00:00:00 2001 From: Grigory Date: Thu, 12 Apr 2018 19:34:38 +0300 Subject: [PATCH 16/24] Support decimal number type keys in maps (#140) * socket: only send client metadata once per socket (#105) Periodic cluster synchronisation calls isMaster() which currently resends the "client" metadata every call - the spec specifies: isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake This hotfix prevents subsequent isMaster calls from sending the client metadata again - fixes #101 and fixes #103. Thanks to @changwoo-nam @qhenkart @canthefason @jyoon17 for spotting the initial issue, opening tickets, and having the problem debugged with a PoC fix before I even woke up. * Merge Development (#111) * Brings in a patch on having flusher not suppress errors. (#81) https://github.com/go-mgo/mgo/pull/360 * Fallback to JSON tags when BSON tag isn't present (#91) * Fallback to JSON tags when BSON tag isn't present Cleanup. * Add test to demonstrate tagging fallback. - Test coverage for tagging test. * socket: only send client metadata once per socket Periodic cluster synchronisation calls isMaster() which currently resends the "client" metadata every call - the spec specifies: isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake This hotfix prevents subsequent isMaster calls from sending the client metadata again - fixes #101 and fixes #103. Thanks to @changwoo-nam @qhenkart @canthefason @jyoon17 for spotting the initial issue, opening tickets, and having the problem debugged with a PoC fix before I even woke up. * Cluster abended test 254 (#100) * Add a test that mongo Server gets their abended reset as necessary. See https://github.com/go-mgo/mgo/issues/254 and https://github.com/go-mgo/mgo/pull/255/files * Include the patch from Issue 255. This brings in a test which fails without the patch, and passes with the patch. Still to be tested, manual tcpkill of a socket. * changeStream support (#97) Add $changeStream support * readme: credit @peterdeka and @steve-gray (#110) * Hotfix #120 (#136) * cluster: fix deadlock in cluster synchronisation (#120) For a impressively thorough breakdown of the problem, see: https://github.com/globalsign/mgo/issues/120#issuecomment-371699575 Huge thanks to @dvic and @KJTsanaktsidis for the report and fix. * readme: credit @dvic and @KJTsanaktsidis * added support for marshalling/unmarshalling maps with non-string keys * refactor method receiver --- README.md | 3 +++ bson/bson_test.go | 34 ++++++++++++++++++++------------ bson/decode.go | 50 +++++++++++++++++++++++++++++++++++++++++++---- bson/encode.go | 2 +- 4 files changed, 72 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index 2990a3cdb..72717c808 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,7 @@ A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implement * Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69)) * Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) * Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) +* Fix deadlock in cluster synchronisation ([details](https://github.com/globalsign/mgo/issues/120)) --- @@ -55,11 +56,13 @@ A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implement * @carter2000 * @cezarsa * @drichelson +* @dvic * @eaglerayp * @feliixx * @fmpwizard * @idy * @jameinel +* @KJTsanaktsidis * @gazoon * @mapete94 * @peterdeka diff --git a/bson/bson_test.go b/bson/bson_test.go index ebb8b96bd..406ede6ae 100644 --- a/bson/bson_test.go +++ b/bson/bson_test.go @@ -607,6 +607,8 @@ func (s *S) TestMarshalOneWayItems(c *C) { // -------------------------------------------------------------------------- // One-way unmarshaling tests. +type intAlias int + var unmarshalItems = []testItemType{ // Field is private. Should not attempt to unmarshal it. {&struct{ priv byte }{}, @@ -661,6 +663,14 @@ var unmarshalItems = []testItemType{ // Decode a doc within a doc in to a slice within a doc; shouldn't error {&struct{ Foo []string }{}, "\x03\x66\x6f\x6f\x00\x05\x00\x00\x00\x00"}, + + // int key maps + {map[int]string{10: "s"}, + "\x0210\x00\x02\x00\x00\x00s\x00"}, + + //// event if type is alias to int + {map[intAlias]string{10: "s"}, + "\x0210\x00\x02\x00\x00\x00s\x00"}, } func (s *S) TestUnmarshalOneWayItems(c *C) { @@ -738,11 +748,6 @@ var unmarshalErrorItems = []unmarshalErrorType{ "\x10name\x00\x08\x00\x00\x00", "Duplicated key 'name' in struct bson_test.structWithDupKeys"}, - // Non-string map key. - {map[int]interface{}{}, - "\x10name\x00\x08\x00\x00\x00", - "BSON map must have string keys. Got: map\\[int\\]interface \\{\\}"}, - {nil, "\xEEname\x00", "Unknown element kind \\(0xEE\\)"}, @@ -758,6 +763,11 @@ var unmarshalErrorItems = []unmarshalErrorType{ {nil, "\x08\x62\x00\x02", "encoded boolean must be 1 or 0, found 2"}, + + // Non-string and not numeric map key. + {map[bool]interface{}{true: 1}, + "\x10true\x00\x01\x00\x00\x00", + "BSON map must have string or decimal keys. Got: map\\[bool\\]interface \\{\\}"}, } func (s *S) TestUnmarshalErrorItems(c *C) { @@ -1161,8 +1171,8 @@ type inlineBadKeyMap struct { M map[int]int `bson:",inline"` } type inlineUnexported struct { - M map[string]interface{} `bson:",inline"` - unexported `bson:",inline"` + M map[string]interface{} `bson:",inline"` + unexported `bson:",inline"` } type unexported struct { A int @@ -1219,11 +1229,11 @@ func (s ifaceSlice) GetBSON() (interface{}, error) { type ( MyString string - MyBytes []byte - MyBool bool - MyD []bson.DocElem - MyRawD []bson.RawDocElem - MyM map[string]interface{} + MyBytes []byte + MyBool bool + MyD []bson.DocElem + MyRawD []bson.RawDocElem + MyM map[string]interface{} ) var ( diff --git a/bson/decode.go b/bson/decode.go index e71eac23f..658856add 100644 --- a/bson/decode.go +++ b/bson/decode.go @@ -176,9 +176,6 @@ func (d *decoder) readDocTo(out reflect.Value) { switch outk { case reflect.Map: keyType = outt.Key() - if keyType.Kind() != reflect.String { - panic("BSON map must have string keys. Got: " + outt.String()) - } if keyType != typeString { convertKey = true } @@ -240,7 +237,42 @@ func (d *decoder) readDocTo(out reflect.Value) { if d.readElemTo(e, kind) { k := reflect.ValueOf(name) if convertKey { - k = k.Convert(keyType) + mapKeyType := out.Type().Key() + mapKeyKind := mapKeyType.Kind() + + switch mapKeyKind { + case reflect.Int: + fallthrough + case reflect.Int8: + fallthrough + case reflect.Int16: + fallthrough + case reflect.Int32: + fallthrough + case reflect.Int64: + fallthrough + case reflect.Uint: + fallthrough + case reflect.Uint8: + fallthrough + case reflect.Uint16: + fallthrough + case reflect.Uint32: + fallthrough + case reflect.Uint64: + fallthrough + case reflect.Float32: + fallthrough + case reflect.Float64: + parsed := d.parseMapKeyAsFloat(k, mapKeyKind) + k = reflect.ValueOf(parsed) + case reflect.String: + mapKeyType = keyType + default: + panic("BSON map must have string or decimal keys. Got: " + outt.String()) + } + + k = k.Convert(mapKeyType) } out.SetMapIndex(k, e) } @@ -276,6 +308,16 @@ func (d *decoder) readDocTo(out reflect.Value) { d.docType = docType } +func (decoder) parseMapKeyAsFloat(k reflect.Value, mapKeyKind reflect.Kind) float64 { + parsed, err := strconv.ParseFloat(k.String(), 64) + if err != nil { + panic("Map key is defined to be a decimal type (" + mapKeyKind.String() + ") but got error " + + err.Error()) + } + + return parsed +} + func (d *decoder) readArrayDocTo(out reflect.Value) { end := int(d.readInt32()) end += d.i - 4 diff --git a/bson/encode.go b/bson/encode.go index 5d5f452b0..7e0b84d77 100644 --- a/bson/encode.go +++ b/bson/encode.go @@ -203,7 +203,7 @@ func (e *encoder) addDoc(v reflect.Value) { func (e *encoder) addMap(v reflect.Value) { for _, k := range v.MapKeys() { - e.addElem(k.String(), v.MapIndex(k), false) + e.addElem(fmt.Sprint(k), v.MapIndex(k), false) } } From caed40134f6ace5eace025c8f512dcb16bd5729d Mon Sep 17 00:00:00 2001 From: Dom Date: Thu, 19 Apr 2018 11:49:51 +0100 Subject: [PATCH 17/24] socket: amortise cost of querying OS time counter (#149) #116 adds a much needed ability to shrink the connection pool, but requires tracking the last-used timestamp for each socket after every operation. Frequent calls to time.Now() in the hot-path reduced read throughput by ~6% and increased the latency (and variance) of socket operations as a whole. This PR adds a periodically updated time value to amortise the cost of the last- used bookkeeping, restoring the original throughput at the cost of approximate last-used values (configured to be ~25ms of potential skew). On some systems (currently including FreeBSD) querying the time counter also requires a syscall/context switch. Fixes #142. --- coarse_time.go | 62 +++++++++++++++++++++++++++++++++++++++++++++ coarse_time_test.go | 23 +++++++++++++++++ server.go | 14 +++++++++- 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 coarse_time.go create mode 100644 coarse_time_test.go diff --git a/coarse_time.go b/coarse_time.go new file mode 100644 index 000000000..e54dd17cf --- /dev/null +++ b/coarse_time.go @@ -0,0 +1,62 @@ +package mgo + +import ( + "sync" + "sync/atomic" + "time" +) + +// coarseTimeProvider provides a periodically updated (approximate) time value to +// amortise the cost of frequent calls to time.Now. +// +// A read throughput increase of ~6% was measured when using coarseTimeProvider with the +// high-precision event timer (HPET) on FreeBSD 11.1 and Go 1.10.1 after merging +// #116. +// +// Calling Now returns a time.Time that is updated at the configured interval, +// however due to scheduling the value may be marginally older than expected. +// +// coarseTimeProvider is safe for concurrent use. +type coarseTimeProvider struct { + once sync.Once + stop chan struct{} + last atomic.Value +} + +// Now returns the most recently acquired time.Time value. +func (t *coarseTimeProvider) Now() time.Time { + return t.last.Load().(time.Time) +} + +// Close stops the periodic update of t. +// +// Any subsequent calls to Now will return the same value forever. +func (t *coarseTimeProvider) Close() { + t.once.Do(func() { + close(t.stop) + }) +} + +// newcoarseTimeProvider returns a coarseTimeProvider configured to update at granularity. +func newcoarseTimeProvider(granularity time.Duration) *coarseTimeProvider { + t := &coarseTimeProvider{ + stop: make(chan struct{}), + } + + t.last.Store(time.Now()) + + go func() { + ticker := time.NewTicker(granularity) + for { + select { + case <-t.stop: + ticker.Stop() + return + case <-ticker.C: + t.last.Store(time.Now()) + } + } + }() + + return t +} diff --git a/coarse_time_test.go b/coarse_time_test.go new file mode 100644 index 000000000..5c98fc65d --- /dev/null +++ b/coarse_time_test.go @@ -0,0 +1,23 @@ +package mgo + +import ( + "testing" + "time" +) + +func TestCoarseTimeProvider(t *testing.T) { + t.Parallel() + + const granularity = 50 * time.Millisecond + + ct := newcoarseTimeProvider(granularity) + defer ct.Close() + + start := ct.Now().Unix() + time.Sleep(time.Second) + + got := ct.Now().Unix() + if got <= start { + t.Fatalf("got %d, expected at least %d", got, start) + } +} diff --git a/server.go b/server.go index 7832bec1b..f34624f74 100644 --- a/server.go +++ b/server.go @@ -36,6 +36,18 @@ import ( "github.com/globalsign/mgo/bson" ) +// coarseTime is used to amortise the cost of querying the timecounter (possibly +// incurring a syscall too) when setting a socket.lastTimeUsed value which +// happens frequently in the hot-path. +// +// The lastTimeUsed value may be skewed by at least 25ms (see +// coarseTimeProvider). +var coarseTime *coarseTimeProvider + +func init() { + coarseTime = newcoarseTimeProvider(25 * time.Millisecond) +} + // --------------------------------------------------------------------------- // Mongo server encapsulation. @@ -293,7 +305,7 @@ func (server *mongoServer) close(waitForIdle bool) { func (server *mongoServer) RecycleSocket(socket *mongoSocket) { server.Lock() if !server.closed { - socket.lastTimeUsed = time.Now() + socket.lastTimeUsed = coarseTime.Now() // A rough approximation of the current time - see courseTime server.unusedSockets = append(server.unusedSockets, socket) } // If anybody is waiting for a connection, they should try now. From 78a812e604b8dbd7c581784d511676763b25c3eb Mon Sep 17 00:00:00 2001 From: John Harrison Date: Thu, 19 Apr 2018 09:44:47 -0700 Subject: [PATCH 18/24] Implement collation option for Collection.Pipe (#144) --- session.go | 20 ++++++++++++++++++++ session_test.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/session.go b/session.go index a9f3a8cb2..167a0375d 100644 --- a/session.go +++ b/session.go @@ -2510,6 +2510,7 @@ type Pipe struct { allowDisk bool batchSize int maxTimeMS int64 + collation *Collation } type pipeCmd struct { @@ -2519,6 +2520,7 @@ type pipeCmd struct { Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` + Collation *Collation `bson:"collation,omitempty"` } type pipeCmdCursor struct { @@ -2539,6 +2541,7 @@ type pipeCmdCursor struct { // http://docs.mongodb.org/manual/applications/aggregation // http://docs.mongodb.org/manual/tutorial/aggregation-examples // + func (c *Collection) Pipe(pipeline interface{}) *Pipe { session := c.Database.Session session.m.RLock() @@ -2572,6 +2575,7 @@ func (p *Pipe) Iter() *Iter { Pipeline: p.pipeline, AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, + Collation: p.collation, } if p.maxTimeMS > 0 { cmd.MaxTimeMS = p.maxTimeMS @@ -2761,6 +2765,22 @@ func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { return p } +// Collation allows to specify language-specific rules for string comparison, +// such as rules for lettercase and accent marks. +// When specifying collation, the locale field is mandatory; all other collation +// fields are optional +// +// Relevant documentation: +// +// https://docs.mongodb.com/manual/reference/collation/ +// +func (p *Pipe) Collation(collation *Collation) *Pipe { + if collation != nil { + p.collation = collation + } + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: diff --git a/session_test.go b/session_test.go index c740af47a..14cb9b1a6 100644 --- a/session_test.go +++ b/session_test.go @@ -4617,6 +4617,36 @@ func (s *S) TestPipeExplain(c *C) { c.Assert(result.Ok, Equals, 1) } +func (s *S) TestPipeCollation(c *C) { + if !s.versionAtLeast(2, 1) { + c.Skip("Pipe only works on 2.1+") + } + if !s.versionAtLeast(3, 3, 12) { + c.Skip("collations being released with 3.4") + } + + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + beatles := []string{"John", "RINGO", "George", "Paul"} + for _, n := range beatles { + err := coll.Insert(M{"name": n}) + c.Assert(err, IsNil) + } + + collation := &mgo.Collation{ + Locale: "en", + Strength: 1, // ignore case/diacritics + } + var result []struct{ Name string } + err = coll.Pipe([]M{{"$match": M{"name": "ringo"}}}).Collation(collation).All(&result) + c.Assert(err, IsNil) + c.Assert(len(result), Equals, 1) + c.Assert(result[0].Name, Equals, "RINGO") +} + func (s *S) TestBatch1Bug(c *C) { session, err := mgo.Dial("localhost:40001") c.Assert(err, IsNil) From db12ee52971557d77e937dbab379358347e3a05c Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 12:46:25 +0100 Subject: [PATCH 19/24] examples: add SSL auth This was taken from @ReeseWang's PR and turned into an example - thanks! --- example_test.go | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/example_test.go b/example_test.go index bf7982a46..d176d5f5c 100644 --- a/example_test.go +++ b/example_test.go @@ -90,12 +90,12 @@ func ExampleCredential_x509Authentication() { func ExampleSession_concurrency() { // This example shows the best practise for concurrent use of a mgo session. - // + // // Internally mgo maintains a connection pool, dialling new connections as - // required. - // + // required. + // // Some general suggestions: - // - Define a struct holding the original session, database name and + // - Define a struct holding the original session, database name and // collection name instead of passing them explicitly. // - Define an interface abstracting your data access instead of exposing // mgo to your application code directly. @@ -107,7 +107,7 @@ func ExampleSession_concurrency() { // Copy the session - if needed this will dial a new connection which // can later be reused. - // + // // Calling close returns the connection to the pool. conn := session.Copy() defer conn.Close() @@ -133,4 +133,31 @@ func ExampleSession_concurrency() { wg.Wait() session.Close() -} \ No newline at end of file +} + +func ExampleDial_usingSSL() { + // To connect via TLS/SSL (enforced for MongoDB Atlas for example) requires + // configuring the dialer to use a TLS connection: + url := "mongodb://localhost:40003" + + tlsConfig := &tls.Config{ + // This can be configured to use a private root CA - see the Credential + // x509 Authentication example. + // + // Please don't set InsecureSkipVerify to true - it makes using TLS + // pointless and is never the right answer! + } + + dialInfo, err := ParseURL(url) + dialInfo.DialServer = func(addr *ServerAddr) (net.Conn, error) { + return tls.Dial("tcp", addr.String(), tlsConfig) + } + + session, err := DialWithInfo(dialInfo) + if err != nil { + panic(err) + } + + // Use session as normal + session.Close() +} From 925782f51f6aaedc0201f9311e0d80b1f42b39fb Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 12:53:46 +0100 Subject: [PATCH 20/24] build: use correct golint path --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 43cc9c2b6..335d527ec 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,7 +29,7 @@ install: - go get gopkg.in/check.v1 - go get gopkg.in/yaml.v2 - go get gopkg.in/tomb.v2 - - go get github.com/golang/lint/golint + - go get github.com/golang/lint before_script: - golint ./... | grep -v 'ID' | cat From 8bb407c9389e7d6715b32272be49eaa99c51d3d6 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 12:54:50 +0100 Subject: [PATCH 21/24] build: test against Go 1.10.x --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 335d527ec..78991a8c1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,8 +3,8 @@ language: go go_import_path: github.com/globalsign/mgo go: - - 1.8.x - 1.9.x + - 1.10.x env: global: From c63ce54714d7ceee5a50a23e9edaaa13b569cf69 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 13:07:27 +0100 Subject: [PATCH 22/24] bson: document mongo specific behaviour --- bson/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/bson/README.md b/bson/README.md index ded89869b..5c5819e61 100644 --- a/bson/README.md +++ b/bson/README.md @@ -5,5 +5,8 @@ An Implementation of BSON for Go Package bson is an implementation of the [BSON specification](http://bsonspec.org) for Go. -It was created as part of the mgo MongoDB driver for Go, but is standalone -and may be used on its own without the driver. +While the BSON package implements the BSON spec as faithfully as possible, there +is some MongoDB specific behaviour (such as map keys `$in`, `$all`, etc) in the +`bson` package. The priority is for backwards compatibility for the `mgo` +driver, though fixes for obviously buggy behaviour is welcome (and features, etc +behind feature flags). From 13b14859b6d04443a2eb871b61ec0af987d34753 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 13:14:22 +0100 Subject: [PATCH 23/24] readme: remove "official" claim - it's a community decision! --- README.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/README.md b/README.md index 72717c808..582924293 100644 --- a/README.md +++ b/README.md @@ -3,9 +3,6 @@ The MongoDB driver for Go ------------------------- -This is the "official", maintained fork of `mgo` (pronounced as "mango"). -We are grateful for the great work that @niemeyer and the mgo contributors have made over the years. - This fork has had a few improvements by ourselves as well as several PR's merged from the original mgo repo that are currently awaiting review. Changes are mostly geared towards performance improvements and bug fixes, though a few new features have been added. @@ -14,7 +11,7 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili Detailed documentation of the API is available at [GoDoc](https://godoc.org/github.com/globalsign/mgo). -A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implements [BSON](http://bsonspec.org) is also included, and may be used independently of the driver. +A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implements the [BSON](http://bsonspec.org) specification is also included, and may be used independently of the driver. ## Changes * Fixes attempting to authenticate before every query ([details](https://github.com/go-mgo/mgo/issues/254)) From ff16defa0d61b4fcb81ae7c32c61c3702eaaac42 Mon Sep 17 00:00:00 2001 From: Dom Dwyer Date: Mon, 23 Apr 2018 15:20:56 +0100 Subject: [PATCH 24/24] readme: credit everyone! * @aksentyev * @carldunham * @gnawux * @johnlawsharrison * @maxnoel * @mcspring --- README.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 582924293..6c87fa905 100644 --- a/README.md +++ b/README.md @@ -43,13 +43,21 @@ A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implement * Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) * Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) * Fix deadlock in cluster synchronisation ([details](https://github.com/globalsign/mgo/issues/120)) +* Implement `maxIdleTimeout` for pooled connections ([details](https://github.com/globalsign/mgo/pull/116)) +* Connection pool waiting improvements ([details](https://github.com/globalsign/mgo/pull/115)) +* Fixes BSON encoding for `$in` and friends ([details](https://github.com/globalsign/mgo/pull/128)) +* Add BSON stream encoders ([details](https://github.com/globalsign/mgo/pull/127)) +* Add integer map key support in the BSON encoder ([details](https://github.com/globalsign/mgo/pull/140)) +* Support aggregation [collations](https://docs.mongodb.com/manual/reference/collation/) ([details](https://github.com/globalsign/mgo/pull/144)) --- ### Thanks to +* @aksentyev * @bachue * @bozaro * @BenLubar +* @carldunham * @carter2000 * @cezarsa * @drichelson @@ -57,11 +65,15 @@ A [sub-package](https://godoc.org/github.com/globalsign/mgo/bson) that implement * @eaglerayp * @feliixx * @fmpwizard +* @gazoon +* @gnawux * @idy * @jameinel +* @johnlawsharrison * @KJTsanaktsidis -* @gazoon * @mapete94 +* @maxnoel +* @mcspring * @peterdeka * @Reenjii * @smoya