Skip to content

Commit

Permalink
[Bug] fix fillna function on a single column fail (#32594)
Browse files Browse the repository at this point in the history
* fix bug all arg add as inputs

* fix bug for fillna

* Revert "fix bug for fillna"

This reverts commit 2a5736c.

* fix bug for fillna

* add test for fillna a column

* add test for fillna a column

* add test for fillna a column

* revert add test to frames_test

* Move test from transforms to frames
  • Loading branch information
DKER2 authored Oct 9, 2024
1 parent 42cad40 commit b781b82
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 1 deletion.
11 changes: 11 additions & 0 deletions sdks/python/apache_beam/dataframe/frames_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,17 @@ def test_series_fillna_series_as_value(self):

self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2)

def test_dataframe_column_fillna_constant_as_value(self):
from apache_beam.dataframe import convert
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
with beam.Pipeline(None) as p:
pcoll = (
p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x))
df = convert.to_dataframe(pcoll)
df_new = df['x'].fillna(0)
assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0]))

@unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0')
def test_append_verify_integrity(self):
df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10))
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/dataframe/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,11 @@ def expr_to_stages(expr):

if stage is None:
# No stage available, compute this expression as part of a new stage.
stage = Stage(expr.args(), expr.requires_partition_by())
stage = Stage([
arg for arg in expr.args()
if not isinstance(arg, expressions.ConstantExpression)
],
expr.requires_partition_by())
for arg in expr.args():
# For each argument, declare that it is also available in
# this new stage.
Expand Down

0 comments on commit b781b82

Please sign in to comment.