Skip to content

Commit

Permalink
[Trino] Enhance the logic to efficiently retrieve the rows
Browse files Browse the repository at this point in the history
  • Loading branch information
agl29 committed Jan 31, 2025
1 parent 4a863dc commit f24bebd
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 25 deletions.
3 changes: 3 additions & 0 deletions desktop/core/src/desktop/js/apps/notebook/snippet.js
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,7 @@ class Snippet {
if (self.type() === 'trino') {
const existing_handle = self.result.handle();
existing_handle.row_count = data.handle.row_count;
existing_handle.rows_left = data.handle.rows_left;
existing_handle.next_uri = data.handle.next_uri;
}
self.showLogs(true);
Expand Down Expand Up @@ -2195,6 +2196,7 @@ class Snippet {
if (self.type() === 'trino') {
const existing_handle = self.result.handle();
existing_handle.row_count = data.result.row_count;
existing_handle.rows_left = data.result.rows_left;
existing_handle.next_uri = data.result.next_uri;
}
} else {
Expand Down Expand Up @@ -2369,6 +2371,7 @@ class Snippet {
if (self.type() === 'trino') {
const existing_handle = self.result.handle();
existing_handle.row_count = 0;
existing_handle.rows_left = 0;
existing_handle.next_uri = data.query_status.next_uri;
}
const delay = self.result.executionTime() > 45000 ? 5000 : 1000; // 5s if more than 45s
Expand Down
29 changes: 16 additions & 13 deletions desktop/libs/notebook/src/notebook/connectors/trino.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ def execute(self, notebook, snippet):

response = {
'row_count': 0,
'rows_left': 0,
'next_uri': status.next_uri,
'sync': None,
'has_result_set': status.next_uri is not None,
Expand Down Expand Up @@ -219,10 +220,11 @@ def fetch_result(self, notebook, snippet, rows, start_over):
data = []
columns = []
next_uri = snippet['result']['handle']['next_uri']
processed_rows = snippet['result']['handle'].get('row_count', 0)
row_count = snippet['result']['handle'].get('row_count', 0)
rows_left = snippet['result']['handle'].get('rows_left', 0)
status = False

if processed_rows == 0:
if row_count == 0:
data = snippet['result']['handle']['result']['data']

while next_uri:
Expand All @@ -235,25 +237,25 @@ def fetch_result(self, notebook, snippet, rows, start_over):
data += status.rows
columns = status.columns

if len(data) >= processed_rows + 100:
if processed_rows < 0:
data = data[:100]
else:
data = data[processed_rows:processed_rows + 100]
if rows_left:
data = data[-rows_left:] # Trim the data to only include the lefted rows
rows_left = 0 # Reset rows_left since we've handled the trimming

if len(data) > 100:
rows_left = len(data) - 100 # no of rows left to fetch in the present uri
break
rows_left = 0

next_uri = status.next_uri
current_length = len(data)
if processed_rows < 0:
processed_rows = 0
data = data[processed_rows:processed_rows + 100]
processed_rows -= current_length

data = data[:100]

properties = self.trino_session.properties
self._set_session_info_to_user(properties)

return {
'row_count': 100 + processed_rows,
'row_count': len(data) + row_count,
'rows_left': rows_left,
'next_uri': next_uri,
'has_more': bool(status.next_uri) if status else False,
'data': data or [],
Expand Down Expand Up @@ -456,6 +458,7 @@ def fetch(self, handle, start_over=None, rows=None):
else:
result = self.api.fetch_result(self.notebook, self.snippet, rows, start_over)
self.snippet['result']['handle']['row_count'] = result['row_count']
self.snippet['result']['handle']['rows_left'] = result['rows_left']
self.snippet['result']['handle']['next_uri'] = result['next_uri']

return ResultWrapper(result.get('meta'), result.get('data'), result.get('has_more'))
Expand Down
73 changes: 61 additions & 12 deletions desktop/libs/notebook/src/notebook/connectors/trino_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ def test_execute(self):

expected_result = {
'row_count': 0,
'rows_left': 0,
'next_uri': 'http://url',
'sync': None,
'has_result_set': True,
Expand All @@ -204,6 +205,7 @@ def test_execute(self):

expected_result = {
'row_count': 0,
'rows_left': 0,
'next_uri': 'http://url',
'sync': None,
'has_result_set': True,
Expand All @@ -220,7 +222,7 @@ def test_execute(self):
}
assert result == expected_result

def test_fetch_result(self):
def test_fetch_result_more_than_100(self):
# Mock TrinoRequest object and its methods
mock_trino_request = MagicMock()
self.trino_api.trino_request = mock_trino_request
Expand All @@ -229,18 +231,67 @@ def test_fetch_result(self):
mock_trino_request.get.return_value = MagicMock()
_columns = [{'comment': '', 'name': 'test_column1', 'type': 'str'}, {'comment': '', 'name': 'test_column2', 'type': 'str'}]

# Generate more than 100 rows of mock data
mock_data = [[f'value{i}', f'value{i + 1}'] for i in range(1, 201, 1)]

mock_trino_request.process.side_effect = [
MagicMock(
stats={'state': 'FINISHED'}, next_uri='http://url', id=123,
rows=[['value1', 'value2'], ['value3', 'value4']], columns=_columns
stats={'state': 'FINISHED'}, next_uri='http://url1', id=123,
rows=mock_data[:57], columns=_columns
),
MagicMock(
stats={'state': 'FINISHED'}, next_uri='http://url1', id=124,
rows=[['value5', 'value6'], ['value7', 'value8']], columns=_columns
stats={'state': 'FINISHED'}, next_uri='http://url2', id=124,
rows=mock_data[57:105], columns=_columns
),
MagicMock(
stats={'state': 'FINISHED'}, next_uri=None, id=125,
rows=[['value9', 'value10'], ['value11', 'value12']], columns=_columns
rows=mock_data[105:], columns=_columns
)
]

# Call the fetch_result method
result = self.trino_api.fetch_result(
notebook={}, snippet={'result': {'handle': {'next_uri': 'http://url', 'result': {'data': []}}}}, rows=0, start_over=False
)

expected_result = {
'row_count': 100,
'rows_left': 5,
'next_uri': 'http://url1',
'has_more': True,
'data': mock_data[:100],
'meta': [{
'name': column['name'],
'type': column['type'],
'comment': ''
} for column in _columns],
'type': 'table'
}

assert result == expected_result
assert len(result['data']) == 100
assert len(result['meta']) == 2

def test_fetch_result_less_than_100(self):
# Mock TrinoRequest object and its methods
mock_trino_request = MagicMock()
self.trino_api.trino_request = mock_trino_request

# Configure the MagicMock object to return expected responses
mock_trino_request.get.return_value = MagicMock()
_columns = [{'comment': '', 'name': 'test_column1', 'type': 'str'}, {'comment': '', 'name': 'test_column2', 'type': 'str'}]

# Generate 100 rows of mock data
mock_data = [[f'value{i}', f'value{i + 1}'] for i in range(1, 90, 1)]

mock_trino_request.process.side_effect = [
MagicMock(
stats={'state': 'FINISHED'}, next_uri='http://url1', id=123,
rows=mock_data[:57], columns=_columns
),
MagicMock(
stats={'state': 'FINISHED'}, next_uri=None, id=124,
rows=mock_data[57:], columns=_columns
)
]

Expand All @@ -250,13 +301,11 @@ def test_fetch_result(self):
)

expected_result = {
'row_count': 94,
'row_count': 89,
'rows_left': 0,
'next_uri': None,
'has_more': False,
'data': [
['value1', 'value2'], ['value3', 'value4'], ['value5', 'value6'],
['value7', 'value8'], ['value9', 'value10'], ['value11', 'value12']
],
'data': mock_data[:90],
'meta': [{
'name': column['name'],
'type': column['type'],
Expand All @@ -266,7 +315,7 @@ def test_fetch_result(self):
}

assert result == expected_result
assert len(result['data']) == 6
assert len(result['data']) == 89
assert len(result['meta']) == 2

def test_get_select_query(self):
Expand Down

0 comments on commit f24bebd

Please sign in to comment.