Skip to content

Commit

Permalink
Merge pull request #7874 from drew2a/fix/7808
Browse files Browse the repository at this point in the history
Remove code related to `num_entries`
  • Loading branch information
drew2a authored Feb 1, 2024
2 parents 5d09538 + e14bf2f commit d488a0b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,26 @@
from unittest.mock import AsyncMock, Mock, patch

import pytest

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.messaging.payload import IntroductionRequestPayload
from pony.orm import OperationalError, db_session

from ipv8.messaging.serialization import default_serializer
from ipv8.test.base import TestBase
from ipv8.test.mocking.ipv8 import MockIPv8
from pony.orm import OperationalError, db_session

from tribler.core import notifications
from tribler.core.components.content_discovery.community.payload import SelectResponsePayload, VersionResponse, \
TorrentsHealthPayload, PopularTorrentsRequest
from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity
from tribler.core.components.content_discovery.community.payload import PopularTorrentsRequest, SelectResponsePayload, \
TorrentsHealthPayload, VersionResponse
from tribler.core.components.content_discovery.community.settings import ContentDiscoverySettings
from tribler.core.components.database.db.layers.knowledge_data_access_layer import KnowledgeDataAccessLayer, \
ResourceType, SHOW_THRESHOLD
from tribler.core.components.database.db.layers.tests.test_knowledge_data_access_layer_base import \
Resource, TestKnowledgeAccessLayerBase
from tribler.core.components.database.db.tribler_database import TriblerDatabase
from tribler.core.components.database.db.orm_bindings.torrent_metadata import LZ4_EMPTY_ARCHIVE, NEW
from tribler.core.components.database.db.serialization import CHANNEL_THUMBNAIL, NULL_KEY, REGULAR_TORRENT
from tribler.core.components.database.db.serialization import NULL_KEY, REGULAR_TORRENT
from tribler.core.components.database.db.store import MetadataStore
from tribler.core.components.content_discovery.community.content_discovery_community import ContentDiscoveryCommunity
from tribler.core.components.database.db.tribler_database import TriblerDatabase
from tribler.core.components.torrent_checker.torrent_checker.torrent_checker import TorrentChecker
from tribler.core.components.torrent_checker.torrent_checker.torrentchecker_session import HealthInfo
from tribler.core.tests.tools.base_test import MockObject
Expand Down Expand Up @@ -163,15 +162,15 @@ async def test_torrents_health_gossip_no_checker(self):
"""
self.overlay(0).composition.torrent_checker = None

with self.assertReceivedBy(1, [], message_filter = [TorrentsHealthPayload]):
with self.assertReceivedBy(1, [], message_filter=[TorrentsHealthPayload]):
self.overlay(0).gossip_random_torrents_health()
await self.deliver_messages()

async def test_torrents_health_gossip_no_live(self):
"""
Test whether torrent health information is spread when no live torrents are known
"""
with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter = [TorrentsHealthPayload]) as received:
with self.assertReceivedBy(1, [TorrentsHealthPayload], message_filter=[TorrentsHealthPayload]) as received:
self.overlay(0).gossip_random_torrents_health()
await self.deliver_messages()
message, = received
Expand Down Expand Up @@ -489,7 +488,6 @@ def callback(_, results):
assert obj.title == 'title1'
assert obj.health.seeders == 0


async def test_remote_select_packets_limit(self):
"""
Test dropping packets that go over the response limit for a remote select.
Expand All @@ -502,6 +500,7 @@ async def test_remote_select_packets_limit(self):

def add_result(request, processing_results):
add_result.result_count += 1

add_result.result_count = 0

expected = [SelectResponsePayload]
Expand All @@ -516,7 +515,7 @@ def add_result(request, processing_results):
while add_result.result_count < 10:
await asyncio.sleep(0.1)

assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty
assert [] == self.overlay(1).request_cache.get_tasks() # The list of outstanding requests should be empty
assert add_result.result_count == 10 # The packet limit is 10

def test_sanitize_query(self):
Expand Down Expand Up @@ -629,7 +628,7 @@ async def test_remote_query_big_response(self):
with db_session:
add_random_torrent(self.metadata_store(1).TorrentMetadata, name=hexlify(value).decode())

kwargs_dict = {"metadata_type": [CHANNEL_THUMBNAIL]}
kwargs_dict = {"metadata_type": [REGULAR_TORRENT]}
callback = Mock()
self.overlay(0).send_remote_select(self.peer(1), **kwargs_dict, processing_callback=callback)

Expand Down Expand Up @@ -734,8 +733,8 @@ def slow_get_entries(self, *args, **kwargs):
time.sleep(0.1)
return original_get_entries(self, *args, **kwargs)

with patch.object(self.overlay(0), 'logger') as logger,\
patch.object(MetadataStore, 'get_entries', slow_get_entries):
with patch.object(self.overlay(0), 'logger') as logger, \
patch.object(MetadataStore, 'get_entries', slow_get_entries):
await self.deliver_messages(timeout=0.5)

torrents1 = list(self.metadata_store(1).get_entries(**kwargs1))
Expand Down Expand Up @@ -772,7 +771,6 @@ async def test_deprecated_popular_torrents_request_no_live(self):
assert message.random_torrents == []
assert message.torrents_checked == []


async def test_deprecated_popular_torrents_request_live(self):
"""
The new protocol no longer uses PopularTorrentsRequest but still supports it.
Expand All @@ -788,4 +786,4 @@ async def test_deprecated_popular_torrents_request_live(self):
assert message.random_torrents_length == 0
assert message.torrents_checked_length == 1
assert message.random_torrents == []
assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3])
assert message.torrents_checked[0] == (b'00000000000000000000', 200, 0, message.torrents_checked[0][3])
84 changes: 10 additions & 74 deletions src/tribler/core/components/database/db/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from ipv8.keyvault.crypto import default_eccrypto
from ipv8.messaging.lazy_payload import VariablePayload, vp_compile
from ipv8.messaging.serialization import default_serializer, VarLenUtf8
from ipv8.messaging.serialization import VarLenUtf8, default_serializer

from tribler.core.utilities.unicode import hexlify

Expand All @@ -18,18 +18,9 @@
NULL_SIG = b'\x00' * 64
NULL_KEY = b'\x00' * 64

# Metadata types. Should have been an enum, but in Python its unwieldy.
TYPELESS = 100
CHANNEL_NODE = 200
METADATA_NODE = 210
COLLECTION_NODE = 220
JSON_NODE = 230
CHANNEL_DESCRIPTION = 231
BINARY_NODE = 240
CHANNEL_THUMBNAIL = 241
REGULAR_TORRENT = 300
CHANNEL_TORRENT = 400
DELETED = 500
SNIPPET = 600


Expand Down Expand Up @@ -67,14 +58,13 @@ class UnknownBlobTypeException(Exception):
def read_payload_with_offset(data, offset=0):
# First we have to determine the actual payload type
metadata_type = struct.unpack_from('>H', data, offset=offset)[0]
payload_class = METADATA_TYPE_TO_PAYLOAD_CLASS.get(metadata_type)
if payload_class is not None:
payload, offset = default_serializer.unpack_serializable(payload_class, data, offset=offset)
payload.signature = data[offset: offset + 64]
return payload, offset + 64

# Unknown metadata type, raise exception
raise UnknownBlobTypeException
if metadata_type != REGULAR_TORRENT:
raise UnknownBlobTypeException

payload, offset = default_serializer.unpack_serializable(TorrentMetadataPayload, data, offset=offset)
payload.signature = data[offset: offset + 64]
return payload, offset + 64


@vp_compile
Expand Down Expand Up @@ -110,9 +100,9 @@ def has_signature(self):

def check_signature(self):
return default_eccrypto.is_valid_signature(
default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key),
self.serialized(),
self.signature
default_eccrypto.key_from_public_bin(b"LibNaCLPK:" + self.public_key),
self.serialized(),
self.signature
)


Expand All @@ -122,30 +112,6 @@ class ChannelNodePayload(SignedPayload):
format_list = SignedPayload.format_list + ['Q', 'Q', 'Q']


@vp_compile
class MetadataNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['title', 'tags']
format_list = ChannelNodePayload.format_list + ['varlenIutf8', 'varlenIutf8']


@vp_compile
class JsonNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['json_text']
format_list = ChannelNodePayload.format_list + ['varlenIutf8']


@vp_compile
class BinaryNodePayload(ChannelNodePayload):
names = ChannelNodePayload.names + ['binary_data', 'data_type']
format_list = ChannelNodePayload.format_list + ['varlenI', 'varlenIutf8']


@vp_compile
class CollectionNodePayload(MetadataNodePayload):
names = MetadataNodePayload.names + ['num_entries']
format_list = MetadataNodePayload.format_list + ['Q']


@vp_compile
class TorrentMetadataPayload(ChannelNodePayload):
"""
Expand All @@ -170,36 +136,6 @@ def get_magnet(self):
)


@vp_compile
class ChannelMetadataPayload(TorrentMetadataPayload):
"""
Payload for metadata that stores a channel.
"""

names = TorrentMetadataPayload.names + ['num_entries', 'start_timestamp']
format_list = TorrentMetadataPayload.format_list + ['Q'] + ['Q']


@vp_compile
class DeletedMetadataPayload(SignedPayload):
"""
Payload for metadata that stores deleted metadata.
"""

names = SignedPayload.names + ['delete_signature']
format_list = SignedPayload.format_list + ['64s']


METADATA_TYPE_TO_PAYLOAD_CLASS = {
REGULAR_TORRENT: TorrentMetadataPayload,
CHANNEL_TORRENT: ChannelMetadataPayload,
COLLECTION_NODE: CollectionNodePayload,
CHANNEL_THUMBNAIL: BinaryNodePayload,
CHANNEL_DESCRIPTION: JsonNodePayload,
DELETED: DeletedMetadataPayload,
}


@vp_compile
class HealthItemsPayload(VariablePayload):
"""
Expand Down
27 changes: 11 additions & 16 deletions src/tribler/core/components/database/db/store.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import enum
from dataclasses import dataclass, field
import logging
import re
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from time import sleep, time
from typing import Optional, Union
Expand All @@ -15,15 +15,8 @@
from tribler.core.components.database.db.orm_bindings import misc, torrent_metadata, torrent_state as torrent_state_, \
tracker_state
from tribler.core.components.database.db.orm_bindings.torrent_metadata import NULL_KEY_SUBST
from tribler.core.components.database.db.serialization import (
CHANNEL_TORRENT,
COLLECTION_NODE,
HealthItemsPayload,
REGULAR_TORRENT,
read_payload_with_offset,
NULL_KEY
)

from tribler.core.components.database.db.serialization import (CHANNEL_TORRENT, COLLECTION_NODE, HealthItemsPayload,
NULL_KEY, REGULAR_TORRENT, read_payload_with_offset)
from tribler.core.components.torrent_checker.torrent_checker.dataclasses import HealthInfo
from tribler.core.utilities.db_corruption_handling.base import DatabaseIsCorrupted, handle_db_if_corrupted
from tribler.core.utilities.notifier import Notifier
Expand Down Expand Up @@ -255,13 +248,13 @@ async def process_compressed_mdblob_threaded(self, compressed_data, **kwargs):
try:
return await run_threaded(self.db, self.process_compressed_mdblob, compressed_data, **kwargs)
except DatabaseIsCorrupted:
raise # re-raise this exception and terminate the Core process
raise # re-raise this exception and terminate the Core process
except Exception as e: # pylint: disable=broad-except # pragma: no cover
self._logger.exception("DB transaction error when tried to process compressed mdblob: "
f"{e.__class__.__name__}: {e}", exc_info=e)
return []

def process_compressed_mdblob(self, compressed_data, **kwargs):
def process_compressed_mdblob(self, compressed_data, skip_personal_metadata_payload=True):
try:
with LZ4FrameDecompressor() as decompressor:
decompressed_data = decompressor.decompress(compressed_data)
Expand All @@ -278,7 +271,8 @@ def process_compressed_mdblob(self, compressed_data, **kwargs):
self._logger.warning(f"Unable to parse health information: {type(e).__name__}: {str(e)}")
raise

return self.process_squashed_mdblob(decompressed_data, health_info=health_info, **kwargs)
return self.process_squashed_mdblob(decompressed_data, health_info=health_info,
skip_personal_metadata_payload=skip_personal_metadata_payload)

def process_torrent_health(self, health: HealthInfo) -> bool:
"""
Expand All @@ -305,7 +299,8 @@ def process_torrent_health(self, health: HealthInfo) -> bool:

return False

def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None, **kwargs):
def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info=None,
skip_personal_metadata_payload=True):
"""
Process raw concatenated payloads blob. This routine breaks the database access into smaller batches.
It uses a congestion-control like algorithm to determine the optimal batch size, targeting the
Expand Down Expand Up @@ -344,7 +339,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info
# We separate the sessions to minimize database locking.
with db_session(immediate=True):
for payload in batch:
result.extend(self.process_payload(payload, **kwargs))
result.extend(self.process_payload(payload, skip_personal_metadata_payload))

# Batch size adjustment
batch_end_time = datetime.now() - batch_start_time
Expand Down Expand Up @@ -374,7 +369,7 @@ def process_squashed_mdblob(self, chunk_data, external_thread=False, health_info
return result

@db_session
def process_payload(self, payload, skip_personal_metadata_payload=True, **kwargs):
def process_payload(self, payload, skip_personal_metadata_payload=True):
# Don't process our own torrents
if skip_personal_metadata_payload and payload.public_key == self.my_public_key_bin:
return []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from datetime import datetime
from unittest.mock import Mock, patch

from tribler.core.components.database.db.serialization import TorrentMetadataPayload
import pytest

from tribler.core.components.database.db.serialization import TorrentMetadataPayload, UnknownBlobTypeException, \
read_payload_with_offset


def test_fix_torrent_metadata_payload():
Expand All @@ -24,3 +28,10 @@ def test_torrent_metadata_payload_magnet():
expected = "magnet:?xt=urn:btih:000102030405060708090a0b0c0d0e0f10111213&dn=b'title'&tr=b'tracker_info'"

assert expected == payload.get_magnet()


@patch('struct.unpack_from', Mock(return_value=(301,)))
def test_read_payload_with_offset_exception():
# Test that an exception is raised when metadata_type != REGULAR_TORRENT
with pytest.raises(UnknownBlobTypeException):
read_payload_with_offset(b'')
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
'date': "torrent_date",
'created': "torrent_date",
'status': 'status',
'torrents': 'num_entries',
'votes': 'votes',
'subscribed': 'subscribed',
'health': 'HEALTH',
Expand Down

0 comments on commit d488a0b

Please sign in to comment.