Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CP for #28624 into release 2.51.0 (Bigtable Python timestamp bug fix) #28634

Merged
merged 9 commits into from
Sep 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,13 @@ public KV<ByteString, Iterable<Mutation>> apply(Row row) {
.setColumnQualifier(
ByteString.copyFrom(ofNullable(mutation.get("column_qualifier")).get()))
.setFamilyNameBytes(
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()));
if (mutation.containsKey("timestamp_micros")) {
setMutation =
setMutation.setTimestampMicros(
Longs.fromByteArray(ofNullable(mutation.get("timestamp_micros")).get()));
}
ByteString.copyFrom(ofNullable(mutation.get("family_name")).get()))
// Use timestamp if provided, else default to -1 (current Bigtable server time)
.setTimestampMicros(
mutation.containsKey("timestamp_micros")
? Longs.fromByteArray(
ofNullable(mutation.get("timestamp_micros")).get())
: -1);
bigtableMutation = Mutation.newBuilder().setSetCell(setMutation.build()).build();
break;
case "DeleteFromColumn":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ public void tearDown() {
public void testSetMutationsExistingColumn() {
RowMutation rowMutation =
RowMutation.create(tableId, "key-1")
.setCell(COLUMN_FAMILY_NAME_1, "col_a", "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", "val-1-c");
.setCell(COLUMN_FAMILY_NAME_1, "col_a", 1000, "val-1-a")
.setCell(COLUMN_FAMILY_NAME_2, "col_c", 1000, "val-1-c");
dataClient.mutateRow(rowMutation);

List<Map<String, byte[]>> mutations = new ArrayList<>();
Expand All @@ -165,13 +165,15 @@ public void testSetMutationsExistingColumn() {
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-a".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_a".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_1.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
mutations.add(
ImmutableMap.of(
"type", "SetCell".getBytes(StandardCharsets.UTF_8),
"value", "new-val-1-c".getBytes(StandardCharsets.UTF_8),
"column_qualifier", "col_c".getBytes(StandardCharsets.UTF_8),
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8)));
"family_name", COLUMN_FAMILY_NAME_2.getBytes(StandardCharsets.UTF_8),
"timestamp_micros", Longs.toByteArray(2000)));
Row mutationRow =
Row.withSchema(SCHEMA)
.withFieldValue("key", "key-1".getBytes(StandardCharsets.UTF_8))
Expand Down Expand Up @@ -202,10 +204,11 @@ public void testSetMutationsExistingColumn() {
.collect(Collectors.toList());
assertEquals(2, cellsColA.size());
assertEquals(2, cellsColC.size());
System.out.println(cellsColA);
System.out.println(cellsColC);
assertEquals("new-val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(1).getValue().toStringUtf8());
// Bigtable keeps cell history ordered by descending timestamp
assertEquals("new-val-1-a", cellsColA.get(0).getValue().toStringUtf8());
assertEquals("new-val-1-c", cellsColC.get(0).getValue().toStringUtf8());
assertEquals("val-1-a", cellsColA.get(1).getValue().toStringUtf8());
assertEquals("val-1-c", cellsColC.get(1).getValue().toStringUtf8());
}

@Test
Expand Down
7 changes: 3 additions & 4 deletions sdks/python/apache_beam/io/gcp/bigtableio.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,10 @@ def process(self, direct_row):
"type": b'SetCell',
"family_name": mutation.set_cell.family_name.encode('utf-8'),
"column_qualifier": mutation.set_cell.column_qualifier,
"value": mutation.set_cell.value
"value": mutation.set_cell.value,
"timestamp_micros": struct.pack(
'>q', mutation.set_cell.timestamp_micros)
}
micros = mutation.set_cell.timestamp_micros
if micros > -1:
mutation_dict['timestamp_micros'] = struct.pack('>q', micros)
elif mutation.__contains__("delete_from_column"):
mutation_dict = {
"type": b'DeleteFromColumn',
Expand Down
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/io/gcp/bigtableio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ def test_set_mutation(self):
row1_col2_cell = Cell(b'val1-2', 200_000_000)
row2_col1_cell = Cell(b'val2-1', 100_000_000)
row2_col2_cell = Cell(b'val2-2', 200_000_000)
# When setting this cell, we won't set a timestamp. We expect the timestamp
# to default to -1, and Bigtable will set it to system time at insertion.
row2_col1_no_timestamp = Cell(b'val2-2-notimestamp', time.time())
# rows sent to write transform
row1.set_cell(
'col_fam', b'col-1', row1_col1_cell.value, row1_col1_cell.timestamp)
Expand All @@ -232,6 +235,8 @@ def test_set_mutation(self):
'col_fam', b'col-1', row2_col1_cell.value, row2_col1_cell.timestamp)
row2.set_cell(
'col_fam', b'col-2', row2_col2_cell.value, row2_col2_cell.timestamp)
# don't set a timestamp here. it should default to -1
row2.set_cell('col_fam', b'col-no-timestamp', row2_col1_no_timestamp.value)

self.run_pipeline([row1, row2])

Expand All @@ -249,6 +254,19 @@ def test_set_mutation(self):
self.assertEqual(
row2_col2_cell, actual_row2.find_cells('col_fam', b'col-2')[0])

# check mutation that doesn't have a timestamp set is handled properly:
self.assertEqual(
row2_col1_no_timestamp.value,
actual_row2.find_cells('col_fam', b'col-no-timestamp')[0].value)
# Bigtable sets timestamp as insertion time, which is later than the
# time.time() we set when creating this test case
cell_timestamp = actual_row2.find_cells('col_fam',
b'col-no-timestamp')[0].timestamp
self.assertTrue(
row2_col1_no_timestamp.timestamp < cell_timestamp,
msg="Expected cell with unset timestamp to have ingestion time "
f"attached, but was {cell_timestamp}")

def test_delete_cells_mutation(self):
col_fam = self.table.column_family('col_fam')
col_fam.create()
Expand Down
Loading