Skip to content

Commit

Permalink
Modify dimension slice catalog update API
Browse files Browse the repository at this point in the history
Use the same logic as PR 6773 while updating dimension slice catalog
tuples. PR 6773 addresses chunk catalog updates. We first lock the
tuple and then modify the values and update the locked tuple. Replace
ts_dimension_slice_update_by_id with field specific APIs and use
dimension_slice_update_catalog_tuple calls consistently.
  • Loading branch information
antekresic committed Jun 3, 2024
1 parent 5b1c28d commit 7694c4a
Show file tree
Hide file tree
Showing 3 changed files with 213 additions and 67 deletions.
275 changes: 211 additions & 64 deletions src/dimension_slice.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,167 @@ dimension_slice_from_slot(TupleTableSlot *slot)
return slice;
}

static HeapTuple
dimension_slice_formdata_make_tuple(const FormData_dimension_slice *fd, TupleDesc desc)
{
Datum values[Natts_dimension_slice];
bool nulls[Natts_dimension_slice] = { false };

memset(values, 0, sizeof(Datum) * Natts_dimension_slice);

values[AttrNumberGetAttrOffset(Anum_dimension_slice_id)] = Int32GetDatum(fd->id);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)] =
Int32GetDatum(fd->dimension_id);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] =
Int64GetDatum(fd->range_start);
values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = Int64GetDatum(fd->range_end);

return heap_form_tuple(desc, values, nulls);
}

static inline void
dimension_slice_formdata_fill(FormData_dimension_slice *fd, const TupleInfo *ti)
{
bool nulls[Natts_dimension_slice];
Datum values[Natts_dimension_slice];
bool should_free;
HeapTuple tuple;

tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
heap_deform_tuple(tuple, ts_scanner_get_tupledesc(ti), values, nulls);

Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)]);
Assert(!nulls[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)]);

fd->id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_id)]);
fd->dimension_id =
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_dimension_slice_dimension_id)]);
fd->range_start =
DatumGetInt64(values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)]);
fd->range_end = DatumGetInt64(values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)]);

if (should_free)
heap_freetuple(tuple);
}

static bool
lock_dimension_slice_tuple(int32 dimension_slice_id, ItemPointer tid,
FormData_dimension_slice *form)
{
bool success = false;
ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};
ScanIterator iterator =
ts_scan_iterator_create(DIMENSION_SLICE, RowShareLock, CurrentMemoryContext);
iterator.ctx.index =
catalog_get_index(ts_catalog_get(), DIMENSION_SLICE, DIMENSION_SLICE_ID_IDX);
iterator.ctx.tuplock = &scantuplock;
/* Keeping the lock since we presumably want to update the tuple */
iterator.ctx.flags = SCANNER_F_KEEPLOCK;

/* see table_tuple_lock for details about flags that are set in TupleExclusive mode */
scantuplock.lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
if (!IsolationUsesXactSnapshot())
{
/* in read committed mode, we follow all updates to this tuple */
scantuplock.lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
}

ts_scan_iterator_scan_key_init(&iterator,
Anum_dimension_slice_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_slice_id));

ts_scanner_foreach(&iterator)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
if (ti->lockresult != TM_Ok)
{
if (IsolationUsesXactSnapshot())
{
/* For Repeatable Read and Serializable isolation level report error
* if we cannot lock the tuple
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to lock hypertable catalog tuple, lock result is %d for "
"hypertable "
"ID (%d)",
ti->lockresult,
dimension_slice_id)));
}
}
dimension_slice_formdata_fill(form, ti);
ItemPointer result_tid = ts_scanner_get_tuple_tid(ti);
tid->ip_blkid = result_tid->ip_blkid;
tid->ip_posid = result_tid->ip_posid;
success = true;
break;
}
ts_scan_iterator_close(&iterator);
return success;
}

/* update the tuple at this tid. The assumption is that we already hold a
* tuple exclusive lock and no other transaction can modify this tuple
* The sequence of operations for any update is:
* lock the tuple using lock_hypertable_tuple.
* then update the required fields
* call dimension_slice_update_catalog_tuple to complete the update.
* This ensures correct tuple locking and tuple updates in the presence of
* concurrent transactions. Failure to follow this results in catalog corruption
*/
static void
dimension_slice_update_catalog_tuple(ItemPointer tid, FormData_dimension_slice *update)
{
HeapTuple new_tuple;
CatalogSecurityContext sec_ctx;
Catalog *catalog = ts_catalog_get();
Oid table = catalog_get_table_id(catalog, DIMENSION_SLICE);
Relation dimension_slice_rel = relation_open(table, RowExclusiveLock);

new_tuple = dimension_slice_formdata_make_tuple(update, dimension_slice_rel->rd_att);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_update_tid(dimension_slice_rel, tid, new_tuple);
ts_catalog_restore_user(&sec_ctx);
heap_freetuple(new_tuple);
relation_close(dimension_slice_rel, NoLock);
}

/* delete the tuple at this tid. The assumption is that we already hold a
* tuple exclusive lock and no other transaction can modify this tuple
* The sequence of operations for any delete is:
* lock the tuple using lock_hypertable_tuple.
* call dimension_slice_delete_catalog_tuple to complete the delete.
* This ensures correct tuple locking and tuple deletes in the presence of
* concurrent transactions. Failure to follow this results in catalog corruption
*/
static void
dimension_slice_delete_catalog_tuple(ItemPointer tid)
{
CatalogSecurityContext sec_ctx;
Catalog *catalog = ts_catalog_get();
Oid table = catalog_get_table_id(catalog, DIMENSION_SLICE);
Relation dimension_slice_rel = relation_open(table, RowExclusiveLock);

ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
ts_catalog_delete_tid(dimension_slice_rel, tid);
ts_catalog_restore_user(&sec_ctx);
relation_close(dimension_slice_rel, NoLock);
}

DimensionSlice *
ts_dimension_slice_create(int dimension_id, int64 range_start, int64 range_end)
{
Expand Down Expand Up @@ -596,6 +757,30 @@ dimension_slice_tuple_delete(TupleInfo *ti, void *data)
{
bool isnull;
Datum dimension_slice_id = slot_getattr(ti->slot, Anum_dimension_slice_id, &isnull);

if (ti->lockresult != TM_Ok)
{
if (IsolationUsesXactSnapshot())
{
/* For Repeatable Read and Serializable isolation level report error
* if we cannot lock the tuple
*/
ereport(ERROR,
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("unable to lock hypertable catalog tuple, lock result is %d for "
"hypertable "
"ID (%d)",
ti->lockresult,
DatumGetInt32(dimension_slice_id))));
}
}

bool *delete_constraints = data;
CatalogSecurityContext sec_ctx;

Expand Down Expand Up @@ -623,6 +808,11 @@ ts_dimension_slice_delete_by_dimension_id(int32 dimension_id, bool delete_constr
F_INT4EQ,
Int32GetDatum(dimension_id));

ScanTupLock scantuplock = {
.waitpolicy = LockWaitBlock,
.lockmode = LockTupleExclusive,
};

return dimension_slice_scan_limit_internal(
DIMENSION_SLICE_DIMENSION_ID_RANGE_START_RANGE_END_IDX,
scankey,
Expand All @@ -631,30 +821,21 @@ ts_dimension_slice_delete_by_dimension_id(int32 dimension_id, bool delete_constr
&delete_constraints,
0,
RowExclusiveLock,
NULL,
&scantuplock,
CurrentMemoryContext);
}

int
ts_dimension_slice_delete_by_id(int32 dimension_slice_id, bool delete_constraints)
{
ScanKeyData scankey[1];
FormData_dimension_slice form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_dimension_slice_tuple(dimension_slice_id, &tid, &form);
Ensure(found, "hypertable id %d not found", slice->fd.id);

ScanKeyInit(&scankey[0],
Anum_dimension_slice_id_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_slice_id));

return dimension_slice_scan_limit_internal(DIMENSION_SLICE_ID_IDX,
scankey,
1,
dimension_slice_tuple_delete,
&delete_constraints,
1,
RowExclusiveLock,
NULL,
CurrentMemoryContext);
dimension_slice_delete_catalog_tuple(&tid);
return true;
}

static ScanTupleResult
Expand Down Expand Up @@ -1179,54 +1360,20 @@ ts_osm_chunk_range_overlaps(int32 osm_dimension_slice_id, int32 dimension_id, in
return res;
}

static ScanTupleResult
dimension_slice_tuple_update(TupleInfo *ti, void *data)
{
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
FormData_dimension_slice *fd = (FormData_dimension_slice *) data;

Datum values[Natts_dimension_slice] = { 0 };
bool isnull[Natts_dimension_slice] = { 0 };
bool doReplace[Natts_dimension_slice] = { 0 };

values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] =
Int64GetDatum(fd->range_start);
doReplace[AttrNumberGetAttrOffset(Anum_dimension_slice_range_start)] = true;

values[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = Int64GetDatum(fd->range_end);
doReplace[AttrNumberGetAttrOffset(Anum_dimension_slice_range_end)] = true;

HeapTuple new_tuple =
heap_modify_tuple(tuple, ts_scanner_get_tupledesc(ti), values, isnull, doReplace);

ts_catalog_update(ti->scanrel, new_tuple);

heap_freetuple(new_tuple);
if (should_free)
heap_freetuple(tuple);

return SCAN_DONE;
}

int
ts_dimension_slice_update_by_id(int32 dimension_slice_id, FormData_dimension_slice *fd_slice)
ts_dimension_slice_range_update(DimensionSlice *slice)
{
ScanKeyData scankey[1];

ScanKeyInit(&scankey[0],
Anum_dimension_slice_id_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(dimension_slice_id));
FormData_dimension_slice form;
ItemPointerData tid;
/* lock the tuple entry in the catalog table */
bool found = lock_dimension_slice_tuple(slice->fd.id, &tid, &form);
Ensure(found, "hypertable id %d not found", slice->fd.id);

return dimension_slice_scan_limit_internal(DIMENSION_SLICE_ID_IDX,
scankey,
1,
dimension_slice_tuple_update,
fd_slice,
1,
RowExclusiveLock,
NULL,
CurrentMemoryContext);
if (form.range_start != slice->fd.range_start || form.range_end != slice->fd.range_end)
{
form.range_start = slice->fd.range_start;
form.range_end = slice->fd.range_end;
dimension_slice_update_catalog_tuple(&tid, &form);
}
return true;
}
3 changes: 1 addition & 2 deletions src/dimension_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ extern int ts_dimension_slice_scan_iterator_set_range(ScanIterator *it, int32 di
extern bool ts_osm_chunk_range_overlaps(int32 osm_dimension_slice_id, int32 dimension_id,
int64 range_start, int64 range_end);

extern int ts_dimension_slice_update_by_id(int32 dimension_slice_id,
FormData_dimension_slice *fd_slice);
extern int ts_dimension_slice_range_update(DimensionSlice *slice);

#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)

Expand Down
2 changes: 1 addition & 1 deletion src/hypertable.c
Original file line number Diff line number Diff line change
Expand Up @@ -2598,7 +2598,7 @@ ts_hypertable_osm_range_update(PG_FUNCTION_ARGS)

slice->fd.range_start = range_start_internal;
slice->fd.range_end = range_end_internal;
ts_dimension_slice_update_by_id(dimension_slice_id, &slice->fd);
ts_dimension_slice_range_update(slice);

PG_RETURN_BOOL(overlap);
}

0 comments on commit 7694c4a

Please sign in to comment.