Skip to content

Commit

Permalink
Add support for merging chunks
Browse files Browse the repository at this point in the history
New procedures to `merge_chunks` are introduced that can merge an
arbitrary number of chunks if the right conditions apply. Basic checks
are done to ensure that the chunks can be merged from a partitioning
perspective. Some more advanced cases that are potentially mergeable
are not supported at this time (e.g., complicated merges of chunks
with multi-dimensional partitioning) and merging of compressed
chunks.

Merging compressed chunks requires additional work, although the same
basic rewrite approach should work also on the internal compressed
relations. Howver, one needs to handle merge of a compressed chunk
with a non-compressed chunk, or two compressed chunks with different
compression settings, and so forth. This is left for a future
enhancement.

Currently, the merge defaults to taking an AccessExclusive lock on the
merged chunks to prevent deadlocks and concurrent
modifications. Weaker locking is supported via an anonymous settings
variable, but this is mostly to prove in tests that these approaches
can lead to deadlocks.

The actual merging is done by rewriting all the data from multiple
chunks into a (temporary) merged heap using the same approach as that
implemented to support VACUUM FULL and CLUSTER. Then this new heap is
swapped into one of the original relations while the rest are
dropped. This approach is MVCC compliant and implements correct
visibility under higher isolation levels, while also doing vacuum and
leaving no garbage tuples.
  • Loading branch information
erimatnor committed Jan 13, 2025
1 parent 3170014 commit 013e5dc
Show file tree
Hide file tree
Showing 23 changed files with 2,322 additions and 28 deletions.
1 change: 1 addition & 0 deletions .unreleased/pr_7433
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7433 Add support for merging chunks
8 changes: 8 additions & 0 deletions sql/maintenance_utils.sql
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ CREATE OR REPLACE PROCEDURE @[email protected]_to_rowstore(
if_columnstore BOOLEAN = true
) AS '@MODULE_PATHNAME@', 'ts_decompress_chunk' LANGUAGE C;

CREATE OR REPLACE PROCEDURE @[email protected]_chunks(
chunk1 REGCLASS, chunk2 REGCLASS
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_merge_two_chunks';

CREATE OR REPLACE PROCEDURE @[email protected]_chunks(
chunks REGCLASS[]
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_merge_chunks';

CREATE OR REPLACE FUNCTION _timescaledb_functions.recompress_chunk_segmentwise(
uncompressed_chunk REGCLASS,
if_compressed BOOLEAN = true
Expand Down
9 changes: 9 additions & 0 deletions sql/updates/latest-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,12 @@ CREATE FUNCTION @[email protected]_columnstore_stats (hypertable REGCLASS)
STABLE STRICT
AS 'SELECT * FROM @[email protected]_compression_stats($1)'
SET search_path TO pg_catalog, pg_temp;

-- Merge chunks
CREATE PROCEDURE @[email protected]_chunks(
chunk1 REGCLASS, chunk2 REGCLASS
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';

CREATE PROCEDURE @[email protected]_chunks(
chunks REGCLASS[]
) LANGUAGE C AS '@MODULE_PATHNAME@', 'ts_update_placeholder';
3 changes: 3 additions & 0 deletions sql/updates/reverse-dev.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ DROP VIEW timescaledb_information.hypertable_columnstore_settings;
DROP VIEW timescaledb_information.chunk_columnstore_settings;

DROP PROCEDURE IF EXISTS _timescaledb_functions.cagg_migrate_update_watermark(INTEGER);
-- Merge chunks
DROP PROCEDURE IF EXISTS @[email protected]_chunks(chunk1 REGCLASS, chunk2 REGCLASS);
DROP PROCEDURE IF EXISTS @[email protected]_chunks(chunks REGCLASS[]);
13 changes: 13 additions & 0 deletions src/chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <catalog/pg_opfamily.h>
#include <catalog/pg_trigger.h>
#include <catalog/pg_type.h>
#include <catalog/pg_type_d.h>
#include <catalog/toasting.h>
#include <commands/defrem.h>
#include <commands/tablecmds.h>
Expand All @@ -31,6 +32,7 @@
#include <storage/lmgr.h>
#include <tcop/tcopprot.h>
#include <utils/acl.h>
#include <utils/array.h>
#include <utils/builtins.h>
#include <utils/datum.h>
#include <utils/hsearch.h>
Expand Down Expand Up @@ -5158,3 +5160,14 @@ ts_chunk_drop_osm_chunk(PG_FUNCTION_ARGS)
ts_cache_release(hcache);
PG_RETURN_BOOL(true);
}

TS_FUNCTION_INFO_V1(ts_merge_two_chunks);

Datum
ts_merge_two_chunks(PG_FUNCTION_ARGS)
{
Datum chunks[2] = { PG_GETARG_DATUM(0), PG_GETARG_DATUM(1) };
ArrayType *chunk_array =
construct_array(chunks, 2, REGCLASSOID, sizeof(Oid), true, TYPALIGN_INT);
return DirectFunctionCall1(ts_cm_functions->merge_chunks, PointerGetDatum(chunk_array));
}
3 changes: 2 additions & 1 deletion src/chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ extern bool ts_chunk_exists_relid(Oid relid);
extern TSDLLEXPORT bool ts_chunk_exists_with_compression(int32 hypertable_id);
extern void ts_chunk_recreate_all_constraints_for_dimension(Hypertable *ht, int32 dimension_id);
extern int ts_chunk_delete_by_hypertable_id(int32 hypertable_id);
extern int ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior);
extern TSDLLEXPORT int ts_chunk_delete_by_name(const char *schema, const char *table,
DropBehavior behavior);
extern bool ts_chunk_set_name(Chunk *chunk, const char *newname);
extern bool ts_chunk_set_schema(Chunk *chunk, const char *newschema);
extern TSDLLEXPORT List *ts_chunk_get_window(int32 dimension_id, int64 point, int count,
Expand Down
9 changes: 5 additions & 4 deletions src/chunk_constraint.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti)
/*
* Create a dimensional CHECK constraint for a partitioning dimension.
*/
static Constraint *
create_dimension_check_constraint(const Dimension *dim, const DimensionSlice *slice,
const char *name)
Constraint *
ts_chunk_constraint_dimensional_create(const Dimension *dim, const DimensionSlice *slice,
const char *name)
{
Constraint *constr = NULL;
bool isvarlena;
Expand Down Expand Up @@ -489,7 +489,8 @@ ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk)

dim = ts_hyperspace_get_dimension_by_id(ht->space, slice->fd.dimension_id);
Assert(dim);
constr = create_dimension_check_constraint(dim, slice, NameStr(cc->fd.constraint_name));
constr =
ts_chunk_constraint_dimensional_create(dim, slice, NameStr(cc->fd.constraint_name));

/* In some cases, a CHECK constraint is not needed. For instance,
* if the range is -INF to +INF. */
Expand Down
9 changes: 6 additions & 3 deletions src/chunk_constraint.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ extern int ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *sli
ChunkScanCtx *ctx, MemoryContext mctx);
extern int ts_chunk_constraint_scan_by_dimension_slice_to_list(const DimensionSlice *slice,
List **list, MemoryContext mctx);
extern int ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,
ChunkConstraints *ccs,
MemoryContext mctx);
extern int TSDLLEXPORT ts_chunk_constraint_scan_by_dimension_slice_id(int32 dimension_slice_id,
ChunkConstraints *ccs,
MemoryContext mctx);
extern ChunkConstraint *ts_chunk_constraints_add(ChunkConstraints *ccs, int32 chunk_id,
int32 dimension_slice_id,
const char *constraint_name,
Expand All @@ -58,6 +58,9 @@ extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_constraints(ChunkCon
extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_check_constraints(
ChunkConstraints *ccs, int32 chunk_id, const char chunk_relkind, Oid hypertable_oid);
extern TSDLLEXPORT void ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs);
extern TSDLLEXPORT Constraint *ts_chunk_constraint_dimensional_create(const Dimension *dim,
const DimensionSlice *slice,
const char *name);
extern TSDLLEXPORT void ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk);
extern void ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk,
Oid constraint_oid);
Expand Down
2 changes: 2 additions & 0 deletions src/cross_module_fn.c
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ CROSSMODULE_WRAPPER(chunk_create_empty_table);

CROSSMODULE_WRAPPER(recompress_chunk_segmentwise);
CROSSMODULE_WRAPPER(get_compressed_chunk_index_for_recompression);
CROSSMODULE_WRAPPER(merge_chunks);

/* hypercore */
CROSSMODULE_WRAPPER(is_compressed_tid);
Expand Down Expand Up @@ -407,6 +408,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.recompress_chunk_segmentwise = error_no_default_fn_pg_community,
.get_compressed_chunk_index_for_recompression = error_no_default_fn_pg_community,
.preprocess_query_tsl = preprocess_query_tsl_default_fn_community,
.merge_chunks = error_no_default_fn_pg_community,
};

TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
1 change: 1 addition & 0 deletions src/cross_module_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ typedef struct CrossModuleFunctions
PGFunction recompress_chunk_segmentwise;
PGFunction get_compressed_chunk_index_for_recompression;
void (*preprocess_query_tsl)(Query *parse, int *cursor_opts);
PGFunction merge_chunks;
} CrossModuleFunctions;

extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
Expand Down
3 changes: 2 additions & 1 deletion src/dimension.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ extern Hyperspace *ts_dimension_scan(int32 hypertable_id, Oid main_table_relid,
extern DimensionSlice *ts_dimension_calculate_default_slice(const Dimension *dim, int64 value);
extern TSDLLEXPORT Point *ts_hyperspace_calculate_point(const Hyperspace *h, TupleTableSlot *slot);
extern int ts_dimension_get_slice_ordinal(const Dimension *dim, const DimensionSlice *slice);
extern const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs, int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension_by_id(const Hyperspace *hs,
int32 id);
extern TSDLLEXPORT const Dimension *ts_hyperspace_get_dimension(const Hyperspace *hs,
DimensionType type, Index n);
extern TSDLLEXPORT Dimension *ts_hyperspace_get_mutable_dimension(Hyperspace *hs,
Expand Down
12 changes: 7 additions & 5 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str
int limit, const ScanTupLock *tuplock);
extern DimensionVec *ts_dimension_slice_collision_scan_limit(int32 dimension_id, int64 range_start,
int64 range_end, int limit);
extern bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice,
const ScanTupLock *tuplock);
extern TSDLLEXPORT bool ts_dimension_slice_scan_for_existing(const DimensionSlice *slice,
const ScanTupLock *tuplock);
extern DimensionSlice *ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id,
const ScanTupLock *tuplock,
MemoryContext mctx,
Expand All @@ -70,18 +70,20 @@ extern DimensionVec *ts_dimension_slice_scan_by_dimension_before_point(int32 dim
ScanDirection scandir,
MemoryContext mctx);
extern int ts_dimension_slice_delete_by_dimension_id(int32 dimension_id, bool delete_constraints);
extern int ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraints);
extern TSDLLEXPORT int ts_dimension_slice_delete_by_id(int32 dimension_slice_id,
bool delete_constraints);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_create(int dimension_id, int64 range_start,
int64 range_end);
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_copy(const DimensionSlice *original);
extern TSDLLEXPORT bool ts_dimension_slices_collide(const DimensionSlice *slice1,
const DimensionSlice *slice2);
extern bool ts_dimension_slices_equal(const DimensionSlice *slice1, const DimensionSlice *slice2);
extern TSDLLEXPORT bool ts_dimension_slices_equal(const DimensionSlice *slice1,
const DimensionSlice *slice2);
extern bool ts_dimension_slice_cut(DimensionSlice *to_cut, const DimensionSlice *other,
int64 coord);
extern void ts_dimension_slice_free(DimensionSlice *slice);
extern int ts_dimension_slice_insert_multi(DimensionSlice **slice, Size num_slices);
extern void ts_dimension_slice_insert(DimensionSlice *slice);
extern TSDLLEXPORT void ts_dimension_slice_insert(DimensionSlice *slice);
extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSlice *right);
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);

Expand Down
4 changes: 2 additions & 2 deletions src/hypercube.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ typedef struct Hypercube
(sizeof(Hypercube) + (sizeof(DimensionSlice *) * (num_dimensions)))

extern TSDLLEXPORT Hypercube *ts_hypercube_alloc(int16 num_dimensions);
extern void ts_hypercube_free(Hypercube *hc);
extern TSDLLEXPORT void ts_hypercube_free(Hypercube *hc);

extern TSDLLEXPORT DimensionSlice *
ts_hypercube_add_slice_from_range(Hypercube *hc, int32 dimension_id, int64 start, int64 end);
Expand All @@ -41,6 +41,6 @@ extern Hypercube *ts_hypercube_calculate_from_point(const Hyperspace *hs, const
extern bool ts_hypercubes_collide(const Hypercube *cube1, const Hypercube *cube2);
extern TSDLLEXPORT const DimensionSlice *ts_hypercube_get_slice_by_dimension_id(const Hypercube *hc,
int32 dimension_id);
extern Hypercube *ts_hypercube_copy(const Hypercube *hc);
extern TSDLLEXPORT Hypercube *ts_hypercube_copy(const Hypercube *hc);
extern bool ts_hypercube_equal(const Hypercube *hc1, const Hypercube *hc2);
extern void ts_hypercube_slice_sort(Hypercube *hc);
Loading

0 comments on commit 013e5dc

Please sign in to comment.