Skip to content

Commit

Permalink
332 improve spark integration (#333)
Browse files Browse the repository at this point in the history
* Refactor demo notebook structure by converting a code cell to markdown for improved documentation. Removed redundant markdown cell and updated execution metadata for clarity. This enhances the user experience by providing better context and organization within the notebook.

* Refactor Spark session creation in notebook and JavaScript model

- Renamed `create_spark_dev` to `create_spark` in `startup.py` to simplify the function name.
- Made `create_spark` available in IPython's global namespace for easier access.
- Removed the default Spark instance creation to allow for manual session management.
- Updated `SparkModel.js` to use the new `create_spark` function for initializing Spark sessions, enhancing integration with the backend API.

* Enhance Spark session creation and configuration handling

- Updated `create_spark` function in `startup.py` to accept an optional `notebook_path` parameter, allowing for dynamic configuration retrieval based on the notebook context.
- Improved error handling in `create_spark` to log errors and use default Spark configuration when the API request fails.
- Modified `SparkModel.js` to pass the `notebookPath` to the `create_spark` function, ensuring proper session initialization.
- Cleaned up the demo notebook by removing outdated code cells, enhancing clarity and usability.

* Refactor Spark configuration keys in create_spark function

- Updated the configuration keys in the create_spark function within startup.py to use a more concise naming convention, changing 'spark.executor.memory', 'spark.executor.cores', and 'spark.executor.instances' to 'executor_memory', 'executor_cores', and 'executor_instances' respectively.
- Adjusted the corresponding references in the Spark session creation logic to align with the new key names, improving consistency and readability of the configuration handling.

* Refactor Spark configuration keys in create_spark function

Updated the configuration keys in the create_spark function within startup.py to use the correct Spark naming convention, changing 'executor_memory', 'executor_cores', and 'executor_instances' to 'spark.executor.memory', 'spark.executor.cores', and 'spark.executor.instances'. Adjusted the corresponding references in the Spark session creation logic for improved consistency and clarity.
  • Loading branch information
xuwenyihust authored Dec 10, 2024
1 parent 0cf6d19 commit 02e3486
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 181 deletions.
28 changes: 20 additions & 8 deletions docker/notebook/startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,21 @@ def _repr_html_(self):
</div>
"""

def create_spark_dev():
def create_spark(notebook_path=None):
logger.info("Creating Spark session")
try:
config_json = requests.get("http://server:5002/spark_app/config").json()
if notebook_path:
config_json = requests.get(f"http://server:5002/spark_app/{notebook_path}/config").json()
else:
config_json = requests.get("http://server:5002/spark_app/config").json()
except Exception as e:
config_json = 'Error loading config: ' + str(e)

logger.error(f"Error loading config: {str(e)}. Using defaults.")
config_json = {
'spark.executor.memory': '1g',
'spark.executor.cores': 1,
'spark.executor.instances': 1
}

spark = PawMarkSparkSession(
config_json,
SparkSession.builder \
Expand All @@ -125,12 +133,16 @@ def create_spark_dev():
.config("spark.eventLog.dir", "/opt/data/spark-events") \
.config("spark.history.fs.logDirectory", "/opt/data/spark-events") \
.config("spark.sql.warehouse.dir", "/opt/data/spark-warehouse") \
.config("executor.memory", config_json['executor.memory']) \
.config("executor.cores", config_json['executor.cores']) \
.config("spark.executor.memory", config_json['spark.executor.memory']) \
.config("spark.executor.cores", config_json['spark.executor.cores']) \
.config("spark.executor.instances", config_json['spark.executor.instances']) \
.getOrCreate()
)
)

return spark

spark = create_spark_dev()
# Make create_spark_dev available in IPython's global namespace
ip = get_ipython()
if ip is not None:
# Add to global namespace
ip.user_global_ns['create_spark'] = create_spark
150 changes: 0 additions & 150 deletions examples/[email protected]/demo.ipynb
Original file line number Diff line number Diff line change
@@ -1,70 +1,5 @@
{
"cells": [
{
"cell_type": "code",
"isExecuted": true,
"lastExecutionResult": "success",
"lastExecutionTime": "2024-12-10 03:27:50",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div>\n",
" <p><b>SparkSession - in-memory</b></p>\n",
" \n",
" <div>\n",
" <p><b>SparkContext</b></p>\n",
"\n",
" <p><a href=\"http://8e207d700c27:4040\">Spark UI</a></p>\n",
"\n",
" <dl>\n",
" <dt>Version</dt>\n",
" <dd><code>v3.5.0</code></dd>\n",
" <dt>Master</dt>\n",
" <dd><code>spark://spark-master:7077</code></dd>\n",
" <dt>AppName</dt>\n",
" <dd><code>spark-1733801092185</code></dd>\n",
" </dl>\n",
" </div>\n",
" \n",
" </div>\n",
" "
],
"text/plain": [
"<pyspark.sql.session.SparkSession at 0x7fffe2dc9ed0>"
]
},
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"\n",
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder\\\n",
" .appName(\"spark-1733801270245\")\\\n",
" .master(\"spark://spark-master:7077\")\\\n",
" .config(\"spark.jars.packages\", \"io.delta:delta-spark_2.12:3.0.0\")\\\n",
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\\\n",
" .config(\"spark.sql.catalog.spark_catalog\", \"org.apache.spark.sql.delta.catalog.DeltaCatalog\")\\\n",
" .config(\"spark.eventLog.enabled\", \"true\")\\\n",
" .config(\"spark.eventLog.dir\", \"/opt/data/spark-events\")\\\n",
" .config(\"spark.history.fs.logDirectory\", \"/opt/data/spark-events\")\\\n",
" .config(\"spark.sql.warehouse.dir\", \"/opt/data/spark-warehouse\")\\\n",
" .config(\"spark.executor.memory\", \"1g\")\\\n",
" .config(\"spark.executor.cores\", 1)\\\n",
" .config(\"spark.executor.instances\", 1)\\\n",
" .config(\"spark.driver.memory\", \"1g\")\\\n",
" .config(\"spark.driver.cores\", 1)\\\n",
" .getOrCreate()\n",
"\n",
"spark\n"
]
},
{
"cell_type": "markdown",
"isExecuted": true,
Expand All @@ -75,91 +10,6 @@
"- This is just a demo notebook\n",
"- For testing only"
]
},
{
"cell_type": "code",
"execution_count": null,
"isExecuted": false,
"lastExecutionResult": "success",
"lastExecutionTime": "2024-08-04 15:29:17",
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
" <div style=\"border: 1px solid #e8e8e8; padding: 10px;\">\n",
" <h3>Spark Session Information</h3>\n",
" <p><strong>Application ID:</strong> app-20240804152430-0000</p>\n",
" <p><strong>Spark UI:</strong> <a href=\"http://localhost:18080/history/app-20240804152430-0000\">http://localhost:18080/history/app-20240804152430-0000</a></p>\n",
" </div>\n",
" "
],
"text/plain": [
"Custom Spark Session (App ID: app-20240804152430-0000) - UI: http://66eef2d0ade3:4040"
]
},
"execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# SparkSession is already defined in `spark` variable\n",
"spark"
]
},
{
"cell_type": "code",
"execution_count": 1,
"isExecuted": false,
"metadata": {},
"outputs": [
{
"ename": "NameError",
"evalue": "name 'a' is not defined",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[1], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[43ma\u001b[49m \u001b[38;5;241m+\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m2233666777888\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n",
"\u001b[0;31mNameError\u001b[0m: name 'a' is not defined"
]
}
],
"source": [
"print(a + \"2233666777888\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"isExecuted": false,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"66\n",
"77\n"
]
}
],
"source": [
"print(66)\n",
"print(77)"
]
},
{
"cell_type": "code",
"execution_count": null,
"isExecuted": false,
"metadata": {},
"outputs": [],
"source": [
"spark.stop()"
]
}
],
"metadata": {
Expand Down
26 changes: 3 additions & 23 deletions webapp/src/models/SparkModel.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,29 +60,9 @@ class SparkModel {
// Generate a unique spark app ID
const sparkAppId = `spark-${Date.now()}`;

// Create a cell with Spark initialization code that uses the config
const sparkInitCode = `
from pyspark.sql import SparkSession
spark = SparkSession.builder\\
.appName("${sparkAppId}")\\
.master("spark://spark-master:7077")\\
.config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")\\
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\\
.config("spark.eventLog.enabled", "true")\\
.config("spark.eventLog.dir", "/opt/data/spark-events")\\
.config("spark.history.fs.logDirectory", "/opt/data/spark-events")\\
.config("spark.sql.warehouse.dir", "/opt/data/spark-warehouse")\\
.config("spark.executor.memory", "${sparkConfig['spark.executor.memory']}")\\
.config("spark.executor.cores", ${sparkConfig['spark.executor.cores']})\\
.config("spark.executor.instances", ${sparkConfig['spark.executor.instances']})\\
.config("spark.driver.memory", "${sparkConfig['spark.driver.memory']}")\\
.config("spark.driver.cores", ${sparkConfig['spark.driver.cores']})\\
.getOrCreate()
spark
`;
// Create a cell with Spark initialization code that uses the existing spark instance
const sparkInitCode = `spark = create_spark("${notebookPath}")
spark`;

// Create the Spark session with this config
const response = await fetch(`${config.serverBaseUrl}/spark_app/${sparkAppId}`, {
Expand Down

0 comments on commit 02e3486

Please sign in to comment.