diff --git a/sdks/python/apache_beam/dataframe/frames_test.py b/sdks/python/apache_beam/dataframe/frames_test.py index 076ab504addec..55d9fc5f4dfbc 100644 --- a/sdks/python/apache_beam/dataframe/frames_test.py +++ b/sdks/python/apache_beam/dataframe/frames_test.py @@ -1025,6 +1025,17 @@ def test_series_fillna_series_as_value(self): self._run_test(lambda df, df2: df.A.fillna(df2.A), df, df2) + def test_dataframe_column_fillna_constant_as_value(self): + from apache_beam.dataframe import convert + from apache_beam.testing.util import assert_that + from apache_beam.testing.util import equal_to + with beam.Pipeline(None) as p: + pcoll = ( + p | beam.Create([1.0, np.nan, -1.0]) | beam.Select(x=lambda x: x)) + df = convert.to_dataframe(pcoll) + df_new = df['x'].fillna(0) + assert_that(convert.to_pcollection(df_new), equal_to([1.0, 0.0, -1.0])) + @unittest.skipIf(PD_VERSION >= (2, 0), 'append removed in Pandas 2.0') def test_append_verify_integrity(self): df1 = pd.DataFrame({'A': range(10), 'B': range(10)}, index=range(10)) diff --git a/sdks/python/apache_beam/dataframe/transforms.py b/sdks/python/apache_beam/dataframe/transforms.py index 852b49c4e2ed6..d0b5be4eb2a9b 100644 --- a/sdks/python/apache_beam/dataframe/transforms.py +++ b/sdks/python/apache_beam/dataframe/transforms.py @@ -395,7 +395,11 @@ def expr_to_stages(expr): if stage is None: # No stage available, compute this expression as part of a new stage. - stage = Stage(expr.args(), expr.requires_partition_by()) + stage = Stage([ + arg for arg in expr.args() + if not isinstance(arg, expressions.ConstantExpression) + ], + expr.requires_partition_by()) for arg in expr.args(): # For each argument, declare that it is also available in # this new stage.