Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: spark etl application #8

Merged
merged 40 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
8b85d6e
feat: add base interface for spark app
kkiani Aug 6, 2024
7d97a11
chore(spark): pyright incompatibility with classproperty decorator
kkiani Aug 7, 2024
76bf772
feat(sparkle): structurizing the inputs field in pipeline decorator
kkiani Aug 7, 2024
b2078bf
feat: implementing the ARC software architecture pattern
kkiani Aug 8, 2024
694cc60
fix: circular import for type annotations
kkiani Aug 8, 2024
e3854d0
fix(object_storage): ARC new interface were not supporting
kkiani Aug 8, 2024
3c7a3c3
feat: spark added
kkiani Aug 8, 2024
01e7302
chore(test): add test for environment class
kkiani Aug 8, 2024
dcdd1ad
chore(spark): remove unused method
kkiani Aug 8, 2024
4006700
chore(spark): spark controller tests added
kkiani Aug 8, 2024
8b8632d
chore(spark): add tests for aws spark
kkiani Aug 8, 2024
28df5b0
feat: restructure to match the ARC Pattern
kkiani Aug 12, 2024
1a4ed38
fix(spark): creating glue job per pipeline
kkiani Aug 13, 2024
1ea18ce
chore: restructure the test folder to match the ARC Pattern
kkiani Aug 13, 2024
ce8b28c
chore(spark): code formatting
kkiani Aug 14, 2024
80be07b
fix: cloud connection creating fully connected dependency graph
kkiani Aug 20, 2024
4bbf8f1
chore(glue): add pulumi unit tests
kkiani Aug 20, 2024
ecd6beb
feat(spark): add azure implementations
kkiani Sep 2, 2024
b1e1f17
fix(factory): tag property not getting initialized properly
kkiani Sep 2, 2024
8d741bc
chore(sparkle): add examples
kkiani Sep 6, 2024
ce58c14
feat(pyspark): integrating with sparkle
kkiani Sep 6, 2024
7b25e27
fix(spark): make sparkle an optional dependency
kkiani Sep 6, 2024
23146a8
chore: update package description
kkiani Sep 6, 2024
7e6429c
fix(spark): azure controller not passing applications to parent
kkiani Sep 6, 2024
e9ccc0f
chore(nix): add spark group dependencies
kkiani Sep 6, 2024
8d00a87
Revert "fix(spark): make sparkle an optional dependency"
kkiani Sep 10, 2024
c81e06a
fix(sparkle): applications cant be added during controllers init
kkiani Sep 10, 2024
8d5c91c
chore(sparkle): update example to factory controller
kkiani Sep 10, 2024
8510201
feat(sparkle): revert to app_id to prevent confusions
kkiani Sep 11, 2024
e4af94e
chore(spark): fix typo in the docs
kkiani Sep 11, 2024
869041e
Merge pull request #19 from DataChefHQ/feature/refactor-sparkle-example
callarelli93 Sep 11, 2024
614ac83
fix(sparkle): pin sparkle to v0.3.1 version
kkiani Sep 12, 2024
b639fae
fix(sparkle): depricated application paramter for spark controller
kkiani Sep 12, 2024
e65279a
fix(factory): not passing region to aws controllers
kkiani Sep 12, 2024
3b4df37
fix(controller): auto provision causing resource creation before befo…
kkiani Sep 12, 2024
b4385e9
fix(core): deprecated id_ parameter leftover
kkiani Sep 12, 2024
8a06b65
chore(tests): fix depricated sparkle interface
kkiani Sep 12, 2024
966e5da
chore(sparkle): add tests
kkiani Sep 13, 2024
7081982
chore(sparkle): fix the example with the latest changes
kkiani Sep 13, 2024
1504d60
Merge pull request #21 from DataChefHQ/bugfix/core-api-controllers
kkiani Sep 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions examples/sparkle/Pulumi.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
name: object_storage
runtime:
name: python
options:
toolchain: pip
virtualenv: venv
description: A minimal Azure Native Python Pulumi program
config:
pulumi:tags:
value:
pulumi:template: azure-python
31 changes: 31 additions & 0 deletions examples/sparkle/__main__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from damavand.cloud.provider import AwsProvider
from damavand.factories import SparkControllerFactory

from applications.orders import CustomerOrders
from applications.products import Products


def main() -> None:
spark_factory = SparkControllerFactory(
provider=AwsProvider(
app_name="my-app",
region="us-west-2",
),
tags={"env": "dev"},
)

spark_controller = spark_factory.new(
name="my-spark",
)

spark_controller.applications = [
Products(spark_controller.default_session()),
CustomerOrders(spark_controller.default_session()),
]

spark_controller.run_application("products")
spark_controller.provision()


if __name__ == "__main__":
main()
File renamed without changes.
31 changes: 31 additions & 0 deletions examples/sparkle/applications/orders.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter
from sparkle.application import Sparkle

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class CustomerOrders(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="orders",
app_id="orders-app",
version="0.0.1",
database_bucket="s3://test-bucket",
checkpoints_bucket="s3://test-checkpoints",
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
spark_session=spark_session,
)
],
)

def process(self) -> DataFrame:
return self.input["orders"].read()
31 changes: 31 additions & 0 deletions examples/sparkle/applications/products.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from sparkle.application import Sparkle
from sparkle.config import Config
from sparkle.writer.iceberg_writer import IcebergWriter

from pyspark.sql import DataFrame
from pyspark.sql import SparkSession


class Products(Sparkle):
def __init__(self, spark_session: SparkSession):
super().__init__(
spark_session,
config=Config(
app_name="products",
app_id="products-app",
version="0.0.1",
database_bucket="s3://test-bucket",
checkpoints_bucket="s3://test-checkpoints",
),
writers=[
IcebergWriter(
database_name="default",
database_path="s3://bucket-name/warehouse",
table_name="products",
spark_session=spark_session,
)
],
)

def process(self) -> DataFrame:
return self.input["products"].read()
3 changes: 3 additions & 0 deletions examples/sparkle/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-e ../../../damavand
pyspark==3.3.2
pulumi
430 changes: 279 additions & 151 deletions pdm.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "damavand"
description = "Default template for PDM package"
description = "Damavand is an opinionated cloud-agnostic pythonic implementation of ARC design pattern for developing cloud-native applications."
authors = [
{name = "Kiarash Kiani", email = "[email protected]"},
]
Expand All @@ -12,6 +12,8 @@ dependencies = [
"pulumi>=3.127.0",
"pulumi-aws>=6.47.0",
"pulumi-azure-native>=2.51.0",
"pulumi-random>=4.16.3",
"sparkle @ git+https://github.com/DataChefHQ/[email protected]",
]
requires-python = ">=3.11.0"
readme = "README.md"
Expand All @@ -36,6 +38,7 @@ dev = [
"pytest-coverage>=0.0",
"pyright>=1.1.374",
"moto>=5.0.11",
"pip>=24.2",
]
[tool.commitizen]
version = "1.0.0"
Expand Down
Empty file added src/damavand/base/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions src/damavand/base/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .base_controller import ApplicationController, runtime, buildtime
from .object_storage import ObjectStorageController
from .spark import SparkController

__all__ = [
"ApplicationController",
"ObjectStorageController",
"SparkController",
"runtime",
"buildtime",
]
66 changes: 66 additions & 0 deletions src/damavand/base/controllers/base_controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
from functools import cache
from pulumi import Resource as PulumiResource
import pulumi

from damavand import utils


logger = logging.getLogger(__name__)


def buildtime(func):
def wrapper(self, *args, **kwargs):
if not utils.is_building():
logger.warning(
f"Calling buildtime method `{func.__name__}` during runtime."
)
return None

return func(self, *args, **kwargs)

return wrapper


def runtime(func):
def wrapper(self, *args, **kwargs):
if utils.is_building():
logger.warning(
f"Calling runtime method `{func.__name__}` during buildtime."
)
return None

return func(self, *args, **kwargs)

return wrapper


class ApplicationController(object):
def __init__(
self,
name: str,
tags: dict[str, str] = {},
**kwargs,
) -> None:
self.name = name
self.tags = tags
self.extra_args = kwargs
self._pulumi_object = None

@property
@buildtime
@cache
def build_config(self) -> pulumi.Config:
return pulumi.Config()

@buildtime
@cache
def resource(self) -> PulumiResource:
"""A lazy property that provision the resource if it is not provisioned yet and return the pulumi object."""

raise NotImplementedError()

def provision(self) -> None:
"""Provision the resource in not provisioned yet."""

_ = self.resource()
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
from typing import Iterable, Optional
from typing import Iterable

from damavand.resource import BaseResource
from damavand.base.controllers import ApplicationController


class BaseObjectStorage(BaseResource):
class ObjectStorageController(ApplicationController):
def __init__(
self,
name,
id_: Optional[str] = None,
tags: dict[str, str] = {},
**kwargs,
) -> None:
super().__init__(name, id_, tags, **kwargs)

def provision(self):
raise NotImplementedError
super().__init__(name, tags, **kwargs)

def read(self, path: str) -> bytes:
"""Read an object from the storage."""
Expand Down
158 changes: 158 additions & 0 deletions src/damavand/base/controllers/spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import os
import logging
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

from damavand.environment import Environment
from damavand.base.controllers import ApplicationController
from damavand.base.controllers.base_controller import runtime

from sparkle.application import Sparkle


logger = logging.getLogger(__name__)


class SparkController(ApplicationController):
"""
The SparkController class is the base class for all Spark controllers
implementations for each cloud provider.

...

Attributes
----------
name : str
the name of the controller.
applications : list[Sparkle]
the list of Spark applications.
tags : dict[str, str]
the tags of the controller.
kwargs : dict
the extra arguments.

Methods
-------
default_local_session()
Return the default local Spark session.
default_cloud_session()
Return the default cloud Spark session.
default_session()
Return the currently active Spark session.
application_with_id(app_id)
Return the Spark application with the given ID.
run_application(app_id)
Run the Spark application with the given ID.
"""

def __init__(
self,
name,
tags: dict[str, str] = {},
**kwargs,
) -> None:
ApplicationController.__init__(self, name, tags, **kwargs)
self.applications: list[Sparkle] = []

@property
def _spark_extensions(self) -> list[str]:
return [
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
]

@property
def _spark_packages(self) -> list[str]:
return [
"org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.3.1",
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0",
"org.apache.spark:spark-avro_2.12:3.3.0",
]

def default_local_session(self) -> SparkSession:
"""Return the default local Spark session.

Returns:
SparkSession: The Spark session.
"""

ivy_settings_path = os.environ.get("IVY_SETTINGS_PATH", None)
LOCAL_CONFIG = {
"spark.sql.extensions": ",".join(self._spark_extensions),
"spark.jars.packages": ",".join(self._spark_packages),
"spark.sql.jsonGenerator.ignoreNullFields": False,
"spark.sql.session.timeZone": "UTC",
"spark.sql.catalog.spark_catalog": "org.apache.iceberg.spark.SparkSessionCatalog",
"spark.sql.catalog.spark_catalog.type": "hive",
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.local.type": "hadoop",
"spark.sql.catalog.local.warehouse": "/tmp/warehouse",
"spark.sql.defaultCatalog": "local",
}

spark_conf = SparkConf()

for key, value in LOCAL_CONFIG.items():
spark_conf.set(key, str(value))

spark_session = (
# NOTE: Pyright does not work `@classproperty` decorator used in `SparkSession`. This however should be fixed in pyspark v4.
SparkSession.builder.master("local[*]") # type: ignore
.appName("LocalDataProductApp")
.config(conf=spark_conf)
)

if ivy_settings_path:
spark_session.config("spark.jars.ivySettings", ivy_settings_path)

return spark_session.getOrCreate()

def default_cloud_session(self) -> SparkSession:
"""Return the default Spark session provided by the cloud spark machine.

Returns:
SparkSession: The Spark session.
"""

raise NotImplementedError

def default_session(self) -> SparkSession:
"""Return the currently active Spark session. If the environment is local, it
returns the local session. Otherwise, it returns the cloud session.

Returns:
SparkSession: The Spark session.
"""
env = Environment.from_system_env()
match env:
case Environment.LOCAL:
return self.default_local_session()
case _:
return self.default_cloud_session()

def application_with_id(self, app_id: str) -> Sparkle:
"""Return the Spark application with the given ID.

Args:
app_id (str): The application ID.

Returns:
Sparkle: The Spark application.
"""

for app in self.applications:
if app.config.app_id == app_id:
return app

raise ValueError(f"Application with ID {app_id} not found.")

@runtime
def run_application(self, app_id: str) -> None:
"""Run the Spark application with the given ID.

Args:
app_id (str): The application ID.
"""

app = self.application_with_id(app_id)
df = app.process()
app.write(df)
Loading
Loading