From 493029a6f06a1b2de750b7be470297f0ccf003c4 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 8 Dec 2023 15:58:07 -0800 Subject: [PATCH 01/14] added lock to _MutatuionsBatchQueue --- google/cloud/bigtable/batcher.py | 122 ++++++++++++++++++------------- 1 file changed, 70 insertions(+), 52 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index a6eb806e9..a563e5613 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -51,36 +51,54 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT self.total_size = 0 self.max_mutation_bytes = max_mutation_bytes self.flush_count = flush_count + self.lock = threading.Lock() def get(self): - """Retrieve an item from the queue. Recalculate queue size.""" - row = self._queue.get() - mutation_size = row.get_mutations_size() - self.total_mutation_count -= len(row._get_mutations()) - self.total_size -= mutation_size - return row + """ + Retrieve an item from the queue. Recalculate queue size. + + If the queue is empty, return None. + """ + with self.lock: + try: + row = self._queue.get_nowait() + mutation_size = row.get_mutations_size() + self.total_mutation_count -= len(row._get_mutations()) + self.total_size -= mutation_size + return row + except queue.Empty: + return None + + def get_all(self): + """Get all items from the queue.""" + with self.lock: + items = [] + while not self._queue.empty(): + items.append(self._queue.get_nowait()) + self.total_mutation_count = 0 + self.total_size = 0 + return items def put(self, item): """Insert an item to the queue. Recalculate queue size.""" mutation_count = len(item._get_mutations()) - self._queue.put(item) + with self.lock: + self._queue.put(item) - self.total_size += item.get_mutations_size() - self.total_mutation_count += mutation_count + self.total_size += item.get_mutations_size() + self.total_mutation_count += mutation_count def full(self): """Check if the queue is full.""" - if ( - self.total_mutation_count >= self.flush_count - or self.total_size >= self.max_mutation_bytes - ): - return True - return False - - def empty(self): - return self._queue.empty() + with self.lock: + if ( + self.total_mutation_count >= self.flush_count + or self.total_size >= self.max_mutation_bytes + ): + return True + return False @dataclass @@ -291,9 +309,7 @@ def flush(self): :raises: * :exc:`.batcherMutationsBatchError` if there's any error in the mutations. """ - rows_to_flush = [] - while not self._rows.empty(): - rows_to_flush.append(self._rows.get()) + rows_to_flush = self._rows.get_all() response = self._flush_rows(rows_to_flush) return response @@ -310,38 +326,40 @@ def _flush_async(self): rows_count = 0 batch_info = _BatchInfo() - while not self._rows.empty(): - row = self._rows.get() - mutations_count += len(row._get_mutations()) - mutations_size += row.get_mutations_size() - rows_count += 1 - rows_to_flush.append(row) - batch_info.mutations_count = mutations_count - batch_info.rows_count = rows_count - batch_info.mutations_size = mutations_size - - if ( - rows_count >= self.flush_count - or mutations_size >= self.max_row_bytes - or mutations_count >= self.flow_control.max_mutations - or mutations_size >= self.flow_control.max_mutation_bytes - or self._rows.empty() # submit when it reached the end of the queue + row = self._rows.get() + while row is not None: + while ( + row is not None + and rows_count < self.flush_count + and mutations_size < self.max_row_bytes + and mutations_count < self.flow_control.max_mutations + and mutations_size < self.flow_control.max_mutation_bytes ): - # wait for resources to become available, before submitting any new batch - self.flow_control.wait() - # once unblocked, submit a batch - # event flag will be set by control_flow to block subsequent thread, but not blocking this one - self.flow_control.control_flow(batch_info) - future = self._executor.submit(self._flush_rows, rows_to_flush) - self.futures_mapping[future] = batch_info - future.add_done_callback(self._batch_completed_callback) - - # reset and start a new batch - rows_to_flush = [] - mutations_size = 0 - rows_count = 0 - mutations_count = 0 - batch_info = _BatchInfo() + # build a batch + mutations_count += len(row._get_mutations()) + mutations_size += row.get_mutations_size() + rows_count += 1 + rows_to_flush.append(row) + batch_info.mutations_count = mutations_count + batch_info.rows_count = rows_count + batch_info.mutations_size = mutations_size + row = self._rows.get() + # wait for resources to become available, before submitting any new batch + self.flow_control.wait() + # once unblocked, submit the batch + # event flag will be set by control_flow to block subsequent thread, but not blocking this one + self.flow_control.control_flow(batch_info) + future = self._executor.submit(self._flush_rows, rows_to_flush) + self.futures_mapping[future] = batch_info + future.add_done_callback(self._batch_completed_callback) + + # reset and start a new batch + rows_to_flush = [] + mutations_size = 0 + rows_count = 0 + mutations_count = 0 + batch_info = _BatchInfo() + row = self._rows.get() def _batch_completed_callback(self, future): """Callback for when the mutation has finished to clean up the current batch From 2393b2ab91e3a8b69bdb8f469aee1770190db84c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 8 Dec 2023 16:39:35 -0800 Subject: [PATCH 02/14] refactored _flush_async --- google/cloud/bigtable/batcher.py | 73 +++++++++++++++++--------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index a563e5613..5da16dc82 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -319,32 +319,25 @@ def _flush_async(self): :raises: * :exc:`.batcherMutationsBatchError` if there's any error in the mutations. """ - - rows_to_flush = [] - mutations_count = 0 - mutations_size = 0 - rows_count = 0 - batch_info = _BatchInfo() - - row = self._rows.get() - while row is not None: - while ( - row is not None - and rows_count < self.flush_count - and mutations_size < self.max_row_bytes - and mutations_count < self.flow_control.max_mutations - and mutations_size < self.flow_control.max_mutation_bytes - ): - # build a batch - mutations_count += len(row._get_mutations()) - mutations_size += row.get_mutations_size() - rows_count += 1 - rows_to_flush.append(row) - batch_info.mutations_count = mutations_count - batch_info.rows_count = rows_count - batch_info.mutations_size = mutations_size - row = self._rows.get() - # wait for resources to become available, before submitting any new batch + next_row = self._rows.get() + while next_row is not None: + # start a new batch + rows_to_flush = [next_row] + batch_info = _BatchInfo( + mutations_count=len(next_row._get_mutations()), + rows_count=1, + mutations_size=next_row.get_mutations_size(), + ) + # fill up batch with rows + next_row = self._rows.get() + while next_row is not None and self._row_fits_in_batch(next_row, batch_info): + rows_to_flush.append(next_row) + batch_info.mutations_count += len(next_row._get_mutations()) + batch_info.rows_count += 1 + batch_info.mutations_size += next_row.get_mutations_size() + next_row = self._rows.get() + # send batch over network + # wait for resources to become available self.flow_control.wait() # once unblocked, submit the batch # event flag will be set by control_flow to block subsequent thread, but not blocking this one @@ -353,13 +346,27 @@ def _flush_async(self): self.futures_mapping[future] = batch_info future.add_done_callback(self._batch_completed_callback) - # reset and start a new batch - rows_to_flush = [] - mutations_size = 0 - rows_count = 0 - mutations_count = 0 - batch_info = _BatchInfo() - row = self._rows.get() + def _row_fits_in_batch(self, row, batch_info): + """Checks if a row can fit in the current batch. + + :type row: class + :param row: :class:`~google.cloud.bigtable.row.DirectRow`. + + :type batch_info: :class:`_BatchInfo` + :param batch_info: Information about the current batch. + + :rtype: bool + :returns: True if the row can fit in the current batch. + """ + new_rows_count = batch_info.rows_count + 1 + new_mutations_count = batch_info.mutations_count + len(row._get_mutations()) + new_mutations_size = batch_info.mutations_size + row.get_mutations_size() + return ( + new_rows_count <= self.flush_count + and new_mutations_size <= self.max_row_bytes + and new_mutations_count <= self.flow_control.max_mutations + and new_mutations_size <= self.flow_control.max_mutation_bytes + ) def _batch_completed_callback(self, future): """Callback for when the mutation has finished to clean up the current batch From 106d1a4d8367504baa21b874a96cd14376e6096f Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Fri, 8 Dec 2023 16:40:29 -0800 Subject: [PATCH 03/14] ran black --- google/cloud/bigtable/batcher.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 5da16dc82..2715929fe 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -330,7 +330,9 @@ def _flush_async(self): ) # fill up batch with rows next_row = self._rows.get() - while next_row is not None and self._row_fits_in_batch(next_row, batch_info): + while next_row is not None and self._row_fits_in_batch( + next_row, batch_info + ): rows_to_flush.append(next_row) batch_info.mutations_count += len(next_row._get_mutations()) batch_info.rows_count += 1 From 829eb045eac14cf2540b0d9b36e219ef2d972acf Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 11:32:11 -0800 Subject: [PATCH 04/14] made lock private --- google/cloud/bigtable/batcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 2715929fe..06693fe2f 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -51,7 +51,7 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT self.total_size = 0 self.max_mutation_bytes = max_mutation_bytes self.flush_count = flush_count - self.lock = threading.Lock() + self._lock = threading.Lock() def get(self): """ @@ -59,7 +59,7 @@ def get(self): If the queue is empty, return None. """ - with self.lock: + with self._lock: try: row = self._queue.get_nowait() mutation_size = row.get_mutations_size() @@ -71,7 +71,7 @@ def get(self): def get_all(self): """Get all items from the queue.""" - with self.lock: + with self._lock: items = [] while not self._queue.empty(): items.append(self._queue.get_nowait()) @@ -84,7 +84,7 @@ def put(self, item): mutation_count = len(item._get_mutations()) - with self.lock: + with self._lock: self._queue.put(item) self.total_size += item.get_mutations_size() @@ -92,7 +92,7 @@ def put(self, item): def full(self): """Check if the queue is full.""" - with self.lock: + with self._lock: if ( self.total_mutation_count >= self.flush_count or self.total_size >= self.max_mutation_bytes From ee4e515c9070c5bf08a30ae8e36077d00a3ce9dc Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 11:37:51 -0800 Subject: [PATCH 05/14] simplify flow control release --- google/cloud/bigtable/batcher.py | 18 ++---------------- 1 file changed, 2 insertions(+), 16 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 06693fe2f..9944152c4 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -18,7 +18,6 @@ import concurrent.futures import atexit - from google.api_core.exceptions import from_grpc_status from dataclasses import dataclass @@ -237,7 +236,6 @@ def __init__( max_mutations=MAX_OUTSTANDING_ELEMENTS, max_mutation_bytes=MAX_OUTSTANDING_BYTES, ) - self.futures_mapping = {} self.exceptions = queue.Queue() self._user_batch_completed_callback = batch_completed_callback @@ -345,8 +343,8 @@ def _flush_async(self): # event flag will be set by control_flow to block subsequent thread, but not blocking this one self.flow_control.control_flow(batch_info) future = self._executor.submit(self._flush_rows, rows_to_flush) - self.futures_mapping[future] = batch_info - future.add_done_callback(self._batch_completed_callback) + # schedule release of resources from flow control + future.add_done_callback(lambda f: self.flow_control.release(batch_info)) def _row_fits_in_batch(self, row, batch_info): """Checks if a row can fit in the current batch. @@ -370,18 +368,6 @@ def _row_fits_in_batch(self, row, batch_info): and new_mutations_size <= self.flow_control.max_mutation_bytes ) - def _batch_completed_callback(self, future): - """Callback for when the mutation has finished to clean up the current batch - and release items from the flow controller. - - Raise exceptions if there's any. - Release the resources locked by the flow control and allow enqueued tasks to be run. - """ - - processed_rows = self.futures_mapping[future] - self.flow_control.release(processed_rows) - del self.futures_mapping[future] - def _flush_rows(self, rows_to_flush): """Mutate the specified rows. From 5761191a30ddc656688b6b928fcc9b35256cce5c Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 12:14:25 -0800 Subject: [PATCH 06/14] added system test for mutations batcher --- tests/system/test_data_api.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 2ca7e1504..9db4441da 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -381,3 +381,29 @@ def test_access_with_non_admin_client(data_client, data_instance_id, data_table_ instance = data_client.instance(data_instance_id) table = instance.table(data_table_id) assert table.read_row("nonesuch") is None # no raise + + +def test_mutations_batcher_threading(data_table, rows_to_delete): + """ + Test the mutations batcher by sending a bunch of mutations using different + flush methods + """ + import time + from google.cloud.bigtable.batcher import MutationsBatcher + + num_sent = 20 + all_results = [] + + def callback(results): + all_results.extend(results) + + with MutationsBatcher(data_table, flush_count=5, flush_interval=0.07, batch_completed_callback=callback) as batcher: + # send mutations in a way that timed flushes and count flushes interleave + for i in range(num_sent): + row = data_table.direct_row("row{}".format(i)) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")) + rows_to_delete.append(row) + batcher.mutate(row) + time.sleep(0.01) + # ensure all mutations were sent + assert len(all_results) == num_sent From 4b451fd110aee1a3ccd9e04b4ef56d4ff46aef13 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 12:53:08 -0800 Subject: [PATCH 07/14] fix issue with flow_control deadlock --- google/cloud/bigtable/batcher.py | 13 ++++++++++++- tests/system/test_data_api.py | 24 +++++++++++++++--------- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 9944152c4..21fb99345 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -18,6 +18,9 @@ import concurrent.futures import atexit +from functools import partial +from copy import copy + from google.api_core.exceptions import from_grpc_status from dataclasses import dataclass @@ -344,7 +347,15 @@ def _flush_async(self): self.flow_control.control_flow(batch_info) future = self._executor.submit(self._flush_rows, rows_to_flush) # schedule release of resources from flow control - future.add_done_callback(lambda f: self.flow_control.release(batch_info)) + future.add_done_callback(partial(self._batch_completed_callback, copy(batch_info))) + + def _batch_completed_callback(self, batch_info, future): + """Callback for when the mutation has finished to clean up the current batch + and release items from the flow controller. + Raise exceptions if there's any. + Release the resources locked by the flow control and allow enqueued tasks to be run. + """ + self.flow_control.release(batch_info) def _row_fits_in_batch(self, row, batch_info): """Checks if a row can fit in the current batch. diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 9db4441da..9608f1e54 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -388,22 +388,28 @@ def test_mutations_batcher_threading(data_table, rows_to_delete): Test the mutations batcher by sending a bunch of mutations using different flush methods """ + import mock import time from google.cloud.bigtable.batcher import MutationsBatcher num_sent = 20 all_results = [] + max_elements_per_batch = 2 + def callback(results): all_results.extend(results) - - with MutationsBatcher(data_table, flush_count=5, flush_interval=0.07, batch_completed_callback=callback) as batcher: - # send mutations in a way that timed flushes and count flushes interleave - for i in range(num_sent): - row = data_table.direct_row("row{}".format(i)) - row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")) - rows_to_delete.append(row) - batcher.mutate(row) - time.sleep(0.01) + assert results <= max_elements_per_batch + + # override flow control max elements + with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch): + with MutationsBatcher(data_table, flush_count=5, flush_interval=0.07, batch_completed_callback=callback) as batcher: + # send mutations in a way that timed flushes and count flushes interleave + for i in range(num_sent): + row = data_table.direct_row("row{}".format(i)) + row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")) + rows_to_delete.append(row) + batcher.mutate(row) + time.sleep(0.01) # ensure all mutations were sent assert len(all_results) == num_sent From 6a52214f11504c6b3ff9830eebe5bf3f3def680b Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 12:53:34 -0800 Subject: [PATCH 08/14] change number of test nodes --- tests/system/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/conftest.py b/tests/system/conftest.py index f39fcba88..910c20970 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -58,7 +58,7 @@ def location_id(): @pytest.fixture(scope="session") def serve_nodes(): - return 3 + return 1 @pytest.fixture(scope="session") From 96222ba806fdf5fd414ede687c506fa158f6ab0a Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 13:22:21 -0800 Subject: [PATCH 09/14] ran blacken --- google/cloud/bigtable/batcher.py | 4 +++- tests/system/test_data_api.py | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 21fb99345..f826cea77 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -347,7 +347,9 @@ def _flush_async(self): self.flow_control.control_flow(batch_info) future = self._executor.submit(self._flush_rows, rows_to_flush) # schedule release of resources from flow control - future.add_done_callback(partial(self._batch_completed_callback, copy(batch_info))) + future.add_done_callback( + partial(self._batch_completed_callback, copy(batch_info)) + ) def _batch_completed_callback(self, batch_info, future): """Callback for when the mutation has finished to clean up the current batch diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 9608f1e54..c833b0a77 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -402,12 +402,21 @@ def callback(results): assert results <= max_elements_per_batch # override flow control max elements - with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch): - with MutationsBatcher(data_table, flush_count=5, flush_interval=0.07, batch_completed_callback=callback) as batcher: + with mock.patch( + "google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch + ): + with MutationsBatcher( + data_table, + flush_count=5, + flush_interval=0.07, + batch_completed_callback=callback, + ) as batcher: # send mutations in a way that timed flushes and count flushes interleave for i in range(num_sent): row = data_table.direct_row("row{}".format(i)) - row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")) + row.set_cell( + COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8") + ) rows_to_delete.append(row) batcher.mutate(row) time.sleep(0.01) From a0b5975f3d7e58ee3cf526716e25b99caa3fdda3 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Mon, 11 Dec 2023 21:23:04 +0000 Subject: [PATCH 10/14] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/bigtable/batcher.py | 4 +++- tests/system/test_data_api.py | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 21fb99345..f826cea77 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -347,7 +347,9 @@ def _flush_async(self): self.flow_control.control_flow(batch_info) future = self._executor.submit(self._flush_rows, rows_to_flush) # schedule release of resources from flow control - future.add_done_callback(partial(self._batch_completed_callback, copy(batch_info))) + future.add_done_callback( + partial(self._batch_completed_callback, copy(batch_info)) + ) def _batch_completed_callback(self, batch_info, future): """Callback for when the mutation has finished to clean up the current batch diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 9608f1e54..c833b0a77 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -402,12 +402,21 @@ def callback(results): assert results <= max_elements_per_batch # override flow control max elements - with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch): - with MutationsBatcher(data_table, flush_count=5, flush_interval=0.07, batch_completed_callback=callback) as batcher: + with mock.patch( + "google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch + ): + with MutationsBatcher( + data_table, + flush_count=5, + flush_interval=0.07, + batch_completed_callback=callback, + ) as batcher: # send mutations in a way that timed flushes and count flushes interleave for i in range(num_sent): row = data_table.direct_row("row{}".format(i)) - row.set_cell(COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8")) + row.set_cell( + COLUMN_FAMILY_ID1, COL_NAME1, "val{}".format(i).encode("utf-8") + ) rows_to_delete.append(row) batcher.mutate(row) time.sleep(0.01) From 31a14d33c427403c84ff8842e2a98782316f5db7 Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 14:04:26 -0800 Subject: [PATCH 11/14] removed lock --- google/cloud/bigtable/batcher.py | 54 +++++++++++++------------------- 1 file changed, 22 insertions(+), 32 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index f826cea77..57a23a26c 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -53,7 +53,6 @@ def __init__(self, max_mutation_bytes=MAX_MUTATION_SIZE, flush_count=FLUSH_COUNT self.total_size = 0 self.max_mutation_bytes = max_mutation_bytes self.flush_count = flush_count - self._lock = threading.Lock() def get(self): """ @@ -61,46 +60,33 @@ def get(self): If the queue is empty, return None. """ - with self._lock: - try: - row = self._queue.get_nowait() - mutation_size = row.get_mutations_size() - self.total_mutation_count -= len(row._get_mutations()) - self.total_size -= mutation_size - return row - except queue.Empty: - return None - - def get_all(self): - """Get all items from the queue.""" - with self._lock: - items = [] - while not self._queue.empty(): - items.append(self._queue.get_nowait()) - self.total_mutation_count = 0 - self.total_size = 0 - return items + try: + row = self._queue.get_nowait() + mutation_size = row.get_mutations_size() + self.total_mutation_count -= len(row._get_mutations()) + self.total_size -= mutation_size + return row + except queue.Empty: + return None def put(self, item): """Insert an item to the queue. Recalculate queue size.""" mutation_count = len(item._get_mutations()) - with self._lock: - self._queue.put(item) + self._queue.put(item) - self.total_size += item.get_mutations_size() - self.total_mutation_count += mutation_count + self.total_size += item.get_mutations_size() + self.total_mutation_count += mutation_count def full(self): """Check if the queue is full.""" - with self._lock: - if ( - self.total_mutation_count >= self.flush_count - or self.total_size >= self.max_mutation_bytes - ): - return True - return False + if ( + self.total_mutation_count >= self.flush_count + or self.total_size >= self.max_mutation_bytes + ): + return True + return False @dataclass @@ -310,7 +296,11 @@ def flush(self): :raises: * :exc:`.batcherMutationsBatchError` if there's any error in the mutations. """ - rows_to_flush = self._rows.get_all() + rows_to_flush = [] + row = self._rows.get() + while row is not None: + rows_to_flush.append(row) + row = self._rows.get() response = self._flush_rows(rows_to_flush) return response From 68e6eb582c42b783da0b5c1d8d9e8fbb879766df Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 14:08:39 -0800 Subject: [PATCH 12/14] reverted flow control release pattern --- google/cloud/bigtable/batcher.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/batcher.py b/google/cloud/bigtable/batcher.py index 57a23a26c..8f0cabadd 100644 --- a/google/cloud/bigtable/batcher.py +++ b/google/cloud/bigtable/batcher.py @@ -18,8 +18,6 @@ import concurrent.futures import atexit -from functools import partial -from copy import copy from google.api_core.exceptions import from_grpc_status from dataclasses import dataclass @@ -225,6 +223,7 @@ def __init__( max_mutations=MAX_OUTSTANDING_ELEMENTS, max_mutation_bytes=MAX_OUTSTANDING_BYTES, ) + self.futures_mapping = {} self.exceptions = queue.Queue() self._user_batch_completed_callback = batch_completed_callback @@ -337,17 +336,18 @@ def _flush_async(self): self.flow_control.control_flow(batch_info) future = self._executor.submit(self._flush_rows, rows_to_flush) # schedule release of resources from flow control - future.add_done_callback( - partial(self._batch_completed_callback, copy(batch_info)) - ) + self.futures_mapping[future] = batch_info + future.add_done_callback(self._batch_completed_callback) - def _batch_completed_callback(self, batch_info, future): + def _batch_completed_callback(self, future): """Callback for when the mutation has finished to clean up the current batch and release items from the flow controller. Raise exceptions if there's any. Release the resources locked by the flow control and allow enqueued tasks to be run. """ - self.flow_control.release(batch_info) + processed_rows = self.futures_mapping[future] + self.flow_control.release(processed_rows) + del self.futures_mapping[future] def _row_fits_in_batch(self, row, batch_info): """Checks if a row can fit in the current batch. From 9fac3a2a78000e576fe8830680aae9e10907cbdf Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 14:17:14 -0800 Subject: [PATCH 13/14] fixed assertion --- tests/system/test_data_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index c833b0a77..7c02e3395 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -399,7 +399,7 @@ def test_mutations_batcher_threading(data_table, rows_to_delete): def callback(results): all_results.extend(results) - assert results <= max_elements_per_batch + assert len(results) <= max_elements_per_batch # override flow control max elements with mock.patch( From e265baadc69909a15083de2af098c8ac9cc2e59f Mon Sep 17 00:00:00 2001 From: Daniel Sanche Date: Mon, 11 Dec 2023 14:21:09 -0800 Subject: [PATCH 14/14] loosened test requirement --- tests/system/test_data_api.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/system/test_data_api.py b/tests/system/test_data_api.py index 7c02e3395..579837e34 100644 --- a/tests/system/test_data_api.py +++ b/tests/system/test_data_api.py @@ -395,16 +395,11 @@ def test_mutations_batcher_threading(data_table, rows_to_delete): num_sent = 20 all_results = [] - max_elements_per_batch = 2 - def callback(results): all_results.extend(results) - assert len(results) <= max_elements_per_batch # override flow control max elements - with mock.patch( - "google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", max_elements_per_batch - ): + with mock.patch("google.cloud.bigtable.batcher.MAX_OUTSTANDING_ELEMENTS", 2): with MutationsBatcher( data_table, flush_count=5,