diff --git a/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py b/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py index 31e45eb1b9f..caff772d662 100644 --- a/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py +++ b/src/tribler/core/components/content_discovery/community/tests/test_content_discovery_community.py @@ -13,27 +13,26 @@ from unittest.mock import AsyncMock, Mock, patch import pytest - from ipv8.keyvault.crypto import default_eccrypto from ipv8.messaging.payload import IntroductionRequestPayload -from pony.orm import OperationalError, db_session - from ipv8.messaging.serialization import default_serializer from ipv8.test.base import TestBase from ipv8.test.mocking.ipv8 import MockIPv8 +from pony.orm import OperationalError, db_session + from tribler.core import notifications -from tribler.core.components.content_discovery.community.payload import SelectResponsePayload, VersionResponse, \ - TorrentsHealthPayload, PopularTorrentsRequest +from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity +from tribler.core.components.content_discovery.community.payload import PopularTorrentsRequest, SelectResponsePayload, \ + TorrentsHealthPayload, VersionResponse from tribler.core.components.content_discovery.community.settings import ContentDiscoverySettings from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer, \ ResourceType, SHOW_THRESHOLD from tribler.core.components.database.db.layers.tests.test_knowledge_data_access_layer_base import \ Resource, TestKnowledgeAccessLayerBase -from tribler.core.components.database.db.tribler_database import TriblerDatabase from tribler.core.components.database.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, NEW -from tribler.core.components.database.db.serialization import CHANNEL_THUMBNAIL, NULL_KEY, REGULAR_TORRENT +from tribler.core.components.database.db.serialization import NULL_KEY, REGULAR_TORRENT from tribler.core.components.database.db.store import MetadataStore -from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity +from tribler.core.components.database.db.tribler_database import TriblerDatabase from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo from tribler.core.tests.tools.base_test import MockObject @@ -163,7 +162,7 @@ async def test_torrents_health_gossip_no_checker(self): """ self.overlay(0).composition.torrent_checker = None - with self.assertReceivedBy(1, [], message_filter = [TorrentsHealthPayload]): + with self.assertReceivedBy(1, [], message_filter=[TorrentsHealthPayload]): self.overlay(0).gossip_random_torrents_health() await self.deliver_messages() @@ -171,7 +170,7 @@ async def test_torrents_health_gossip_no_live(self): """ Test whether torrent health information is spread when no live torrents are known """ - with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter = [TorrentsHealthPayload]) as received: + with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter=[TorrentsHealthPayload]) as received: self.overlay(0).gossip_random_torrents_health() await self.deliver_messages() message, = received @@ -489,7 +488,6 @@ def callback(_, results): assert obj.title == 'title1' assert obj.health.seeders == 0 - async def test_remote_select_packets_limit(self): """ Test dropping packets that go over the response limit for a remote select. @@ -502,6 +500,7 @@ async def test_remote_select_packets_limit(self): def add_result(request, processing_results): add_result.result_count += 1 + add_result.result_count = 0 expected = [SelectResponsePayload] @@ -516,7 +515,7 @@ def add_result(request, processing_results): while add_result.result_count < 10: await asyncio.sleep(0.1) - assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty + assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty assert add_result.result_count == 10 # The packet limit is 10 def test_sanitize_query(self): @@ -629,7 +628,7 @@ async def test_remote_query_big_response(self): with db_session: add_random_torrent(self.metadata_store(1).TorrentMetadata, name=hexlify(value).decode()) - kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]} + kwargs_dict = {"metadata_type": [REGULAR_TORRENT]} callback = Mock() self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, processing_callback=callback) @@ -734,8 +733,8 @@ def slow_get_entries(self, *args, **kwargs): time.sleep(0.1) return original_get_entries(self, *args, **kwargs) - with patch.object(self.overlay(0), 'logger') as logger,\ - patch.object(MetadataStore, 'get_entries', slow_get_entries): + with patch.object(self.overlay(0), 'logger') as logger, \ + patch.object(MetadataStore, 'get_entries', slow_get_entries): await self.deliver_messages(timeout=0.5) torrents1 = list(self.metadata_store(1).get_entries(**kwargs1)) @@ -772,7 +771,6 @@ async def test_deprecated_popular_torrents_request_no_live(self): assert message.random_torrents == [] assert message.torrents_checked == [] - async def test_deprecated_popular_torrents_request_live(self): """ The new protocol no longer uses PopularTorrentsRequest but still supports it. @@ -788,4 +786,4 @@ async def test_deprecated_popular_torrents_request_live(self): assert message.random_torrents_length == 0 assert message.torrents_checked_length == 1 assert message.random_torrents == [] - assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3]) \ No newline at end of file + assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3]) diff --git a/src/tribler/core/components/database/db/serialization.py b/src/tribler/core/components/database/db/serialization.py index 301e0474719..733a218bd29 100644 --- a/src/tribler/core/components/database/db/serialization.py +++ b/src/tribler/core/components/database/db/serialization.py @@ -6,7 +6,7 @@ from ipv8.keyvault.crypto import default_eccrypto from ipv8.messaging.lazy_payload import VariablePayload, vp_compile -from ipv8.messaging.serialization import default_serializer, VarLenUtf8 +from ipv8.messaging.serialization import VarLenUtf8, default_serializer from tribler.core.utilities.unicode import hexlify @@ -18,18 +18,9 @@ NULL_SIG = b'\x00' * 64 NULL_KEY = b'\x00' * 64 -# Metadata types. Should have been an enum, but in Python its unwieldy. -TYPELESS = 100 -CHANNEL_NODE = 200 -METADATA_NODE = 210 COLLECTION_NODE = 220 -JSON_NODE = 230 -CHANNEL_DESCRIPTION = 231 -BINARY_NODE = 240 -CHANNEL_THUMBNAIL = 241 REGULAR_TORRENT = 300 CHANNEL_TORRENT = 400 -DELETED = 500 SNIPPET = 600 @@ -67,14 +58,13 @@ class UnknownBlobTypeException(Exception): def read_payload_with_offset(data, offset=0): # First we have to determine the actual payload type metadata_type = struct.unpack_from('>H', data, offset=offset)[0] - payload_class = METADATA_TYPE_TO_PAYLOAD_CLASS.get(metadata_type) - if payload_class is not None: - payload, offset = default_serializer.unpack_serializable(payload_class, data, offset=offset) - payload.signature = data[offset: offset + 64] - return payload, offset + 64 - # Unknown metadata type, raise exception - raise UnknownBlobTypeException + if metadata_type != REGULAR_TORRENT: + raise UnknownBlobTypeException + + payload, offset = default_serializer.unpack_serializable(TorrentMetadataPayload, data, offset=offset) + payload.signature = data[offset: offset + 64] + return payload, offset + 64 @vp_compile @@ -110,9 +100,9 @@ def has_signature(self): def check_signature(self): return default_eccrypto.is_valid_signature( - default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key), - self.serialized(), - self.signature + default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key), + self.serialized(), + self.signature ) @@ -122,30 +112,6 @@ class ChannelNodePayload(SignedPayload): format_list = SignedPayload.format_list + ['Q', 'Q', 'Q'] -@vp_compile -class MetadataNodePayload(ChannelNodePayload): - names = ChannelNodePayload.names + ['title', 'tags'] - format_list = ChannelNodePayload.format_list + ['varlenIutf8', 'varlenIutf8'] - - -@vp_compile -class JsonNodePayload(ChannelNodePayload): - names = ChannelNodePayload.names + ['json_text'] - format_list = ChannelNodePayload.format_list + ['varlenIutf8'] - - -@vp_compile -class BinaryNodePayload(ChannelNodePayload): - names = ChannelNodePayload.names + ['binary_data', 'data_type'] - format_list = ChannelNodePayload.format_list + ['varlenI', 'varlenIutf8'] - - -@vp_compile -class CollectionNodePayload(MetadataNodePayload): - names = MetadataNodePayload.names + ['num_entries'] - format_list = MetadataNodePayload.format_list + ['Q'] - - @vp_compile class TorrentMetadataPayload(ChannelNodePayload): """ @@ -170,36 +136,6 @@ def get_magnet(self): ) -@vp_compile -class ChannelMetadataPayload(TorrentMetadataPayload): - """ - Payload for metadata that stores a channel. - """ - - names = TorrentMetadataPayload.names + ['num_entries', 'start_timestamp'] - format_list = TorrentMetadataPayload.format_list + ['Q'] + ['Q'] - - -@vp_compile -class DeletedMetadataPayload(SignedPayload): - """ - Payload for metadata that stores deleted metadata. - """ - - names = SignedPayload.names + ['delete_signature'] - format_list = SignedPayload.format_list + ['64s'] - - -METADATA_TYPE_TO_PAYLOAD_CLASS = { - REGULAR_TORRENT: TorrentMetadataPayload, - CHANNEL_TORRENT: ChannelMetadataPayload, - COLLECTION_NODE: CollectionNodePayload, - CHANNEL_THUMBNAIL: BinaryNodePayload, - CHANNEL_DESCRIPTION: JsonNodePayload, - DELETED: DeletedMetadataPayload, -} - - @vp_compile class HealthItemsPayload(VariablePayload): """ diff --git a/src/tribler/core/components/database/db/store.py b/src/tribler/core/components/database/db/store.py index 36e4d18fbbf..858a71f8f83 100644 --- a/src/tribler/core/components/database/db/store.py +++ b/src/tribler/core/components/database/db/store.py @@ -1,7 +1,7 @@ import enum -from dataclasses import dataclass, field import logging import re +from dataclasses import dataclass, field from datetime import datetime, timedelta from time import sleep, time from typing import Optional, Union @@ -15,15 +15,8 @@ from tribler.core.components.database.db.orm_bindings import misc, torrent_metadata, torrent_state as torrent_state_, \ tracker_state from tribler.core.components.database.db.orm_bindings.torrent_metadata import NULL_KEY_SUBST -from tribler.core.components.database.db.serialization import ( - CHANNEL_TORRENT, - COLLECTION_NODE, - HealthItemsPayload, - REGULAR_TORRENT, - read_payload_with_offset, - NULL_KEY -) - +from tribler.core.components.database.db.serialization import (CHANNEL_TORRENT, COLLECTION_NODE, HealthItemsPayload, + NULL_KEY, REGULAR_TORRENT, read_payload_with_offset) from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted from tribler.core.utilities.notifier import Notifier @@ -255,13 +248,13 @@ async def process_compressed_mdblob_threaded(self, compressed_data, **kwargs): try: return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs) except DatabaseIsCorrupted: - raise # re-raise this exception and terminate the Core process + raise # re-raise this exception and terminate the Core process except Exception as e: # pylint: disable=broad-except # pragma: no cover self._logger.exception("DB transaction error when tried to process compressed mdblob: " f"{e.__class__.__name__}: {e}", exc_info=e) return [] - def process_compressed_mdblob(self, compressed_data, **kwargs): + def process_compressed_mdblob(self, compressed_data, skip_personal_metadata_payload=True): try: with LZ4FrameDecompressor() as decompressor: decompressed_data = decompressor.decompress(compressed_data) @@ -278,7 +271,8 @@ def process_compressed_mdblob(self, compressed_data, **kwargs): self._logger.warning(f"Unable to parse health information: {type(e).__name__}: {str(e)}") raise - return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs) + return self.process_squashed_mdblob(decompressed_data, health_info=health_info, + skip_personal_metadata_payload=skip_personal_metadata_payload) def process_torrent_health(self, health: HealthInfo) -> bool: """ @@ -305,7 +299,8 @@ def process_torrent_health(self, health: HealthInfo) -> bool: return False - def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs): + def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, + skip_personal_metadata_payload=True): """ Process raw concatenated payloads blob. This routine breaks the database access into smaller batches. It uses a congestion-control like algorithm to determine the optimal batch size, targeting the @@ -344,7 +339,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info # We separate the sessions to minimize database locking. with db_session(immediate=True): for payload in batch: - result.extend(self.process_payload(payload, **kwargs)) + result.extend(self.process_payload(payload, skip_personal_metadata_payload)) # Batch size adjustment batch_end_time = datetime.now() - batch_start_time @@ -374,7 +369,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info return result @db_session - def process_payload(self, payload, skip_personal_metadata_payload=True, **kwargs): + def process_payload(self, payload, skip_personal_metadata_payload=True): # Don't process our own torrents if skip_personal_metadata_payload and payload.public_key == self.my_public_key_bin: return [] diff --git a/src/tribler/core/components/database/db/tests/test_serialization.py b/src/tribler/core/components/database/db/tests/test_serialization.py index 68aa2c510b5..30b3da4e1b7 100644 --- a/src/tribler/core/components/database/db/tests/test_serialization.py +++ b/src/tribler/core/components/database/db/tests/test_serialization.py @@ -1,6 +1,10 @@ from datetime import datetime +from unittest.mock import Mock, patch -from tribler.core.components.database.db.serialization import TorrentMetadataPayload +import pytest + +from tribler.core.components.database.db.serialization import TorrentMetadataPayload, UnknownBlobTypeException, \ + read_payload_with_offset def test_fix_torrent_metadata_payload(): @@ -24,3 +28,10 @@ def test_torrent_metadata_payload_magnet(): expected = "magnet:?xt=urn:btih:000102030405060708090a0b0c0d0e0f10111213&dn=b'title'&tr=b'tracker_info'" assert expected == payload.get_magnet() + + +@patch('struct.unpack_from', Mock(return_value=(301,))) +def test_read_payload_with_offset_exception(): + # Test that an exception is raised when metadata_type != REGULAR_TORRENT + with pytest.raises(UnknownBlobTypeException): + read_payload_with_offset(b'') diff --git a/src/tribler/core/components/database/restapi/database_endpoint.py b/src/tribler/core/components/database/restapi/database_endpoint.py index 5d1e5e2e57b..877fd588ef6 100644 --- a/src/tribler/core/components/database/restapi/database_endpoint.py +++ b/src/tribler/core/components/database/restapi/database_endpoint.py @@ -39,7 +39,6 @@ 'date': "torrent_date", 'created': "torrent_date", 'status': 'status', - 'torrents': 'num_entries', 'votes': 'votes', 'subscribed': 'subscribed', 'health': 'HEALTH',