diff --git a/gtm.go b/gtm.go index 30a74fb..c2d18e7 100644 --- a/gtm.go +++ b/gtm.go @@ -113,6 +113,7 @@ type Op struct { Doc interface{} `json:"doc,omitempty"` UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"` ResumeToken OpResumeToken `json:"-"` + RawFullDoc interface{} `json:"-"` } type ReplStatus struct { @@ -144,6 +145,11 @@ type ChangeDoc struct { UpdateDescription map[string]interface{} "updateDescription" } +// extract fullDocument as bson.Raw +type fullDocOnly struct { + RawFullDoc bson.Raw "fullDocument" +} + func (cd *ChangeDoc) docId() interface{} { return cd.DocKey["_id"] } @@ -1325,6 +1331,19 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options ctx.ErrC <- errors.Wrap(err, "Error decoding change doc") break } + + // get raw full document + var fullDoc fullDocOnly + if err = stream.Decode(&fullDoc); err != nil { + ctx.ErrC <- errors.Wrap(err, "Error decoding change doc") + break + } + + if len(changeDoc.FullDoc) > 0 && len(fullDoc.RawFullDoc) <= 0 { + ctx.ErrC <- errors.New("extract raw fullDocument fail") + break + } + resumeAfter = changeDoc.Id startAt = nil startAfter = nil @@ -1371,6 +1390,12 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options Timestamp: changeDoc.mapTimestamp(), ResumeToken: token, } + + // output raw full document + if len(fullDoc.RawFullDoc) > 0 { + op.RawFullDoc = fullDoc.RawFullDoc + } + if op.matchesNsFilter(o) { if changeDoc.hasUpdate() { op.UpdateDescription = changeDoc.UpdateDescription