Skip to content

Commit

Permalink
Merge pull request #19 from DataChefHQ/feature/refactor-sparkle-example
Browse files Browse the repository at this point in the history
feat: refactor sparkle example
  • Loading branch information
callarelli93 authored Sep 11, 2024
2 parents 8d00a87 + e4af94e commit 869041e
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 38 deletions.
51 changes: 26 additions & 25 deletions examples/sparkle/__main__.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
import pulumi
from argparse import ArgumentParser, Namespace
from damavand.cloud.provider import AwsProvider
from damavand.factories import SparkControllerFactory

from damavand.cloud.azure.resources import SynapseComponent, SynapseComponentArgs
from applications.orders import CustomerOrders
from examples.sparkle.applications.products import Products


# def main():
# spark_factory = SparkControllerFactory(
# provider=AwsProvider(
# app_name="my-app",
# region="us-west-2",
# ),
# tags={"env": "dev"},
# )
#
# spark = spark_factory.new(name="my-spark")


def main() -> None:
spark = SynapseComponent(
name="my-spark",
args=SynapseComponentArgs(
jobs=[],
sql_admin_username="kiarashk",
sql_admin_password="lkjsf@123",
def main(args: Namespace) -> None:
spark_factory = SparkControllerFactory(
provider=AwsProvider(
app_name="my-app",
region="us-west-2",
),
tags={"env": "dev"},
)

pulumi.export(
"resource_group",
pulumi.Output.all(spark.resource_group).apply(lambda x: x[0].name),
spark_controller = spark_factory.new(
name="my-spark",
)

spark_controller.applications = [
Products(spark_controller.default_session()),
CustomerOrders(spark_controller.default_session()),
]

spark_controller.run_application(args.app_id)


if __name__ == "__main__":
main()
arg_parser = ArgumentParser()
arg_parser.add_argument("--app_id", type=str, required=True)

args = arg_parser.parse_args()

main(args)
Empty file.
34 changes: 34 additions & 0 deletions examples/sparkle/applications/orders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter
from sparkle.application import Sparkle

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class CustomerOrders(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="customer-orders",
app_id="customer_orders",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["orders"].read()
34 changes: 34 additions & 0 deletions examples/sparkle/applications/products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from sparkle.application import Sparkle
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class Products(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="products",
app_id="products",
version="0.1",
database_bucket="s3://bucket-name",
kafka=None,
input_database=None,
output_database=None,
iceberg_config=None,
spark_trigger='{"once": True}',
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
)
],
)

def process(self) -> DataFrame:
return self.input["products"].read()
14 changes: 3 additions & 11 deletions src/damavand/base/controllers/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,24 +42,21 @@ class SparkController(ApplicationController):
Return the default cloud Spark session.
default_session()
Return the currently active Spark session.
applications()
Return the list of Spark applications.
application_with_id(app_id)
Return the Spark application with the given ID.
run(app_id)
run_application(app_id)
Run the Spark application with the given ID.
"""

def __init__(
self,
name,
applications: list[Sparkle] = [],
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
ApplicationController.__init__(self, name, id_, tags, **kwargs)
self.__applications = applications
self.applications: list[Sparkle]

@property
def _spark_extensions(self) -> list[str]:
Expand Down Expand Up @@ -136,11 +133,6 @@ def default_session(self) -> SparkSession:
case _:
return self.default_cloud_session()

@property
def applications(self) -> list[Sparkle]:
"""Return the list of Spark applications."""
return self.__applications

def application_with_id(self, app_id: str) -> Sparkle:
"""Return the Spark application with the given ID.
Expand All @@ -158,7 +150,7 @@ def application_with_id(self, app_id: str) -> Sparkle:
raise ValueError(f"Application with ID {app_id} not found.")

@runtime
def run(self, app_id: str) -> None:
def run_application(self, app_id: str) -> None:
"""Run the Spark application with the given ID.
Args:
Expand Down
4 changes: 2 additions & 2 deletions src/damavand/cloud/azure/controllers/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ def __init__(
self,
name,
region: str,
applications: list[Sparkle] = [],
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
super().__init__(name, applications, id_, tags, **kwargs)
super().__init__(name, id_, tags, **kwargs)
self.applications: list[Sparkle]

@buildtime
def admin_username(self) -> str:
Expand Down

0 comments on commit 869041e

Please sign in to comment.