Skip to content

Commit

Permalink
FEAT-#6440: Use different HDK parameters for different queries
Browse files Browse the repository at this point in the history
Disable lazy_fetch and columnar_output for groupby and enable for
join queries.

Signed-off-by: Andrey Pavlenko <[email protected]>
  • Loading branch information
AndreyPavlenko committed Aug 1, 2023
1 parent 5af3318 commit 0654adb
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ def __exit__(self, type, value, traceback):

def __init__(self):
self._input_ctx_stack = []
self.has_join = False
self.has_groupby = False

def build(self, op):
"""
Expand Down Expand Up @@ -880,6 +882,7 @@ def _process_groupby(self, op):
op : GroupbyAggNode
An operation to translate.
"""
self.has_groupby = True
frame = op.input[0]

# Aggregation's input should always be a projection and
Expand Down Expand Up @@ -957,6 +960,7 @@ def _process_join(self, op):
op : JoinNode
An operation to translate.
"""
self.has_join = True
node = CalciteJoinNode(
left_id=self._input_node(0).id,
right_id=self._input_node(1).id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ def executeDML(cls, query: str):
return cls.executeRA(query, True)

@classmethod
def executeRA(cls, query: str, exec_calcite=False):
def executeRA(cls, query: str, exec_calcite=False, **exec_args):
hdk = cls._hdk()
if exec_calcite or query.startswith("execute calcite"):
ra = hdk._calcite.process(query, db_name="hdk", legacy_syntax=True)
else:
ra = query
ra_executor = RelAlgExecutor(hdk._executor, hdk._schema_mgr, hdk._data_mgr, ra)
return HdkTable(ra_executor.execute(device_type=cls._preferred_device))
table = ra_executor.execute(device_type=cls._preferred_device, **exec_args)
return HdkTable(table)

@classmethod
def import_arrow_table(cls, table: pa.Table, name: Optional[str] = None):
Expand Down Expand Up @@ -163,9 +164,7 @@ def _hdk(cls) -> HDK:
HDK
"""
params = HdkLaunchParameters.get()
cls._preferred_device = (
"CPU" if bool(HdkLaunchParameters.get()["cpu_only"]) else "GPU"
)
cls._preferred_device = "CPU" if bool(params["cpu_only"]) else "GPU"
cls._hdk_instance = HDK(**params)
cls._hdk = cls._get_hdk_instance
return cls._hdk()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,21 @@ def run_exec_plan(cls, plan):
for frame in frames:
cls.import_table(frame, worker)

calcite_plan = CalciteBuilder().build(plan)
builder = CalciteBuilder()
calcite_plan = builder.build(plan)
calcite_json = CalciteSerializer().serialize(calcite_plan)
if DoUseCalcite.get():
exec_calcite = True
calcite_json = "execute calcite " + calcite_json
table = worker.executeRA(calcite_json)
else:
exec_calcite = False

Check warning on line 258 in modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py#L258

Added line #L258 was not covered by tests
if builder.has_groupby == builder.has_join:
exec_args = {}
elif builder.has_groupby:
exec_args = {"enable_lazy_fetch": 0, "enable_columnar_output": 0}
elif builder.has_join:
exec_args = {"enable_lazy_fetch": 1, "enable_columnar_output": 1}
table = worker.executeRA(calcite_json, exec_calcite, **exec_args)

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'exec_args' may be used before it is initialized.

res = np.empty((1, 1), dtype=np.dtype(object))
res[0][0] = cls._partition_class(table)
Expand Down

0 comments on commit 0654adb

Please sign in to comment.