Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6440: Use different HDK parameters for different queries #6441

Merged
merged 3 commits into from
Aug 4, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,8 @@ def __exit__(self, type, value, traceback):

def __init__(self):
self._input_ctx_stack = []
self.has_join = False
self.has_groupby = False

def build(self, op):
"""
Expand Down Expand Up @@ -880,6 +882,7 @@ def _process_groupby(self, op):
op : GroupbyAggNode
An operation to translate.
"""
self.has_groupby = True
frame = op.input[0]

# Aggregation's input should always be a projection and
Expand Down Expand Up @@ -957,6 +960,7 @@ def _process_join(self, op):
op : JoinNode
An operation to translate.
"""
self.has_join = True
node = CalciteJoinNode(
left_id=self._input_node(0).id,
right_id=self._input_node(1).id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,15 @@ def executeDML(cls, query: str):
return cls.executeRA(query, True)

@classmethod
def executeRA(cls, query: str, exec_calcite=False):
def executeRA(cls, query: str, exec_calcite=False, **exec_args):
hdk = cls._hdk()
if exec_calcite or query.startswith("execute calcite"):
ra = hdk._calcite.process(query, db_name="hdk", legacy_syntax=True)
else:
ra = query
ra_executor = RelAlgExecutor(hdk._executor, hdk._schema_mgr, hdk._data_mgr, ra)
return HdkTable(ra_executor.execute(device_type=cls._preferred_device))
table = ra_executor.execute(device_type=cls._preferred_device, **exec_args)
return HdkTable(table)

@classmethod
def import_arrow_table(cls, table: pa.Table, name: Optional[str] = None):
Expand Down Expand Up @@ -163,9 +164,7 @@ def _hdk(cls) -> HDK:
HDK
"""
params = HdkLaunchParameters.get()
cls._preferred_device = (
"CPU" if bool(HdkLaunchParameters.get()["cpu_only"]) else "GPU"
)
cls._preferred_device = "CPU" if bool(params["cpu_only"]) else "GPU"
AndreyPavlenko marked this conversation as resolved.
Show resolved Hide resolved
cls._hdk_instance = HDK(**params)
cls._hdk = cls._get_hdk_instance
return cls._hdk()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,11 +248,20 @@
for frame in frames:
cls.import_table(frame, worker)

calcite_plan = CalciteBuilder().build(plan)
builder = CalciteBuilder()
calcite_plan = builder.build(plan)
calcite_json = CalciteSerializer().serialize(calcite_plan)
if DoUseCalcite.get():
exec_calcite = True
calcite_json = "execute calcite " + calcite_json
table = worker.executeRA(calcite_json)
else:
exec_calcite = False

Check warning on line 258 in modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py

View check run for this annotation

Codecov / codecov/patch

modin/experimental/core/execution/native/implementations/hdk_on_native/partitioning/partition_manager.py#L258

Added line #L258 was not covered by tests
exec_args = {}
if builder.has_groupby and not builder.has_join:
exec_args = {"enable_lazy_fetch": 0, "enable_columnar_output": 0}
elif not builder.has_groupby and builder.has_join:
exec_args = {"enable_lazy_fetch": 1, "enable_columnar_output": 1}
table = worker.executeRA(calcite_json, exec_calcite, **exec_args)
Fixed Show fixed Hide fixed

res = np.empty((1, 1), dtype=np.dtype(object))
res[0][0] = cls._partition_class(table)
Expand Down
Loading