diff --git a/config/redirects b/config/redirects index 8c5cf780..0834af9f 100644 --- a/config/redirects +++ b/config/redirects @@ -61,4 +61,9 @@ raw: ${prefix}/sparkR -> ${base}/v3.0/r-api/ [*-v3.0]: ${prefix}/${version}/configuration/read -> ${base}/${version}/ [*-v3.0]: ${prefix}/${version}/write-to-mongodb -> ${base}/${version}/ [*-v3.0]: ${prefix}/${version}/read-from-mongodb -> ${base}/${version}/ -[*-v3.0]: ${prefix}/${version}/structured-streaming -> ${base}/${version}/ \ No newline at end of file +[*-v3.0]: ${prefix}/${version}/structured-streaming -> ${base}/${version}/ +[v10.0-*]: ${prefix}/${version}/configuration/write -> ${base}/${version}/batch-mode/batch-write-config/ +[v10.0-*]: ${prefix}/${version}/configuration/read -> ${base}/${version}/batch-mode/batch-read-config/ +[v10.0-*]: ${prefix}/${version}/write-to-mongodb -> ${base}/${version}/batch-mode/batch-write/ +[v10.0-*]: ${prefix}/${version}/read-from-mongodb -> ${base}/${version}/batch-mode/batch-read/ +[v10.0-*]: ${prefix}/${version}/structured-streaming -> ${base}/${version}/streaming-mode/ \ No newline at end of file diff --git a/snooty.toml b/snooty.toml index eb224baa..f24da682 100644 --- a/snooty.toml +++ b/snooty.toml @@ -3,7 +3,15 @@ title = "Spark Connector" intersphinx = ["https://www.mongodb.com/docs/manual/objects.inv"] -toc_landing_pages = ["configuration"] +toc_landing_pages = [ + "configuration", + "/batch-mode", + "/streaming-mode", + "/streaming-mode/streaming-read", + "/streaming-mode/streaming-write", + "/batch-mode/batch-write", + "/batch-mode/batch-read", +] [constants] connector-short = "Spark Connector" diff --git a/source/batch-mode.txt b/source/batch-mode.txt new file mode 100644 index 00000000..5f5119a2 --- /dev/null +++ b/source/batch-mode.txt @@ -0,0 +1,32 @@ +========== +Batch Mode +========== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. toctree:: + + /batch-mode/batch-read + /batch-mode/batch-write + +Overview +-------- + +In batch mode, you can use the Spark Dataset and DataFrame APIs to process data at +a specified time interval. + +The following sections show you how to use the {+connector-short+} to read data from +MongoDB and write data to MongoDB in batch mode: + +- :ref:`batch-read-from-mongodb` +- :ref:`batch-write-to-mongodb` + +.. tip:: Apache Spark Documentation + + To learn more about using Spark to process batches of data, see the + `Spark Programming Guide + `__. \ No newline at end of file diff --git a/source/configuration/read.txt b/source/batch-mode/batch-read-config.txt similarity index 50% rename from source/configuration/read.txt rename to source/batch-mode/batch-read-config.txt index 649be7d0..fbf2e593 100644 --- a/source/configuration/read.txt +++ b/source/batch-mode/batch-read-config.txt @@ -1,10 +1,8 @@ -.. _spark-read-conf: +.. _spark-batch-read-conf: -========================== -Read Configuration Options -========================== - -.. default-domain:: mongodb +================================ +Batch Read Configuration Options +================================ .. contents:: On this page :local: @@ -12,17 +10,14 @@ Read Configuration Options :depth: 1 :class: singlecol -.. _spark-input-conf: - -Read Configuration ------------------- +.. _spark-batch-input-conf: -You can configure the following properties to read from MongoDB: +Overview +-------- -.. note:: +You can configure the following properties when reading data from MongoDB in batch mode. - If you use ``SparkConf`` to set the connector's read configurations, - prefix ``spark.mongodb.read.`` to each property. +.. include:: /includes/conf-read-prefix.rst .. list-table:: :header-rows: 1 @@ -62,12 +57,12 @@ You can configure the following properties to read from MongoDB: * - ``partitioner`` - | The partitioner full class name. - | You can specify a custom implementation which must implement the + | You can specify a custom implementation that must implement the ``com.mongodb.spark.sql.connector.read.partitioner.Partitioner`` interface. | See the :ref:`Partitioner Configuration ` section for more - information on partitioners. + information about partitioners. | | **Default:** ``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner`` @@ -75,7 +70,7 @@ You can configure the following properties to read from MongoDB: - | Partitioner configuration prefix. | See the :ref:`Partitioner Configuration ` section for more - information on partitioners. + information about partitioners. * - ``sampleSize`` - | The number of documents to sample from the collection when inferring @@ -100,13 +95,13 @@ You can configure the following properties to read from MongoDB: before sending data to Spark. | The value must be either an extended JSON single document or list of documents. - | A single document should resemble the following: + | A single document resembles the following: .. code-block:: json {"$match": {"closed": false}} - | A list of documents should resemble the following: + | A list of documents resembles the following: .. code-block:: json @@ -136,11 +131,10 @@ You can configure the following properties to read from MongoDB: .. _partitioner-conf: Partitioner Configurations -~~~~~~~~~~~~~~~~~~~~~~~~~~ +-------------------------- -Partitioners change the read behavior for batch reads with the {+connector-short+}. -They do not affect Structured Streaming because the data stream processing -engine produces a single stream with Structured Streaming. +Partitioners change the read behavior of batch reads that use the {+connector-short+}. By +dividing the data into partitions, you can run transformations in parallel. This section contains configuration information for the following partitioners: @@ -151,18 +145,23 @@ partitioners: - :ref:`PaginateIntoPartitionsPartitioner ` - :ref:`SinglePartitionPartitioner ` +.. note:: Batch Reads Only + + Because the data-stream-processing engine produces a single data stream, + partitioners do not affect streaming reads. + .. _conf-mongosamplepartitioner: .. _conf-samplepartitioner: ``SamplePartitioner`` Configuration -``````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +``SamplePartitioner`` is the default partitioner configuration. This configuration +lets you specify a partition field, partition size, and number of samples per partition. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.SamplePartitioner``. - .. list-table:: :header-rows: 1 :widths: 35 65 @@ -181,7 +180,6 @@ You must specify this partitioner using the full classname: **Default:** ``64`` - * - ``partitioner.options.samples.per.partition`` - The number of samples to take per partition. The total number of samples taken is: @@ -195,10 +193,10 @@ You must specify this partitioner using the full classname: .. example:: For a collection with 640 documents with an average document - size of 0.5 MB, the default SamplePartitioner configuration values creates + size of 0.5 MB, the default ``SamplePartitioner`` configuration creates 5 partitions with 128 documents per partition. - The MongoDB Spark Connector samples 50 documents (the default 10 + The {+connector-short+} samples 50 documents (the default 10 per intended partition) and defines 5 partitions by selecting partition field ranges from the sampled documents. @@ -206,28 +204,28 @@ You must specify this partitioner using the full classname: .. _conf-shardedpartitioner: ``ShardedPartitioner`` Configuration -````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The ``ShardedPartitioner`` automatically determines the partitions to use +The ``ShardedPartitioner`` configuration automatically partitions the data based on your shard configuration. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.ShardedPartitioner``. .. warning:: This partitioner is not compatible with hashed shard keys. - .. _conf-mongopaginatebysizepartitioner: .. _conf-paginatebysizepartitioner: ``PaginateBySizePartitioner`` Configuration -``````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``PaginateBySizePartitioner`` configuration paginates the data by using the +average document size to split the collection into average-sized chunks. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.PaginateBySizePartitioner``. .. list-table:: @@ -252,14 +250,14 @@ You must specify this partitioner using the full classname: .. _conf-paginateintopartitionspartitioner: ``PaginateIntoPartitionsPartitioner`` Configuration -``````````````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``PaginateIntoPartitionsPartitioner`` configuration paginates the data by dividing +the count of documents in the collection by the maximum number of allowable partitions. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.PaginateIntoPartitionsPartitioner``. - .. list-table:: :header-rows: 1 :widths: 35 65 @@ -280,144 +278,14 @@ You must specify this partitioner using the full classname: .. _conf-singlepartitionpartitioner: ``SinglePartitionPartitioner`` Configuration -```````````````````````````````````````````` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -.. include:: /includes/sparkconf-partitioner-options-note.rst +The ``SinglePartitionPartitioner`` configuration creates a single partition. -You must specify this partitioner using the full classname: +To use this configuration, set the ``partitioner`` configuration option to ``com.mongodb.spark.sql.connector.read.partitioner.SinglePartitionPartitioner``. -This partitioner creates a single partition. - -.. _spark-change-stream-conf: - -Change Streams --------------- - -.. note:: - - If you use ``SparkConf`` to set the connector's change stream - configurations, prefix ``spark.mongodb.`` to each property. - -.. list-table:: - :header-rows: 1 - :widths: 35 65 +Specifying Properties in ``connection.uri`` +------------------------------------------- - * - Property name - - Description - - * - ``change.stream.lookup.full.document`` - - - Determines what values your change stream returns on update - operations. - - The default setting returns the differences between the original - document and the updated document. - - The ``updateLookup`` setting returns the differences between the - original document and updated document as well as a copy of the - entire updated document. - - **Default:** "default" - - .. tip:: - - For more information on how this change stream option works, - see the MongoDB server manual guide - :manual:`Lookup Full Document for Update Operation `. - - * - ``change.stream.micro.batch.max.partition.count`` - - | The maximum number of partitions the {+connector-short+} divides each - micro-batch into. Spark workers can process these partitions in parallel. - | - | This setting applies only when using micro-batch streams. - | - | **Default**: ``1`` - - .. warning:: Event Order - - Specifying a value larger than ``1`` can alter the order in which - the {+connector-short+} processes change events. Avoid this setting - if out-of-order processing could create data inconsistencies downstream. - - * - ``change.stream.publish.full.document.only`` - - | Specifies whether to publish the changed document or the full - change stream document. - | - | When this setting is ``true``, the connector exhibits the following behavior: - - - The connector filters out messages that - omit the ``fullDocument`` field and only publishes the value of the - field. - - If you don't specify a schema, the connector infers the schema - from the change stream document rather than from the underlying collection. - - **Default**: ``false`` - - .. note:: - - This setting overrides the ``change.stream.lookup.full.document`` - setting. - - * - ``change.stream.startup.mode`` - - | Specifies how the connector starts up when no offset is available. - - | This setting accepts the following values: - - - ``latest``: The connector begins processing - change events starting with the most recent event. - It will not process any earlier unprocessed events. - - ``timestamp``: The connector begins processing change events at a specified time. - - To use the ``timestamp`` option, you must specify a time by using the - ``change.stream.startup.mode.timestamp.start.at.operation.time`` setting. - This setting accepts timestamps in the following formats: - - - An integer representing the number of seconds since the - :wikipedia:`Unix epoch ` - - A date and time in - `ISO-8601 `__ - format with one-second precision - - An extended JSON ``BsonTimestamp`` - - **Default**: ``latest`` - -.. _configure-input-uri: - -``connection.uri`` Configuration Setting ----------------------------------------- - -You can set all :ref:`spark-input-conf` via the read ``connection.uri`` setting. - -For example, consider the following example which sets the read -``connection.uri`` setting: - -.. note:: - - If you use ``SparkConf`` to set the connector's read configurations, - prefix ``spark.mongodb.read.`` to the setting. - -.. code:: cfg - - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/databaseName.collectionName?readPreference=primaryPreferred - - -The configuration corresponds to the following separate configuration -settings: - -.. code:: cfg - - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ - spark.mongodb.read.database=databaseName - spark.mongodb.read.collection=collectionName - spark.mongodb.read.readPreference.name=primaryPreferred - -If you specify a setting both in the ``connection.uri`` and in a separate -configuration, the ``connection.uri`` setting overrides the separate -setting. For example, given the following configuration, the -database for the connection is ``foobar``: - -.. code:: cfg - - spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar - spark.mongodb.read.database=bar +.. include:: /includes/connection-read-config.rst \ No newline at end of file diff --git a/source/batch-mode/batch-read.txt b/source/batch-mode/batch-read.txt new file mode 100644 index 00000000..1da915e4 --- /dev/null +++ b/source/batch-mode/batch-read.txt @@ -0,0 +1,123 @@ +.. _batch-read-from-mongodb: + +=============================== +Read from MongoDB in Batch Mode +=============================== + +.. toctree:: + :caption: Batch Read Configuration Options + + /batch-mode/batch-read-config + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +Overview +-------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/read-from-mongodb.txt + + - id: python + content: | + + .. include:: /python/read-from-mongodb.txt + + - id: scala + content: | + + .. include:: /scala/read-from-mongodb.txt + +Schema Inference +---------------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/schema-inference.rst + + - id: python + content: | + + .. include:: /python/schema-inference.rst + + - id: scala + content: | + + .. include:: /scala/schema-inference.rst + +Filters +------- + +.. tabs-drivers:: + + tabs: + + - id: python + content: | + + .. include:: /python/filters.txt + + - id: scala + content: | + + .. include:: /scala/filters.txt + +SQL Queries +----------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/sql.txt + + - id: python + content: | + + .. include:: /python/sql.txt + + - id: scala + content: | + + .. include:: /scala/sql.txt + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataFrameReader `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataFrameReader `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataFrameReader `__ \ No newline at end of file diff --git a/source/configuration/write.txt b/source/batch-mode/batch-write-config.txt similarity index 77% rename from source/configuration/write.txt rename to source/batch-mode/batch-write-config.txt index 2efa0a62..4d8361d1 100644 --- a/source/configuration/write.txt +++ b/source/batch-mode/batch-write-config.txt @@ -1,10 +1,8 @@ -.. _spark-write-conf: +.. _spark-batch-write-conf: -=========================== -Write Configuration Options -=========================== - -.. default-domain:: mongodb +================================= +Batch Write Configuration Options +================================= .. contents:: On this page :local: @@ -12,17 +10,14 @@ Write Configuration Options :depth: 1 :class: singlecol -.. _spark-output-conf: - -Write Configuration -------------------- +.. _spark-batch-output-conf: -The following options for writing to MongoDB are available: +Overview +-------- -.. note:: +You can configure the following properties when writing data to MongoDB in batch mode. - If you use ``SparkConf`` to set the connector's write configurations, - prefix ``spark.mongodb.write.`` to each property. +.. include:: /includes/conf-write-prefix.rst .. list-table:: :header-rows: 1 @@ -51,6 +46,14 @@ The following options for writing to MongoDB are available: | | **Default:** None + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation that must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + * - ``convertJson`` - | Specifies whether the connector parses the string and converts extended JSON into BSON. @@ -102,14 +105,6 @@ The following options for writing to MongoDB are available: | | **Default:** ``512`` - * - ``mongoClientFactory`` - - | MongoClientFactory configuration key. - | You can specify a custom implementation which must implement the - ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` - interface. - | - | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` - * - ``operationType`` - | Specifies the type of write operation to perform. You can set this to one of the following values: @@ -169,37 +164,7 @@ The following options for writing to MongoDB are available: guide on the :manual:`WriteConcern wtimeout option `. -.. _configure-output-uri: - -``connection.uri`` Configuration Setting ----------------------------------------- - -You can set all :ref:`spark-output-conf` via the write ``connection.uri``. - -.. note:: - - If you use ``SparkConf`` to set the connector's write configurations, - prefix ``spark.mongodb.write.`` to the setting. - -.. code:: cfg - - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection - -The configuration corresponds to the following separate configuration -settings: - -.. code:: cfg - - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/ - spark.mongodb.write.database=test - spark.mongodb.write.collection=myCollection - -If you specify a setting both in the ``connection.uri`` and in a separate -configuration, the ``connection.uri`` setting overrides the separate -setting. For example, in the following configuration, the -database for the connection is ``foobar``: - -.. code:: cfg +Specifying Properties in ``connection.uri`` +------------------------------------------- - spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar - spark.mongodb.write.database=bar +.. include:: /includes/connection-write-config.rst diff --git a/source/batch-mode/batch-write.txt b/source/batch-mode/batch-write.txt new file mode 100644 index 00000000..0730ed66 --- /dev/null +++ b/source/batch-mode/batch-write.txt @@ -0,0 +1,88 @@ +.. _batch-write-to-mongodb: + +============================== +Write to MongoDB in Batch Mode +============================== + +.. toctree:: + :caption: Batch Write Configuration Options + + /batch-mode/batch-write-config + +Overview +-------- + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + .. include:: /java/write-to-mongodb.txt + + - id: python + content: | + + .. include:: /python/write-to-mongodb.txt + + - id: scala + content: | + + .. include:: /scala/write-to-mongodb.txt + +.. warning:: Save Modes + + The {+connector-long+} supports the following save modes: + + - ``append`` + - ``overwrite`` + + If you specify the ``overwrite`` write mode, the connector drops the target + collection and creates a new collection that uses the + default collection options. + This behavior can affect collections that don't use the default options, + such as the following collection types: + + - Sharded collections + - Collections with nondefault collations + - Time-series collections + + To learn more about save modes, see the + `Spark SQL Guide `__. + +.. important:: + + If your write operation includes a field with a ``null`` value, + the connector writes the field name and ``null`` value to MongoDB. You can + change this behavior by setting the write configuration property + ``ignoreNullValues``. + + For more information about setting the connector's + write behavior, see :ref:`Write Configuration Options `. + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataFrameWriter `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataFrameReader `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataFrameReader `__ \ No newline at end of file diff --git a/source/configuration.txt b/source/configuration.txt index d4df8401..1b57b62e 100644 --- a/source/configuration.txt +++ b/source/configuration.txt @@ -1,8 +1,8 @@ -===================== -Configuration Options -===================== +.. _configuration: -.. default-domain:: mongodb +================= +Configuring Spark +================= .. contents:: On this page :local: @@ -10,9 +10,17 @@ Configuration Options :depth: 1 :class: singlecol -Various configuration options are available for the MongoDB Spark -Connector. To learn more about the options you can set, see -:ref:`spark-write-conf` and :ref:`spark-read-conf`. +Overview +-------- + +You can configure read and write operations in both batch and streaming mode. +To learn more about the available configuration options, see the following +pages: + +- :ref:`spark-batch-read-conf` +- :ref:`spark-batch-write-conf` +- :ref:`spark-streaming-read-conf` +- :ref:`spark-streaming-write-conf` Specify Configuration --------------------- @@ -25,8 +33,6 @@ Using ``SparkConf`` You can specify configuration options with ``SparkConf`` using any of the following approaches: -.. tabs-selector:: drivers - .. tabs-drivers:: tabs: @@ -54,51 +60,19 @@ the following approaches: The MongoDB Spark Connector will use the settings in ``SparkConf`` as defaults. -.. important:: - - When setting configurations with ``SparkConf``, you must prefix the - configuration options. Refer to :ref:`spark-write-conf` and - :ref:`spark-read-conf` for the specific prefixes. - .. _options-map: Using an Options Map ~~~~~~~~~~~~~~~~~~~~ -In the Spark API, the DataFrameReader and DataFrameWriter methods -accept options in the form of a ``Map[String, String]``. Options -specified this way override any corresponding settings in ``SparkConf``. +In the Spark API, the ``DataFrameReader``, ``DataFrameWriter``, ``DataStreamReader``, +and ``DataStreamWriter`` classes each contain an ``option()`` method. You can use +this method to specify options for the underlying read or write operation. -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | +.. note:: + + Options specified in this way override any corresponding settings in ``SparkConf``. - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - - - id: python - content: | - - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - - - id: scala - content: | - - To learn more about specifying options with - `DataFrameReader `__ and - `DataFrameWriter `__, - refer to the Java Spark documentation for the ``.option()`` - method. - Short-Form Syntax ````````````````` @@ -115,50 +89,45 @@ specifying an option key string. - ``dfw.option("collection", "myCollection").save()`` -Using a System Property -~~~~~~~~~~~~~~~~~~~~~~~ - -The connector provides a cache for ``MongoClients`` which can only be -configured with a System Property. See :ref:`cache-configuration`. - -.. _cache-configuration: - -Cache Configuration -------------------- +To learn more about the ``option()`` method, see the following Spark +documentation pages: -The MongoConnector includes a cache for MongoClients, so workers can -share the MongoClient across threads. - -.. important:: +.. tabs-drivers:: - As the cache is set up before the Spark Configuration is available, - the cache can only be configured with a System Property. + tabs: + - id: java-sync + content: | -.. list-table:: - :header-rows: 1 - :widths: 35 65 + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ - * - System Property name - - Description + - id: python + content: | - * - ``mongodb.keep_alive_ms`` - - The length of time to keep a ``MongoClient`` available for - sharing. + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ - **Default:** ``5000`` + - id: scala + content: | -``ConfigException``\s ---------------------- + - `DataFrameReader `__ + - `DataFrameWriter `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + +Using a System Property +~~~~~~~~~~~~~~~~~~~~~~~ -A configuration error throws a ``ConfigException``. Confirm that any of -the following methods of configuration that you use are configured -properly: +The {+connector-short+} reads some configuration settings before ``SparkConf`` is +available. You must specify these settings by using a JVM system property. -- :ref:`SparkConf ` -- :ref:`Options maps ` +For more information on Java system properties, see the `Java documentation. `__ -.. toctree:: - :titlesonly: +.. tip:: Configuration Exceptions - configuration/write - configuration/read + If the {+connector-short+} throws a ``ConfigException``, confirm that your ``SparkConf`` + or options map uses correct syntax and contains only valid configuration options. \ No newline at end of file diff --git a/source/faq.txt b/source/faq.txt index 6fefa7a4..b4791301 100644 --- a/source/faq.txt +++ b/source/faq.txt @@ -2,31 +2,29 @@ FAQ === -.. default-domain:: mongodb - How can I achieve data locality? -------------------------------- -For any MongoDB deployment, the Mongo Spark Connector sets the -preferred location for a DataFrame or Dataset to be where the data is: +For any MongoDB deployment, the {+connector-short+} sets the +preferred location for a DataFrame or Dataset to be where the data is. -- For a non sharded system, it sets the preferred location to be the +- For a nonsharded system, it sets the preferred location to be the hostname(s) of the standalone or the replica set. - For a sharded system, it sets the preferred location to be the hostname(s) of the shards. -To promote data locality, +To promote data locality, we recommend taking the following actions: -- Ensure there is a Spark Worker on one of the hosts for non-sharded +- Ensure there is a Spark Worker on one of the hosts for nonsharded system or one per shard for sharded systems. - Use a :readmode:`nearest` read preference to read from the local :binary:`~bin.mongod`. -- For a sharded cluster, you should have a :binary:`~bin.mongos` on the - same nodes and use :ref:`localThreshold ` - configuration to connect to the nearest :binary:`~bin.mongos`. +- For a sharded cluster, have a :binary:`~bin.mongos` on the + same nodes and use the ``localThreshold`` + configuration setting to connect to the nearest :binary:`~bin.mongos`. To partition the data by shard use the :ref:`conf-shardedpartitioner`. @@ -36,7 +34,7 @@ How do I resolve ``Unrecognized pipeline stage name`` Error? In MongoDB deployments with mixed versions of :binary:`~bin.mongod`, it is possible to get an ``Unrecognized pipeline stage name: '$sample'`` error. To mitigate this situation, explicitly configure the partitioner -to use and define the Schema when using DataFrames. +to use and define the schema when using DataFrames. How can I use mTLS for authentication? -------------------------------------- @@ -49,7 +47,28 @@ To use mTLS, include the following options when you run ``spark-submit``: --driver-java-options -Djavax.net.ssl.trustStorePassword= \ --driver-java-options -Djavax.net.ssl.keyStore= \ --driver-java-options -Djavax.net.ssl.keyStorePassword= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStorePassword= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore= \ - -conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStorePassword= \ \ No newline at end of file + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStorePassword= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStore= \ + --conf spark.executor.extraJavaOptions=-Djavax.net.ssl.keyStorePassword= \ + +.. _cache-configuration: + +How can I share a MongoClient instance across threads? +------------------------------------------------------ + +The MongoConnector includes a cache that lets workers +share a single ``MongoClient`` across threads. To specify the length of time to keep a +``MongoClient`` available, include the ``mongodb.keep_alive_ms`` option when you run +``spark-submit``: + +.. code-block:: bash + + --driver-java-options -Dmongodb.keep_alive_ms= + +By default, this property has a value of ``5000``. + +.. note:: + + Because the cache is set up before the Spark Configuration is available, + you must use a system property to configure it. \ No newline at end of file diff --git a/source/getting-started.txt b/source/getting-started.txt index c6f71d55..1acfc3fe 100644 --- a/source/getting-started.txt +++ b/source/getting-started.txt @@ -22,8 +22,6 @@ Prerequisites Getting Started --------------- -.. tabs-selector:: drivers - .. tabs-drivers:: tabs: @@ -45,7 +43,8 @@ Getting Started Tutorials --------- -- :doc:`write-to-mongodb` -- :doc:`read-from-mongodb` -- :doc:`structured-streaming` +- :ref:`batch-write-to-mongodb` +- :ref:`batch-read-from-mongodb` +- :ref:`streaming-write-to-mongodb` +- :ref:`streaming-read-from-mongodb` diff --git a/source/includes/batch-read-settings.rst b/source/includes/batch-read-settings.rst new file mode 100644 index 00000000..b5252527 --- /dev/null +++ b/source/includes/batch-read-settings.rst @@ -0,0 +1,23 @@ +You must specify the following configuration settings to read from MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``dataFrame.read.format()`` + - Specifies the format of the underlying input data source. Use ``mongodb`` + to read from MongoDB. + + * - ``dataFrame.read.option()`` + - Use the ``option`` method to configure batch read settings, including the + MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and + partitioner configuration. + + For a list of batch read configuration options, see + the :ref:`spark-batch-read-conf` guide. diff --git a/source/includes/batch-write-settings.rst b/source/includes/batch-write-settings.rst new file mode 100644 index 00000000..d138f77d --- /dev/null +++ b/source/includes/batch-write-settings.rst @@ -0,0 +1,23 @@ +You must specify the following configuration settings to write to MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``dataFrame.write.format()`` + - Specifies the format of the underlying output data source. Use ``mongodb`` + to write to MongoDB. + + * - ``dataFrame.write.option()`` + - Use the ``option`` method to configure batch write settings, including the + MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and + destination directory. + + For a list of batch write configuration options, see + the :ref:`spark-batch-write-conf` guide. diff --git a/source/includes/conf-read-prefix.rst b/source/includes/conf-read-prefix.rst new file mode 100644 index 00000000..6ba8d7cd --- /dev/null +++ b/source/includes/conf-read-prefix.rst @@ -0,0 +1,4 @@ +.. note:: + + If you use ``SparkConf`` to set the connector's read configurations, + prefix ``spark.mongodb.read.`` to each property. diff --git a/source/includes/conf-write-prefix.rst b/source/includes/conf-write-prefix.rst new file mode 100644 index 00000000..b6d0256c --- /dev/null +++ b/source/includes/conf-write-prefix.rst @@ -0,0 +1,4 @@ +.. note:: + + If you use ``SparkConf`` to set the connector's write configurations, + prefix ``spark.mongodb.write.`` to each property. diff --git a/source/includes/connection-read-config.rst b/source/includes/connection-read-config.rst new file mode 100644 index 00000000..04c0d3e6 --- /dev/null +++ b/source/includes/connection-read-config.rst @@ -0,0 +1,31 @@ +If you use :ref:`SparkConf ` to specify any of the previous settings, you can +either include them in the ``connection.uri`` setting or list them individually. + +The following code example shows how to specify the +database, collection, and read preference as part of the ``connection.uri`` setting: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/myDB.myCollection?readPreference=primaryPreferred + +To keep the ``connection.uri`` shorter and make the settings easier to read, you can +specify them individually instead: + +.. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.read.database=myDB + spark.mongodb.read.collection=myCollection + spark.mongodb.read.readPreference.name=primaryPreferred + +.. important:: + + If you specify a setting in both the ``connection.uri`` and on its own line, + the ``connection.uri`` setting takes precedence. + For example, in the following configuration, the connection + database is ``foobar``, because it's the value in the ``connection.uri`` setting: + + .. code:: cfg + + spark.mongodb.read.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.read.database=bar diff --git a/source/includes/connection-write-config.rst b/source/includes/connection-write-config.rst new file mode 100644 index 00000000..892a63df --- /dev/null +++ b/source/includes/connection-write-config.rst @@ -0,0 +1,32 @@ +If you use :ref:`SparkConf ` to specify any of the previous settings, you can +either include them in the ``connection.uri`` setting or list them individually. + +The following code example shows how to specify the +database, collection, and ``convertJson`` setting as part of the ``connection.uri`` +setting: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/myDB.myCollection?convertJson=any + +To keep the ``connection.uri`` shorter and make the settings easier to read, you can +specify them individually instead: + +.. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/ + spark.mongodb.write.database=myDB + spark.mongodb.write.collection=myCollection + spark.mongodb.write.convertJson=any + +.. important:: + + If you specify a setting in both the ``connection.uri`` and on its own line, + the ``connection.uri`` setting takes precedence. + For example, in the following configuration, the connection + database is ``foobar``: + + .. code:: cfg + + spark.mongodb.write.connection.uri=mongodb://127.0.0.1/foobar + spark.mongodb.write.database=bar \ No newline at end of file diff --git a/source/includes/extracts-command-line.yaml b/source/includes/extracts-command-line.yaml index 21c584be..fe284431 100644 --- a/source/includes/extracts-command-line.yaml +++ b/source/includes/extracts-command-line.yaml @@ -1,79 +1,78 @@ ref: list-command-line-specification content: | - - the ``--packages`` option to download the MongoDB Spark Connector - package. The following package is available: + - the ``--packages`` option to download the MongoDB Spark Connector + package. The following package is available: - - ``mongo-spark-connector`` + - ``mongo-spark-connector`` - - the ``--conf`` option to configure the MongoDB Spark Connnector. - These settings configure the ``SparkConf`` object. + - the ``--conf`` option to configure the MongoDB Spark Connnector. + These settings configure the ``SparkConf`` object. - .. note:: + .. note:: - When specifying the Connector configuration via ``SparkConf``, you - must prefix the settings appropriately. For details and other - available MongoDB Spark Connector options, see the - :doc:`/configuration`. + If you use ``SparkConf`` to configure the {+connector-short+}, you + must prefix the settings appropriately. For details and other + available MongoDB Spark Connector options, see the + :doc:`/configuration` guide. --- ref: list-configuration-explanation content: | - - The :ref:`spark.mongodb.read.connection.uri ` specifies the - MongoDB server address (``127.0.0.1``), the database to connect - (``test``), and the collection (``myCollection``) from which to read - data, and the read preference. - - The :ref:`spark.mongodb.write.connection.uri ` specifies the - MongoDB server address (``127.0.0.1``), the database to connect - (``test``), and the collection (``myCollection``) to which to write - data. Connects to port ``27017`` by default. - - The ``packages`` option specifies the Spark Connector's - Maven coordinates, in the format ``groupId:artifactId:version``. + - The ``spark.mongodb.read.connection.uri`` specifies the + MongoDB server address (``127.0.0.1``), the database to connect + (``test``), and the collection (``myCollection``) from which to read + data, and the read preference. + - The ``spark.mongodb.write.connection.uri`` specifies the + MongoDB server address (``127.0.0.1``), the database to connect + (``test``), and the collection (``myCollection``) to which to write + data. Connects to port ``27017`` by default. + - The ``packages`` option specifies the Spark Connector's + Maven coordinates, in the format ``groupId:artifactId:version``. --- ref: command-line-start-spark-shell content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst - For example, + For example, - .. code-block:: sh + .. code-block:: sh - ./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - - .. include:: /includes/extracts/list-configuration-explanation.rst + ./bin/spark-shell --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} + + .. include:: /includes/extracts/list-configuration-explanation.rst --- ref: command-line-start-pyspark content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst + + The following example starts the ``pyspark`` shell from the command + line: - The following example starts the ``pyspark`` shell from the command - line: + .. code-block:: sh - .. code-block:: sh + ./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - ./bin/pyspark --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - - .. include:: /includes/extracts/list-configuration-explanation.rst + .. include:: /includes/extracts/list-configuration-explanation.rst - The examples in this tutorial will use this database and collection. + The examples in this tutorial will use this database and collection. --- ref: command-line-start-sparkR content: | - .. include:: /includes/extracts/list-command-line-specification.rst + .. include:: /includes/extracts/list-command-line-specification.rst - For example, + For example, - .. code-block:: sh + .. code-block:: sh - ./bin/sparkR --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ - --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ - --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} + ./bin/sparkR --conf "spark.mongodb.read.connection.uri=mongodb://127.0.0.1/test.myCollection?readPreference=primaryPreferred" \ + --conf "spark.mongodb.write.connection.uri=mongodb://127.0.0.1/test.myCollection" \ + --packages org.mongodb.spark:{+artifact-id-2-12+}:{+current-version+} - .. include:: /includes/extracts/list-configuration-explanation.rst -... + .. include:: /includes/extracts/list-configuration-explanation.rst diff --git a/source/includes/java-dataframe-tip.rst b/source/includes/java-dataframe-tip.rst new file mode 100644 index 00000000..ce92ed3d --- /dev/null +++ b/source/includes/java-dataframe-tip.rst @@ -0,0 +1,4 @@ +.. tip:: DataFrame Type + + ``DataFrame`` doesn't exist as a class in the Java API. Use + ``Dataset`` to reference a DataFrame. diff --git a/source/includes/note-trigger-method.rst b/source/includes/note-trigger-method.rst index 747f11fc..f9ad2d1d 100644 --- a/source/includes/note-trigger-method.rst +++ b/source/includes/note-trigger-method.rst @@ -1,4 +1,4 @@ .. note:: - Call the ``trigger`` method on the ``DataStreamWriter`` you create + Call the ``trigger()`` method on the ``DataStreamWriter`` you create from the ``DataStreamReader`` you configure. diff --git a/source/includes/scala-java-explicit-schema.rst b/source/includes/scala-java-explicit-schema.rst index 3e1109a5..3b682cb1 100644 --- a/source/includes/scala-java-explicit-schema.rst +++ b/source/includes/scala-java-explicit-schema.rst @@ -10,4 +10,4 @@ queries needed for sampling. wire. The following statement creates a ``Character`` |class| and then -uses it to define the schema for the ``DataFrame``: +uses it to define the schema for the DataFrame: diff --git a/source/includes/scala-java-sparksession-config.rst b/source/includes/scala-java-sparksession-config.rst index 665aebc7..839bd561 100644 --- a/source/includes/scala-java-sparksession-config.rst +++ b/source/includes/scala-java-sparksession-config.rst @@ -1,4 +1,4 @@ When specifying the Connector configuration via ``SparkSession``, you must prefix the settings appropriately. For details and other available MongoDB Spark Connector options, see the -:doc:`/configuration`. +:doc:`/configuration` guide. diff --git a/source/includes/scala-java-sql-register-table.rst b/source/includes/scala-java-sql-register-table.rst index 8eb0705a..5bcbe95b 100644 --- a/source/includes/scala-java-sql-register-table.rst +++ b/source/includes/scala-java-sql-register-table.rst @@ -1,5 +1,5 @@ -Before running SQL queries on your dataset, you must register a -temporary view for the dataset. +Before running SQL queries on your Dataset, you must register a +temporary view for the Dataset. The following operation registers a ``characters`` table and then queries it to find all characters that diff --git a/source/includes/characters-example-collection.rst b/source/includes/schema-inference-intro.rst similarity index 73% rename from source/includes/characters-example-collection.rst rename to source/includes/schema-inference-intro.rst index 578a1732..24c5ca5e 100644 --- a/source/includes/characters-example-collection.rst +++ b/source/includes/schema-inference-intro.rst @@ -1,3 +1,8 @@ +When you load a Dataset or DataFrame without a schema, Spark samples +the records to infer the schema of the collection. + +Suppose that the MongoDB collection ``people.contacts`` contains the following documents: + .. code-block:: javascript { "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } @@ -10,3 +15,7 @@ { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" } + +The following operation loads data from ``people.contacts`` +and infers the schema of the DataFrame: + diff --git a/source/includes/stream-read-settings.rst b/source/includes/stream-read-settings.rst new file mode 100644 index 00000000..162772b5 --- /dev/null +++ b/source/includes/stream-read-settings.rst @@ -0,0 +1,24 @@ +You must specify the following configuration settings to read from MongoDB: + +.. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``readStream.format()`` + - Specifies the format of the underlying input data source. Use ``mongodb`` + to read from MongoDB. + + * - ``readStream.option()`` + - Specifies stream settings, including the MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and aggregation pipeline stages. + + For a list of read stream configuration options, see + the :ref:`spark-streaming-read-conf` guide. + + * - ``readStream.schema()`` + - Specifies the input schema. \ No newline at end of file diff --git a/source/includes/streaming-distinction.rst b/source/includes/streaming-distinction.rst index dc7fa4c2..2448decd 100644 --- a/source/includes/streaming-distinction.rst +++ b/source/includes/streaming-distinction.rst @@ -1,3 +1,10 @@ .. important:: - `Spark Structured Streaming `__ and `Spark Streaming with DStreams `__ are different. + Apache Spark contains two different stream-processing engines: + + - `Spark Streaming with DStreams `__, + now an unsupported legacy engine + + - `Spark Structured Streaming `__. + + This guide pertains only to Spark Structured Streaming. \ No newline at end of file diff --git a/source/includes/warn-console-stream.rst b/source/includes/warn-console-stream.rst index b46d586d..4bf16d7d 100644 --- a/source/includes/warn-console-stream.rst +++ b/source/includes/warn-console-stream.rst @@ -1,4 +1,4 @@ .. important:: - Avoid streaming large datasets to your console. Streaming to your + Avoid streaming large Datasets to your console. Streaming to your console is memory intensive and intended only for testing purposes. diff --git a/source/index.txt b/source/index.txt index 35c24014..df17674b 100644 --- a/source/index.txt +++ b/source/index.txt @@ -2,19 +2,17 @@ MongoDB Connector for Spark =========================== -.. default-domain:: mongodb - The `MongoDB Connector for Spark `_ provides integration between MongoDB and Apache Spark. .. note:: - Version 10.x of the MongoDB Connector for Spark is an all-new + Version 10.x of the {+connector-long+} is an all-new connector based on the latest Spark API. Install and migrate to version 10.x to take advantage of new capabilities, such as tighter integration with - :ref:`Spark Structured Streaming `. + :ref:`Spark Structured Streaming `. Version 10.x uses the new namespace ``com.mongodb.spark.sql.connector.MongoTableProvider``. @@ -25,11 +23,11 @@ integration between MongoDB and Apache Spark. `MongoDB announcement blog post `__. With the connector, you have access to all Spark libraries for use with -MongoDB datasets: Datasets for analysis with SQL (benefiting from +MongoDB datasets: ``Dataset`` for analysis with SQL (benefiting from automatic schema inference), streaming, machine learning, and graph APIs. You can also use the connector with the Spark Shell. -The MongoDB Connector for Spark is compatible with the following +The {+connector-long+} is compatible with the following versions of Apache Spark and MongoDB: .. list-table:: @@ -47,11 +45,10 @@ versions of Apache Spark and MongoDB: .. toctree:: :titlesonly: - configuration getting-started - write-to-mongodb - read-from-mongodb - structured-streaming + configuration + /batch-mode + /streaming-mode faq release-notes api-docs diff --git a/source/java/api.txt b/source/java/api.txt index c9a1cede..4377f9fb 100644 --- a/source/java/api.txt +++ b/source/java/api.txt @@ -73,12 +73,12 @@ Configuration } } -- The :ref:`spark.mongodb.read.connection.uri ` specifies the +- The ``spark.mongodb.read.connection.uri`` specifies the MongoDB server address(``127.0.0.1``), the database to connect (``test``), and the collection (``myCollection``) from which to read data, and the read preference. -- The :ref:`spark.mongodb.write.connection.uri ` specifies the +- The ``spark.mongodb.write.connection.uri`` specifies the MongoDB server address(``127.0.0.1``), the database to connect (``test``), and the collection (``myCollection``) to which to write data. diff --git a/source/java/read-from-mongodb.txt b/source/java/read-from-mongodb.txt index d8b976a5..a241bec8 100644 --- a/source/java/read-from-mongodb.txt +++ b/source/java/read-from-mongodb.txt @@ -1,71 +1,19 @@ -Use your local SparkSession's ``read`` method to create a DataFrame -representing a collection. +To read data from MongoDB, call the ``read()`` method on your +``SparkSession`` object. This method returns a +``DataFrameReader`` object, which you can use to specify the format and other +configuration settings for your batch read operation. -.. note:: - - ``DataFrame`` does not exist as a class in the Java API. Use - ``Dataset`` to reference a DataFrame. +.. include:: /includes/batch-read-settings.rst -The following example loads the collection specified in the -``SparkConf``: +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: .. code-block:: java - Dataset df = spark.read().format("mongodb").load(); // Uses the SparkConf for configuration + Dataset dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load(); -To specify a different collection, database, and other :ref:`read -configuration settings `, use the ``option`` method: - -.. code-block:: java - - Dataset df = spark.read().format("mongodb").option("database", "").option("collection", "").load(); - -.. _java-implicit-schema: - -Schema Inference ----------------- - -When you load a Dataset or DataFrame without a schema, Spark samples -the records to infer the schema of the collection. - -Consider a collection named ``characters``: - -.. include:: /includes/characters-example-collection.rst - -The following operation loads data from the MongoDB collection -specified in ``SparkConf`` and infers the schema: - -.. code-block:: java - - Dataset implicitDS = spark.read().format("mongodb").load(); - implicitDS.printSchema(); - implicitDS.show(); - -``implicitDS.printSchema()`` outputs the following schema to the console: - -.. code-block:: sh - - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: integer (nullable = true) - |-- name: string (nullable = true) - -``implicitDS.show()`` outputs the following to the console: - -.. code-block:: sh - - +--------------------+----+-------------+ - | _id| age| name| - +--------------------+----+-------------+ - |[585024d558bef808...| 50|Bilbo Baggins| - |[585024d558bef808...|1000| Gandalf| - |[585024d558bef808...| 195| Thorin| - |[585024d558bef808...| 178| Balin| - |[585024d558bef808...| 77| Kíli| - |[585024d558bef808...| 169| Dwalin| - |[585024d558bef808...| 167| Óin| - |[585024d558bef808...| 158| Glóin| - |[585024d558bef808...| 82| Fíli| - |[585024d558bef808...|null| Bombur| - +--------------------+----+-------------+ +.. include:: /includes/java-dataframe-tip.rst diff --git a/source/java/schema-inference.rst b/source/java/schema-inference.rst new file mode 100644 index 00000000..e2c75d16 --- /dev/null +++ b/source/java/schema-inference.rst @@ -0,0 +1,62 @@ +.. _java-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: java + + Dataset dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load(); + +To see the inferred schema, use the ``printSchema()`` method on your ``Dataset`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: java + + dataFrame.printSchema(); + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` method on your ``DataFrame`` object, +as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: java + + dataFrame.show(); + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/java/write-to-mongodb.txt b/source/java/write-to-mongodb.txt index 7a8b4bbe..c47d9225 100644 --- a/source/java/write-to-mongodb.txt +++ b/source/java/write-to-mongodb.txt @@ -1,16 +1,23 @@ -The following example creates a DataFrame from a ``json`` file and -saves it to the MongoDB collection specified in ``SparkConf``: +To write data to MongoDB, call the ``write()`` method on your +``Dataset`` object. This method returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. -.. code-block:: java +.. include:: /includes/batch-write-settings.rst - Dataset df = spark.read().format("json").load("example.json"); +The following example creates a DataFrame from a ``json`` file and +saves it to the ``people.contacts`` collection in MongoDB: - df.write().format("mongodb").mode("overwrite").save(); +.. code-block:: java -The MongoDB Connector for Spark supports the following save modes: + Dataset dataFrame = spark.read().format("json") + .load("example.json"); -- ``append`` -- ``overwrite`` + dataFrame.write().format("mongodb") + .mode("overwrite") + .option("database", "people") + .option("collection", "contacts") + .save(); -To learn more about save modes, see the `Spark SQL Guide `__. - \ No newline at end of file +.. include:: /includes/java-dataframe-tip.rst \ No newline at end of file diff --git a/source/python/filters.txt b/source/python/filters.txt index 124ef886..55022cd2 100644 --- a/source/python/filters.txt +++ b/source/python/filters.txt @@ -1,13 +1,10 @@ -Filters -------- - .. include:: includes/pushed-filters.rst Use ``filter()`` to read a subset of data from your MongoDB collection. .. include:: /includes/example-load-dataframe.rst -First, set up a DataFrame to connect with your default MongoDB data +First, set up a ``DataFrame`` object to connect with your default MongoDB data source: .. code-block:: python diff --git a/source/python/read-from-mongodb.txt b/source/python/read-from-mongodb.txt index d0ce8b4d..aa6f7958 100644 --- a/source/python/read-from-mongodb.txt +++ b/source/python/read-from-mongodb.txt @@ -1,38 +1,18 @@ -You can create a Spark DataFrame to hold data from the MongoDB -collection specified in the -:ref:`spark.mongodb.read.connection.uri ` option which your -``SparkSession`` option is using. +To read data from MongoDB, call the ``read`` function on your +``SparkSession`` object. This function returns a +``DataFrameReader`` +object, which you can use to specify the format and other configuration settings for your +batch read operation. -.. include:: /includes/example-load-dataframe.rst +.. include:: /includes/batch-read-settings.rst -Assign the collection to a DataFrame with ``spark.read()`` -from within the ``pyspark`` shell. +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: .. code-block:: python - df = spark.read.format("mongodb").load() - -Spark samples the records to infer the schema of the collection. - -.. code-block:: python - - df.printSchema() - -The above operation produces the following shell output: - -.. code-block:: none - - root - |-- _id: double (nullable = true) - |-- qty: double (nullable = true) - |-- type: string (nullable = true) - -If you need to read from a different MongoDB collection, -use the ``.option`` method when reading data into a DataFrame. - -To read from a collection called ``contacts`` in a database called -``people``, specify ``people.contacts`` in the input URI option. - -.. code-block:: python - - df = spark.read.format("mongodb").option("uri", "mongodb://127.0.0.1/people.contacts").load() + dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() diff --git a/source/python/schema-inference.rst b/source/python/schema-inference.rst new file mode 100644 index 00000000..c66e2858 --- /dev/null +++ b/source/python/schema-inference.rst @@ -0,0 +1,62 @@ +.. _python-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: python + + dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() + +To see the inferred schema, use the ``printSchema()`` function on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: python + + dataFrame.printSchema() + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` function on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: python + + dataFrame.show() + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/python/write-to-mongodb.txt b/source/python/write-to-mongodb.txt index fbb83e2a..61beb5f9 100644 --- a/source/python/write-to-mongodb.txt +++ b/source/python/write-to-mongodb.txt @@ -1,73 +1,23 @@ -To create a DataFrame, first create a :ref:`SparkSession object -`, then use the object's ``createDataFrame()`` function. -In the following example, ``createDataFrame()`` takes -a list of tuples containing names and ages, and a list of column names: +To write data to MongoDB, call the ``write`` function on your +``DataFrame`` object. This function returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. -.. code-block:: python - - people = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), - ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"]) - -Write the ``people`` DataFrame to the MongoDB database and collection -specified in the :ref:`spark.mongodb.write.connection.uri` option -by using the ``write`` method: - -.. code-block:: python +.. include:: /includes/batch-write-settings.rst - people.write.format("mongodb").mode("append").save() - -The above operation writes to the MongoDB database and collection -specified in the :ref:`spark.mongodb.write.connection.uri` option -when you connect to the ``pyspark`` shell. - -To read the contents of the DataFrame, use the ``show()`` method. +The following example uses the ``createDataFrame()`` function on the ``SparkSession`` +object to create a ``DataFrame`` object from a list of tuples containing names +and ages and a list of column names. The example then writes this ``DataFrame`` to the +``people.contacts`` collection in MongoDB. .. code-block:: python - people.show() - -In the ``pyspark`` shell, the operation prints the following output: - -.. code-block:: none - - +-------------+----+ - | name| age| - +-------------+----+ - |Bilbo Baggins| 50| - | Gandalf|1000| - | Thorin| 195| - | Balin| 178| - | Kili| 77| - | Dwalin| 169| - | Oin| 167| - | Gloin| 158| - | Fili| 82| - | Bombur|null| - +-------------+----+ - -The ``printSchema()`` method prints out the DataFrame's schema: - -.. code-block:: python - - people.printSchema() - -In the ``pyspark`` shell, the operation prints the following output: - -.. code-block:: none - - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: long (nullable = true) - |-- name: string (nullable = true) - -If you need to write to a different MongoDB collection, -use the ``.option()`` method with ``.write()``. - -To write to a collection called ``contacts`` in a database called -``people``, specify the collection and database with ``.option()``: - -.. code-block:: python + dataFrame = spark.createDataFrame([("Bilbo Baggins", 50), ("Gandalf", 1000), ("Thorin", 195), ("Balin", 178), ("Kili", 77), + ("Dwalin", 169), ("Oin", 167), ("Gloin", 158), ("Fili", 82), ("Bombur", None)], ["name", "age"]) - people.write.format("mongodb").mode("append").option("database", - "people").option("collection", "contacts").save() + dataFrame.write.format("mongodb") + .mode("append") + .option("database", "people") + .option("collection", "contacts") + .save() \ No newline at end of file diff --git a/source/read-from-mongodb.txt b/source/read-from-mongodb.txt deleted file mode 100644 index 5844f280..00000000 --- a/source/read-from-mongodb.txt +++ /dev/null @@ -1,76 +0,0 @@ -.. _read-from-mongodb: -.. _scala-read: -.. _java-read: -.. _scala-dataset-filters: - -================= -Read from MongoDB -================= - -.. default-domain:: mongodb - -.. contents:: On this page - :local: - :backlinks: none - :depth: 1 - :class: singlecol - -Overview --------- - -.. tabs-selector:: drivers - -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | - - .. include:: /java/read-from-mongodb.txt - - - id: python - content: | - - .. include:: /python/read-from-mongodb.txt - - .. include:: /python/filters.txt - - - id: scala - content: | - - .. include:: /scala/read-from-mongodb.txt - - .. include:: /scala/filters.txt - -.. important:: Inferring the Schema of a Change Stream - - When the {+connector-short+} infers the schema of a DataFrame - read from a change stream, by default, - it will use the schema of the underlying collection rather than that - of the change stream. If you set the ``change.stream.publish.full.document.only`` - option to ``true``, the connector uses the schema of the - change stream instead. - - For more information on configuring a read operation, see the - :ref:`spark-change-stream-conf` section of the Read Configuration Options guide. - -SQL Queries ------------ - -.. tabs-drivers:: - - tabs: - - id: java-sync - content: | - - .. include:: /java/sql.txt - - - id: python - content: | - - .. include:: /python/sql.txt - - - id: scala - content: | - - .. include:: /scala/sql.txt diff --git a/source/scala/api.txt b/source/scala/api.txt index 9197cddc..eb327227 100644 --- a/source/scala/api.txt +++ b/source/scala/api.txt @@ -10,8 +10,8 @@ When starting the Spark shell, specify: Import the MongoDB Connector Package ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Enable MongoDB Connector specific functions and implicits for your -``SparkSession`` and Datasets by importing the following +Enable MongoDB Connector-specific functions and implicits for your +``SparkSession`` and ``Dataset`` objects by importing the following package in the Spark shell: .. code-block:: scala @@ -21,9 +21,9 @@ package in the Spark shell: Connect to MongoDB ~~~~~~~~~~~~~~~~~~ -Connection to MongoDB happens automatically when a Dataset -action requires a :ref:`read ` from MongoDB or a -:ref:`write ` to MongoDB. +Connection to MongoDB happens automatically when a Dataset +action requires a read from MongoDB or a +write to MongoDB. .. _scala-app: diff --git a/source/scala/filters.txt b/source/scala/filters.txt index 924de663..5208f29c 100644 --- a/source/scala/filters.txt +++ b/source/scala/filters.txt @@ -1,6 +1,3 @@ -Filters -------- - .. include:: /includes/pushed-filters.rst The following example filters and output the characters with ages under diff --git a/source/scala/read-from-mongodb.txt b/source/scala/read-from-mongodb.txt index f5213dcf..0ca38aa8 100644 --- a/source/scala/read-from-mongodb.txt +++ b/source/scala/read-from-mongodb.txt @@ -1,52 +1,23 @@ -Use your local SparkSession's ``read`` method to create a DataFrame -representing a collection. +To read data from MongoDB, call the ``read`` method on your +``SparkSession`` object. This method returns a +``DataFrameReader`` +object, which you can use to specify the format and other configuration settings for your +batch read operation. -.. note:: - - A ``DataFrame`` is represented by a ``Dataset`` of - ``Rows``. It is an alias of ``Dataset[Row]``. - -The following example loads the collection specified in the -``SparkConf``: - -.. code-block:: scala +.. include:: /includes/batch-read-settings.rst - val df = spark.read.format("mongodb").load() // Uses the SparkConf for configuration - -To specify a different collection, database, and other :ref:`read -configuration settings `, use the ``option`` method: +The following code example shows how to use the previous +configuration settings to read data from ``people.contacts`` in MongoDB: .. code-block:: scala - val df = spark.read.format("mongodb").option("database", "").option("collection", "").load() - -.. _scala-implicit-schema: - -Schema Inference ----------------- - -When you load a Dataset or DataFrame without a schema, Spark samples -the records to infer the schema of the collection. - -Consider a collection named ``characters``: - -.. include:: /includes/characters-example-collection.rst - -The following operation loads data from the MongoDB collection -specified in ``SparkConf`` and infers the schema: - -.. code-block:: scala - - val df = MongoSpark.load(spark) // Uses the SparkSession - df.printSchema() // Prints DataFrame schema - -``df.printSchema()`` outputs the following schema to the console: + val dataFrame = spark.read + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() -.. code-block:: sh +.. tip:: DataFrame Type - root - |-- _id: struct (nullable = true) - | |-- oid: string (nullable = true) - |-- age: integer (nullable = true) - |-- name: string (nullable = true) - \ No newline at end of file + A DataFrame is represented by a ``Dataset`` of ``Row`` objects. + The ``DataFrame`` type is an alias for ``Dataset[Row]``. diff --git a/source/scala/schema-inference.rst b/source/scala/schema-inference.rst new file mode 100644 index 00000000..8bc3e18c --- /dev/null +++ b/source/scala/schema-inference.rst @@ -0,0 +1,62 @@ +.. _scala-schema-inference: + +.. include:: /includes/schema-inference-intro.rst + +.. code-block:: scala + + val dataFrame = spark.read() + .format("mongodb") + .option("database", "people") + .option("collection", "contacts") + .load() + +To see the inferred schema, use the ``printSchema()`` method on your ``DataFrame`` +object, as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: scala + + dataFrame.printSchema() + + .. output:: + :language: none + :visible: false + + root + |-- _id: struct (nullable = true) + | |-- oid: string (nullable = true) + |-- age: integer (nullable = true) + |-- name: string (nullable = true) + +To see the data in the DataFrame, use the ``show()`` method on your ``DataFrame`` object, +as shown in the following example: + +.. io-code-block:: + :copyable: true + + .. input:: + :language: scala + + dataFrame.show() + + .. output:: + :language: none + :visible: false + + +--------------------+----+-------------+ + | _id| age| name| + +--------------------+----+-------------+ + |[585024d558bef808...| 50|Bilbo Baggins| + |[585024d558bef808...|1000| Gandalf| + |[585024d558bef808...| 195| Thorin| + |[585024d558bef808...| 178| Balin| + |[585024d558bef808...| 77| Kíli| + |[585024d558bef808...| 169| Dwalin| + |[585024d558bef808...| 167| Óin| + |[585024d558bef808...| 158| Glóin| + |[585024d558bef808...| 82| Fíli| + |[585024d558bef808...|null| Bombur| + +--------------------+----+-------------+ diff --git a/source/scala/write-to-mongodb.txt b/source/scala/write-to-mongodb.txt index f5ad4670..d07ba358 100644 --- a/source/scala/write-to-mongodb.txt +++ b/source/scala/write-to-mongodb.txt @@ -1,15 +1,21 @@ -The following example creates a DataFrame from a ``json`` file and -saves it to the MongoDB collection specified in ``SparkConf``: - -.. code-block:: scala +To write data to MongoDB, call the ``write()`` method on your +``DataFrame`` object. This method returns a +``DataFrameWriter`` +object, which you can use to specify the format and other configuration settings for your +batch write operation. - val df = spark.read.format("json").load("example.json") +.. include:: /includes/batch-write-settings.rst - df.write.format("mongodb").mode("overwrite").save() +The following example creates a DataFrame from a ``json`` file and +saves it to the ``people.contacts`` collection in MongoDB: -The MongoDB Connector for Spark supports the following save modes: +.. code-block:: scala -- ``append`` -- ``overwrite`` + val dataFrame = spark.read.format("json") + .load("example.json") -To learn more about save modes, see the `Spark SQL Guide `__. + dataFrame.write.format("mongodb") + .mode("overwrite") + .option("database", "people") + .option("collection", "contacts") + .save() \ No newline at end of file diff --git a/source/streaming-mode.txt b/source/streaming-mode.txt new file mode 100644 index 00000000..456695f6 --- /dev/null +++ b/source/streaming-mode.txt @@ -0,0 +1,38 @@ +.. _streaming-mode: + +============== +Streaming Mode +============== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. toctree:: + + /streaming-mode/streaming-read + /streaming-mode/streaming-write + +Overview +-------- + +The {+connector-short+} supports streaming mode, which uses Spark Structured Streaming +to process data as soon as it's available instead of waiting for a time interval to pass. +Spark Structured Streaming is a data-stream-processing engine that you can access by using +the Dataset or DataFrame API. + +.. include:: includes/streaming-distinction.rst + +The following sections show you how to use the {+connector-short+} to read data from +MongoDB and write data to MongoDB in streaming mode: + +- :ref:`streaming-read-from-mongodb` +- :ref:`streaming-write-to-mongodb` + +.. tip:: Apache Spark Documentation + + To learn more about using Spark to process streams of data, see the + `Spark Programming Guide + `__. diff --git a/source/streaming-mode/streaming-read-config.txt b/source/streaming-mode/streaming-read-config.txt new file mode 100644 index 00000000..a81330d2 --- /dev/null +++ b/source/streaming-mode/streaming-read-config.txt @@ -0,0 +1,194 @@ +.. _spark-streaming-read-conf: + +==================================== +Streaming Read Configuration Options +==================================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. _spark-streaming-input-conf: + +Overview +-------- + +You can configure the following properties when reading data from MongoDB in streaming mode. + +.. include:: /includes/conf-read-prefix.rst + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the read operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation, which must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``aggregation.pipeline`` + - | Specifies a custom aggregation pipeline to apply to the collection + before sending data to Spark. + | The value must be either an extended JSON single document or list + of documents. + | A single document resembles the following: + + .. code-block:: json + + {"$match": {"closed": false}} + + | A list of documents resembles the following: + + .. code-block:: json + + [{"$match": {"closed": false}}, {"$project": {"status": 1, "name": 1, "description": 1}}] + + .. important:: + + Custom aggregation pipelines must be compatible with the + partitioner strategy. For example, aggregation stages such as + ``$group`` do not work with any partitioner that creates more than + one partition. + + * - ``aggregation.allowDiskUse`` + - | Specifies whether to allow storage to disk when running the + aggregation. + | + | **Default:** ``true`` + + * - ``change.stream.`` + - | Change stream configuration prefix. + | See the + :ref:`Change Stream Configuration ` section for more + information about change streams. + + * - ``outputExtendedJson`` + - | When ``true``, the connector converts BSON types not supported by Spark into + extended JSON strings. + When ``false``, the connector uses the original relaxed JSON format for + unsupported types. + | + | **Default:** ``false`` + +.. _change-stream-conf: + +Change Stream Configuration +--------------------------- + +You can configure the following properties when reading a change stream from MongoDB: + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``change.stream.lookup.full.document`` + + - Determines what values your change stream returns on update + operations. + + The default setting returns the differences between the original + document and the updated document. + + The ``updateLookup`` setting also returns the differences between the + original document and updated document, but it also includes a copy of the + entire updated document. + + **Default:** "default" + + .. tip:: + + For more information on how this change stream option works, + see the MongoDB server manual guide + :manual:`Lookup Full Document for Update Operation `. + + * - ``change.stream.micro.batch.max.partition.count`` + - | The maximum number of partitions the {+connector-short+} divides each + micro-batch into. Spark workers can process these partitions in parallel. + | + | This setting applies only when using micro-batch streams. + | + | **Default**: ``1`` + + .. warning:: Event Order + + Specifying a value larger than ``1`` can alter the order in which + the {+connector-short+} processes change events. Avoid this setting + if out-of-order processing could create data inconsistencies downstream. + + * - ``change.stream.publish.full.document.only`` + - | Specifies whether to publish the changed document or the full + change stream document. + | + | When this setting is ``true``, the connector exhibits the following behavior: + + - The connector filters out messages that + omit the ``fullDocument`` field and only publishes the value of the + field. + - If you don't specify a schema, the connector infers the schema + from the change stream document rather than from the underlying collection. + + **Default**: ``false`` + + .. note:: + + This setting overrides the ``change.stream.lookup.full.document`` + setting. + + * - ``change.stream.startup.mode`` + - | Specifies how the connector starts up when no offset is available. + + | This setting accepts the following values: + + - ``latest``: The connector begins processing + change events starting with the most recent event. + It will not process any earlier unprocessed events. + - ``timestamp``: The connector begins processing change events at a specified time. + + To use the ``timestamp`` option, you must specify a time by using the + ``change.stream.startup.mode.timestamp.start.at.operation.time`` setting. + This setting accepts timestamps in the following formats: + + - An integer representing the number of seconds since the + :wikipedia:`Unix epoch ` + - A date and time in + `ISO-8601 `__ + format with one-second precision + - An extended JSON ``BsonTimestamp`` + + **Default**: ``latest`` + +Specifying Properties in ``connection.uri`` +------------------------------------------- + +.. include:: /includes/connection-read-config.rst \ No newline at end of file diff --git a/source/streaming-mode/streaming-read.txt b/source/streaming-mode/streaming-read.txt new file mode 100644 index 00000000..d7433cc7 --- /dev/null +++ b/source/streaming-mode/streaming-read.txt @@ -0,0 +1,386 @@ +.. _streaming-read-from-mongodb: + +=================================== +Read from MongoDB in Streaming Mode +=================================== + +.. toctree:: + :caption: Streaming Read Configuration Options + + /streaming-mode/streaming-read-config + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +Overview +-------- + +When reading a stream from a MongoDB database, the {+connector-long+} supports both +*micro-batch processing* and +*continuous processing*. Micro-batch processing, the default processing engine, achieves +end-to-end latencies as low as 100 milliseconds with exactly-once fault-tolerance +guarantees. Continuous processing is an experimental feature introduced in +Spark version 2.3 that achieves end-to-end latencies as low as 1 millisecond with +at-least-once guarantees. + +To learn more about continuous processing, see the +`Spark documentation `__. + +.. include:: /includes/fact-read-from-change-stream + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + To read data from MongoDB, call the ``readStream()`` method on your + ``SparkSession`` object. This method returns a + ``DataStreamReader`` object, which you can use to specify the format and other + configuration settings for your streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``Trigger.Continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: java + :copyable: true + :emphasize-lines: 1, 4, 8, 13 + + import org.apache.spark.sql.streaming.Trigger; + + Dataset streamingDataset = .readStream() + .format("mongodb") + .load(); + + DataStreamWriter dataStreamWriter = streamingDataset.writeStream() + .trigger(Trigger.Continuous("1 second")) + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append"); + + StreamingQuery query = dataStreamWriter.start(); + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `Java Structured Streaming reference `__. + + - id: python + content: | + + To read data from MongoDB, call the ``readStream`` function on your + ``SparkSession`` object. This function returns a + ``DataStreamReader`` + object, which you can use to specify the format and other configuration settings for your + streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: python + :copyable: true + :emphasize-lines: 2, 7, 13 + + streamingDataFrame = (.readStream + .format("mongodb") + .load() + ) + + dataStreamWriter = (streamingDataFrame.writeStream + .trigger(continuous="1 second") + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append") + ) + + query = dataStreamWriter.start() + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `pyspark Structured Streaming reference `__. + + - id: scala + content: | + + To read data from MongoDB, call the ``readStream`` method on your + ``SparkSession`` object. This method returns a + ``DataStreamReader`` + object, which you can use to specify the format and other configuration settings for your + streaming read operation. + + .. include:: /includes/stream-read-settings.rst + + The following code snippet shows how to use the preceding + configuration settings to continuously process data streamed from MongoDB. + The connector appends all new data to the existing data and asynchronously + writes checkpoints to ``/tmp/checkpointDir`` once per second. Passing the + ``Trigger.Continuous`` parameter to the ``trigger()`` method enables continuous + processing. + + .. code-block:: scala + :copyable: true + :emphasize-lines: 1, 4, 8, 13 + + import org.apache.spark.sql.streaming.Trigger + + val streamingDataFrame = .readStream + .format("mongodb") + .load() + + val dataStreamWriter = streamingDataFrame.writeStream + .trigger(Trigger.Continuous("1 second")) + .format("memory") + .option("checkpointLocation", "/tmp/checkpointDir") + .outputMode("append") + + val query = dataStreamWriter.start() + + .. note:: + + Spark does not begin streaming until you call the + ``start()`` method on a streaming query. + + For a complete list of methods, see the + `Scala Structured Streaming reference `__. + +Example +------- + +The following example shows how to stream data from MongoDB to your console. + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + 1. Create a ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``Dataset`` object that you created with a + ``DataStreamReader``. Specify the format ``console`` using + the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: java + :copyable: true + + // create a local SparkSession + SparkSession spark = SparkSession.builder() + .appName("readExample") + .master("spark://spark-master:") + .config("spark.jars", "") + .getOrCreate(); + + // define the schema of the source collection + StructType readSchema = new StructType() + .add("company_symbol", DataTypes.StringType) + .add("company_name", DataTypes.StringType) + .add("price", DataTypes.DoubleType) + .add("tx_time", DataTypes.TimestampType); + + // define a streaming query + DataStreamWriter dataStreamWriter = spark.readStream() + .format("mongodb") + .option("spark.mongodb.connection.uri", "") + .option("spark.mongodb.database", "") + .option("spark.mongodb.collection", "") + .schema(readSchema) + .load() + // manipulate your streaming data + .writeStream() + .format("console") + .trigger(Trigger.Continuous("1 second")) + .outputMode("append"); + + // run the query + StreamingQuery query = dataStreamWriter.start(); + + - id: python + content: | + + 1. Create a ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``DataFrame`` that you created with a ``DataStreamReader``. + Specify the format ``console`` by using the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: python + :copyable: true + + # create a local SparkSession + spark = SparkSession.builder \ + .appName("readExample") \ + .master("spark://spark-master:") \ + .config("spark.jars", "") \ + .getOrCreate() + + # define the schema of the source collection + readSchema = (StructType() + .add('company_symbol', StringType()) + .add('company_name', StringType()) + .add('price', DoubleType()) + .add('tx_time', TimestampType()) + ) + + # define a streaming query + dataStreamWriter = (spark.readStream + .format("mongodb") + .option("spark.mongodb.connection.uri", ) + .option('spark.mongodb.database', ) + .option('spark.mongodb.collection', ) + .schema(readSchema) + .load() + # manipulate your streaming data + .writeStream + .format("console") + .trigger(continuous="1 second") + .outputMode("append") + ) + + # run the query + query = dataStreamWriter.start() + + - id: scala + content: | + + 1. Create a + ``DataStreamReader`` object that reads from MongoDB. + + #. Create a + ``DataStreamWriter`` object + by calling the ``writeStream()`` method on the streaming + ``DataFrame`` object that you created by using the + ``DataStreamReader``. Specify the format ``console`` by using + the ``format()`` method. + + #. Call the ``start()`` method on the ``DataStreamWriter`` + instance to begin the stream. + + As new data is inserted into MongoDB, MongoDB streams that + data out to your console according to the ``outputMode`` + you specify. + + .. include:: /includes/warn-console-stream.txt + + .. code-block:: scala + :copyable: true + + // create a local SparkSession + val spark = SparkSession.builder + .appName("readExample") + .master("spark://spark-master:") + .config("spark.jars", "") + .getOrCreate() + + // define the schema of the source collection + val readSchema = StructType() + .add("company_symbol", StringType()) + .add("company_name", StringType()) + .add("price", DoubleType()) + .add("tx_time", TimestampType()) + + // define a streaming query + val dataStreamWriter = spark.readStream + .format("mongodb") + .option("spark.mongodb.connection.uri", ) + .option("spark.mongodb.database", ) + .option("spark.mongodb.collection", ) + .schema(readSchema) + .load() + // manipulate your streaming data + .writeStream + .format("console") + .trigger(Trigger.Continuous("1 second")) + .outputMode("append") + + // run the query + val query = dataStreamWriter.start() + +.. important:: Inferring the Schema of a Change Stream + + When the {+connector-short+} infers the schema of a DataFrame + read from a change stream, by default, + it uses the schema of the underlying collection rather than that + of the change stream. If you set the ``change.stream.publish.full.document.only`` + option to ``true``, the connector uses the schema of the + change stream instead. + + For more information about this setting, and to see a full list of change stream + configuration options, see the + :ref:`Read Configuration Options ` guide. + +API Documentation +----------------- + +To learn more about the types used in these examples, see the following Apache Spark +API documentation: + +.. tabs-drivers:: + + tabs: + - id: java-sync + content: | + + - `Dataset `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + + - id: python + content: | + + - `DataFrame `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ + + - id: scala + content: | + + - `Dataset[T] `__ + - `DataStreamReader `__ + - `DataStreamWriter `__ \ No newline at end of file diff --git a/source/streaming-mode/streaming-write-config.txt b/source/streaming-mode/streaming-write-config.txt new file mode 100644 index 00000000..4b540b4e --- /dev/null +++ b/source/streaming-mode/streaming-write-config.txt @@ -0,0 +1,74 @@ +.. _spark-streaming-write-conf: + +===================================== +Streaming Write Configuration Options +===================================== + +.. contents:: On this page + :local: + :backlinks: none + :depth: 1 + :class: singlecol + +.. _spark-streaming-output-conf: + +Overview +-------- + +You can configure the following properties when writing data to MongoDB in streaming mode. + +.. include:: /includes/conf-write-prefix.rst + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Property name + - Description + + * - ``connection.uri`` + - | **Required.** + | The connection string configuration key. + | + | **Default:** ``mongodb://localhost:27017/`` + + * - ``database`` + - | **Required.** + | The database name configuration. + + * - ``collection`` + - | **Required.** + | The collection name configuration. + + * - ``comment`` + - | The comment to append to the write operation. Comments appear in the + :manual:`output of the Database Profiler. ` + | + | **Default:** None + + * - ``mongoClientFactory`` + - | MongoClientFactory configuration key. + | You can specify a custom implementation that must implement the + ``com.mongodb.spark.sql.connector.connection.MongoClientFactory`` + interface. + | + | **Default:** ``com.mongodb.spark.sql.connector.connection.DefaultMongoClientFactory`` + + * - ``checkpointLocation`` + - | The absolute file path of the directory to which the connector writes checkpoint + information. + | + | For more information about checkpoints, see the + `Spark Structured Streaming Programming Guide `__ + | + | **Default:** None + + * - ``forceDeleteTempCheckpointLocation`` + - | A Boolean value that specifies whether to delete existing checkpoint data. + | + | **Default:** ``false`` + +Specifying Properties in ``connection.uri`` +------------------------------------------- + +.. include:: /includes/connection-write-config.rst diff --git a/source/streaming-mode/streaming-write.txt b/source/streaming-mode/streaming-write.txt new file mode 100644 index 00000000..60a6aa3f --- /dev/null +++ b/source/streaming-mode/streaming-write.txt @@ -0,0 +1,393 @@ +.. _streaming-write-to-mongodb: + +================================== +Write to MongoDB in Streaming Mode +================================== + +.. toctree:: + :caption: Streaming Write Configuration Options + + /streaming-mode/streaming-write-config + +.. tabs-drivers:: + + tabs: + + - id: java-sync + content: | + + To write data to MongoDB, call the ``writeStream()`` method on your + ``Dataset`` object. This method returns a + ``DataStreamWriter`` + object, which you can use to specify the format and other configuration settings + for your streaming write operation. + + You must specify the following configuration settings to write to MongoDB: + + .. list-table:: + :header-rows: 1 + :stub-columns: 1 + :widths: 10 40 + + * - Setting + - Description + + * - ``writeStream.format()`` + - Specifies the format of the underlying output data source. Use ``mongodb`` + to write to MongoDB. + + * - ``writeStream.option()`` + - Specifies stream settings, including the MongoDB deployment + :manual:`connection string `, + MongoDB database and collection, and checkpoint directory. + + For a list of write stream configuration options, see + the :ref:`spark-streaming-write-conf` guide. + + * - ``writeStream.outputMode()`` + - Specifies how data of a streaming DataFrame is + written to a streaming sink. To view a list of all + supported output modes, see the `Java outputMode documentation `__. + + * - ``writeStream.trigger()`` + - Specifies how often the {+connector-short+} writes results + to the streaming sink. + + To use continuous processing, pass ``Trigger.Continuous(