Skip to content

Commit

Permalink
feat: add DataFrame.combine and DataFrame.combine_first (googleap…
Browse files Browse the repository at this point in the history
…is#27)

feat: add `DataFrame.skew` and `GroupBy.skew`
test: remove unneeded mock
perf: `bigframes-api` label to I/O query jobs
fix: `remote_function` uses same credentials as other APIs
test: BQML golden SQL unit tests
feat: add `DataFrame.pct_change` and `Series.pct_change`
test: disable `remote_function` reuse in tests
test: fix flaky repr_cache tests
test: add unit tests for private `ArrayValue` class
feat: add `DataFrame.to_dict`, `to_excel`, `to_latex`, `to_records`, `to_string`, `to_markdown`, `to_pickle`, `to_orc`
fix: use for literals `Int64Dtype` in `cut`
feat: add `DataFrame.nlargest`, `nsmallest`
chore: refactor PCA tests
feat: add `bfill` and `ffill` to `DataFrame` and `Series`
feat: add `reindex_like` to `DataFrame` and `Series`
fix: use lowercase strings for parameter literals in `bigframes.ml` (**breaking change**)
feat: support `DataFrame.loc[bool_series, column] = scalar`
fix: support column joins with "None indexer"
docs: document region logic in README
feat: add partial support for `Sereies.replace`
fix: add type hints to models
test: add more unit tests for internal `ArrayValue`
feat: add `filter` and `reindex` to `Series` and `DataFrame`
docs: document possible parameter values for PaLM2TextGenerator
test: mark generate_text test as flaky
feat: support a persistent `name` in `remote_function`
fix: raise error when ARIMAPlus is used with Pipeline
feat: add `swaplevel` to `DataFrame` and `Series`
feat: add `axis` parameter to `droplevel` and `reorder_levels`
docs: fix OneHotEncoder sample
fix: remove `transforms` parameter in `model.fit` (**breaking change**)
feat: add `diff` method to `DataFrame` and `GroupBy`
  • Loading branch information
tswast authored Sep 16, 2023
1 parent ffad464 commit 7c6b0dd
Show file tree
Hide file tree
Showing 63 changed files with 3,487 additions and 417 deletions.
23 changes: 23 additions & 0 deletions .kokoro/docs/docs-presubmit-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Format: //devtools/kokoro/config/proto/build.proto

env_vars: {
key: "V2_STAGING_BUCKET"
value: "gcloud-python-test"
}

# We only upload the image in the main `docs` build.
env_vars: {
key: "TRAMPOLINE_IMAGE_UPLOAD"
value: "false"
}

env_vars: {
key: "TRAMPOLINE_BUILD_FILE"
value: ".kokoro/build.sh"
}

# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "docfx"
}
7 changes: 7 additions & 0 deletions .kokoro/presubmit/e2e-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Format: //devtools/kokoro/config/proto/build.proto

# Only run this nox session.
env_vars: {
key: "NOX_SESSION"
value: "system_noextras e2e notebook samples"
}
1 change: 1 addition & 0 deletions .kokoro/presubmit/presubmit-gerrit.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Format: //devtools/kokoro/config/proto/build.proto
1 change: 1 addition & 0 deletions OWNERS
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
[email protected]
[email protected]
[email protected]
[email protected]
Expand Down
11 changes: 10 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,13 @@ internally to manage metadata on the service side. This session is tied to a
BigQuery DataFrames uses the US multi-region as the default location, but you
can use ``session_options.location`` to set a different location. Every query
in a session is executed in the location where the session was created.
BigQuery DataFrames
auto-populates ``bf.options.bigquery.location`` if the user starts with
``read_gbq/read_gbq_table/read_gbq_query()`` and specifies a table, either
directly or in a SQL statement.

If you want to reset the location of the created DataFrame or Series objects,
can reset the session by executing ``bigframes.pandas.reset_session()``.
you can reset the session by executing ``bigframes.pandas.reset_session()``.
After that, you can reuse ``bigframes.pandas.options.bigquery.location`` to
specify another location.

Expand All @@ -68,6 +72,11 @@ specify another location.
querying is not in the US multi-region. If you try to read a table from another
location, you get a NotFound exception.

Project
-------
If ``bf.options.bigquery.project`` is not set, the ``$GOOGLE_CLOUD_PROJECT``
environment variable is used, which is set in the notebook runtime serving the
BigQuery Studio/Vertex Notebooks.

ML Capabilities
---------------
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ def aggregate(
"""
Apply aggregations to the expression.
Arguments:
by_column_id: column id of the aggregation key, this is preserved through the transform
aggregations: input_column_id, operation, output_column_id tuples
by_column_id: column id of the aggregation key, this is preserved through the transform
dropna: whether null keys should be dropped
"""
table = self.to_ibis_expr(ordering_mode="unordered")
Expand Down
176 changes: 176 additions & 0 deletions bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,25 @@ def value_counts(
return block.select_column(count_id).with_column_labels(["count"])


def pct_change(block: blocks.Block, periods: int = 1) -> blocks.Block:
column_labels = block.column_labels
window_spec = core.WindowSpec(
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)

original_columns = block.value_columns
block, shift_columns = block.multi_apply_window_op(
original_columns, agg_ops.ShiftOp(periods), window_spec=window_spec
)
result_ids = []
for original_col, shifted_col in zip(original_columns, shift_columns):
block, change_id = block.apply_binary_op(original_col, shifted_col, ops.sub_op)
block, pct_change_id = block.apply_binary_op(change_id, shifted_col, ops.div_op)
result_ids.append(pct_change_id)
return block.select_columns(result_ids).with_column_labels(column_labels)


def rank(
block: blocks.Block,
method: str = "average",
Expand Down Expand Up @@ -229,3 +248,160 @@ def dropna(block: blocks.Block, how: typing.Literal["all", "any"] = "any"):
filtered_block = filtered_block.filter(predicate)
filtered_block = filtered_block.select_columns(block.value_columns)
return filtered_block


def nsmallest(
block: blocks.Block,
n: int,
column_ids: typing.Sequence[str],
keep: str,
) -> blocks.Block:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep == "last":
block = block.reversed()
order_refs = [
ordering.OrderingColumnReference(
col_id, direction=ordering.OrderingDirection.ASC
)
for col_id in column_ids
]
block = block.order_by(order_refs, stable=True)
if keep in ("first", "last"):
return block.slice(0, n)
else: # keep == "all":
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
)
block = block.filter(condition)
return block.drop_columns([counter, condition])


def nlargest(
block: blocks.Block,
n: int,
column_ids: typing.Sequence[str],
keep: str,
) -> blocks.Block:
if keep not in ("first", "last", "all"):
raise ValueError("'keep must be one of 'first', 'last', or 'all'")
if keep == "last":
block = block.reversed()
order_refs = [
ordering.OrderingColumnReference(
col_id, direction=ordering.OrderingDirection.DESC
)
for col_id in column_ids
]
block = block.order_by(order_refs, stable=True)
if keep in ("first", "last"):
return block.slice(0, n)
else: # keep == "all":
block, counter = block.apply_window_op(
column_ids[0],
agg_ops.rank_op,
window_spec=core.WindowSpec(ordering=order_refs),
)
block, condition = block.apply_unary_op(
counter, ops.partial_right(ops.le_op, n)
)
block = block.filter(condition)
return block.drop_columns([counter, condition])


def skew(
block: blocks.Block,
skew_column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str] = (),
) -> blocks.Block:

original_columns = skew_column_ids
column_labels = block.select_columns(original_columns).column_labels

block, delta3_ids = _mean_delta_to_power(
block, 3, original_columns, grouping_column_ids
)
# counts, moment3 for each column
aggregations = []
for i, col in enumerate(original_columns):
count_agg = (col, agg_ops.count_op)
moment3_agg = (delta3_ids[i], agg_ops.mean_op)
variance_agg = (col, agg_ops.PopVarOp())
aggregations.extend([count_agg, moment3_agg, variance_agg])

block, agg_ids = block.aggregate(
by_column_ids=grouping_column_ids, aggregations=aggregations
)

skew_ids = []
for i, col in enumerate(original_columns):
# Corresponds to order of aggregations in preceding loop
count_id, moment3_id, var_id = agg_ids[i * 3 : (i * 3) + 3]
block, skew_id = _skew_from_moments_and_count(
block, count_id, moment3_id, var_id
)
skew_ids.append(skew_id)

block = block.select_columns(skew_ids).with_column_labels(column_labels)
if not grouping_column_ids:
# When ungrouped, stack everything into single column so can be returned as series
block = block.stack()
block = block.drop_levels([block.index_columns[0]])
return block


def _mean_delta_to_power(
block: blocks.Block,
n_power,
column_ids: typing.Sequence[str],
grouping_column_ids: typing.Sequence[str],
) -> typing.Tuple[blocks.Block, typing.Sequence[str]]:
"""Calculate (x-mean(x))^n. Useful for calculating moment statistics such as skew and kurtosis."""
window = core.WindowSpec(grouping_keys=grouping_column_ids)
block, mean_ids = block.multi_apply_window_op(column_ids, agg_ops.mean_op, window)
delta_ids = []
cube_op = ops.partial_right(ops.pow_op, n_power)
for val_id, mean_val_id in zip(column_ids, mean_ids):
block, delta_id = block.apply_binary_op(val_id, mean_val_id, ops.sub_op)
block, delta_power_id = block.apply_unary_op(delta_id, cube_op)
block = block.drop_columns(delta_id)
delta_ids.append(delta_power_id)
return block, delta_ids


def _skew_from_moments_and_count(
block: blocks.Block, count_id: str, moment3_id: str, var_id: str
) -> typing.Tuple[blocks.Block, str]:
# Calculate skew using count, third moment and population variance
# See G1 estimator:
# https://en.wikipedia.org/wiki/Skewness#Sample_skewness
block, denominator_id = block.apply_unary_op(
var_id, ops.partial_right(ops.pow_op, 3 / 2)
)
block, base_id = block.apply_binary_op(moment3_id, denominator_id, ops.div_op)
block, countminus1_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 1)
)
block, countminus2_id = block.apply_unary_op(
count_id, ops.partial_right(ops.sub_op, 2)
)
block, adjustment_id = block.apply_binary_op(count_id, countminus1_id, ops.mul_op)
block, adjustment_id = block.apply_unary_op(
adjustment_id, ops.partial_right(ops.pow_op, 1 / 2)
)
block, adjustment_id = block.apply_binary_op(
adjustment_id, countminus2_id, ops.div_op
)
block, skew_id = block.apply_binary_op(base_id, adjustment_id, ops.mul_op)

# Need to produce NA if have less than 3 data points
block, na_cond_id = block.apply_unary_op(count_id, ops.partial_right(ops.ge_op, 3))
block, skew_id = block.apply_binary_op(
skew_id, na_cond_id, ops.partial_arg3(ops.where_op, None)
)
return block, skew_id
16 changes: 10 additions & 6 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -709,8 +709,9 @@ def multi_apply_window_op(
window_spec: core.WindowSpec,
*,
skip_null_groups: bool = False,
) -> Block:
) -> typing.Tuple[Block, typing.Sequence[str]]:
block = self
result_ids = []
for i, col_id in enumerate(columns):
label = self.col_id_to_label[col_id]
block, result_id = block.apply_window_op(
Expand All @@ -721,9 +722,8 @@ def multi_apply_window_op(
result_label=label,
skip_null_groups=skip_null_groups,
)
block = block.copy_values(result_id, col_id)
block = block.drop_columns([result_id])
return block
result_ids.append(result_id)
return block, result_ids

def multi_apply_unary_op(
self,
Expand Down Expand Up @@ -1123,7 +1123,9 @@ def promote_offsets(self, label: Label = None) -> typing.Tuple[Block, str]:
)

def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
axis_number = bigframes.core.utils.get_axis_number(axis)
axis_number = bigframes.core.utils.get_axis_number(
"rows" if (axis is None) else axis
)
if axis_number == 0:
expr = self._expr
for index_col in self._index_columns:
Expand All @@ -1140,7 +1142,9 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block:
return self.rename(columns=lambda label: f"{prefix}{label}")

def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block:
axis_number = bigframes.core.utils.get_axis_number(axis)
axis_number = bigframes.core.utils.get_axis_number(
"rows" if (axis is None) else axis
)
if axis_number == 0:
expr = self._expr
for index_col in self._index_columns:
Expand Down
44 changes: 40 additions & 4 deletions bigframes/core/groupby/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import bigframes.constants as constants
import bigframes.core as core
import bigframes.core.block_transforms as block_ops
import bigframes.core.blocks as blocks
import bigframes.core.ordering as order
import bigframes.core.utils as utils
Expand Down Expand Up @@ -145,6 +146,16 @@ def var(
self._raise_on_non_numeric("var")
return self._aggregate_all(agg_ops.var_op, numeric_only=True)

def skew(
self,
*,
numeric_only: bool = False,
) -> df.DataFrame:
if not numeric_only:
self._raise_on_non_numeric("skew")
block = block_ops.skew(self._block, self._selected_cols, self._by_col_ids)
return df.DataFrame(block)

def all(self) -> df.DataFrame:
return self._aggregate_all(agg_ops.all_op)

Expand All @@ -168,6 +179,22 @@ def cummax(self, *args, numeric_only: bool = False, **kwargs) -> df.DataFrame:
def cumprod(self, *args, **kwargs) -> df.DataFrame:
return self._apply_window_op(agg_ops.product_op, numeric_only=True)

def shift(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

def diff(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

def agg(self, func=None, **kwargs) -> df.DataFrame:
if func:
if isinstance(func, str):
Expand Down Expand Up @@ -323,10 +350,10 @@ def _apply_window_op(
grouping_keys=self._by_col_ids, following=0
)
columns = self._aggregated_columns(numeric_only=numeric_only)
block = self._block.multi_apply_window_op(
block, result_ids = self._block.multi_apply_window_op(
columns, op, window_spec=window_spec, skip_null_groups=self._dropna
)
block = block.select_columns(columns)
block = block.select_columns(result_ids)
return df.DataFrame(block)

def _resolve_label(self, label: blocks.Label) -> str:
Expand Down Expand Up @@ -391,6 +418,10 @@ def std(self, *args, **kwargs) -> series.Series:
def var(self, *args, **kwargs) -> series.Series:
return self._aggregate(agg_ops.var_op)

def skew(self, *args, **kwargs) -> series.Series:
block = block_ops.skew(self._block, [self._value_column], self._by_col_ids)
return series.Series(block)

def prod(self, *args) -> series.Series:
return self._aggregate(agg_ops.product_op)

Expand Down Expand Up @@ -459,8 +490,13 @@ def shift(self, periods=1) -> series.Series:
)
return self._apply_window_op(agg_ops.ShiftOp(periods), window=window)

def diff(self) -> series.Series:
return self._ungroup() - self.shift(1)
def diff(self, periods=1) -> series.Series:
window = core.WindowSpec(
grouping_keys=self._by_col_ids,
preceding=periods if periods > 0 else None,
following=-periods if periods < 0 else None,
)
return self._apply_window_op(agg_ops.DiffOp(periods), window=window)

def rolling(self, window: int, min_periods=None) -> windows.Window:
# To get n size window, need current row and n-1 preceding rows.
Expand Down
Loading

0 comments on commit 7c6b0dd

Please sign in to comment.