Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline that includes $project doesn't seem to work #737

Open
dhyeymistri opened this issue Nov 20, 2024 · 3 comments
Open

pipeline that includes $project doesn't seem to work #737

dhyeymistri opened this issue Nov 20, 2024 · 3 comments

Comments

@dhyeymistri
Copy link

Hi,
trying to set up the change-streams with pipeline that includes $project stage but the initial sync doesn't seem to work.

Versions:
monstache: 6.7.17
opensearch: 8.16.1
mongoDB: 5.0.30

The config.toml file:

#The following shows how to specify options in a TOML config file.
# connection settings
# connect to MongoDB using the following URL
mongo-url = "mongodb://localhost:27017/my-database"

# connect to the Elasticsearch REST API at the following node URLs
elasticsearch-urls = ["http://localhost:9200"]

# if you want to use MongoDB change streams instead of legacy oplog tailing use change-stream-namespaces
# change streams require at least MongoDB API 3.6+
# if you have MongoDB 4+ you can listen for changes to an entire database or entire deployment
# in this case you usually don't need regexes in your config to filter collections unless you target the deployment.
# to listen to an entire db use only the database name.  For a deployment use an empty string.
change-stream-namespaces = ["my-database.users"]
#verbose = true

#[logs]
#error = "/var/log/monstache/error.log"
#info = "/var/log/monstache/info.log"
#warn = "/var/log/monstache/warn.log"
#trace = "/var/log/monstache/trace.log"


[[mapping]]
namespace = "my-database.users"
index = "test-users"

#[[pipeline]]
#namespace = "my-database.users"
#path =  "/path/to/test.so"
#routing = true

Working test.go:

package main

import (
	"encoding/json"
	"fmt"

	"github.com/rwynn/monstache/v6/monstachemap"
	"go.mongodb.org/mongo-driver/bson"
)

func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
	fmt.Println("Map function")
	indentedJSON, _ := json.MarshalIndent(input, "", "   ")
	fmt.Println("Input Document: ", string(indentedJSON))
	transformedDoc := map[string]interface{}{}
	transformedDoc["doc_id"] = "TEST_ID"
	output = &monstachemap.MapperPluginOutput{Document: transformedDoc}
	return
}
func Pipeline(ns string, changeStream bool) (stages []interface{}, err error) {
	fmt.Println("Changestream: ", changeStream)
	fmt.Println("ns: ", ns)
	// pipeline := []bson.D{}
	if changeStream {
		return []interface{}{
			bson.D{
				{
					Key: "$match",
					Value: bson.D{
						{
							Key: "operationType",
							Value: bson.D{
								{
									Key:   "$in",
									Value: bson.A{"insert", "update", "delete"},
								},
							},
						},
					},
				},
			},
		}, nil
	}
	return []interface{}{
		bson.D{},
		bson.D{
			{
				Key: "$project",
				Value: bson.D{
					{
						Key:   "_id",
						Value: 1,
					},
				},
			},
		},
	}, nil
}

Failing test.go:

package main

import (
	"encoding/json"
	"fmt"

	"github.com/rwynn/monstache/v6/monstachemap"
	"go.mongodb.org/mongo-driver/bson"
)

func Map(input *monstachemap.MapperPluginInput) (output *monstachemap.MapperPluginOutput, err error) {
	fmt.Println("Map function")
	indentedJSON, _ := json.MarshalIndent(input, "", "   ")
	fmt.Println("Input Document: ", string(indentedJSON))
	transformedDoc := map[string]interface{}{}
	transformedDoc["doc_id"] = "TEST_ID"
	output = &monstachemap.MapperPluginOutput{Document: transformedDoc}
	return
}
func Pipeline(ns string, changeStream bool) (stages []interface{}, err error) {
	fmt.Println("Changestream: ", changeStream)
	fmt.Println("ns: ", ns)
	if changeStream {
		return []interface{}{
			bson.D{
				{
					Key: "$match",
					Value: bson.D{
						{
							Key: "operationType",
							Value: bson.D{
								{
									Key:   "$in",
									Value: bson.A{"insert", "update", "delete"},
								},
							},
						},
					},
				},
			},
			bson.D{
				{
					Key: "$project",
					Value: bson.D{
						{
							Key:   "fullDocument",
							Value: 1,
						},
					},
				},
			},
		}, nil
	}
	return []interface{}{
		bson.D{},
		bson.D{
			{
				Key: "$project",
				Value: bson.D{
					{
						Key:   "_id",
						Value: 1,
					},
				},
			},
		},
	}, nil
}

When running the working version all works as expected:

  • full import is performed initially
  • additional updates are propagated to ElasticSearch

When running the failing version the initial sync is not performed and no additional errors are produced by monstache:

INFO 2024/11/20 15:16:50 Started monstache version 6.7.17
INFO 2024/11/20 15:16:50 Go version go1.23.3
INFO 2024/11/20 15:16:50 MongoDB go driver v1.13.1
INFO 2024/11/20 15:16:50 Elasticsearch go driver 7.0.31
INFO 2024/11/20 15:16:50 Successfully connected to MongoDB version 5.0.30
INFO 2024/11/20 15:16:50 Successfully connected to Elasticsearch version 8.16.0
INFO 2024/11/20 15:16:50 Listening for events
INFO 2024/11/20 15:16:50 Watching changes on collection my-database.users
Changestream:  true
ns:  my-database.users

Expected behaviour is atleast the Map function is called, but there is no print logs in the terminal
Any advice what I'm doing wrong or did I find an actual issue?

@rwynn
Copy link
Owner

rwynn commented Nov 20, 2024

Hi, $project should work in the pipeline for documents read as a result of direct-read-namespaces, but would not work for documents coming from change streams in monstache.

The reason is that monstache uses change event fields like operationType, ns, etc. to determine how to handle the event. Additionally, in the case of a delete event the fullDocument field is omitted by MongoDB. Monstache uses the operationType to determine whether or not to even look at the fullDocument field.

Monstache use a library, gtm, to read the change stream and you can see below how only projecting the fullDocument field would cause gtm to be very confused.

https://github.com/rwynn/gtm/blob/master/gtm.go#L1342

@dhyeymistri
Copy link
Author

Ok, my requirement here is that I want to take only the changed fields from my mongoDB collection and update them in ElasticSearch. Is there a way to make it possible through monstache?

@rwynn
Copy link
Owner

rwynn commented Dec 7, 2024

I think what you would need to do for monstache is implement the Process function in your plugin and then patch Elasticsearch based on the UpdateDescription field.

UpdateDescription map[string]interface{} // map describing changes to the document

https://www.mongodb.com/docs/manual/reference/change-events/update/

For a newly created document in MongoDB the changed fields would probably be empty so you would just use the full document to update Elasticsearch. The changed fields would only apply for update operations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants