Skip to content

Commit

Permalink
321 support getting spark config from be (#325)
Browse files Browse the repository at this point in the history
* Refactor Notebook.js, SparkAppConfigModel.js, Config.js, spark_app.py, and spark_app.py to support saving Spark configuration to the database.

* Refactor SparkAppConfigModel, Notebook.js, and spark_app.py to update Spark app configuration handling

* Fix missing newline at end of file in test_spark_app_route.py

* Refactor test_spark_app_route.py to use spark_app_blueprint in server/tests/routes/test_spark_app_route.py

* Refactor test_spark_app_route.py to use POST method instead of GET in server/tests/routes/test_spark_app_route.py

* Refactor test_spark_app_route.py to use POST method instead of GET in server/tests/routes/test_spark_app_route.py

* Refactor test_spark_app_route.py to use POST method instead of GET in server/tests/routes/test_spark_app_route.py

* Refactor test_spark_app_route.py to use POST method instead of GET in server/tests/routes/test_spark_app_route.py

* Refactor test_spark_app_route.py to use POST method instead of GET in server/tests/routes/test_spark_app_route.py

* Refactor spark_app.py and test_spark_app_route.py to add spark_app endpoint and update test case in server/tests/routes/test_spark_app_route.py

* Refactor SparkAppConfigModel, Notebook.js, Config.js, and spark_app.py to update Spark app configuration handling
  • Loading branch information
xuwenyihust authored Aug 23, 2024
1 parent 2f15c64 commit bed3350
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 110 deletions.
1 change: 1 addition & 0 deletions docker/postgres/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ GRANT ALL PRIVILEGES ON SEQUENCE directories_id_seq TO server;
GRANT ALL PRIVILEGES ON TABLE spark_apps TO server;

GRANT ALL PRIVILEGES ON TABLE spark_app_config TO server;
GRANT ALL PRIVILEGES ON SEQUENCE spark_app_config_id_seq TO server;

-- Add some initial data
-- user_0 -12345A
Expand Down
20 changes: 12 additions & 8 deletions server/app/routes/spark_app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from flask import Blueprint, jsonify, request
from app.services.spark_app import SparkApp
from flask_jwt_extended import jwt_required
from app.auth.auth import identify_user
import logging

spark_app_blueprint = Blueprint('spark_app', __name__)
Expand All @@ -12,13 +14,15 @@ def create_spark_app(spark_app_id):
notebook_path = data.get('notebookPath', None)
return SparkApp.create_spark_app(spark_app_id=spark_app_id, notebook_path=notebook_path)

@spark_app_blueprint.route('/spark_app/<path:notbook_id>/config', methods=['GET'])
def get_spark_app_config(notbook_id):
logging.info(f"Getting spark app config for notebook id: {notbook_id}")
return SparkApp.get_spark_app_config_by_notebook_id(notbook_id)
# @jwt_required()
# @identify_user
@spark_app_blueprint.route('/spark_app/<path:notbook_path>/config', methods=['GET'])
def get_spark_app_config(notbook_path):
logging.info(f"Getting spark app config for notebook path: {notbook_path}")
return SparkApp.get_spark_app_config_by_notebook_path(notbook_path)

@spark_app_blueprint.route('/spark_app/<path:notbook_id>/config', methods=['POST'])
def update_spark_app_config(notbook_id):
logging.info(f"Updating spark app config for notebook id: {notbook_id}")
@spark_app_blueprint.route('/spark_app/<path:notbook_path>/config', methods=['POST'])
def update_spark_app_config(notbook_path):
logging.info(f"Updating spark app config for notebook path: {notbook_path}")
data = request.get_json()
return SparkApp.update_spark_app_config_by_notebook_id(notbook_id, data)
return SparkApp.update_spark_app_config_by_notebook_path(notbook_path, data)
46 changes: 35 additions & 11 deletions server/app/services/spark_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,35 @@ def get_spark_app_by_id(spark_app_id: str = None):
)

@staticmethod
def get_spark_app_config_by_notebook_id(notebook_id: str = None):
logger.info(f"Getting spark app config for notebook id: {notebook_id}")
def get_spark_app_config_by_notebook_path(notbook_path: str = None):
# Get notebook id from path
notebook = NotebookModel.query.filter_by(path=notbook_path).first()
notebook_id = notebook.id

# Get the spark app config
spark_app_config = SparkAppConfigModel.query.filter_by(notebook_id=notebook_id).first()

if (spark_app_config is not None):
spark_app_config_dict = spark_app_config.to_dict()
spark_app_config_dict_transformed = {
'spark.driver.memory': spark_app_config_dict['driver_memory'],
'spark.driver.memoryOverhead': spark_app_config_dict['driver_memory_overhead'],
'spark.driver.cores': spark_app_config_dict['driver_cores'],
'spark.executor.memory': spark_app_config_dict['executor_memory'],
'spark.executor.memoryOverhead': spark_app_config_dict['executor_memory_overhead'],
'spark.executor.memoryFraction': spark_app_config_dict['executor_memory_fraction'],
'spark.executor.cores': spark_app_config_dict['executor_cores'],
'spark.executor.instances': spark_app_config_dict['executor_instances'],
'spark.dynamicAllocation.enabled': spark_app_config_dict['dynamic_allocation_enabled'],
'spark.dynamicAllocation.minExecutors': spark_app_config_dict['executor_instances_min'],
'spark.dynamicAllocation.maxExecutors': spark_app_config_dict['executor_instances_max'],
'spark.shuffle.service.enabled': spark_app_config_dict['shuffle_service_enabled'],
'spark.executor.idleTimeout': spark_app_config_dict['executor_idle_timeout'],
'spark.queue': spark_app_config_dict['queue']
}

return Response(
response=json.dumps(spark_app_config.to_dict()),
response=json.dumps(spark_app_config_dict_transformed),
status=200
)
else:
Expand All @@ -69,23 +90,25 @@ def get_spark_app_config_by_notebook_id(notebook_id: str = None):
)

@staticmethod
def update_spark_app_config_by_notebook_id(notebook_id: str = None, data: dict = None):
logger.info(f"Updating spark app config for notebook id: {notebook_id} with data: {data}")
def update_spark_app_config_by_notebook_path(notebook_path: str = None, data: dict = None):
logger.info(f"Updating spark app config for notebook path: {notebook_path} with data: {data}")

if notebook_id is None:
logger.error("Notebook id is None")
if notebook_path is None:
logger.error("Notebook path is None")
return Response(
response=json.dumps({'message': 'Notebook id is None'}),
response=json.dumps({'message': 'Notebook path is None'}),
status=404)

# Get the notebook id
notebook = NotebookModel.query.filter_by(id=notebook_id).first()
notebook = NotebookModel.query.filter_by(path=notebook_path).first()
if notebook is None:
logger.error("Notebook not found")
return Response(
response=json.dumps({'message': 'Notebook not found'}),
status=404)

notebook_id = notebook.id

# Transform data
transformed_data = {
'driver_memory': data.get('spark.driver.memory', None),
Expand All @@ -97,8 +120,8 @@ def update_spark_app_config_by_notebook_id(notebook_id: str = None, data: dict =
'executor_cores': data.get('spark.executor.cores', None),
'executor_instances': data.get('spark.executor.instances', None),
'dynamic_allocation_enabled': data.get('spark.dynamicAllocation.enabled', None),
'executor_instances_min': data.get('spark.executor.instancesMin', None),
'executor_instances_max': data.get('spark.executor.instancesMax', None),
'executor_instances_min': data.get('spark.dynamicAllocation.minExecutors', None),
'executor_instances_max': data.get('spark.dynamicAllocation.maxExecutors', None),
'shuffle_service_enabled': data.get('spark.shuffle.service.enabled', None),
'executor_idle_timeout': data.get('spark.executor.idleTimeout', None),
'queue': data.get('spark.queue', None)
Expand All @@ -113,6 +136,7 @@ def update_spark_app_config_by_notebook_id(notebook_id: str = None, data: dict =
setattr(config, key, value)

db.session.commit()
db.session.refresh(config)

return Response(
response=json.dumps({'message': 'Updated spark app config'}),
Expand Down
82 changes: 82 additions & 0 deletions server/tests/routes/test_spark_app_route.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# import unittest
# import json
# from flask_cors import CORS
# from flask import g
# from database import db
# from run import create_app
# from app.routes.spark_app import spark_app_blueprint
# from app.routes.login import login_blueprint
# from app.services.directory import Directory
# from app.models.user import UserModel
# from app.services.user import User
# from app.models.spark_app import SparkAppModel
# from app.models.notebook import NotebookModel

# class SparkAppRouteTestCase(unittest.TestCase):

# def setUp(self):
# self.app = create_app()
# self.app.register_blueprint(spark_app_blueprint)
# self.app.register_blueprint(login_blueprint)
# self.client = self.app.test_client()
# with self.app.app_context():
# db.create_all()
# user = UserModel(name='test_user', email='test_email')
# user.set_password('test_password')
# db.session.add(user)
# db.session.commit()

# def tearDown(self):
# with self.app.app_context():
# db.session.remove()
# db.drop_all()

# def login_and_get_token(self):
# with self.app.app_context():
# response = self.client.post('/login', auth=('test_user', 'test_password'))
# return json.loads(response.data)['access_token']

# # def test_create_spark_app(self):
# # with self.app.app_context():
# # # Create Notebook
# # notebook = NotebookModel(name='Test Notebook', path='/path/to/notebook', user_id=1)
# # db.session.add(notebook)
# # db.session.commit()

# # # Create Spark App
# # spark_app_id = 'app_0001'
# # path = f'/spark-app/app_0001'

# # # data = {
# # # 'notebookPath': notebook.path
# # # }

# # # token = self.login_and_get_token()
# # # headers = {
# # # 'Authorization': f'Bearer {token}',
# # # }

# # response = self.client.post(
# # path,
# # # headers=headers,
# # # json=json.dumps(data),
# # )

# # print(response.data)
# # # self.assertEqual(response.status_code, 200)
# # # self.assertEqual(json.loads(response.data)['spark_app_id'], spark_app_id)
# # # self.assertEqual(json.loads(response.data)['notebook_id'], notebook.id)
# # # self.assertEqual(json.loads(response.data)['user_id'], notebook.user_id)

# def test_get_spark_app_config_by_notebook_path(self):
# with self.app.app_context():
# token = self.login_and_get_token()
# headers = {
# 'Authorization': f'Bearer {token}',
# }

# # response = self.client.get('/spark-app/path_to_notebook/config', headers=headers)
# # print(response.data)

# response = self.client.get('/spark-app')
# print(response.data)
16 changes: 9 additions & 7 deletions server/tests/services/test_spark_app_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def test_get_spark_by_id(self):
spark_app_dict = json.loads(response.data)
self.assertEqual(spark_app_dict['spark_app_id'], '1234')

def test_get_spark_app_config_by_notebook_id(self):
def test_get_spark_app_config_by_notebook_path(self):
with self.app.app_context():
# Create User
user_0 = UserModel(name='testuser0', email='[email protected]')
Expand All @@ -65,7 +65,7 @@ def test_get_spark_app_config_by_notebook_id(self):
db.session.commit()

# Get spark app config by notebook path
response = SparkApp.get_spark_app_config_by_notebook_id(notebook_0.id)
response = SparkApp.get_spark_app_config_by_notebook_path('/path/to/notebook')
spark_app_config_dict = json.loads(response.data)

self.assertEqual(spark_app_config_dict['spark.driver.memory'], '1g')
Expand All @@ -75,7 +75,7 @@ def test_get_spark_app_config_by_notebook_id(self):
self.assertEqual(spark_app_config_dict['spark.executor.instances'], 1)
self.assertEqual(spark_app_config_dict['spark.dynamicAllocation.enabled'], False)

def test_update_spark_app_config_by_notebook_id(self):
def test_update_spark_app_config(self):
with self.app.app_context():
# Create User
user_0 = UserModel(name='testuser0', email='[email protected]')
Expand All @@ -100,15 +100,15 @@ def test_update_spark_app_config_by_notebook_id(self):
'spark.dynamicAllocation.enabled': True,
}

response_0 = SparkApp.update_spark_app_config_by_notebook_id(None, data=data)
response_0 = SparkApp.update_spark_app_config_by_notebook_path(None, data=data)
self.assertEqual(response_0.status_code, 404)
self.assertEqual(json.loads(response_0.data)['message'], 'Notebook id is None')
self.assertEqual(json.loads(response_0.data)['message'], 'Notebook path is None')

response_1 = SparkApp.update_spark_app_config_by_notebook_id(999, data=data)
response_1 = SparkApp.update_spark_app_config_by_notebook_path('path_not_found', data=data)
self.assertEqual(response_1.status_code, 404)
self.assertEqual(json.loads(response_1.data)['message'], 'Notebook not found')

response_2 = SparkApp.update_spark_app_config_by_notebook_id(notebook_0.id, data=data)
response_2 = SparkApp.update_spark_app_config_by_notebook_path('/path/to/notebook', data=data)
self.assertEqual(response_2.status_code, 200)
self.assertEqual(json.loads(response_2.data)['message'], 'Updated spark app config')

Expand All @@ -121,6 +121,8 @@ def test_update_spark_app_config_by_notebook_id(self):
self.assertEqual(spark_app_config.executor_cores, 2)
self.assertEqual(spark_app_config.executor_instances, 2)
self.assertEqual(spark_app_config.dynamic_allocation_enabled, True)



def test_create_spark_app(self):
with self.app.app_context():
Expand Down
5 changes: 4 additions & 1 deletion webapp/src/components/notebook/Notebook.js
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,10 @@ function Notebook({
saveNotebook={handleUpdateNotebook}
deleteNotebook={handleDeleteNotebook}
/> : contentType === ContentType.Config ?
<Config /> :
<Config
notebook={notebook}
notebookPath={notebook.path}
/> :
<Runs
notebook={notebook}
contentType={contentType}
Expand Down
Loading

0 comments on commit bed3350

Please sign in to comment.