From 54777687602faa8991364c42466c6c0b0e0c3084 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Mon, 9 Oct 2023 13:56:07 -0500 Subject: [PATCH 01/65] first test --- source/batch-mode.txt | 12 ++++++++++++ .../read.txt => batch-mode/batch-read-config.txt} | 2 -- .../batch-read.txt} | 5 ++++- .../write.txt => batch-mode/batch-write-config.txt} | 0 source/batch-mode/batch-write.txt | 0 5 files changed, 16 insertions(+), 3 deletions(-) create mode 100644 source/batch-mode.txt rename source/{configuration/read.txt => batch-mode/batch-read-config.txt} (99%) rename source/{read-from-mongodb.txt => batch-mode/batch-read.txt} (94%) rename source/{configuration/write.txt => batch-mode/batch-write-config.txt} (100%) create mode 100644 source/batch-mode/batch-write.txt diff --git a/source/batch-mode.txt b/source/batch-mode.txt new file mode 100644 index 00000000..83443fbd --- /dev/null +++ b/source/batch-mode.txt @@ -0,0 +1,12 @@ +========== +Batch Mode +========== + +.. meta:: + :description: Use the {+connector-long+} to read and write batches of data. + +.. toctree:: + :caption: Batch Mode + + /batch-mode/batch-read + /batch-mode/batch-write diff --git a/source/configuration/read.txt b/source/batch-mode/batch-read-config.txt similarity index 99% rename from source/configuration/read.txt rename to source/batch-mode/batch-read-config.txt index 649be7d0..b35ad9f7 100644 --- a/source/configuration/read.txt +++ b/source/batch-mode/batch-read-config.txt @@ -4,8 +4,6 @@ Read Configuration Options ========================== -.. default-domain:: mongodb - .. contents:: On this page :local: :backlinks: none diff --git a/source/read-from-mongodb.txt b/source/batch-mode/batch-read.txt similarity index 94% rename from source/read-from-mongodb.txt rename to source/batch-mode/batch-read.txt index 4fdca2db..4539834d 100644 --- a/source/read-from-mongodb.txt +++ b/source/batch-mode/batch-read.txt @@ -7,7 +7,10 @@ Read from MongoDB ================= -.. default-domain:: mongodb +.. toctree:: + :caption: Read Configuration Options + + /./batch-read-config.txt .. contents:: On this page :local: diff --git a/source/configuration/write.txt b/source/batch-mode/batch-write-config.txt similarity index 100% rename from source/configuration/write.txt rename to source/batch-mode/batch-write-config.txt diff --git a/source/batch-mode/batch-write.txt b/source/batch-mode/batch-write.txt new file mode 100644 index 00000000..e69de29b From 83ebbd1ab951a876d135628977d75ae3e83e6357 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Mon, 9 Oct 2023 14:38:40 -0500 Subject: [PATCH 02/65] fix --- source/index.txt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/index.txt b/source/index.txt index 35c24014..5ef12bee 100644 --- a/source/index.txt +++ b/source/index.txt @@ -47,10 +47,8 @@ versions of Apache Spark and MongoDB: .. toctree:: :titlesonly: - configuration getting-started - write-to-mongodb - read-from-mongodb + batch-mode structured-streaming faq release-notes From 79cd1580743d613aa90929278221d62d796b43df Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Mon, 9 Oct 2023 14:44:44 -0500 Subject: [PATCH 03/65] fix --- source/configuration.txt | 6 ------ source/index.txt | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/source/configuration.txt b/source/configuration.txt index efe84e90..b4486ae4 100644 --- a/source/configuration.txt +++ b/source/configuration.txt @@ -156,9 +156,3 @@ properly: - :ref:`SparkConf ` - :ref:`Options maps ` - -.. toctree:: - :titlesonly: - - configuration/write - configuration/read diff --git a/source/index.txt b/source/index.txt index 5ef12bee..b70db4e2 100644 --- a/source/index.txt +++ b/source/index.txt @@ -48,7 +48,7 @@ versions of Apache Spark and MongoDB: :titlesonly: getting-started - batch-mode + /batch-mode structured-streaming faq release-notes From 7e1df0e181ac1ebab1457365ac5866c368f675b4 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Mon, 9 Oct 2023 14:54:38 -0500 Subject: [PATCH 04/65] test --- source/batch-mode/batch-read.txt | 2 +- source/batch-mode/batch-write.txt | 49 +++++++++++++++++++++++++++++++ source/write-to-mongodb.txt | 49 ------------------------------- 3 files changed, 50 insertions(+), 50 deletions(-) delete mode 100644 source/write-to-mongodb.txt diff --git a/source/batch-mode/batch-read.txt b/source/batch-mode/batch-read.txt index 4539834d..e08918cb 100644 --- a/source/batch-mode/batch-read.txt +++ b/source/batch-mode/batch-read.txt @@ -10,7 +10,7 @@ Read from MongoDB .. toctree:: :caption: Read Configuration Options - /./batch-read-config.txt + /batch-mode/batch-read-config .. contents:: On this page :local: diff --git a/source/batch-mode/batch-write.txt b/source/batch-mode/batch-write.txt index e69de29b..31f44197 100644 --- a/source/batch-mode/batch-write.txt +++ b/source/batch-mode/batch-write.txt @@ -0,0 +1,49 @@ +.. _write-to-mongodb: +.. _scala-write: +.. _java-write: + +================ +Write to MongoDB +================ + +.. default-domain:: mongodb + +.. tabs-selector:: drivers + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/write-to-mongodb.txt + + - id: python + content: | + + .. include:: /python/write-to-mongodb.txt + + - id: scala + content: | + + .. include:: /scala/write-to-mongodb.txt + +.. warning:: Setting the Write Mode to ``overwrite`` + + If you specify the ``overwrite`` write mode, the connector drops the target + collection and creates a new collection that uses the + default collection options. + This behavior can affect collections that don't use the default options, + such as the following collection types: + + - Sharded collections + - Collections with non-default collations + - Time-series collections + +.. important:: + + If your write operation includes a field with a ``null`` value, + the connector writes the field name and ``null`` value to MongoDB. You can + change this behavior by setting the write configuration property + ``ignoreNullValues``. For more information about setting the connector's + write behavior, see :ref:`Write Configuration Options `. diff --git a/source/write-to-mongodb.txt b/source/write-to-mongodb.txt deleted file mode 100644 index 31f44197..00000000 --- a/source/write-to-mongodb.txt +++ /dev/null @@ -1,49 +0,0 @@ -.. _write-to-mongodb: -.. _scala-write: -.. _java-write: - -================ -Write to MongoDB -================ - -.. default-domain:: mongodb - -.. tabs-selector:: drivers - -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | - - .. include:: /java/write-to-mongodb.txt - - - id: python - content: | - - .. include:: /python/write-to-mongodb.txt - - - id: scala - content: | - - .. include:: /scala/write-to-mongodb.txt - -.. warning:: Setting the Write Mode to ``overwrite`` - - If you specify the ``overwrite`` write mode, the connector drops the target - collection and creates a new collection that uses the - default collection options. - This behavior can affect collections that don't use the default options, - such as the following collection types: - - - Sharded collections - - Collections with non-default collations - - Time-series collections - -.. important:: - - If your write operation includes a field with a ``null`` value, - the connector writes the field name and ``null`` value to MongoDB. You can - change this behavior by setting the write configuration property - ``ignoreNullValues``. For more information about setting the connector's - write behavior, see :ref:`Write Configuration Options `. From b20a31abe209760b102f1c10214f44077e6d0e95 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Mon, 9 Oct 2023 15:04:45 -0500 Subject: [PATCH 05/65] add streaming --- source/batch-mode/batch-write.txt | 5 +- source/streaming-mode.txt | 12 + .../streaming-mode/streaming-read-config.txt | 421 ++++++++++++++++++ source/streaming-mode/streaming-read.txt | 79 ++++ .../streaming-mode/streaming-write-config.txt | 203 +++++++++ source/streaming-mode/streaming-write.txt | 52 +++ 6 files changed, 771 insertions(+), 1 deletion(-) create mode 100644 source/streaming-mode.txt create mode 100644 source/streaming-mode/streaming-read-config.txt create mode 100644 source/streaming-mode/streaming-read.txt create mode 100644 source/streaming-mode/streaming-write-config.txt create mode 100644 source/streaming-mode/streaming-write.txt diff --git a/source/batch-mode/batch-write.txt b/source/batch-mode/batch-write.txt index 31f44197..5b074001 100644 --- a/source/batch-mode/batch-write.txt +++ b/source/batch-mode/batch-write.txt @@ -6,7 +6,10 @@ Write to MongoDB ================ -.. default-domain:: mongodb +.. toctree:: + :caption: Read Configuration Options + + /batch-mode/batch-write-config .. tabs-selector:: drivers diff --git a/source/streaming-mode.txt b/source/streaming-mode.txt new file mode 100644 index 00000000..931cc197 --- /dev/null +++ b/source/streaming-mode.txt @@ -0,0 +1,12 @@ +============== +Streaming Mode +============== + +.. meta:: + :description: Use the {+connector-long+} to stream data. + +.. toctree:: + :caption: Streaming Mode + + /streaming-mode/streaming-read + /streaming-mode/streaming-write diff --git a/source/streaming-mode/streaming-read-config.txt b/source/streaming-mode/streaming-read-config.txt new file mode 100644 index 00000000..cf99d709 --- /dev/null +++ b/source/streaming-mode/streaming-read-config.txt @@ -0,0 +1,421 @@ +.. _spark-streaming-read-conf: + +========================== +Read Configuration Options +========================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. _spark-streaming-input-conf: + +Read Configuration +------------------ + +You can configure the following properties to read from MongoDB: + +.. note:: + + If you use ``SparkConf`` to set the connector's read configurations, + prefix ``spark.mongodb.read.`` to each property. + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the read operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation which must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``partitioner`` + - | The partitioner full class name. + + | You can specify a custom implementation which must implement the + ``com.mongodb.spark.sql.connector.read.partitioner.Partitioner`` + interface. + | See the + :ref:`Partitioner Configuration ` section for more + information on partitioners. + | + | **Default:** ``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner`` + + * - ``partitioner.options.`` + - | Partitioner configuration prefix. + | See the + :ref:`Partitioner Configuration ` section for more + information on partitioners. + + * - ``sampleSize`` + - | The number of documents to sample from the collection when inferring + | the schema. + | + | **Default:** ``1000`` + + * - ``sql.inferSchema.mapTypes.enabled`` + - | Whether to enable Map types when inferring the schema. + | When enabled, large compatible struct types are inferred to a + ``MapType`` instead. + | + | **Default:** ``true`` + + * - ``sql.inferSchema.mapTypes.minimum.key.size`` + - | Minimum size of a ``StructType`` before inferring as a ``MapType``. + | + | **Default:** ``250`` + + * - ``aggregation.pipeline`` + - | Specifies a custom aggregation pipeline to apply to the collection + before sending data to Spark. + | The value must be either an extended JSON single document or list + of documents. + | A single document should resemble the following: + + .. code-block:: json + + {"$match": {"closed": false}} + + | A list of documents should resemble the following: + + .. code-block:: json + + [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] + + .. important:: + + Custom aggregation pipelines must be compatible with the + partitioner strategy. For example, aggregation stages such as + ``$group`` do not work with any partitioner that creates more than + one partition. + + * - ``aggregation.allowDiskUse`` + - | Specifies whether to allow storage to disk when running the + aggregation. + | + | **Default:** ``true`` + + * - ``outputExtendedJson`` + - | When ``true``, the connector converts BSON types not supported by Spark into + extended JSON strings. + When ``false``, the connector uses the original relaxed JSON format for + unsupported types. + | + | **Default:** ``false`` + +.. _partitioner-conf: + +Partitioner Configurations +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Partitioners change the read behavior for batch reads with the {+connector-short+}. +They do not affect Structured Streaming because the data stream processing +engine produces a single stream with Structured Streaming. + +This section contains configuration information for the following +partitioners: + +- :ref:`SamplePartitioner ` +- :ref:`ShardedPartitioner ` +- :ref:`PaginateBySizePartitioner ` +- :ref:`PaginateIntoPartitionsPartitioner ` +- :ref:`SinglePartitionPartitioner ` + +.. _conf-mongosamplepartitioner: +.. _conf-samplepartitioner: + +``SamplePartitioner`` Configuration +``````````````````````````````````` + +.. include:: /includes/sparkconf-partitioner-options-note.rst + +You must specify this partitioner using the full classname: +``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner``. + + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``partitioner.options.partition.field`` + - The field to use for partitioning, which must be a unique field. + + **Default:** ``_id`` + + * - ``partitioner.options.partition.size`` + - The size (in MB) for each partition. Smaller partition sizes + create more partitions containing fewer documents. + + **Default:** ``64`` + + + * - ``partitioner.options.samples.per.partition`` + - The number of samples to take per partition. The total number of + samples taken is: + + .. code-block:: none + + samples per partition * ( count / number of documents per partition) + + **Default:** ``10`` + +.. example:: + + For a collection with 640 documents with an average document + size of 0.5 MB, the default SamplePartitioner configuration values creates + 5 partitions with 128 documents per partition. + + The MongoDB Spark Connector samples 50 documents (the default 10 + per intended partition) and defines 5 partitions by selecting + partition field ranges from the sampled documents. + +.. _conf-mongoshardedpartitioner: +.. _conf-shardedpartitioner: + +``ShardedPartitioner`` Configuration +````````````````````````````````````` + +The ``ShardedPartitioner`` automatically determines the partitions to use +based on your shard configuration. + +You must specify this partitioner using the full classname: +``com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner``. + +.. warning:: + + This partitioner is not compatible with hashed shard keys. + + +.. _conf-mongopaginatebysizepartitioner: +.. _conf-paginatebysizepartitioner: + +``PaginateBySizePartitioner`` Configuration +``````````````````````````````````````````` + +.. include:: /includes/sparkconf-partitioner-options-note.rst + +You must specify this partitioner using the full classname: +``com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner``. + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``partitioner.options.partition.field`` + - The field to use for partitioning, which must be a unique field. + + **Default:** ``_id`` + + * - ``partitioner.options.partition.size`` + - The size (in MB) for each partition. Smaller partition sizes + + create more partitions containing fewer documents. + + **Default:** ``64`` + +.. _conf-paginateintopartitionspartitioner: + +``PaginateIntoPartitionsPartitioner`` Configuration +``````````````````````````````````````````````````` + +.. include:: /includes/sparkconf-partitioner-options-note.rst + +You must specify this partitioner using the full classname: +``com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner``. + + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``partitioner.options.partition.field`` + - The field to use for partitioning, which must be a unique field. + + **Default:** ``_id`` + + * - ``partitioner.options.maxNumberOfPartitions`` + - The number of partitions to create. + + **Default:** ``64`` + +.. _conf-singlepartitionpartitioner: + +``SinglePartitionPartitioner`` Configuration +```````````````````````````````````````````` + +.. include:: /includes/sparkconf-partitioner-options-note.rst + +You must specify this partitioner using the full classname: +``com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner``. + +This partitioner creates a single partition. + +.. _spark-change-stream-conf: + +Change Streams +-------------- + +.. note:: + + If you use ``SparkConf`` to set the connector's change stream + configurations, prefix ``spark.mongodb.`` to each property. + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``change.stream.lookup.full.document`` + + - Determines what values your change stream returns on update + operations. + + The default setting returns the differences between the original + document and the updated document. + + The ``updateLookup`` setting returns the differences between the + original document and updated document as well as a copy of the + entire updated document. + + **Default:** "default" + + .. tip:: + + For more information on how this change stream option works, + see the MongoDB server manual guide + :manual:`Lookup Full Document for Update Operation `. + + * - ``change.stream.micro.batch.max.partition.count`` + - | The maximum number of partitions the {+connector-short+} divides each + micro-batch into. Spark workers can process these partitions in parallel. + | + | This setting applies only when using micro-batch streams. + | + | **Default**: ``1`` + + .. warning:: Event Order + + Specifying a value larger than ``1`` can alter the order in which + the {+connector-short+} processes change events. Avoid this setting + if out-of-order processing could create data inconsistencies downstream. + + * - ``change.stream.publish.full.document.only`` + - | Specifies whether to publish the changed document or the full + change stream document. + | + | When this setting is ``true``, the connector exhibits the following behavior: + + - The connector filters out messages that + omit the ``fullDocument`` field and only publishes the value of the + field. + - If you don't specify a schema, the connector infers the schema + from the change stream document rather than from the underlying collection. + + **Default**: ``false`` + + .. note:: + + This setting overrides the ``change.stream.lookup.full.document`` + setting. + + * - ``change.stream.startup.mode`` + - | Specifies how the connector starts up when no offset is available. + + | This setting accepts the following values: + + - ``latest``: The connector begins processing + change events starting with the most recent event. + It will not process any earlier unprocessed events. + - ``timestamp``: The connector begins processing change events at a specified time. + + To use the ``timestamp`` option, you must specify a time by using the + ``change.stream.startup.mode.timestamp.start.at.operation.time`` setting. + This setting accepts timestamps in the following formats: + + - An integer representing the number of seconds since the + :wikipedia:`Unix epoch ` + - A date and time in + `ISO-8601 `__ + format with one-second precision + - An extended JSON ``BsonTimestamp`` + + **Default**: ``latest`` + +.. _configure-input-uri: + +``connection.uri`` Configuration Setting +---------------------------------------- + +You can set all :ref:`spark-input-conf` via the read ``connection.uri`` setting. + +For example, consider the following example which sets the read +``connection.uri`` setting: + +.. note:: + + If you use ``SparkConf`` to set the connector's read configurations, + prefix ``spark.mongodb.read.`` to the setting. + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred + + +The configuration corresponds to the following separate configuration +settings: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.read.database=databaseName + spark.mongodb.read.collection=collectionName + spark.mongodb.read.readPreference.name=primaryPreferred + +If you specify a setting both in the ``connection.uri`` and in a separate +configuration, the ``connection.uri`` setting overrides the separate +setting. For example, given the following configuration, the +database for the connection is ``foobar``: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.read.database=bar diff --git a/source/streaming-mode/streaming-read.txt b/source/streaming-mode/streaming-read.txt new file mode 100644 index 00000000..f5d2d992 --- /dev/null +++ b/source/streaming-mode/streaming-read.txt @@ -0,0 +1,79 @@ +.. _read-from-mongodb: +.. _scala-read: +.. _java-read: +.. _scala-dataset-filters: + +================= +Read from MongoDB +================= + +.. toctree:: + :caption: Read Configuration Options + + /streaming-mode/streaming-read-config + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +Overview +-------- + +.. tabs-selector:: drivers + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/read-from-mongodb.txt + + - id: python + content: | + + .. include:: /python/read-from-mongodb.txt + + .. include:: /python/filters.txt + + - id: scala + content: | + + .. include:: /scala/read-from-mongodb.txt + + .. include:: /scala/filters.txt + +.. important:: Inferring the Schema of a Change Stream + + When the {+connector-short+} infers the schema of a data frame + read from a change stream, by default, + it will use the schema of the underlying collection rather than that + of the change stream. If you set the ``change.stream.publish.full.document.only`` + option to ``true``, the connector uses the schema of the + change stream instead. + + For more information on configuring a read operation, see the + :ref:`spark-change-stream-conf` section of the Read Configuration Options guide. + +SQL Queries +----------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/sql.txt + + - id: python + content: | + + .. include:: /python/sql.txt + + - id: scala + content: | + + .. include:: /scala/sql.txt diff --git a/source/streaming-mode/streaming-write-config.txt b/source/streaming-mode/streaming-write-config.txt new file mode 100644 index 00000000..ce4404ce --- /dev/null +++ b/source/streaming-mode/streaming-write-config.txt @@ -0,0 +1,203 @@ +.. _spark-write-conf: + +=========================== +Write Configuration Options +=========================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. _spark-output-conf: + +Write Configuration +------------------- + +The following options for writing to MongoDB are available: + +.. note:: + + If you use ``SparkConf`` to set the connector's write configurations, + prefix ``spark.mongodb.write.`` to each property. + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the write operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``convertJson`` + - | Specifies whether the connector parses the string and converts extended JSON + into BSON. + | + | This setting accepts the following values: + + - ``any``: The connector converts all JSON values to BSON. + + - ``"{a: 1}"`` becomes ``{a: 1}``. + - ``"[1, 2, 3]"`` becomes ``[1, 2, 3]``. + - ``"true"`` becomes ``true``. + - ``"01234"`` becomes ``1234``. + - ``"{a:b:c}"`` doesn't change. + + - ``objectOrArrayOnly``: The connector converts only JSON objects and arrays to + BSON. + + - ``"{a: 1}"`` becomes ``{a: 1}``. + - ``"[1, 2, 3]"`` becomes ``[1, 2, 3]``. + - ``"true"`` doesn't change. + - ``"01234"`` doesn't change. + - ``"{a:b:c}"`` doesn't change. + + - ``false``: The connector leaves all values as strings. + + | **Default:** ``false`` + + * - ``idFieldList`` + - | Field or list of fields by which to split the collection data. To + specify more than one field, separate them using a comma as shown + in the following example: + + .. code-block:: none + :copyable: false + + "fieldName1,fieldName2" + + | **Default:** ``_id`` + + * - ``ignoreNullValues`` + - | When ``true``, the connector ignores any ``null`` values when writing, + including ``null`` values in arrays and nested documents. + | + | **Default:** ``false`` + + * - ``maxBatchSize`` + - | Specifies the maximum number of operations to batch in bulk + operations. + | + | **Default:** ``512`` + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation which must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``operationType`` + - | Specifies the type of write operation to perform. You can set + this to one of the following values: + + - ``insert``: Insert the data. + - ``replace``: Replace an existing document that matches the + ``idFieldList`` value with the new data. If no match exists, the + value of ``upsertDocument`` indicates whether the connector + inserts a new document. + - ``update``: Update an existing document that matches the + ``idFieldList`` value with the new data. If no match exists, the + value of ``upsertDocument`` indicates whether the connector + inserts a new document. + + | + | **Default:** ``replace`` + + * - ``ordered`` + - | Specifies whether to perform ordered bulk operations. + | + | **Default:** ``true`` + + * - ``upsertDocument`` + - | When ``true``, replace and update operations will insert the data + if no match exists. + | + | For time series collections, you must set ``upsertDocument`` to + ``false``. + | + | **Default:** ``true`` + + * - ``writeConcern.journal`` + - | Specifies ``j``, a write-concern option to enable request for + acknowledgment that the data is confirmed on on-disk journal for + the criteria specified in the ``w`` option. You can specify + either ``true`` or ``false``. + | + | For more information on ``j`` values, see the MongoDB server + guide on the + :manual:`WriteConcern j option `. + + * - ``writeConcern.w`` + - | Specifies ``w``, a write-concern option to request acknowledgment + that the write operation has propagated to a specified number of + MongoDB nodes. For a list + of allowed values for this option, see :manual:`WriteConcern + ` in the MongoDB manual. + | + | **Default:** ``1`` + + * - ``writeConcern.wTimeoutMS`` + - | Specifies ``wTimeoutMS``, a write-concern option to return an error + when a write operation exceeds the number of milliseconds. If you + use this optional setting, you must specify a nonnegative integer. + | + | For more information on ``wTimeoutMS`` values, see the MongoDB server + guide on the + :manual:`WriteConcern wtimeout option `. + +.. _configure-output-uri: + +``connection.uri`` Configuration Setting +---------------------------------------- + +You can set all :ref:`spark-output-conf` via the write ``connection.uri``. + +.. note:: + + If you use ``SparkConf`` to set the connector's write configurations, + prefix ``spark.mongodb.write.`` to the setting. + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection + +The configuration corresponds to the following separate configuration +settings: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.write.database=test + spark.mongodb.write.collection=myCollection + +If you specify a setting both in the ``connection.uri`` and in a separate +configuration, the ``connection.uri`` setting overrides the separate +setting. For example, in the following configuration, the +database for the connection is ``foobar``: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.write.database=bar diff --git a/source/streaming-mode/streaming-write.txt b/source/streaming-mode/streaming-write.txt new file mode 100644 index 00000000..b44aee73 --- /dev/null +++ b/source/streaming-mode/streaming-write.txt @@ -0,0 +1,52 @@ +.. _write-to-mongodb: +.. _scala-write: +.. _java-write: + +================ +Write to MongoDB +================ + +.. toctree:: + :caption: Read Configuration Options + + /streaming-mode/streaming-write-config + +.. tabs-selector:: drivers + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/write-to-mongodb.txt + + - id: python + content: | + + .. include:: /python/write-to-mongodb.txt + + - id: scala + content: | + + .. include:: /scala/write-to-mongodb.txt + +.. warning:: Setting the Write Mode to ``overwrite`` + + If you specify the ``overwrite`` write mode, the connector drops the target + collection and creates a new collection that uses the + default collection options. + This behavior can affect collections that don't use the default options, + such as the following collection types: + + - Sharded collections + - Collections with non-default collations + - Time-series collections + +.. important:: + + If your write operation includes a field with a ``null`` value, + the connector writes the field name and ``null`` value to MongoDB. You can + change this behavior by setting the write configuration property + ``ignoreNullValues``. For more information about setting the connector's + write behavior, see :ref:`Write Configuration Options `. From d7292d7123d623152615e4bdd8081572aa4b0be9 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Tue, 10 Oct 2023 16:53:44 -0500 Subject: [PATCH 06/65] cache config --- source/configuration.txt | 38 ++++++++------------------------------ source/faq.txt | 29 +++++++++++++++++++++++++---- source/index.txt | 1 + 3 files changed, 34 insertions(+), 34 deletions(-) diff --git a/source/configuration.txt b/source/configuration.txt index b4486ae4..9442acee 100644 --- a/source/configuration.txt +++ b/source/configuration.txt @@ -1,8 +1,8 @@ -===================== -Configuration Options -===================== +.. _configuration: -.. default-domain:: mongodb +================= +Configuring Spark +================= .. contents:: On this page :local: @@ -118,34 +118,12 @@ specifying an option key string. Using a System Property ~~~~~~~~~~~~~~~~~~~~~~~ -The connector provides a cache for ``MongoClients`` which can only be -configured with a System Property. See :ref:`cache-configuration`. +The {+connector-short+} reads some configuration settings before ``SparkConf`` is +available. You must specify these settings by using a JVM system property. -.. _cache-configuration: +.. tip:: System Properties -Cache Configuration -------------------- - -The MongoConnector includes a cache for MongoClients, so workers can -share the MongoClient across threads. - -.. important:: - - As the cache is setup before the Spark Configuration is available, - the cache can only be configured with a System Property. - -.. list-table:: - :header-rows: 1 - :widths: 35 65 - - * - System Property name - - Description - - * - ``mongodb.keep_alive_ms`` - - The length of time to keep a ``MongoClient`` available for - sharing. - - **Default:** ``5000`` + For more information on Java system properties, see the `Java documentation. `__ ``ConfigException``\s --------------------- diff --git a/source/faq.txt b/source/faq.txt index 6fefa7a4..d3d3c270 100644 --- a/source/faq.txt +++ b/source/faq.txt @@ -49,7 +49,28 @@ To use mTLS, include the following options when you run ``spark-submit``: --driver-java-options -Djavax.net.ssl.trustStorePassword= \ --driver-java-options -Djavax.net.ssl.keyStore= \ --driver-java-options -Djavax.net.ssl.keyStorePassword= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStorePassword= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStorePassword= \ \ No newline at end of file + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStorePassword= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStorePassword= \ + +.. _cache-configuration: + +How can I share a MongoClient instance across threads? +------------------------------------------------------ + +The MongoConnector includes a cache that lets workers +share a single ``MongoClient`` across threads. To specify the length of time to keep a +``MongoClient`` available, include the ``mongodb.keep_alive_ms`` option when you run +``spark-submit``: + +.. code-block:: bash + + --driver-java-options -Dmongodb.keep_alive_ms= + +By default, this property has a value of ``5000``. + +.. note:: + + Because the cache is set up before the Spark Configuration is available, + you must use a system property to configure it. \ No newline at end of file diff --git a/source/index.txt b/source/index.txt index b70db4e2..1880e87a 100644 --- a/source/index.txt +++ b/source/index.txt @@ -48,6 +48,7 @@ versions of Apache Spark and MongoDB: :titlesonly: getting-started + configuration /batch-mode structured-streaming faq From 4a4f5813b1487845c779feabc7d95e66120f096f Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Tue, 10 Oct 2023 16:55:53 -0500 Subject: [PATCH 07/65] remove configexception --- source/configuration.txt | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/source/configuration.txt b/source/configuration.txt index 9442acee..8ecf00fb 100644 --- a/source/configuration.txt +++ b/source/configuration.txt @@ -123,14 +123,4 @@ available. You must specify these settings by using a JVM system property. .. tip:: System Properties - For more information on Java system properties, see the `Java documentation. `__ - -``ConfigException``\s ---------------------- - -A configuration error throws a ``ConfigException``. Confirm that any of -the following methods of configuration that you use are configured -properly: - -- :ref:`SparkConf ` -- :ref:`Options maps ` + For more information on Java system properties, see the `Java documentation. `__ \ No newline at end of file From 2c921baec74df6e5134e15a7d533464634dae0f1 Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Tue, 10 Oct 2023 16:59:04 -0500 Subject: [PATCH 08/65] add streaming to index --- source/index.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/source/index.txt b/source/index.txt index 1880e87a..f8e6c808 100644 --- a/source/index.txt +++ b/source/index.txt @@ -50,6 +50,7 @@ versions of Apache Spark and MongoDB: getting-started configuration /batch-mode + /streaming-mode structured-streaming faq release-notes From 7332e54f3a4f7b8bf68aa798b23f3c117419672a Mon Sep 17 00:00:00 2001 From: Mike Woofter Date: Tue, 10 Oct 2023 17:09:47 -0500 Subject: [PATCH 09/65] streaming mode --- source/streaming-mode.txt | 17 + source/streaming-mode/streaming-read.txt | 394 ++++++++++- source/streaming-mode/streaming-write.txt | 325 +++++++++- source/structured-streaming.txt | 755 ---------------------- 4 files changed, 694 insertions(+), 797 deletions(-) delete mode 100644 source/structured-streaming.txt diff --git a/source/streaming-mode.txt b/source/streaming-mode.txt index 931cc197..b0973255 100644 --- a/source/streaming-mode.txt +++ b/source/streaming-mode.txt @@ -2,6 +2,12 @@ Streaming Mode ============== +.. contents:: On this page + :local: + :backlinks: none + :depth: 2 + :class: singlecol + .. meta:: :description: Use the {+connector-long+} to stream data. @@ -10,3 +16,14 @@ Streaming Mode /streaming-mode/streaming-read /streaming-mode/streaming-write + +Spark Structured Streaming is a data stream processing engine you can +use through the Dataset or DataFrame API. The MongoDB Spark Connector +enables you to stream to and from MongoDB using Spark Structured +Streaming. + +.. include:: includes/streaming-distinction.rst + +To learn more about Structured Streaming, see the +`Spark Programming Guide +`__. diff --git a/source/streaming-mode/streaming-read.txt b/source/streaming-mode/streaming-read.txt index f5d2d992..e9f43358 100644 --- a/source/streaming-mode/streaming-read.txt +++ b/source/streaming-mode/streaming-read.txt @@ -2,6 +2,12 @@ .. _scala-read: .. _java-read: .. _scala-dataset-filters: +.. _streaming-read-from-mongodb: +.. _streaming-scala-read: +.. _streaming-java-read: +.. _streaming-scala-dataset-filters: +.. _read-structured-stream: +.. _continuous-processing: ================= Read from MongoDB @@ -21,59 +27,407 @@ Read from MongoDB Overview -------- -.. tabs-selector:: drivers +When reading a stream from a MongoDB database, the {+connector-long+} supports both +*micro-batch processing* and +*continuous processing*. Micro-batch processing is the default processing engine, while +continuous processing is an experimental feature introduced in +Spark version 2.3. To learn +more about continuous processing, see the `Spark documentation `__. + +.. include:: /includes/fact-read-from-change-stream + +.. note:: + + Since Structured Streaming produces a single partition, it ignores + :ref:`partitioner configurations `. Partitioner + configuration only apply when there are multiple partitions. This is true + for both micro-batch processing and continuous processing streams. .. tabs-drivers:: tabs: + - id: java-sync content: | - .. include:: /java/read-from-mongodb.txt + To read data from MongoDB, specify the following read-stream configuration settings on + `DataStreamReader `__: + + .. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``readStream.format()`` + - The format to use for read stream data. Use ``mongodb``. + + * - ``writeStream.trigger()`` + - Specifies how often results should be + written to the streaming sink. + + To use continuous processing, pass ``Trigger.Continuous(