Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply sort transform optimizations to compressed chunks #7528

Merged
merged 19 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions .github/gh_matrix_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def build_debug_config(overrides):
# builds. This will capture some cases where warnings are generated
# for release builds but not for debug builds.
def build_release_config(overrides):
base_config = build_debug_config({})
release_config = dict(
{
"name": "Release",
Expand All @@ -102,26 +101,24 @@ def build_release_config(overrides):
"coverage": False,
}
)
base_config.update(release_config)
base_config.update(overrides)
return base_config
release_config.update(overrides)
return build_debug_config(release_config)


def build_without_telemetry(overrides):
config = build_release_config({})
config.update(
config = dict(
{
"name": "ReleaseWithoutTelemetry",
"tsdb_build_args": config["tsdb_build_args"] + " -DUSE_TELEMETRY=OFF",
"coverage": False,
}
)
config.update(overrides)
config = build_release_config(config)
config["tsdb_build_args"] += " -DUSE_TELEMETRY=OFF"
return config


def build_apache_config(overrides):
base_config = build_debug_config({})
apache_config = dict(
{
"name": "ApacheOnly",
Expand All @@ -130,9 +127,8 @@ def build_apache_config(overrides):
"coverage": False,
}
)
base_config.update(apache_config)
base_config.update(overrides)
return base_config
apache_config.update(overrides)
return build_debug_config(apache_config)

akuzm marked this conversation as resolved.
Show resolved Hide resolved

def macos_config(overrides):
Expand Down
1 change: 1 addition & 0 deletions .unreleased/compressed-sort-transform
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7528 Transform sorting on `time_bucket` to sorting on time for compressed chunks in some cases.
42 changes: 37 additions & 5 deletions src/planner/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
#include "partitioning.h"
#include "planner/partialize.h"
#include "planner/planner.h"
#include "sort_transform.h"
#include "utils.h"

#include "compat/compat.h"
Expand Down Expand Up @@ -935,8 +936,6 @@ ts_classify_relation(const PlannerInfo *root, const RelOptInfo *rel, Hypertable
return TS_REL_OTHER;
}

extern void ts_sort_transform_optimization(PlannerInfo *root, RelOptInfo *rel);

static inline bool
should_chunk_append(Hypertable *ht, PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered,
int order_attno)
Expand Down Expand Up @@ -1183,14 +1182,47 @@ apply_optimizations(PlannerInfo *root, TsRelType reltype, RelOptInfo *rel, Range
break;
case TS_REL_CHUNK_STANDALONE:
case TS_REL_CHUNK_CHILD:
ts_sort_transform_optimization(root, rel);
{
/*
* Since the sort optimization adds new paths to the rel it has
* to happen before any optimizations that replace pathlist.
*/
if (ts_cm_functions->set_rel_pathlist_query != NULL)
ts_cm_functions->set_rel_pathlist_query(root, rel, rel->relid, rte, ht);
List *transformed_query_pathkeys = ts_sort_transform_get_pathkeys(root, rel, rte, ht);
if (transformed_query_pathkeys != NIL)
{
List *orig_query_pathkeys = root->query_pathkeys;
root->query_pathkeys = transformed_query_pathkeys;

/* Create index paths with transformed pathkeys */
create_index_paths(root, rel);

/*
* Call the TSL hooks with the transformed pathkeys as well, so
* that the decompression paths also use this optimization.
*/
if (ts_cm_functions->set_rel_pathlist_query != NULL)
ts_cm_functions->set_rel_pathlist_query(root, rel, rel->relid, rte, ht);

root->query_pathkeys = orig_query_pathkeys;

/*
* change returned paths to use original pathkeys. have to go through
* all paths since create_index_paths might have modified existing
* pathkey. Always safe to do transform since ordering of
* transformed_query_pathkey implements ordering of
* orig_query_pathkeys.
*/
ts_sort_transform_replace_pathkeys(rel->pathlist,
transformed_query_pathkeys,
orig_query_pathkeys);
}
else
{
if (ts_cm_functions->set_rel_pathlist_query != NULL)
ts_cm_functions->set_rel_pathlist_query(root, rel, rel->relid, rte, ht);
}
break;
}
default:
break;
}
Expand Down
95 changes: 69 additions & 26 deletions src/sort_transform.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
#include <utils/lsyscache.h>

#include "compat/compat.h"
#include "cross_module_fn.h"
#include "func_cache.h"
#include "hypertable.h"
#include "import/allpaths.h"
#include "sort_transform.h"

/* This optimizations allows GROUP BY clauses that transform time in
Expand All @@ -29,8 +32,6 @@
* to an ordering on time.
*/

extern void ts_sort_transform_optimization(PlannerInfo *root, RelOptInfo *rel);

static Expr *
transform_timestamp_cast(FuncExpr *func)
{
Expand Down Expand Up @@ -384,8 +385,9 @@
* For example: an ORDER BY date_trunc('minute', time) can be implemented by
* an ordering of time.
*/
void
ts_sort_transform_optimization(PlannerInfo *root, RelOptInfo *rel)
List *
ts_sort_transform_get_pathkeys(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte,
Hypertable *ht)
{
/*
* We attack this problem in three steps:
Expand All @@ -399,17 +401,16 @@
*
*/
ListCell *lc;
List *transformed_query_pathkey = NIL;
List *orig_query_pathkeys = root->query_pathkeys;
List *transformed_query_pathkeys = NIL;
PathKey *last_pk;
PathKey *new_pk;
EquivalenceClass *transformed;

/*
* nothing to do for empty pathkeys
*/
if (orig_query_pathkeys == NIL)
return;
if (root->query_pathkeys == NIL)
return NIL;

/*
* These sort transformations are only safe for single member ORDER BY
Expand All @@ -420,7 +421,7 @@
transformed = sort_transform_ec(root, last_pk->pk_eclass);

if (transformed == NULL)
return;
return NIL;

new_pk = make_canonical_pathkey(root,
transformed,
Expand All @@ -434,30 +435,72 @@
foreach (lc, root->query_pathkeys)
{
if (lfirst(lc) != last_pk)
transformed_query_pathkey = lappend(transformed_query_pathkey, lfirst(lc));
transformed_query_pathkeys = lappend(transformed_query_pathkeys, lfirst(lc));
else
transformed_query_pathkey = lappend(transformed_query_pathkey, new_pk);
transformed_query_pathkeys = lappend(transformed_query_pathkeys, new_pk);
}

/* search for indexes on transformed pathkeys */
root->query_pathkeys = transformed_query_pathkey;
create_index_paths(root, rel);
root->query_pathkeys = orig_query_pathkeys;
return transformed_query_pathkeys;
}

/*
* change returned paths to use original pathkeys. have to go through
* all paths since create_index_paths might have modified existing
* pathkey. Always safe to do transform since ordering of
* transformed_query_pathkey implements ordering of
* orig_query_pathkeys.
*/
foreach (lc, rel->pathlist)
/*
* After we have created new paths with transformed pathkeys, replace them back
* with the original pathkeys.
*/
void
ts_sort_transform_replace_pathkeys(void *node, List *transformed_pathkeys, List *original_pathkeys)
{
if (node == NULL)
{
Path *path = lfirst(lc);
return;
}

if (compare_pathkeys(path->pathkeys, transformed_query_pathkey) == PATHKEYS_EQUAL)
if (IsA(node, List))
{
List *list = castNode(List, node);
ListCell *lc;
foreach (lc, list)
{
path->pathkeys = orig_query_pathkeys;
ts_sort_transform_replace_pathkeys(lfirst(lc), transformed_pathkeys, original_pathkeys);
}
return;
}

Path *path = (Path *) node;
if (compare_pathkeys(path->pathkeys, transformed_pathkeys) == PATHKEYS_EQUAL)
{
path->pathkeys = original_pathkeys;
}

if (IsA(path, CustomPath))
{
/*
* We should only see ChunkAppend here.
*/
CustomPath *custom = castNode(CustomPath, path);
ts_sort_transform_replace_pathkeys(custom->custom_paths,
transformed_pathkeys,
original_pathkeys);
}
else if (IsA(path, MergeAppendPath))
{
MergeAppendPath *append = castNode(MergeAppendPath, path);
ts_sort_transform_replace_pathkeys(append->subpaths,
transformed_pathkeys,
original_pathkeys);
}
else if (IsA(path, AppendPath))
{
AppendPath *append = castNode(AppendPath, path);
ts_sort_transform_replace_pathkeys(append->subpaths,
transformed_pathkeys,
original_pathkeys);
}
else if (IsA(path, ProjectionPath))
{
ProjectionPath *projection = castNode(ProjectionPath, path);
ts_sort_transform_replace_pathkeys(projection->subpath,

Check warning on line 502 in src/sort_transform.c

View check run for this annotation

Codecov / codecov/patch

src/sort_transform.c#L501-L502

Added lines #L501 - L502 were not covered by tests
transformed_pathkeys,
original_pathkeys);
}
}
8 changes: 8 additions & 0 deletions src/sort_transform.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,12 @@

#include <postgres.h>

#include "hypertable.h"

extern Expr *ts_sort_transform_expr(Expr *expr);

extern List *ts_sort_transform_get_pathkeys(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte,
Hypertable *ht);

extern void ts_sort_transform_replace_pathkeys(void *node, List *transformed_pathkeys,
List *original_pathkeys);
4 changes: 2 additions & 2 deletions tsl/src/chunkwise_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_sorted,
subpath->pathtarget));
partial_grouping_target));
}

if (can_hash)
Expand All @@ -446,7 +446,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_hashed,
subpath->pathtarget));
partial_grouping_target));
}
}
else
Expand Down
64 changes: 56 additions & 8 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,50 @@
plan->plan.parallel_safe = lefttree->parallel_safe;
}

/*
* Find a variable of the given relation somewhere in the expression tree.
* Currently we use this to find the Var argument of time_bucket, when we prepare
* the batch sorted merge parameters after using the monotonous sorting transform
* optimization.
*/
static Var *
find_var_subexpression(void *expr, Index varno)
{
if (IsA(expr, Var))
{
Var *var = castNode(Var, expr);
if ((Index) var->varno == (Index) varno)
{
return var;
}

return NULL;

Check warning on line 975 in tsl/src/nodes/decompress_chunk/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/planner.c#L975

Added line #L975 was not covered by tests
}

if (IsA(expr, List))
{
List *list = castNode(List, expr);
ListCell *lc;
foreach (lc, list)
{
Var *var = find_var_subexpression(lfirst(lc), varno);
if (var != NULL)
{
return var;
}
}

return NULL;

Check warning on line 991 in tsl/src/nodes/decompress_chunk/planner.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/decompress_chunk/planner.c#L991

Added line #L991 was not covered by tests
}

if (IsA(expr, FuncExpr))
{
return find_var_subexpression(castNode(FuncExpr, expr)->args, varno);
}

return NULL;
}

Plan *
decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *path,
List *output_targetlist, List *clauses, List *custom_plans)
Expand Down Expand Up @@ -1130,18 +1174,22 @@
continue;
}

Ensure(IsA(em->em_expr, Var),
"non-Var pathkey not expected for compressed batch sorted merge");

/*
* We found a Var equivalence member that belongs to the
* decompressed relation. We have to convert its varattno which
* is the varattno of the uncompressed chunk tuple, to the
* decompressed scan tuple varattno.
* The equivalence member expression might be a monotonous
* expression of the decompressed relation Var, so recurse to
* find it.
*/
Var *var = castNode(Var, em->em_expr);
Var *var = find_var_subexpression(em->em_expr, em_relid);
Ensure(var != NULL,
"non-Var pathkey not expected for compressed batch sorted merge");

Assert((Index) var->varno == (Index) em_relid);

/*
* Convert its varattno which is the varattno of the
* uncompressed chunk tuple, to the decompressed scan tuple
* varattno.
*/
const int decompressed_scan_attno =
context.uncompressed_attno_info[var->varattno].custom_scan_attno;
Assert(decompressed_scan_attno > 0);
Expand Down
Loading
Loading