Skip to content

[SPARK-51983][PS] Prepare the test environment for pandas API on Spark with ANSI mode enabled #50779

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build_non_ansi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ jobs:
"build": "true",
"docs": "true",
"pyspark": "true",
"pyspark-pandas": "true",
"sparkr": "true",
"tpcds-1g": "true",
"docker-integration-tests": "true",
Expand Down
4 changes: 4 additions & 0 deletions python/docs/source/tutorial/pandas_on_spark/options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,10 @@ compute.isin_limit 80 'compute.isin_limit' set
better performance.
compute.pandas_fallback False 'compute.pandas_fallback' sets whether or not to
fallback automatically to Pandas' implementation.
compute.ansi_mode_support False 'compute.ansi_mode_support' sets whether or not to
support the ANSI mode of the underlying Spark. If
False, pandas API on Spark may hit unexpected results
or errors. The default is False.
plotting.max_rows 1000 'plotting.max_rows' sets the visual limit on top-n-
based plots such as `plot.bar` and `plot.pie`. If it
is set to 1000, the first 1000 data points will be
Expand Down
41 changes: 34 additions & 7 deletions python/pyspark/pandas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
"""
from contextlib import contextmanager
import json
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
from typing import Any, Callable, Dict, Iterator, List, Tuple, Union, Optional

from pyspark._globals import _NoValue, _NoValueType
from pyspark.sql.session import SparkSession
from pyspark.pandas.utils import default_session


Expand Down Expand Up @@ -268,6 +269,17 @@ def validate(self, v: Any) -> None:
default=False,
types=bool,
),
Option(
key="compute.ansi_mode_support",
doc=(
"'compute.ansi_mode_support' sets whether or not to support the ANSI mode of "
"the underlying Spark. "
"If False, pandas API on Spark may hit unexpected results or errors. "
"The default is False."
),
default=False,
types=bool,
),
Option(
key="plotting.max_rows",
doc=(
Expand Down Expand Up @@ -351,7 +363,12 @@ def show_options() -> None:
print(row_format.format("=" * 31, "=" * 23, "=" * 53))


def get_option(key: str, default: Union[Any, _NoValueType] = _NoValue) -> Any:
def get_option(
key: str,
default: Union[Any, _NoValueType] = _NoValue,
*,
spark_session: Optional[SparkSession] = None,
) -> Any:
"""
Retrieves the value of the specified option.

Expand All @@ -361,6 +378,9 @@ def get_option(key: str, default: Union[Any, _NoValueType] = _NoValue) -> Any:
The key which should match a single option.
default : object
The default value if the option is not set yet. The value should be JSON serializable.
spark_session : :class:`SparkSession`, optional
The explicit :class:`SparkSession` object to get the option.
If not specified, the default session will be used.

Returns
-------
Expand All @@ -374,12 +394,12 @@ def get_option(key: str, default: Union[Any, _NoValueType] = _NoValue) -> Any:
if default is _NoValue:
default = _options_dict[key].default
_options_dict[key].validate(default)
spark_session = default_session()
spark_session = spark_session or default_session()

return json.loads(spark_session.conf.get(_key_format(key), default=json.dumps(default)))


def set_option(key: str, value: Any) -> None:
def set_option(key: str, value: Any, *, spark_session: Optional[SparkSession] = None) -> None:
"""
Sets the value of the specified option.

Expand All @@ -389,19 +409,22 @@ def set_option(key: str, value: Any) -> None:
The key which should match a single option.
value : object
New value of option. The value should be JSON serializable.
spark_session : :class:`SparkSession`, optional
The explicit :class:`SparkSession` object to set the option.
If not specified, the default session will be used.

Returns
-------
None
"""
_check_option(key)
_options_dict[key].validate(value)
spark_session = default_session()
spark_session = spark_session or default_session()

spark_session.conf.set(_key_format(key), json.dumps(value))


def reset_option(key: str) -> None:
def reset_option(key: str, *, spark_session: Optional[SparkSession] = None) -> None:
"""
Reset one option to their default value.

Expand All @@ -411,13 +434,17 @@ def reset_option(key: str) -> None:
----------
key : str
If specified only option will be reset.
spark_session : :class:`SparkSession`, optional
The explicit :class:`SparkSession` object to reset the option.
If not specified, the default session will be used.

Returns
-------
None
"""
_check_option(key)
default_session().conf.unset(_key_format(key))
spark_session = spark_session or default_session()
spark_session.conf.unset(_key_format(key))


@contextmanager
Expand Down
21 changes: 21 additions & 0 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -13778,11 +13778,32 @@ def _test() -> None:
import uuid
from pyspark.sql import SparkSession
import pyspark.pandas.frame
from pyspark.testing.utils import is_ansi_mode_test

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.frame.__dict__.copy()
globs["ps"] = pyspark.pandas

if is_ansi_mode_test:
del pyspark.pandas.frame.DataFrame.add.__doc__
del pyspark.pandas.frame.DataFrame.div.__doc__
del pyspark.pandas.frame.DataFrame.floordiv.__doc__
del pyspark.pandas.frame.DataFrame.melt.__doc__
del pyspark.pandas.frame.DataFrame.mod.__doc__
del pyspark.pandas.frame.DataFrame.mul.__doc__
del pyspark.pandas.frame.DataFrame.pow.__doc__
del pyspark.pandas.frame.DataFrame.radd.__doc__
del pyspark.pandas.frame.DataFrame.rdiv.__doc__
del pyspark.pandas.frame.DataFrame.rfloordiv.__doc__
del pyspark.pandas.frame.DataFrame.rmod.__doc__
del pyspark.pandas.frame.DataFrame.rmul.__doc__
del pyspark.pandas.frame.DataFrame.rpow.__doc__
del pyspark.pandas.frame.DataFrame.rsub.__doc__
del pyspark.pandas.frame.DataFrame.rtruediv.__doc__
del pyspark.pandas.frame.DataFrame.sub.__doc__
del pyspark.pandas.frame.DataFrame.truediv.__doc__

spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.frame tests").getOrCreate()
)
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4595,12 +4595,17 @@ def _test() -> None:
import numpy
from pyspark.sql import SparkSession
import pyspark.pandas.groupby
from pyspark.testing.utils import is_ansi_mode_test

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.groupby.__dict__.copy()
globs["np"] = numpy
globs["ps"] = pyspark.pandas

if is_ansi_mode_test:
del pyspark.pandas.groupby.DataFrameGroupBy.corr.__doc__

spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.groupby tests")
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/pandas/namespace.py
Original file line number Diff line number Diff line change
Expand Up @@ -3874,6 +3874,7 @@ def _test() -> None:
from pyspark.sql import SparkSession
import pyspark.pandas.namespace
from pandas.util.version import Version
from pyspark.testing.utils import is_ansi_mode_test

os.chdir(os.environ["SPARK_HOME"])

Expand All @@ -3886,6 +3887,11 @@ def _test() -> None:
globs = pyspark.pandas.namespace.__dict__.copy()
globs["ps"] = pyspark.pandas
globs["sf"] = F

if is_ansi_mode_test:
del pyspark.pandas.namespace.melt.__doc__
del pyspark.pandas.namespace.to_numeric.__doc__

spark = (
SparkSession.builder.master("local[4]")
.appName("pyspark.pandas.namespace tests")
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -7336,11 +7336,16 @@ def _test() -> None:
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.series
from pyspark.testing.utils import is_ansi_mode_test

os.chdir(os.environ["SPARK_HOME"])

globs = pyspark.pandas.series.__dict__.copy()
globs["ps"] = pyspark.pandas

if is_ansi_mode_test:
del pyspark.pandas.series.Series.autocorr.__doc__

spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.series tests").getOrCreate()
)
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/pandas/tests/computation/test_corr.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase, SPARK_CONF_ARROW_ENABLED
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message


class FrameCorrMixin:
@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_dataframe_corr(self):
pdf = pd.DataFrame(
index=[
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/pandas/tests/computation/test_missing_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message


# This file contains test cases for 'Missing data handling'
Expand Down Expand Up @@ -273,6 +274,7 @@ def test_fillna(self):
pdf.fillna({("x", "a"): -1, ("x", "b"): -2, ("y", "c"): -5}),
)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_replace(self):
pdf = pd.DataFrame(
{
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pandas/tests/data_type_ops/test_boolean_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from pyspark import pandas as ps
from pyspark.pandas import option_context
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message
from pyspark.pandas.tests.data_type_ops.testing_utils import OpsTestBase
from pyspark.pandas.typedef.typehints import (
extension_float_dtypes_available,
Expand Down Expand Up @@ -99,6 +100,7 @@ def test_mul(self):
else:
self.assertRaises(TypeError, lambda: b_psser * psser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_truediv(self):
pdf, psdf = self.pdf, self.psdf

Expand All @@ -114,6 +116,7 @@ def test_truediv(self):
for col in self.non_numeric_df_cols:
self.assertRaises(TypeError, lambda: b_psser / psdf[col])

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_floordiv(self):
pdf, psdf = self.pdf, self.psdf

Expand All @@ -134,6 +137,7 @@ def test_floordiv(self):
for col in self.non_numeric_df_cols:
self.assertRaises(TypeError, lambda: b_psser // psdf[col])

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_mod(self):
pdf, psdf = self.pdf, self.psdf

Expand Down Expand Up @@ -233,6 +237,7 @@ def test_rpow(self):
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** b_psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** b_psser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_rmod(self):
psdf = self.psdf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message
from pyspark.pandas.tests.data_type_ops.testing_utils import OpsTestBase


Expand All @@ -40,6 +41,7 @@ def float_pser(self):
def float_psser(self):
return ps.from_pandas(self.float_pser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_add(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand All @@ -57,6 +59,7 @@ def test_add(self):
else:
self.assertRaises(TypeError, lambda: psser + psdf[n_col])

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_sub(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/pandas/tests/data_type_ops/test_num_mod.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message
from pyspark.pandas.tests.data_type_ops.testing_utils import OpsTestBase


Expand All @@ -34,6 +35,7 @@ def float_pser(self):
def float_psser(self):
return ps.from_pandas(self.float_pser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_mod(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/pandas/tests/data_type_ops/test_num_mul_div.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message
from pyspark.pandas.tests.data_type_ops.testing_utils import OpsTestBase


Expand All @@ -34,6 +35,7 @@ def float_pser(self):
def float_psser(self):
return ps.from_pandas(self.float_pser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_mul(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/pandas/tests/data_type_ops/test_num_reverse.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.utils import is_ansi_mode_test, ansi_mode_not_supported_message
from pyspark.pandas.tests.data_type_ops.testing_utils import OpsTestBase


Expand All @@ -41,6 +42,7 @@ def float_pser(self):
def float_psser(self):
return ps.from_pandas(self.float_pser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_radd(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand All @@ -53,6 +55,7 @@ def test_radd(self):
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) + psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) + psser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_rsub(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand All @@ -65,6 +68,7 @@ def test_rsub(self):
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) - psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) - psser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_rmul(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down Expand Up @@ -113,6 +117,7 @@ def test_rpow(self):
self.assertRaises(TypeError, lambda: datetime.date(1994, 1, 1) ** psser)
self.assertRaises(TypeError, lambda: datetime.datetime(1994, 1, 1) ** psser)

@unittest.skipIf(is_ansi_mode_test, ansi_mode_not_supported_message)
def test_rmod(self):
pdf, psdf = self.pdf, self.psdf
for col in self.numeric_df_cols:
Expand Down
Loading