Skip to content

Commit

Permalink
Add test and refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Jan 31, 2024
1 parent 6cacc4d commit 5237bb3
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
2 changes: 1 addition & 1 deletion src/tribler/core/components/database/db/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def read_payload_with_offset(data, offset=0):
metadata_type = struct.unpack_from('>H', data, offset=offset)[0]

if metadata_type != REGULAR_TORRENT:
return
raise UnknownBlobTypeException

payload, offset = default_serializer.unpack_serializable(TorrentMetadataPayload, data, offset=offset)
payload.signature = data[offset: offset + 64]
Expand Down
27 changes: 11 additions & 16 deletions src/tribler/core/components/database/db/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
from dataclasses import dataclass, field
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from time import sleep, time
from typing import Optional, Union
Expand All @@ -15,15 +15,8 @@
from tribler.core.components.database.db.orm_bindings import misc, torrent_metadata, torrent_state as torrent_state_, \
tracker_state
from tribler.core.components.database.db.orm_bindings.torrent_metadata import NULL_KEY_SUBST
from tribler.core.components.database.db.serialization import (
CHANNEL_TORRENT,
COLLECTION_NODE,
HealthItemsPayload,
REGULAR_TORRENT,
read_payload_with_offset,
NULL_KEY
)

from tribler.core.components.database.db.serialization import (CHANNEL_TORRENT, COLLECTION_NODE, HealthItemsPayload,
NULL_KEY, REGULAR_TORRENT, read_payload_with_offset)
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted
from tribler.core.utilities.notifier import Notifier
Expand Down Expand Up @@ -255,13 +248,13 @@ async def process_compressed_mdblob_threaded(self, compressed_data, **kwargs):
try:
return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs)
except DatabaseIsCorrupted:
raise # re-raise this exception and terminate the Core process
raise # re-raise this exception and terminate the Core process
except Exception as e: # pylint: disable=broad-except # pragma: no cover
self._logger.exception("DB transaction error when tried to process compressed mdblob: "
f"{e.__class__.__name__}: {e}", exc_info=e)
return []

def process_compressed_mdblob(self, compressed_data, **kwargs):
def process_compressed_mdblob(self, compressed_data, skip_personal_metadata_payload=True):
try:
with LZ4FrameDecompressor() as decompressor:
decompressed_data = decompressor.decompress(compressed_data)
Expand All @@ -278,7 +271,8 @@ def process_compressed_mdblob(self, compressed_data, **kwargs):
self._logger.warning(f"Unable to parse health information: {type(e).__name__}: {str(e)}")
raise

return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs)
return self.process_squashed_mdblob(decompressed_data, health_info=health_info,
skip_personal_metadata_payload=skip_personal_metadata_payload)

def process_torrent_health(self, health: HealthInfo) -> bool:
"""
Expand All @@ -305,7 +299,8 @@ def process_torrent_health(self, health: HealthInfo) -> bool:

return False

def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs):
def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None,
skip_personal_metadata_payload=True):
"""
Process raw concatenated payloads blob. This routine breaks the database access into smaller batches.
It uses a congestion-control like algorithm to determine the optimal batch size, targeting the
Expand Down Expand Up @@ -344,7 +339,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info
# We separate the sessions to minimize database locking.
with db_session(immediate=True):
for payload in batch:
result.extend(self.process_payload(payload, **kwargs))
result.extend(self.process_payload(payload, skip_personal_metadata_payload))

# Batch size adjustment
batch_end_time = datetime.now() - batch_start_time
Expand Down Expand Up @@ -374,7 +369,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info
return result

@db_session
def process_payload(self, payload, skip_personal_metadata_payload=True, **kwargs):
def process_payload(self, payload, skip_personal_metadata_payload=True):
# Don't process our own torrents
if skip_personal_metadata_payload and payload.public_key == self.my_public_key_bin:
return []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime
from unittest.mock import Mock, patch

from tribler.core.components.database.db.serialization import TorrentMetadataPayload
import pytest

from tribler.core.components.database.db.serialization import TorrentMetadataPayload, UnknownBlobTypeException, \
read_payload_with_offset


def test_fix_torrent_metadata_payload():
Expand All @@ -24,3 +28,10 @@ def test_torrent_metadata_payload_magnet():
expected = "magnet:?xt=urn:btih:000102030405060708090a0b0c0d0e0f10111213&dn=b'title'&tr=b'tracker_info'"

assert expected == payload.get_magnet()


@patch('struct.unpack_from', Mock(return_value=(301,)))
def test_read_payload_with_offset_exception():
# Test that an exception is raised when metadata_type != REGULAR_TORRENT
with pytest.raises(UnknownBlobTypeException):
read_payload_with_offset(b'')

0 comments on commit 5237bb3

Please sign in to comment.