Skip to content

Commit

Permalink
Add Spark app configuration support in database and API routes
Browse files Browse the repository at this point in the history
  • Loading branch information
xuwenyihust committed Aug 19, 2024
1 parent d46421f commit 27f60a7
Show file tree
Hide file tree
Showing 6 changed files with 176 additions and 5 deletions.
19 changes: 19 additions & 0 deletions docker/postgres/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ CREATE TABLE spark_apps (
created_at TIMESTAMP
);

CREATE TABLE spark_app_config (
id SERIAL PRIMARY KEY,
notebook_id INT REFERENCES notebooks(id),
driver_memory VARCHAR(100),
driver_memory_overhead VARCHAR(100),
driver_cores INT,
executor_memory VARCHAR(100),
executor_memory_overhead VARCHAR(100),
executor_memory_fraction FLOAT,
executor_cores INT,
executor_instances INT,
dynamic_allocation_enabled BOOLEAN,
executor_instances_min INT,
executor_instances_max INT,
shuffle_service_enabled BOOLEAN,
executor_idle_timeout INT,
queue VARCHAR(100)
);

GRANT ALL PRIVILEGES ON TABLE users TO server;
GRANT ALL PRIVILEGES ON SEQUENCE users_id_seq TO server;

Expand Down
65 changes: 65 additions & 0 deletions server/app/models/spark_app_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from database import db

class SparkAppConfigModel(db.Model):

__tablename__ = 'spark_app_config'

id = db.Column(db.Integer, primary_key=True, nullable=False)
notebook_id = db.Column(db.Integer, db.ForeignKey('notebooks.id'), nullable=False)
driver_memory = db.Column(db.Integer, nullable=True)
driver_memory_overhead = db.Column(db.Integer, nullable=True)
driver_cores = db.Column(db.Integer, nullable=True)

executor_memory = db.Column(db.Integer, nullable=True)
executor_memory_overhead = db.Column(db.Integer, nullable=True)
executor_memory_fraction = db.Column(db.Float, nullable=True)
executor_cores = db.Column(db.Integer, nullable=True)
executor_instances = db.Column(db.Integer, nullable=True)

dynamic_allocation_enabled = db.Column(db.Boolean, nullable=True)
executor_instances_min = db.Column(db.Integer, nullable=True)
executor_instances_max = db.Column(db.Integer, nullable=True)

shuffle_service_enabled = db.Column(db.Boolean, nullable=True)
executor_idle_timeout = db.Column(db.Integer, nullable=True)
queue = db.Column(db.String, nullable=True)

def __init__(self, notebook_id, driver_memory=None, driver_memory_overhead=None, driver_cores=None,
executor_memory=None, executor_memory_overhead=None, executor_memory_fraction=None,
executor_cores=None, executor_instances=None, dynamic_allocation_enabled=None,
executor_instances_min=None, executor_instances_max=None, shuffle_service_enabled=None,
executor_idle_timeout=None, queue=None):
self.notebook_id = notebook_id
self.driver_memory = driver_memory
self.driver_memory_overhead = driver_memory_overhead
self.driver_cores = driver_cores
self.executor_memory = executor_memory
self.executor_memory_overhead = executor_memory_overhead
self.executor_memory_fraction = executor_memory_fraction
self.executor_cores = executor_cores
self.executor_instances = executor_instances
self.dynamic_allocation_enabled = dynamic_allocation_enabled
self.executor_instances_min = executor_instances_min
self.executor_instances_max = executor_instances_max
self.shuffle_service_enabled = shuffle_service_enabled
self.executor_idle_timeout = executor_idle_timeout
self.queue = queue

def to_dict(self):
return {
'notebook_id': self.notebook_id,
'driver_memory': self.driver_memory,
'driver_memory_overhead': self.driver_memory_overhead,
'driver_cores': self.driver_cores,
'executor_memory': self.executor_memory,
'executor_memory_overhead': self.executor_memory_overhead,
'executor_memory_fraction': self.executor_memory_fraction,
'executor_cores': self.executor_cores,
'executor_instances': self.executor_instances,
'dynamic_allocation_enabled': self.dynamic_allocation_enabled,
'executor_instances_min': self.executor_instances_min,
'executor_instances_max': self.executor_instances_max,
'shuffle_service_enabled': self.shuffle_service_enabled,
'executor_idle_timeout': self.executor_idle_timeout,
'queue': self.queue
}
7 changes: 6 additions & 1 deletion server/app/routes/spark_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ def create_spark_app(spark_app_id):

@spark_app_blueprint.route('/spark_app/config', methods=['GET'])
def get_spark_app_config():
return SparkApp.get_spark_app_config()
return SparkApp.get_spark_app_config()

@spark_app_blueprint.route('/spark_app/<path:notbook_path>/config', methods=['POST'])
def update_spark_app_config(notebook_path):
data = request.get_json()
return SparkApp.update_spark_app_config(notebook_path=notebook_path, data=data)
8 changes: 8 additions & 0 deletions server/app/services/spark_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def get_spark_app_config():
status=200
)

@staticmethod
def update_spark_app_config(notebook_path: str = None, data: dict = None):
logger.info(f"Updating spark app config for notebook path: {notebook_path} with data: {data}")

return Response(
response=json.dumps({'message': 'update_spark_app_config'}),
status=200)

@staticmethod
def create_spark_app(spark_app_id: str = None, notebook_path: str = None):
logger.info(f"Creating spark app with id: {spark_app_id} for notebook path: {notebook_path}")
Expand Down
57 changes: 57 additions & 0 deletions server/tests/models/test_spark_app_config_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import unittest
from flask_cors import CORS
from run import create_app
from database import db
from app.models.spark_app_config import SparkAppConfigModel

class SparkAppConfigModelTestCase(unittest.TestCase):

def setUp(self):
self.app = create_app()
self.client = self.app.test_client()
with self.app.app_context():
db.create_all()

def tearDown(self):
with self.app.app_context():
db.session.remove()
db.drop_all()

def test_spark_app_config_model(self):
with self.app.app_context():
spark_app_config = SparkAppConfigModel(
notebook_id=1,
driver_memory=1,
driver_memory_overhead=1,
driver_cores=1,
executor_memory=1,
executor_memory_overhead=1,
executor_memory_fraction=1.0,
executor_cores=1,
executor_instances=1,
dynamic_allocation_enabled=True,
executor_instances_min=1,
executor_instances_max=1,
shuffle_service_enabled=True,
executor_idle_timeout=1,
queue='test_queue'
)
db.session.add(spark_app_config)
db.session.commit()

spark_app_config_dict = spark_app_config.to_dict()
self.assertEqual(spark_app_config_dict['notebook_id'], 1)
self.assertEqual(spark_app_config_dict['driver_memory'], 1)
self.assertEqual(spark_app_config_dict['driver_memory_overhead'], 1)
self.assertEqual(spark_app_config_dict['driver_cores'], 1)
self.assertEqual(spark_app_config_dict['executor_memory'], 1)
self.assertEqual(spark_app_config_dict['executor_memory_overhead'], 1)
self.assertEqual(spark_app_config_dict['executor_memory_fraction'], 1.0)
self.assertEqual(spark_app_config_dict['executor_cores'], 1)
self.assertEqual(spark_app_config_dict['executor_instances'], 1)
self.assertEqual(spark_app_config_dict['dynamic_allocation_enabled'], True)
self.assertEqual(spark_app_config_dict['executor_instances_min'], 1)
self.assertEqual(spark_app_config_dict['executor_instances_max'], 1)
self.assertEqual(spark_app_config_dict['shuffle_service_enabled'], True)
self.assertEqual(spark_app_config_dict['executor_idle_timeout'], 1)
self.assertEqual(spark_app_config_dict['queue'], 'test_queue')
25 changes: 21 additions & 4 deletions webapp/src/components/notebook/content/Config.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ function Config({ }) {
setExecutorMemoryUnit(event.target.value);
};

const handleSave = () => {
console.log('executorCores:', executorCores);
console.log('executorInstances:', executorInstances);
console.log('executorMemory:', executorMemory + executorMemoryUnit);
}

return (
<Box sx={{
marginTop: 5,
Expand Down Expand Up @@ -47,8 +53,13 @@ function Config({ }) {
defaultValue={executorMemory}
variant="outlined"
size="small"
onChange={(e) => setExecutorMemory(e.target.value)}/>
onInput={(e) => e.target.value = e.target.value.replace(/[^0-9]/g, '')}
onChange={(e) => {
setExecutorMemory(e.target.value)
}}/>
<Box m={1} />
<Select value={executorMemoryUnit}
size="small"
onChange={handleExecutorMemoryUnitChange}>
<MenuItem value={'m'}>MB</MenuItem>
<MenuItem value={'g'}>GB</MenuItem>
Expand All @@ -74,11 +85,17 @@ function Config({ }) {
</ListItem>
</List>
</CardContent>
{/* <CardActions>
<Button variant="contained" color="primary" onClick={handleSave}>
<CardActions>
<Button
variant="outlined"
style={{
marginLeft: '20px',
borderColor: 'lightgrey',
color: 'grey' }}
onClick={handleSave}>
Save
</Button>
</CardActions> */}
</CardActions>
</Card>
</Box>
);
Expand Down

0 comments on commit 27f60a7

Please sign in to comment.