From ec4761a3094726c52657fbef0a5b08429d04adaf Mon Sep 17 00:00:00 2001 From: drew2a Date: Tue, 12 Sep 2023 14:37:07 +0200 Subject: [PATCH] Add a title to the knowledge processing --- .../knowledge/db/tests/test_knowledge_db.py | 15 ++++++++++ .../rules/knowledge_rules_processor.py | 28 ++++++++++++++++--- 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/tribler/core/components/knowledge/db/tests/test_knowledge_db.py b/src/tribler/core/components/knowledge/db/tests/test_knowledge_db.py index b35a638cc73..66716f22044 100644 --- a/src/tribler/core/components/knowledge/db/tests/test_knowledge_db.py +++ b/src/tribler/core/components/knowledge/db/tests/test_knowledge_db.py @@ -167,6 +167,21 @@ def test_add_auto_generated_tag(self): assert self.db.instance.Statement.get().added_count == SHOW_THRESHOLD assert self.db.instance.Peer.get().public_key == PUBLIC_KEY_FOR_AUTO_GENERATED_OPERATIONS + @db_session + def test_double_add_auto_generated_tag(self): + """ Test that adding the same auto-generated tag twice will not create a new Statement entity.""" + kwargs = { + 'subject_type': ResourceType.TORRENT, + 'subject': 'infohash', + 'predicate': ResourceType.TAG, + 'obj': 'tag' + } + self.db.add_auto_generated(**kwargs) + self.db.add_auto_generated(**kwargs) + + assert len(self.db.instance.Statement.select()) == 1 + assert self.db.instance.Statement.get().added_count == SHOW_THRESHOLD + @db_session def test_multiple_tags(self): self.add_operation_set( diff --git a/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py index 50c16812d97..8bf3594b82d 100644 --- a/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py +++ b/src/tribler/core/components/knowledge/rules/knowledge_rules_processor.py @@ -2,8 +2,10 @@ import queue import time from dataclasses import dataclass +from datetime import timedelta from typing import Optional, Set +import human_readable from ipv8.taskmanager import TaskManager from pony.orm import db_session @@ -37,7 +39,7 @@ class TorrentTitle: class KnowledgeRulesProcessor(TaskManager): # this value must be incremented in the case of new rules set has been applied - version: int = 4 + version: int = 5 def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore, batch_size: int = DEFAULT_BATCH_SIZE, batch_interval: float = DEFAULT_BATCH_INTERVAL, @@ -61,6 +63,8 @@ def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore self.queue_max_size = queue_max_size self._last_warning_time = 0 + self._start_rowid_in_current_session = 0 + self._start_time_in_current_session = 0 # this queue is used to be able to process entities supplied from another thread. self.queue: queue.Queue[TorrentTitle] = queue.Queue(maxsize=self.queue_max_size) @@ -82,9 +86,12 @@ def start_batch_processing(self): self.set_rules_processor_version(self.version) max_row_id = self.mds.get_max_rowid() - is_finished = self.get_last_processed_torrent_id() >= max_row_id + last_processed_torrent_id = self.get_last_processed_torrent_id() + is_finished = last_processed_torrent_id >= max_row_id if not is_finished: + self._start_rowid_in_current_session = last_processed_torrent_id + self._start_time_in_current_session = time.time() self.logger.info(f'Register process_batch task with interval: {self.batch_interval} sec') self.register_task( name=self.process_batch.__name__, @@ -137,7 +144,16 @@ def query(_start, _end): self.set_last_processed_torrent_id(end) duration = time.time() - start_time - message = f'[Batch] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.' + + def calculate_eta(): + processed_in_the_current_session = end - self._start_rowid_in_current_session + remaining = max_row_id - end + duration_in_tne_current_session = time.time() - self._start_time_in_current_session + eta = remaining * duration_in_tne_current_session / processed_in_the_current_session + return f'{human_readable.time_delta(timedelta(seconds=eta))} ({remaining} torrents left)' + + message = f'[Batch] Processed: {processed} titles. Added: {added} statements. Duration: {duration:.3f} ' \ + f'seconds. Estimated time for full processing: {calculate_eta()}' self.logger.info(message) is_finished = end >= max_row_id @@ -187,6 +203,10 @@ async def process_torrent_title(self, infohash: Optional[bytes] = None, title: O if not infohash or not title: return 0 infohash_str = hexlify(infohash) + + self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str, predicate=ResourceType.TITLE, + objects={title}) + if tags := set(extract_only_valid_tags(title, rules=general_rules)): self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str, predicate=ResourceType.TAG, objects=tags) @@ -195,7 +215,7 @@ async def process_torrent_title(self, infohash: Optional[bytes] = None, title: O self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str, predicate=ResourceType.CONTENT_ITEM, objects=content_items) - return len(tags) + len(content_items) + return len(tags) + len(content_items) + 1 @db_session def save_statements(self, subject_type: ResourceType, subject: str, predicate: ResourceType, objects: Set[str]):