Skip to content

Commit

Permalink
Merge branch 'main' into Fix-modin-project#7346--Handle-execution-on-…
Browse files Browse the repository at this point in the history
…Dask-workers-to-avoid-creating-conflicting-Clients
  • Loading branch information
data-makerman committed Sep 17, 2024
2 parents 036b91e + 05f5e7d commit 9ff6261
Show file tree
Hide file tree
Showing 60 changed files with 8,979 additions and 81 deletions.
1 change: 1 addition & 0 deletions .github/actions/upload-coverage/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ runs:
with:
name: coverage-data-${{ env.COVERAGE_UUID }}
path: .coverage*
include-hidden-files: true
70 changes: 66 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ jobs:
with:
name: Benchmarks log
path: asv_bench/benchmarks.log
include-hidden-files: true
if: failure()

execution-filter:
Expand All @@ -246,12 +247,16 @@ jobs:
unidist: ${{ steps.filter.outputs.unidist }}
engines: ${{ steps.engines.outputs.engines }}
experimental: ${{ steps.experimental.outputs.experimental }}
test-native-dataframe-mode: ${{ steps.filter.outputs.test-native-dataframe-mode }}
steps:
- uses: actions/checkout@v4
- uses: dorny/paths-filter@v3
id: filter
with:
filters: |
test-native-dataframe-mode:
- 'modin/core/storage_formats/pandas/native_query_compiler.py'
- 'modin/core/storage_formats/base/query_compiler.py'
shared: &shared
- 'modin/core/execution/dispatching/**'
ray:
Expand Down Expand Up @@ -293,7 +298,7 @@ jobs:
name: test-ubuntu (engine unidist ${{matrix.unidist-backend}}, python ${{matrix.python-version}})
services:
moto:
image: motoserver/moto
image: motoserver/moto:5.0.13
ports:
- 5000:5000
env:
Expand Down Expand Up @@ -382,7 +387,7 @@ jobs:
# Using workaround https://github.com/actions/runner/issues/822#issuecomment-1524826092
moto:
# we only need moto service on Ubuntu and for group_4 task or python engine
image: ${{ (matrix.os == 'ubuntu' && (matrix.engine == 'python' || matrix.test_task == 'group_4')) && 'motoserver/moto' || '' }}
image: ${{ (matrix.os == 'ubuntu' && (matrix.engine == 'python' || matrix.test_task == 'group_4')) && 'motoserver/moto:5.0.13' || '' }}
ports:
- 5000:5000
env:
Expand Down Expand Up @@ -439,12 +444,28 @@ jobs:
- run: python -m pytest -n 2 modin/tests/experimental/test_pipeline.py
if: matrix.engine == 'python' || matrix.test_task == 'group_1'
- uses: ./.github/actions/run-core-tests/group_1
with:
# When running with Ray engine on Windows using 2 pytest workers tests are failing in CI.
# See https://github.com/modin-project/modin/issues/7387.
parallel: ${{ matrix.engine == 'ray' && matrix.os == 'windows' && '-n 1' || '-n 2' }}
if: matrix.engine == 'python' || matrix.test_task == 'group_1'
- uses: ./.github/actions/run-core-tests/group_2
with:
# When running with Ray engine on Windows using 2 pytest workers tests are failing in CI.
# See https://github.com/modin-project/modin/issues/7387.
parallel: ${{ matrix.engine == 'ray' && matrix.os == 'windows' && '-n 1' || '-n 2' }}
if: matrix.engine == 'python' || matrix.test_task == 'group_2'
- uses: ./.github/actions/run-core-tests/group_3
with:
# When running with Ray engine on Windows using 2 pytest workers tests are failing in CI.
# See https://github.com/modin-project/modin/issues/7387.
parallel: ${{ matrix.engine == 'ray' && matrix.os == 'windows' && '-n 1' || '-n 2' }}
if: matrix.engine == 'python' || matrix.test_task == 'group_3'
- uses: ./.github/actions/run-core-tests/group_4
with:
# When running with Ray engine on Windows using 2 pytest workers tests are failing in CI.
# See https://github.com/modin-project/modin/issues/7387.
parallel: ${{ matrix.engine == 'ray' && matrix.os == 'windows' && '-n 1' || '-n 2' }}
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- run: python -m pytest -n 2 modin/tests/numpy
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
Expand All @@ -462,6 +483,7 @@ jobs:
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- run: python -m pytest modin/tests/interchange/dataframe_protocol/pandas/test_protocol.py
if: matrix.engine == 'python' || matrix.test_task == 'group_4'
- run: python -m pytest modin/tests/polars/test_dataframe.py
- run: |
python -m pip install lazy_import
python -m pytest modin/tests/pandas/integrations/
Expand Down Expand Up @@ -507,7 +529,7 @@ jobs:
name: test-${{ matrix.os }}-sanity (engine ${{ matrix.execution.name }}, python ${{matrix.python-version}})
services:
moto:
image: ${{ matrix.os != 'windows' && 'motoserver/moto' || '' }}
image: ${{ matrix.os != 'windows' && 'motoserver/moto:5.0.13' || '' }}
ports:
- 5000:5000
env:
Expand Down Expand Up @@ -622,7 +644,7 @@ jobs:
name: test experimental
services:
moto:
image: motoserver/moto
image: motoserver/moto:5.0.13
ports:
- 5000:5000
env:
Expand Down Expand Up @@ -664,6 +686,45 @@ jobs:
python-version: ${{matrix.python-version}}
- run: python -m pytest modin/tests/experimental/spreadsheet/test_general.py

test-native-dataframe-mode:
needs: [ lint-flake8, execution-filter]
if: ${{ needs.execution-filter.outputs.test-native-dataframe-mode == 'true' }}
runs-on: ubuntu-latest
defaults:
run:
shell: bash -l {0}
strategy:
matrix:
python-version: ["3.9"]
env:
MODIN_NATIVE_DATAFRAME_MODE: "Pandas"
name: test-native-dataframe-mode python ${{matrix.python-version}})
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/mamba-env
with:
environment-file: environment-dev.yml
python-version: ${{matrix.python-version}}
- run: python -m pytest modin/tests/pandas/dataframe/test_binary.py
- run: python -m pytest modin/tests/pandas/dataframe/test_default.py
- run: python -m pytest modin/tests/pandas/dataframe/test_indexing.py
- run: python -m pytest modin/tests/pandas/dataframe/test_iter.py
- run: python -m pytest modin/tests/pandas/dataframe/test_join_sort.py
- run: python -m pytest modin/tests/pandas/dataframe/test_map_metadata.py
- run: python -m pytest modin/tests/pandas/dataframe/test_pickle.py
- run: python -m pytest modin/tests/pandas/dataframe/test_reduce.py
- run: python -m pytest modin/tests/pandas/dataframe/test_udf.py
- run: python -m pytest modin/tests/pandas/dataframe/test_window.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_binary.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_default.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_indexing.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_iter.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_join_sort.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_map_metadata.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_pickle.py
- run: python -m pytest modin/tests/pandas/native_df_mode/test_window.py
- uses: ./.github/actions/upload-coverage

merge-coverage-artifacts:
needs: [test-internals, test-api-and-no-engine, test-defaults, test-all-unidist, test-all, test-experimental, test-sanity]
if: always() # we need to run it regardless of some job being skipped, like in PR
Expand All @@ -677,6 +738,7 @@ jobs:
with:
name: coverage-data
pattern: coverage-data-*
include-hidden-files: true
delete-merged: true

upload-coverage:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/fuzzydata-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ jobs:
name: fuzzydata-test-workflow-${{matrix.engine}}
path: /tmp/fuzzydata-test-wf-${{matrix.engine}}/* # Must match output dir in test_fuzzydata.py
if-no-files-found: error
include-hidden-files: true
1 change: 1 addition & 0 deletions .github/workflows/publish-to-pypi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
with:
name: modin-wheel-and-source-tarball
path: ./dist/
include-hidden-files: true

- name: Publish Modin wheel to PyPI
if: github.event_name == 'push'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/push-to-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
shell: bash -l {0}
services:
moto:
image: motoserver/moto
image: motoserver/moto:5.0.13
ports:
- 5000:5000
env:
Expand Down
3 changes: 1 addition & 2 deletions docs/_static/custom.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ document.addEventListener("DOMContentLoaded", function () {
script.type = "module";
script.id = "runllm-widget-script"

script.src = "https://cdn.jsdelivr.net/npm/@runllm/search-widget@stable/dist/run-llm-search-widget.es.js";
script.src = "https://widget.runllm.com";

script.setAttribute("version", "stable");
script.setAttribute("runllm-keyboard-shortcut", "Mod+j"); // cmd-j or ctrl-j to open the widget.
script.setAttribute("runllm-name", "Modin");
script.setAttribute("runllm-position", "BOTTOM_RIGHT");
Expand Down
64 changes: 64 additions & 0 deletions docs/usage_guide/optimization_notes/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,38 @@ Range-partitioning is not a silver bullet, meaning that enabling it is not alway
a link to the list of operations that have support for range-partitioning and practical advices on when one should
enable it: :doc:`operations that support range-partitioning </usage_guide/optimization_notes/range_partitioning_ops>`.

Dynamic-partitioning in Modin
"""""""""""""""""""""""""""""

Ray engine experiences slowdowns when running a large number of small remote tasks at the same time. Ray Core recommends to `avoid tiny task`_.
When modin DataFrame has a large number of partitions, some functions produce a large number of remote tasks, which can cause slowdowns.
To solve this problem, Modin suggests using dynamic partitioning. This approach reduces the number of remote tasks
by combining multiple partitions into a single virtual partition and perform a common remote task on them.

Dynamic partitioning is typically used for operations that are fully or partially executed on all partitions separately.

.. code-block:: python
import modin.pandas as pd
from modin.config import context
df = pd.DataFrame(...)
with context(DynamicPartitioning=True):
df.abs()
Dynamic partitioning is also not always useful, and this approach is usually used for medium-sized DataFrames with a large number of columns.
If the number of columns is small, the number of partitions will be close to the number of CPUs, and Ray will not have this problem.
If the DataFrame has too many rows, this is also not a good case for using Dynamic-partitioning, since each task is no longer tiny and performing
the combined tasks carries more overhead than assigning them separately.

Unfortunately, the use of Dynamic-partitioning depends on various factors such as data size, number of CPUs, operations performed,
and it is up to the user to determine whether Dynamic-partitioning will give a boost in his case or not.

..
TODO: Define heuristics to automatically enable dynamic partitioning without performance penalty.
`Issue #7370 <https://github.com/modin-project/modin/issues/7370>`_
Understanding Modin's partitioning mechanism
""""""""""""""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -282,6 +314,37 @@ Copy-pastable example, showing how mixing pandas and Modin DataFrames in a singl
# Possible output: TypeError
Execute DataFrame operations using NativeQueryCompiler
""""""""""""""""""""""""""""""""""""""""""""""""""""""

By default, Modin distributes data across partitions and performs operations
using the ``PandasQueryCompiler``. However, for certain scenarios such as handling small or empty DataFrames,
distributing them may introduce unnecessary overhead. In such cases, it's more efficient to default
to pandas at the query compiler layer. This can be achieved by setting the ``cfg.NativeDataframeMode``
:doc:`configuration variable: </flow/modin/config>` to ``Pandas``. When set to ``Pandas``, all operations in Modin default to pandas, and the DataFrames are not distributed,
avoiding additional overhead. This configuration can be toggled on or off depending on whether
DataFrame distribution is required.

DataFrames created while the ``NativeDataframeMode`` is active will continue to use the ``NativeQueryCompiler``
even after the config is disabled. Modin supports interoperability between distributed Modin DataFrames and
those using the ``NativeQueryCompiler``.

.. code-block:: python
import modin.pandas as pd
import modin.config as cfg
# This dataframe will be distributed and use `PandasQueryCompiler` by default
df_distributed = pd.DataFrame(...)
# Set mode to "Pandas" to avoid distribution and use `NativeQueryCompiler`
cfg.NativeDataframeMode.put("Pandas")
df_native_qc = pd.DataFrame(...)
# Revert to default settings for distributed dataframes
cfg.NativeDataframeMode.put("Default")
df_distributed = pd.DataFrame(...)
Operation-specific optimizations
""""""""""""""""""""""""""""""""

Expand Down Expand Up @@ -311,3 +374,4 @@ an inner join you may want to swap left and right DataFrames.
Note that result columns order may differ for first and second ``merge``.

.. _range-partitioning: https://www.techopedia.com/definition/31994/range-partitioning
.. _`avoid tiny task`: https://docs.ray.io/en/latest/ray-core/tips-for-first-time.html#tip-2-avoid-tiny-tasks
1 change: 1 addition & 0 deletions environment-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ dependencies:
- git+https://github.com/modin-project/modin-spreadsheet.git@49ffd89f683f54c311867d602c55443fb11bf2a5
# The `numpydoc` version should match the version installed in the `lint-pydocstyle` job of the CI.
- numpydoc==1.6.0
- polars
4 changes: 4 additions & 0 deletions modin/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
CpuCount,
DaskThreadsPerWorker,
DocModule,
DynamicPartitioning,
Engine,
EnvironmentVariable,
GithubCI,
Expand All @@ -39,6 +40,7 @@
MinPartitionSize,
MinRowPartitionSize,
ModinNumpy,
NativeDataframeMode,
NPartitions,
PersistentPickle,
ProgressBar,
Expand Down Expand Up @@ -68,6 +70,7 @@
"CpuCount",
"GpuCount",
"Memory",
"NativeDataframeMode",
# Ray specific
"IsRayCluster",
"RayRedisAddress",
Expand Down Expand Up @@ -95,6 +98,7 @@
"AsyncReadMode",
"ReadSqlEngine",
"IsExperimental",
"DynamicPartitioning",
# For tests
"TrackFileLeaks",
"TestReadFromSqlServer",
Expand Down
52 changes: 52 additions & 0 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,24 @@ class CpuCount(EnvironmentVariable, type=int):

varname = "MODIN_CPUS"

@classmethod
def _put(cls, value: int) -> None:
"""
Put specific value if CpuCount wasn't set by a user yet.
Parameters
----------
value : int
Config value to set.
Notes
-----
This method is used to set CpuCount from cluster resources internally
and should not be called by a user.
"""
if cls.get_value_source() == ValueSource.DEFAULT:
cls.put(value)

@classmethod
def _get_default(cls) -> int:
"""
Expand Down Expand Up @@ -874,6 +892,18 @@ class DaskThreadsPerWorker(EnvironmentVariable, type=int):
default = 1


class DynamicPartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's dynamic-partitioning implementation where possible.
Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/usage_guide/optimization_notes/index.html#dynamic-partitioning-in-modin
"""

varname = "MODIN_DYNAMIC_PARTITIONING"
default = False


def _check_vars() -> None:
"""
Check validity of environment variables.
Expand Down Expand Up @@ -913,4 +943,26 @@ def _check_vars() -> None:
)


class NativeDataframeMode(EnvironmentVariable, type=str):
"""
Configures the query compiler to process Modin data.
When this config is set to ``Default``, ``PandasQueryCompiler`` is used,
which leads to Modin executing dataframes in distributed fashion.
When set to a string (e.g., ``pandas``), ``NativeQueryCompiler`` is used,
which handles the dataframes without distributing,
falling back to native library functions (e.g., ``pandas``).
This could be beneficial for handling relatively small dataframes
without involving additional overhead of communication between processes.
"""

varname = "MODIN_NATIVE_DATAFRAME_MODE"
choices = (
"Default",
"Pandas",
)
default = "Default"


_check_vars()
8 changes: 5 additions & 3 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,11 @@ def aggregate_on_dict(grp_obj, *args, **kwargs):
)

native_res_part = [] if native_agg_res is None else [native_agg_res]
result = pandas.concat(
[*native_res_part, *custom_results], axis=1, copy=False
)
parts = [*native_res_part, *custom_results]
if parts:
result = pandas.concat(parts, axis=1, copy=False)
else:
result = pandas.DataFrame(columns=result_columns)

# The order is naturally preserved if there's no custom aggregations
if preserve_aggregation_order and len(custom_aggs):
Expand Down
Loading

0 comments on commit 9ff6261

Please sign in to comment.