diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py index bd56ac7bab0..ba8a465f960 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/calcite_builder.py @@ -583,6 +583,8 @@ def __exit__(self, type, value, traceback): def __init__(self): self._input_ctx_stack = [] + self.has_join = False + self.has_groupby = False def build(self, op): """ @@ -880,6 +882,7 @@ def _process_groupby(self, op): op : GroupbyAggNode An operation to translate. """ + self.has_groupby = True frame = op.input[0] # Aggregation's input should always be a projection and @@ -957,6 +960,7 @@ def _process_join(self, op): op : JoinNode An operation to translate. """ + self.has_join = True node = CalciteJoinNode( left_id=self._input_node(0).id, right_id=self._input_node(1).id, diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/hdk_worker.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/hdk_worker.py index 619e9359c7c..47038d85702 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/hdk_worker.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/hdk_worker.py @@ -104,14 +104,15 @@ def executeDML(cls, query: str): return cls.executeRA(query, True) @classmethod - def executeRA(cls, query: str, exec_calcite=False): + def executeRA(cls, query: str, exec_calcite=False, **exec_args): hdk = cls._hdk() if exec_calcite or query.startswith("execute calcite"): ra = hdk._calcite.process(query, db_name="hdk", legacy_syntax=True) else: ra = query ra_executor = RelAlgExecutor(hdk._executor, hdk._schema_mgr, hdk._data_mgr, ra) - return HdkTable(ra_executor.execute(device_type=cls._preferred_device)) + table = ra_executor.execute(device_type=cls._preferred_device, **exec_args) + return HdkTable(table) @classmethod def import_arrow_table(cls, table: pa.Table, name: Optional[str] = None): @@ -163,9 +164,7 @@ def _hdk(cls) -> HDK: HDK """ params = HdkLaunchParameters.get() - cls._preferred_device = ( - "CPU" if bool(HdkLaunchParameters.get()["cpu_only"]) else "GPU" - ) + cls._preferred_device = "CPU" if bool(params["cpu_only"]) else "GPU" cls._hdk_instance = HDK(**params) cls._hdk = cls._get_hdk_instance return cls._hdk() diff --git a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py index ffe4db6327b..3b982a7c4ca 100644 --- a/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py +++ b/modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py @@ -248,11 +248,21 @@ def run_exec_plan(cls, plan): for frame in frames: cls.import_table(frame, worker) - calcite_plan = CalciteBuilder().build(plan) + builder = CalciteBuilder() + calcite_plan = builder.build(plan) calcite_json = CalciteSerializer().serialize(calcite_plan) if DoUseCalcite.get(): + exec_calcite = True calcite_json = "execute calcite " + calcite_json - table = worker.executeRA(calcite_json) + else: + exec_calcite = False + if builder.has_groupby == builder.has_join: + exec_args = {} + elif builder.has_groupby: + exec_args = {"enable_lazy_fetch": 0, "enable_columnar_output": 0} + elif builder.has_join: + exec_args = {"enable_lazy_fetch": 1, "enable_columnar_output": 1} + table = worker.executeRA(calcite_json, exec_calcite, **exec_args) res = np.empty((1, 1), dtype=np.dtype(object)) res[0][0] = cls._partition_class(table)