From 6bee35f5ae22b2dab91b4533acf73b74483db74d Mon Sep 17 00:00:00 2001 From: agl29 Date: Mon, 27 Jan 2025 17:46:36 +0530 Subject: [PATCH] [Trino] Enhance the logic to efficiently retrieve the rows --- .../src/desktop/js/apps/notebook/snippet.js | 3 + .../notebook/src/notebook/connectors/trino.py | 29 ++++---- .../src/notebook/connectors/trino_tests.py | 73 ++++++++++++++++--- 3 files changed, 80 insertions(+), 25 deletions(-) diff --git a/desktop/core/src/desktop/js/apps/notebook/snippet.js b/desktop/core/src/desktop/js/apps/notebook/snippet.js index 273dd2a4f7c..132a4da26d0 100644 --- a/desktop/core/src/desktop/js/apps/notebook/snippet.js +++ b/desktop/core/src/desktop/js/apps/notebook/snippet.js @@ -1864,6 +1864,7 @@ class Snippet { if (self.type() === 'trino') { const existing_handle = self.result.handle(); existing_handle.row_count = data.handle.row_count; + existing_handle.rows_remaining = data.handle.rows_remaining; existing_handle.next_uri = data.handle.next_uri; } self.showLogs(true); @@ -2195,6 +2196,7 @@ class Snippet { if (self.type() === 'trino') { const existing_handle = self.result.handle(); existing_handle.row_count = data.result.row_count; + existing_handle.rows_remaining = data.result.rows_remaining; existing_handle.next_uri = data.result.next_uri; } } else { @@ -2369,6 +2371,7 @@ class Snippet { if (self.type() === 'trino') { const existing_handle = self.result.handle(); existing_handle.row_count = 0; + existing_handle.rows_remaining = 0; existing_handle.next_uri = data.query_status.next_uri; } const delay = self.result.executionTime() > 45000 ? 5000 : 1000; // 5s if more than 45s diff --git a/desktop/libs/notebook/src/notebook/connectors/trino.py b/desktop/libs/notebook/src/notebook/connectors/trino.py index 94e93ece070..f94181c2760 100644 --- a/desktop/libs/notebook/src/notebook/connectors/trino.py +++ b/desktop/libs/notebook/src/notebook/connectors/trino.py @@ -170,6 +170,7 @@ def execute(self, notebook, snippet): response = { 'row_count': 0, + 'rows_remaining': 0, 'next_uri': status.next_uri, 'sync': None, 'has_result_set': status.next_uri is not None, @@ -219,10 +220,11 @@ def fetch_result(self, notebook, snippet, rows, start_over): data = [] columns = [] next_uri = snippet['result']['handle']['next_uri'] - processed_rows = snippet['result']['handle'].get('row_count', 0) + row_count = snippet['result']['handle'].get('row_count', 0) + rows_remaining = snippet['result']['handle'].get('rows_remaining', 0) status = False - if processed_rows == 0: + if row_count == 0: data = snippet['result']['handle']['result']['data'] while next_uri: @@ -235,25 +237,25 @@ def fetch_result(self, notebook, snippet, rows, start_over): data += status.rows columns = status.columns - if len(data) >= processed_rows + 100: - if processed_rows < 0: - data = data[:100] - else: - data = data[processed_rows:processed_rows + 100] + if rows_remaining: + data = data[-rows_remaining:] # Trim the data to only include the remaining rows + rows_remaining = 0 # Reset rows_remaining since we've handled the trimming + + if len(data) > 100: + rows_remaining = len(data) - 100 # no of rows remaining to fetch in the present uri break + rows_remaining = 0 next_uri = status.next_uri - current_length = len(data) - if processed_rows < 0: - processed_rows = 0 - data = data[processed_rows:processed_rows + 100] - processed_rows -= current_length + + data = data[:100] properties = self.trino_session.properties self._set_session_info_to_user(properties) return { - 'row_count': 100 + processed_rows, + 'row_count': len(data) + row_count, + 'rows_remaining': rows_remaining, 'next_uri': next_uri, 'has_more': bool(status.next_uri) if status else False, 'data': data or [], @@ -456,6 +458,7 @@ def fetch(self, handle, start_over=None, rows=None): else: result = self.api.fetch_result(self.notebook, self.snippet, rows, start_over) self.snippet['result']['handle']['row_count'] = result['row_count'] + self.snippet['result']['handle']['rows_remaining'] = result['rows_remaining'] self.snippet['result']['handle']['next_uri'] = result['next_uri'] return ResultWrapper(result.get('meta'), result.get('data'), result.get('has_more')) diff --git a/desktop/libs/notebook/src/notebook/connectors/trino_tests.py b/desktop/libs/notebook/src/notebook/connectors/trino_tests.py index 673e0cc192a..c6a619ef607 100644 --- a/desktop/libs/notebook/src/notebook/connectors/trino_tests.py +++ b/desktop/libs/notebook/src/notebook/connectors/trino_tests.py @@ -178,6 +178,7 @@ def test_execute(self): expected_result = { 'row_count': 0, + 'rows_remaining': 0, 'next_uri': 'http://url', 'sync': None, 'has_result_set': True, @@ -204,6 +205,7 @@ def test_execute(self): expected_result = { 'row_count': 0, + 'rows_remaining': 0, 'next_uri': 'http://url', 'sync': None, 'has_result_set': True, @@ -220,7 +222,7 @@ def test_execute(self): } assert result == expected_result - def test_fetch_result(self): + def test_fetch_result_more_than_100(self): # Mock TrinoRequest object and its methods mock_trino_request = MagicMock() self.trino_api.trino_request = mock_trino_request @@ -229,18 +231,67 @@ def test_fetch_result(self): mock_trino_request.get.return_value = MagicMock() _columns = [{'comment': '', 'name': 'test_column1', 'type': 'str'}, {'comment': '', 'name': 'test_column2', 'type': 'str'}] + # Generate more than 100 rows of mock data + mock_data = [[f'value{i}', f'value{i + 1}'] for i in range(1, 201, 1)] + mock_trino_request.process.side_effect = [ MagicMock( - stats={'state': 'FINISHED'}, next_uri='http://url', id=123, - rows=[['value1', 'value2'], ['value3', 'value4']], columns=_columns + stats={'state': 'FINISHED'}, next_uri='http://url1', id=123, + rows=mock_data[:57], columns=_columns ), MagicMock( - stats={'state': 'FINISHED'}, next_uri='http://url1', id=124, - rows=[['value5', 'value6'], ['value7', 'value8']], columns=_columns + stats={'state': 'FINISHED'}, next_uri='http://url2', id=124, + rows=mock_data[57:105], columns=_columns ), MagicMock( stats={'state': 'FINISHED'}, next_uri=None, id=125, - rows=[['value9', 'value10'], ['value11', 'value12']], columns=_columns + rows=mock_data[105:], columns=_columns + ) + ] + + # Call the fetch_result method + result = self.trino_api.fetch_result( + notebook={}, snippet={'result': {'handle': {'next_uri': 'http://url', 'result': {'data': []}}}}, rows=0, start_over=False + ) + + expected_result = { + 'row_count': 100, + 'rows_remaining': 5, + 'next_uri': 'http://url1', + 'has_more': True, + 'data': mock_data[:100], + 'meta': [{ + 'name': column['name'], + 'type': column['type'], + 'comment': '' + } for column in _columns], + 'type': 'table' + } + + assert result == expected_result + assert len(result['data']) == 100 + assert len(result['meta']) == 2 + + def test_fetch_result_less_than_100(self): + # Mock TrinoRequest object and its methods + mock_trino_request = MagicMock() + self.trino_api.trino_request = mock_trino_request + + # Configure the MagicMock object to return expected responses + mock_trino_request.get.return_value = MagicMock() + _columns = [{'comment': '', 'name': 'test_column1', 'type': 'str'}, {'comment': '', 'name': 'test_column2', 'type': 'str'}] + + # Generate 100 rows of mock data + mock_data = [[f'value{i}', f'value{i + 1}'] for i in range(1, 90, 1)] + + mock_trino_request.process.side_effect = [ + MagicMock( + stats={'state': 'FINISHED'}, next_uri='http://url1', id=123, + rows=mock_data[:57], columns=_columns + ), + MagicMock( + stats={'state': 'FINISHED'}, next_uri=None, id=124, + rows=mock_data[57:], columns=_columns ) ] @@ -250,13 +301,11 @@ def test_fetch_result(self): ) expected_result = { - 'row_count': 94, + 'row_count': 89, + 'rows_remaining': 0, 'next_uri': None, 'has_more': False, - 'data': [ - ['value1', 'value2'], ['value3', 'value4'], ['value5', 'value6'], - ['value7', 'value8'], ['value9', 'value10'], ['value11', 'value12'] - ], + 'data': mock_data[:90], 'meta': [{ 'name': column['name'], 'type': column['type'], @@ -266,7 +315,7 @@ def test_fetch_result(self): } assert result == expected_result - assert len(result['data']) == 6 + assert len(result['data']) == 89 assert len(result['meta']) == 2 def test_get_select_query(self):