diff --git a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt index 3dab2ae..f07b300 100644 --- a/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt +++ b/mongo-migration-stream-core/src/main/kotlin/pl/allegro/tech/mongomigrationstream/core/synchronization/ChangeEvent.kt @@ -2,7 +2,6 @@ package pl.allegro.tech.mongomigrationstream.core.synchronization import com.mongodb.DBRefCodecProvider import com.mongodb.client.model.DeleteOneModel -import com.mongodb.client.model.Filters import com.mongodb.client.model.ReplaceOneModel import com.mongodb.client.model.ReplaceOptions import com.mongodb.client.model.UpdateOneModel @@ -42,7 +41,6 @@ internal sealed class ChangeEvent( null } - protected fun idFilter() = Filters.eq("_id", documentKey.getValue("_id")) protected abstract fun toWriteModelImpl(): WriteModel companion object { @@ -89,7 +87,7 @@ internal data class InsertReplaceChangeEvent( } override fun toWriteModelImpl(): WriteModel = ReplaceOneModel( - idFilter(), + documentKey, document!!, ReplaceOptions().upsert(true) ) @@ -107,7 +105,7 @@ internal data class DeleteChangeEvent( ) } - override fun toWriteModelImpl(): WriteModel = DeleteOneModel(idFilter()) + override fun toWriteModelImpl(): WriteModel = DeleteOneModel(documentKey) } internal data class UpdateChangeEvent( @@ -132,7 +130,7 @@ internal data class UpdateChangeEvent( } override fun toWriteModelImpl(): WriteModel = UpdateOneModel( - idFilter(), + documentKey, Updates.combine( *updatedFields.entries.map { Updates.set(it.key, it.value) }.toTypedArray(), *removedFields.map { Updates.unset(it) }.toTypedArray()