diff --git a/docs/changelog/next_release/154.improvement.rst b/docs/changelog/next_release/154.improvement.rst new file mode 100644 index 000000000..745989a87 --- /dev/null +++ b/docs/changelog/next_release/154.improvement.rst @@ -0,0 +1,4 @@ +Drastically improve ``Greenplum`` documentation: +* Added information about network ports, grants, ``pg_hba.conf`` and so on. +* Added interaction schemas for reading, writing and executing statements in Greenplum. +* Added recommendations about reading data from views and ``JOIN`` results from Greenplum. diff --git a/docs/conf.py b/docs/conf.py index 87d6fd17b..06a5b08aa 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -56,6 +56,7 @@ "sphinx.ext.autosummary", "sphinxcontrib.autodoc_pydantic", "sphinxcontrib.towncrier", # provides `towncrier-draft-entries` directive + "sphinxcontrib.plantuml", ] numpydoc_show_class_members = True autodoc_pydantic_model_show_config = False diff --git a/docs/connection/db_connection/greenplum/execute.rst b/docs/connection/db_connection/greenplum/execute.rst index b0833b213..e2179a4ec 100644 --- a/docs/connection/db_connection/greenplum/execute.rst +++ b/docs/connection/db_connection/greenplum/execute.rst @@ -3,6 +3,47 @@ Executing statements in Greenplum ================================== +Interaction schema +------------------ + +Unlike reading & writing, executing statements in Greenplum is done **only** through Greenplum master node, +without any interaction between Greenplum segments and Spark executors. More than that, Spark executors are not used in this case. + +The only port used while interacting with Greenplum in this case is ``5432`` (Greenplum master port). + +.. dropdown:: Spark <-> Greenplum interaction during Greenplum.execute()/Greenplum.fetch() + + .. plantuml:: + + @startuml + title Greenplum master <-> Spark driver + box "Spark" + participant "Spark driver" + end box + + box "Greenplum" + participant "Greenplum master" + end box + + == Greenplum.check() == + + activate "Spark driver" + "Spark driver" -> "Greenplum master" ++ : CONNECT + + == Greenplum.execute(statement) == + "Spark driver" --> "Greenplum master" : EXECUTE statement + "Greenplum master" -> "Spark driver" : RETURN result + + == Greenplum.close() == + "Spark driver" --> "Greenplum master" : CLOSE CONNECTION + + deactivate "Greenplum master" + deactivate "Spark driver" + @enduml + +Options +------- + .. currentmodule:: onetl.connection.db_connection.greenplum.connection .. automethod:: Greenplum.fetch diff --git a/docs/connection/db_connection/greenplum/prerequisites.rst b/docs/connection/db_connection/greenplum/prerequisites.rst index a545fdc27..f7b8e9d32 100644 --- a/docs/connection/db_connection/greenplum/prerequisites.rst +++ b/docs/connection/db_connection/greenplum/prerequisites.rst @@ -43,3 +43,179 @@ There are several ways to do that. See :ref:`java-packages` for details. If you're uploading package to private package repo, use ``groupId=io.pivotal`` and ``artifactoryId=greenplum-spark_2.12`` (``2.12`` is Scala version) to give uploaded package a proper name. + +Connecting to Greenplum +----------------------- + +Interaction schema +~~~~~~~~~~~~~~~~~~ + +Spark executors open ports to listen incoming requests. +Greenplum segments are initiating connections to Spark executors using `EXTERNAL TABLE `_ +functionality, and send/read data using `gpfdist `_ protocol. + +Data is **not** send through Greenplum master. +Greenplum master only receives commands to start reading/writing process, and manages all the metadata (external table location, schema and so on). + +More details can be found in `official documentation `_. + +Number of parallel connections +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +.. warning:: + + This is very important!!! + + If you don't limit number of connections, you can exceed the `max_connections `_ + limit set on the Greenplum side. It's usually not so high, e.g. 500-1000 connections max, + depending on your Greenplum instance settings and using connection balancers like ``pgbouncer``. + + Consuming all available connections means **nobody** (even admin users) can connect to Greenplum. + +Each job on the Spark executor makes its own connection to Greenplum master node, +so you need to limit number of connections to avoid opening too many of them. + +* Reading about ``5-10Gb`` of data requires about ``3-5`` parallel connections. +* Reading about ``20-30Gb`` of data requires about ``5-10`` parallel connections. +* Reading about ``50Gb`` of data requires ~ ``10-20`` parallel connections. +* Reading about ``100+Gb`` of data requires ``20-30`` parallel connections. +* Opening more than ``30-50`` connections is not recommended. + +Number of connections can be limited by 2 ways: + +* By limiting number of Spark executors and number of cores per-executor. Max number of parallel jobs is ``executors * cores``. + +.. tabs:: + + .. code-tab:: py Spark with master=local + + ( + SparkSession.builder + # Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10 + .config("spark.master", "local[10]") + .config("spark.executor.cores", 1) + ) + + .. code-tab:: py Spark with master=yarn or master=k8s, dynamic allocation + + ( + SparkSession.builder + .config("spark.master", "yarn") + # Spark will start MAX 10 executors with 1 core each (dynamically), so max number of parallel jobs is 10 + .config("spark.dynamicAllocation.maxExecutors", 10) + .config("spark.executor.cores", 1) + ) + + .. code-tab:: py Spark with master=yarn or master=k8s, static allocation + + ( + SparkSession.builder + .config("spark.master", "yarn") + # Spark will start EXACTLY 10 executors with 1 core each, so max number of parallel jobs is 10 + .config("spark.executor.instances", 10) + .config("spark.executor.cores", 1) + ) + +* By limiting connection pool size user by Spark (**only** for Spark with ``master=local``): + +.. code:: python + + spark = SparkSession.builder.config("spark.master", "local[*]").getOrCreate() + + # No matter how many executors are started and how many cores they have, + # number of connections cannot exceed pool size: + Greenplum( + ..., + extra={ + "pool.maxSize": 10, + }, + ) + +See `connection pooling `_ +documentation. + + +* By setting :obj:`num_partitions ` + and :obj:`partition_column ` (not recommended). + +Allowing connection to Greenplum master +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Ask your Greenplum cluster administrator to allow your user to connect to Greenplum master node, +e.g. by updating ``pg_hba.conf`` file. + +More details can be found in `official documentation `_. + +Network ports +~~~~~~~~~~~~~ + +To read data from Greenplum using Spark, following ports should be opened in firewall between Spark and Greenplum: + +* Spark driver and all Spark executors -> port ``5432`` on Greenplum master node. + + This port number should be set while connecting to Greenplum: + + .. code:: python + + Greenplum(host="master.host", port=5432, ...) + +* Greenplum segments -> some port range (e.g. ``41000-42000``) **listened by Spark executor**. + + This range should be set in ``extra`` option: + + .. code:: python + + Greenplum( + ..., + extra={ + "server.port": "41000-42000", + }, + ) + + Number of ports in this range is ``number of parallel running Spark sessions`` * ``number of parallel connections per session``. + + Number of connections per session (see below) is usually less than ``30`` (see below). + + Number of session depends on your environment: + * For ``master=local`` only few ones-tens sessions can be started on the same host, depends on available RAM and CPU. + + * For ``master=yarn`` / ``master=k8s`` hundreds or thousands of sessions can be started simultaneously, + but they are executing on different cluster nodes, so one port can be opened on different nodes at the same time. + +More details can be found in official documentation: + * `port requirements `_ + * `format of server.port value `_ + * `port troubleshooting `_ + +Required grants +~~~~~~~~~~~~~~~ + +Ask your Greenplum cluster administrator to set following grants for a user, +used for creating a connection: + +.. tabs:: + + .. code-tab:: sql Reading & writing + + GRANT USAGE ON SCHEMA myschema TO username; + GRANT CREATE ON SCHEMA myschema TO username; + GRANT SELECT, INSERT ON SCHEMA myschema.mytable TO username; + ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist') CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist'); + + .. code-tab:: sql Reading from Greenplum + + GRANT USAGE ON SCHEMA schema_to_read TO username; + GRANT CREATE ON SCHEMA schema_to_read TO username; + GRANT SELECT ON SCHEMA schema_to_read.table_to_read TO username; + -- yes, ``writable``, because data is written from Greenplum to Spark executor. + ALTER USER username CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist'); + + .. code-tab:: sql Writing to Greenplum + + GRANT USAGE ON SCHEMA schema_to_write TO username; + GRANT CREATE ON SCHEMA schema_to_write TO username; + GRANT SELECT, INSERT ON SCHEMA schema_to_write.table_to_write TO username; + -- yes, ``readable``, because data is read from Spark executor to Greenplum. + ALTER USER username CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist'); + +More details can be found in `official documentation `_. diff --git a/docs/connection/db_connection/greenplum/read.rst b/docs/connection/db_connection/greenplum/read.rst index 2640f7e6c..97790bfac 100644 --- a/docs/connection/db_connection/greenplum/read.rst +++ b/docs/connection/db_connection/greenplum/read.rst @@ -8,20 +8,145 @@ For reading data from Greenplum, use :obj:`DBReader `, - and drop staging table after reading is finished. +Interaction schema +------------------ - In this case data will be read directly from Greenplum segment nodes in a distributed way. +High-level schema is described in :ref:`greenplum-prerequisites`. You can find detailed interaction schema below. + +.. dropdown:: Spark <-> Greenplum interaction during DBReader.run() + + .. plantuml:: + + @startuml + title Greenplum master <-> Spark driver + box "Spark" + participant "Spark driver" + participant "Spark executor1" + participant "Spark executor2" + participant "Spark executorN" + end box + + box "Greenplum" + participant "Greenplum master" + participant "Greenplum segment1" + participant "Greenplum segment2" + participant "Greenplum segmentN" + end box + + == Greenplum.check() == + + activate "Spark driver" + "Spark driver" -> "Greenplum master" ++ : CONNECT + + "Spark driver" --> "Greenplum master" : CHECK IF TABLE EXISTS gp_table + "Greenplum master" --> "Spark driver" : TABLE EXISTS + "Spark driver" -> "Greenplum master" : SHOW SCHEMA FOR gp_table + "Greenplum master" --> "Spark driver" : (id bigint, col1 int, col2 text, ...) + + == DBReader.run() == + + "Spark driver" -> "Spark executor1" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1 + "Spark driver" -> "Spark executor2" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2 + "Spark driver" -> "Spark executorN" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N + + note right of "Spark driver" : This is done in parallel,\nexecutors are independent\n|\n|\n|\nV + "Spark executor1" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...) USING address=executor1_host:executor1_port;\nINSERT INTO EXTERNAL TABLE spark_executor1 FROM gp_table WHERE gp_segment_id = 1 + note right of "Greenplum master" : Each white vertical line here is a opened connection to master.\nUsually, **N+1** connections are created from Spark to Greenplum master + "Greenplum master" --> "Greenplum segment1" ++ : SELECT DATA FROM gp_table_data_on_segment1 TO spark_executor1 + note right of "Greenplum segment1" : No direct requests between Greenplum segments & Spark.\nData transfer is always initiated by Greenplum segments. + + "Spark executor2" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...) USING address=executor2_host:executor2_port;\nINSERT INTO EXTERNAL TABLE spark_executor2 FROM gp_table WHERE gp_segment_id = 2 + "Greenplum master" --> "Greenplum segment2" ++ : SELECT DATA FROM gp_table_data_on_segment2 TO spark_executor2 + + "Spark executorN" -> "Greenplum master" ++ : CREATE WRITABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...) USING address=executorN_host:executorN_port;\nINSERT INTO EXTERNAL TABLE spark_executorN FROM gp_table WHERE gp_segment_id = N + "Greenplum master" --> "Greenplum segmentN" ++ : SELECT DATA FROM gp_table_data_on_segmentN TO spark_executorN + + "Greenplum segment1" ->o "Spark executor1" -- : INITIALIZE CONNECTION TO spark_executor1\nPUSH DATA TO spark_executor1 + note left of "Spark executor1" : Circle is an open GPFDIST port,\nlistened by executor + + "Greenplum segment2" ->o "Spark executor2" -- : INITIALIZE CONNECTION TO spark_executor2\nPUSH DATA TO spark_executor2 + "Greenplum segmentN" ->o "Spark executorN" -- : INITIALIZE CONNECTION TO spark_executorN\nPUSH DATA TO spark_executorN + + == Spark.stop() == + + "Spark executor1" --> "Greenplum master" : DROP TABLE spark_executor1 + deactivate "Greenplum master" + "Spark executor2" --> "Greenplum master" : DROP TABLE spark_executor2 + deactivate "Greenplum master" + "Spark executorN" --> "Greenplum master" : DROP TABLE spark_executorN + deactivate "Greenplum master" + + "Spark executor1" --> "Spark driver" -- : DONE + "Spark executor2" --> "Spark driver" -- : DONE + "Spark executorN" --> "Spark driver" -- : DONE + + "Spark driver" --> "Greenplum master" : CLOSE CONNECTION + deactivate "Greenplum master" + deactivate "Spark driver" + @enduml + +Recommendations +--------------- + +Reading from views +~~~~~~~~~~~~~~~~~~ + +This connector is **NOT** designed to read data from views. + +You can technically read data from a view which has +`gp_segment_id `_ column. +But this is **not** recommended because each Spark executor will run the same query, which may lead to running duplicated calculations +and sending data between segments only to skip most of the result and select only small part. + +Prefer following option: + * Create staging table to store result data, using :obj:`Greenplum.execute ` + * Use the same ``.execute`` method run a query ``INSERT INTO staging_table AS SELECT FROM some_view``. This will be done on Greenplum segments side, query will be run only once. + * Read data from staging table to Spark executor using :obj:`DBReader `. + * Drop staging table using ``.execute`` method. + +Using ``JOIN`` on Greenplum side +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If you need to get data of joining 2 tables in Greenplum, you should: + * Create staging table to store result data, using ``Greenplum.execute`` + * Use the same ``Greenplum.execute`` run a query ``INSERT INTO staging_table AS SELECT FROM table1 JOIN table2``. This will be done on Greenplum segments side, in a distributed way. + * Read data from staging table to Spark executor using ``DBReader``. + * Drop staging table using ``Greenplum.execute``. .. warning:: - Greenplum connection does **NOT** support reading data from views which does not have ``gp_segment_id`` column. - Either add this column to a view, or use stating table solution (see above). + Do **NOT** try to read data from ``table1`` and ``table2`` using ``DBReader``, and then join the resulting dataframes! + + This will lead to sending all the data from both tables to Spark executor memory, and then ``JOIN`` + will be performed on Spark side, not Greenplum. This is **very** inefficient. + +Using ``TEMPORARY`` tables +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Someone could think that writing data from ``VIEW`` or result of ``JOIN`` to ``TEMPORARY`` table, +and then passing it to DBReader, is an efficient way to read data from Greenplum, because temp tables are not generating WAL files, +and are automatically deleted after finishing the transaction. + +That's will **not** work. Each Spark executor establishes its own connection to Greenplum, +and thus reads its own temporary table, which does not contain any data. + +You should use `UNLOGGED `_ tables +to write data to staging table without generating useless WAL logs. + +Troubleshooting +--------------- + +Mapping of Greenplum types to Spark types +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +See `official documentation `_ +for more details. onETL does not perform any additional casting of types while reading data. + +Options +------- .. currentmodule:: onetl.connection.db_connection.greenplum.options diff --git a/docs/connection/db_connection/greenplum/write.rst b/docs/connection/db_connection/greenplum/write.rst index aeb688ac5..c4edb1a11 100644 --- a/docs/connection/db_connection/greenplum/write.rst +++ b/docs/connection/db_connection/greenplum/write.rst @@ -5,6 +5,100 @@ Writing to Greenplum For writing data to Greenplum, use :obj:`DBWriter ` with options below. + +Interaction schema +------------------ + +High-level schema is described in :ref:`greenplum-prerequisites`. You can find detailed interaction schema below. + +.. dropdown:: Spark <-> Greenplum interaction during DBReader.write() + + .. plantuml:: + + @startuml + title Greenplum master <-> Spark driver + box "Spark" + participant "Spark driver" + participant "Spark executor1" + participant "Spark executor2" + participant "Spark executorN" + end box + + box "Greenplum" + participant "Greenplum master" + participant "Greenplum segment1" + participant "Greenplum segment2" + participant "Greenplum segmentN" + end box + + == Greenplum.check() == + + activate "Spark driver" + "Spark driver" -> "Greenplum master" ++ : CONNECT + "Spark driver" --> "Greenplum master" ++ : CHECK IF TABLE EXISTS gp_table + "Greenplum master" --> "Spark driver" : TABLE NOT EXISTS + + == DBWriter.run(df) == + + "Spark driver" -> "Spark executor1" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 1 + "Spark driver" -> "Spark executor2" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION 2 + "Spark driver" -> "Spark executorN" ++ : START EXECUTOR FOR df(id bigint, col1 int, col2 text, ...) PARTITION N + + note right of "Spark driver" : This is done in parallel,\nexecutors are independent\n|\n|\n|\nV + "Spark executor1" -> "Greenplum master" ++ : CREATE READABLE EXTERNAL TABLE spark_executor1 (id bigint, col1 int, col2 text, ...) USING address=executor1_host:executor1_port;\nINSERT INTO gp_table FROM spark_executor1 + note right of "Greenplum master" : Each white vertical line here is a opened connection to master.\nUsually, **N+1** connections are created from Spark to Greenplum master + "Greenplum master" --> "Greenplum segment1" ++ : SELECT DATA FROM spark_executor1 TO gp_table_data_on_segment1 + note right of "Greenplum segment1" : No direct requests between Greenplum segments & Spark.\nData transfer is always initiated by Greenplum segments. + + "Spark executor2" -> "Greenplum master" ++ : CREATE READABLE EXTERNAL TABLE spark_executor2 (id bigint, col1 int, col2 text, ...) USING address=executor2_host:executor2_port;\nINSERT INTO gp_table FROM spark_executor2 + "Greenplum master" --> "Greenplum segment2" ++ : SELECT DATA FROM spark_executor2 TO gp_table_data_on_segment2 + + "Spark executorN" -> "Greenplum master" ++ : CREATE READABLE EXTERNAL TABLE spark_executorN (id bigint, col1 int, col2 text, ...) USING address=executorN_host:executorN_port;\nINSERT INTO gp_table FROM spark_executorN + "Greenplum master" --> "Greenplum segmentN" ++ : SELECT DATA FROM spark_executorN TO gp_table_data_on_segmentN + + "Greenplum segment1" -->o "Spark executor1" : INITIALIZE CONNECTION TO spark_executor1 + "Spark executor1" -> "Greenplum segment1" : READ DATA FROM spark_executor1 + note left of "Spark executor1" : Circle is an open GPFDIST port,\nlistened by executor + deactivate "Greenplum segment1" + + "Greenplum segment2" -->o "Spark executor2" : INITIALIZE CONNECTION TO spark_executor2 + "Spark executor2" -> "Greenplum segment2" : READ DATA FROM spark_executor2 + deactivate "Greenplum segment2" + + "Greenplum segmentN" -->o "Spark executorN" : INITIALIZE CONNECTION TO spark_executorN + "Spark executorN" -> "Greenplum segmentN" : READ DATA FROM spark_executorN + deactivate "Greenplum segmentN" + + == Finished == + + "Spark executor1" --> "Greenplum master" : DROP TABLE spark_executor1 + deactivate "Greenplum master" + "Spark executor2" --> "Greenplum master" : DROP TABLE spark_executor2 + deactivate "Greenplum master" + "Spark executorN" --> "Greenplum master" : DROP TABLE spark_executorN + deactivate "Greenplum master" + + "Spark executor1" --> "Spark driver" -- : DONE + "Spark executor2" --> "Spark driver" -- : DONE + "Spark executorN" --> "Spark driver" -- : DONE + + "Spark driver" --> "Greenplum master" : CLOSE CONNECTION + deactivate "Greenplum master" + deactivate "Spark driver" + @enduml + +Troubleshooting +--------------- + +Mapping of Spark types to Greenplum types +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +See `official documentation `_ +for more details. onETL does not perform any additional casting of types while writing data. + +Options +------- + .. currentmodule:: onetl.connection.db_connection.greenplum.options .. autopydantic_model:: GreenplumWriteOptions diff --git a/onetl/connection/db_connection/greenplum/connection.py b/onetl/connection/db_connection/greenplum/connection.py index 99de7d90c..42e56fcb3 100644 --- a/onetl/connection/db_connection/greenplum/connection.py +++ b/onetl/connection/db_connection/greenplum/connection.py @@ -124,35 +124,24 @@ class Greenplum(JDBCMixin, DBConnection): from onetl.connection import Greenplum from pyspark.sql import SparkSession - # Please ask your DevOps and Greenplum admin what port range - # on Spark side can be used to accept requests from Greenplum segments + # See "Reading from Greenplum" -> "Network Access" extra = { - "server.port": "49152-65535", + "server.port": "41000-42000", } # Create Spark session with Greenplum connector loaded # See Prerequisites page for more details maven_packages = Greenplum.get_packages(spark_version="3.2") spark = ( - SparkSession.builder.appName("spark-app-name") - .config("spark.jars.packages", ",".join(maven_packages)) - .config("spark.dynamicAllocation.maxExecutors", 10) - .config("spark.executor.cores", 1) + SparkSession.builder.appName("spark-app-name").config( + "spark.jars.packages", ",".join(maven_packages) + ) + # IMPORTANT!!! + # Set number of executors according to "Reading from Greenplum" -> "Number of executors" .getOrCreate() ) - # IMPORTANT!!! - # Each job on the Spark executor make its own connection to Greenplum master node, - # so we need to limit number of connections to avoid opening too many of them. - # - # Table size ~20Gb requires about 10 executors * cores, - # ~50Gb requires ~ 20 executors * cores, - # 100Gb+ requires 30 executors * cores. - # - # Cores number can be increased, but executors count should be reduced - # to keep the same number of executors * cores. - # Create connection greenplum = Greenplum( host="master.host.or.ip", diff --git a/onetl/connection/db_connection/greenplum/options.py b/onetl/connection/db_connection/greenplum/options.py index 86785155e..7d4638412 100644 --- a/onetl/connection/db_connection/greenplum/options.py +++ b/onetl/connection/db_connection/greenplum/options.py @@ -107,7 +107,9 @@ class Config: .. warning:: - You should not change this option, unless you know what you're doing + You should not change this option, unless you know what you're doing. + + It's preferable to use default values to read data parallel by number of segments in Greenplum cluster. Possible values: * ``None`` (default): diff --git a/requirements/docs.txt b/requirements/docs.txt index d3fc9555e..4ff1db3e9 100644 --- a/requirements/docs.txt +++ b/requirements/docs.txt @@ -3,10 +3,10 @@ furo importlib-resources<6 numpydoc pygments-csv-lexer -# https://github.com/sphinx-doc/sphinx/issues/11662 -sphinx<7.2.5 +sphinx sphinx-copybutton sphinx-design +sphinx-plantuml sphinx-tabs sphinx-toolbox sphinx_substitution_extensions