Skip to content

Prep for 0.1.0rc2 #86

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Apr 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,5 @@ jobs:
--concurrency 3 \
--partitions-per-processor 2 \
--batch-size=8192 \
--worker-pool-min=20 \
--processor-pool-min=20 \
--validate
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ description = "DataFusion on Ray"
homepage = "https://github.com/apache/datafusion-ray"
repository = "https://github.com/apache/datafusion-ray"
authors = ["Apache DataFusion <[email protected]>"]
version = "0.1.0"
version = "0.1.0-rc2"
edition = "2024"
readme = "README.md"
license = "Apache-2.0"
Expand Down
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
under the License.
-->

# DataFusion Ray
# DataFusion for Ray

[![Apache licensed][license-badge]][license-url]
[![Python Tests][actions-badge]][actions-url]
Expand All @@ -32,13 +32,13 @@

## Overview

DataFusion Ray is a distributed execution framework that enables DataFusion DataFrame and SQL queries to run on a
DataFusion for Ray is a distributed execution framework that enables DataFusion DataFrame and SQL queries to run on a
Ray cluster. This integration allows users to leverage Ray's dynamic scheduling capabilities while executing
queries in a distributed fashion.

## Execution Modes

DataFusion Ray supports two execution modes:
DataFusion for Ray supports two execution modes:

### Streaming Execution

Expand All @@ -54,7 +54,7 @@ intermediate shuffle files that are persisted and used as input for the next sta

## Getting Started

See the [contributor guide] for instructions on building DataFusion Ray.
See the [contributor guide] for instructions on building DataFusion for Ray.

Once installed, you can run queries using DataFusion's familiar API while leveraging the distributed execution
capabilities of Ray.
Expand Down Expand Up @@ -84,6 +84,6 @@ Contributions are welcome! Please open an issue or submit a pull request if you

## License

DataFusion Ray is licensed under Apache 2.0.
DataFusion for Ray is licensed under Apache 2.0.

[contributor guide]: docs/contributing.md
85 changes: 53 additions & 32 deletions datafusion_ray/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ async def wait_for(coros, name=""):
# wrap the coro in a task to work with python 3.10 and 3.11+ where asyncio.wait semantics
# changed to not accept any awaitable
start = time.time()
done, _ = await asyncio.wait([asyncio.create_task(_ensure_coro(c)) for c in coros])
done, _ = await asyncio.wait(
[asyncio.create_task(_ensure_coro(c)) for c in coros]
)
end = time.time()
log.info(f"waiting for {name} took {end - start}s")
for d in done:
Expand All @@ -108,9 +110,9 @@ class DFRayProcessorPool:
#
# This is simple though and will suffice for now

def __init__(self, min_workers: int, max_workers: int):
self.min_workers = min_workers
self.max_workers = max_workers
def __init__(self, min_processors: int, max_processors: int):
self.min_processors = min_processors
self.max_processors = max_processors

# a map of processor_key (a random identifier) to stage actor reference
self.pool = {}
Expand All @@ -137,11 +139,11 @@ def __init__(self, min_workers: int, max_workers: int):
# processors available
self.available = set()

for _ in range(min_workers):
for _ in range(min_processors):
self._new_processor()

log.info(
f"created ray processor pool (min_workers: {min_workers}, max_workers: {max_workers})"
f"created ray processor pool (min_processors: {min_processors}, max_processors: {max_processors})"
)

async def start(self):
Expand All @@ -159,12 +161,14 @@ async def acquire(self, need=1):

have = len(self.available)
total = len(self.available) + len(self.acquired)
can_make = self.max_workers - total
can_make = self.max_processors - total

need_to_make = need - have

if need_to_make > can_make:
raise Exception(f"Cannot allocate workers above {self.max_workers}")
raise Exception(
f"Cannot allocate processors above {self.max_processors}"
)

if need_to_make > 0:
log.debug(f"creating {need_to_make} additional processors")
Expand Down Expand Up @@ -193,9 +197,9 @@ def _new_processor(self):
self.processors_ready.clear()
processor_key = new_friendly_name()
log.debug(f"starting processor: {processor_key}")
processor = DFRayProcessor.options(name=f"Processor : {processor_key}").remote(
processor_key
)
processor = DFRayProcessor.options(
name=f"Processor : {processor_key}"
).remote(processor_key)
self.pool[processor_key] = processor
self.processors_started.add(processor.start_up.remote())
self.available.add(processor_key)
Expand Down Expand Up @@ -244,7 +248,9 @@ async def _wait_for_serve(self):

async def all_done(self):
log.info("calling processor all done")
refs = [processor.all_done.remote() for processor in self.pool.values()]
refs = [
processor.all_done.remote() for processor in self.pool.values()
]
await wait_for(refs, "processors to be all done")
log.info("all processors shutdown")

Expand Down Expand Up @@ -287,7 +293,9 @@ async def update_plan(
)

async def serve(self):
log.info(f"[{self.processor_key}] serving on {self.processor_service.addr()}")
log.info(
f"[{self.processor_key}] serving on {self.processor_service.addr()}"
)
await self.processor_service.serve()
log.info(f"[{self.processor_key}] done serving")

Expand Down Expand Up @@ -321,11 +329,13 @@ def __str__(self):
class DFRayContextSupervisor:
def __init__(
self,
worker_pool_min: int,
worker_pool_max: int,
processor_pool_min: int,
processor_pool_max: int,
) -> None:
log.info(f"Creating DFRayContextSupervisor worker_pool_min: {worker_pool_min}")
self.pool = DFRayProcessorPool(worker_pool_min, worker_pool_max)
log.info(
f"Creating DFRayContextSupervisor processor_pool_min: {processor_pool_min}"
)
self.pool = DFRayProcessorPool(processor_pool_min, processor_pool_max)
self.stages: dict[str, InternalStageData] = {}
log.info("Created DFRayContextSupervisor")

Expand All @@ -337,7 +347,9 @@ async def wait_for_ready(self):

async def get_stage_addrs(self, stage_id: int):
addrs = [
sd.remote_addr for sd in self.stages.values() if sd.stage_id == stage_id
sd.remote_addr
for sd in self.stages.values()
if sd.stage_id == stage_id
]
return addrs

Expand Down Expand Up @@ -387,7 +399,10 @@ async def new_query(
refs.append(
isd.remote_processor.update_plan.remote(
isd.stage_id,
{stage_id: val["child_addrs"] for (stage_id, val) in kid.items()},
{
stage_id: val["child_addrs"]
for (stage_id, val) in kid.items()
},
isd.partition_group,
isd.plan_bytes,
)
Expand Down Expand Up @@ -419,7 +434,9 @@ async def sort_out_addresses(self):
]

# sanity check
assert all([op == output_partitions[0] for op in output_partitions])
assert all(
[op == output_partitions[0] for op in output_partitions]
)
output_partitions = output_partitions[0]

for child_stage_isd in child_stage_datas:
Expand Down Expand Up @@ -452,15 +469,15 @@ def __init__(
internal_df: DFRayDataFrameInternal,
supervisor, # ray.actor.ActorHandle[DFRayContextSupervisor],
batch_size=8192,
partitions_per_worker: int | None = None,
partitions_per_processor: int | None = None,
prefetch_buffer_size=0,
):
self.df = internal_df
self.supervisor = supervisor
self._stages = None
self._batches = None
self.batch_size = batch_size
self.partitions_per_worker = partitions_per_worker
self.partitions_per_processor = partitions_per_processor
self.prefetch_buffer_size = prefetch_buffer_size

def stages(self):
Expand All @@ -469,7 +486,7 @@ def stages(self):
self._stages = self.df.stages(
self.batch_size,
self.prefetch_buffer_size,
self.partitions_per_worker,
self.partitions_per_processor,
)

return self._stages
Expand Down Expand Up @@ -503,7 +520,9 @@ def collect(self) -> list[pa.RecordBatch]:
)
log.debug(f"last stage addrs {last_stage_addrs}")

reader = self.df.read_final_stage(last_stage_id, last_stage_addrs[0])
reader = self.df.read_final_stage(
last_stage_id, last_stage_addrs[0]
)
log.debug("got reader")
self._batches = list(reader)
return self._batches
Expand Down Expand Up @@ -541,20 +560,20 @@ def __init__(
self,
batch_size: int = 8192,
prefetch_buffer_size: int = 0,
partitions_per_worker: int | None = None,
worker_pool_min: int = 1,
worker_pool_max: int = 100,
partitions_per_processor: int | None = None,
processor_pool_min: int = 1,
processor_pool_max: int = 100,
) -> None:
self.ctx = DFRayContextInternal()
self.batch_size = batch_size
self.partitions_per_worker = partitions_per_worker
self.partitions_per_processor = partitions_per_processor
self.prefetch_buffer_size = prefetch_buffer_size

self.supervisor = DFRayContextSupervisor.options(
name="RayContextSupersisor",
).remote(
worker_pool_min,
worker_pool_max,
processor_pool_min,
processor_pool_max,
)

# start up our super visor and don't check in on it until its
Expand Down Expand Up @@ -603,7 +622,9 @@ def register_csv(self, name: str, path: str):
"""
self.ctx.register_csv(name, path)

def register_listing_table(self, name: str, path: str, file_extention="parquet"):
def register_listing_table(
self, name: str, path: str, file_extention="parquet"
):
"""
Register a directory of parquet files with the given name.
The path can be a local filesystem path, absolute filesystem path, or a url.
Expand All @@ -629,7 +650,7 @@ def sql(self, query: str) -> DFRayDataFrame:
df,
self.supervisor,
self.batch_size,
self.partitions_per_worker,
self.partitions_per_processor,
self.prefetch_buffer_size,
)

Expand Down
16 changes: 8 additions & 8 deletions dev/release/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
under the License.
-->

# DataFusion Ray Release Process
# DataFusion for Ray Release Process

Development happens on the `main` branch, and most of the time, we depend on DataFusion using GitHub dependencies
rather than using an official release from crates.io. This allows us to pick up new features and bug fixes frequently
Expand All @@ -43,7 +43,7 @@ You will need a GitHub Personal Access Token. Follow
[these instructions](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/creating-a-personal-access-token)
to generate one if you do not already have one.

You will need a PyPI API token. Create one at https://test.pypi.org/manage/account/#api-tokens, setting the “Scope” to
You will need a PyPI API token. Create one at <https://test.pypi.org/manage/account/#api-tokens>, setting the “Scope” to
“Entire account”.

You will also need access to the [datafusion-ray](https://test.pypi.org/project/datafusion-ray/) project on testpypi.
Expand All @@ -63,7 +63,7 @@ We maintain a `CHANGELOG.md` so our users know what has been changed between rel
The changelog is generated using a Python script:

```bash
$ GITHUB_TOKEN=<TOKEN> ./dev/release/generate-changelog.py 0.1.0 HEAD 0.2.0 > dev/changelog/0.2.0.md
GITHUB_TOKEN=<TOKEN> ./dev/release/generate-changelog.py 0.1.0 HEAD 0.2.0 > dev/changelog/0.2.0.md
```

This script creates a changelog from GitHub PRs based on the labels associated with them as well as looking for
Expand Down Expand Up @@ -91,7 +91,7 @@ git push apache 0.2.0-rc1
./dev/release/create-tarball.sh 0.2.0 1
```

This will also create the email template to send to the mailing list.
This will also create the email template to send to the mailing list.

Create a draft email using this content, but do not send until after completing the next step.

Expand All @@ -104,7 +104,7 @@ This section assumes some familiarity with publishing Python packages to PyPi. F

Pushing an `rc` tag to the release branch will cause a GitHub Workflow to run that will build the Python wheels.

Go to https://github.com/apache/datafusion-ray/actions and look for an action named "Python Release Build"
Go to <https://github.com/apache/datafusion-ray/actions> and look for an action named "Python Release Build"
that has run against the pushed tag.

Click on the action and scroll down to the bottom of the page titled "Artifacts". Download `dist.zip`. It should
Expand Down Expand Up @@ -207,10 +207,10 @@ git push apache 0.2.0

### Add the release to Apache Reporter

Add the release to https://reporter.apache.org/addrelease.html?datafusion with a version name prefixed with `DATAFUSION-RAY`,
Add the release to <https://reporter.apache.org/addrelease.html?datafusion> with a version name prefixed with `DATAFUSION-RAY`,
for example `DATAFUSION-RAY-0.2.0`.

The release information is used to generate a template for a board report (see example from Apache Arrow
The release information is used to generate a template for a board report (see example from Apache Arrow
[here](https://github.com/apache/arrow/pull/14357)).

### Delete old RCs and Releases
Expand All @@ -222,7 +222,7 @@ for more information.

Release candidates should be deleted once the release is published.

Get a list of DataFusion release candidates:
Get a list of DataFusion for Ray release candidates:

```bash
svn ls https://dist.apache.org/repos/dist/dev/datafusion | grep datafusion-ray
Expand Down
4 changes: 3 additions & 1 deletion dev/release/check-rat-report.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import xml.etree.ElementTree as ET

if len(sys.argv) != 3:
sys.stderr.write("Usage: %s exclude_globs.lst rat_report.xml\n" % sys.argv[0])
sys.stderr.write(
"Usage: %s exclude_globs.lst rat_report.xml\n" % sys.argv[0]
)
sys.exit(1)

exclude_globs_filename = sys.argv[1]
Expand Down
Loading