Skip to content

Commit

Permalink
Add a title to the knowledge processing
Browse files Browse the repository at this point in the history
  • Loading branch information
drew2a committed Sep 12, 2023
1 parent 0fc1140 commit ec4761a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ def test_add_auto_generated_tag(self):
assert self.db.instance.Statement.get().added_count == SHOW_THRESHOLD
assert self.db.instance.Peer.get().public_key == PUBLIC_KEY_FOR_AUTO_GENERATED_OPERATIONS

@db_session
def test_double_add_auto_generated_tag(self):
""" Test that adding the same auto-generated tag twice will not create a new Statement entity."""
kwargs = {
'subject_type': ResourceType.TORRENT,
'subject': 'infohash',
'predicate': ResourceType.TAG,
'obj': 'tag'
}
self.db.add_auto_generated(**kwargs)
self.db.add_auto_generated(**kwargs)

assert len(self.db.instance.Statement.select()) == 1
assert self.db.instance.Statement.get().added_count == SHOW_THRESHOLD

@db_session
def test_multiple_tags(self):
self.add_operation_set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
import queue
import time
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional, Set

import human_readable
from ipv8.taskmanager import TaskManager
from pony.orm import db_session

Expand Down Expand Up @@ -37,7 +39,7 @@ class TorrentTitle:

class KnowledgeRulesProcessor(TaskManager):
# this value must be incremented in the case of new rules set has been applied
version: int = 4
version: int = 5

def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore,
batch_size: int = DEFAULT_BATCH_SIZE, batch_interval: float = DEFAULT_BATCH_INTERVAL,
Expand All @@ -61,6 +63,8 @@ def __init__(self, notifier: Notifier, db: KnowledgeDatabase, mds: MetadataStore
self.queue_max_size = queue_max_size

self._last_warning_time = 0
self._start_rowid_in_current_session = 0
self._start_time_in_current_session = 0

# this queue is used to be able to process entities supplied from another thread.
self.queue: queue.Queue[TorrentTitle] = queue.Queue(maxsize=self.queue_max_size)
Expand All @@ -82,9 +86,12 @@ def start_batch_processing(self):
self.set_rules_processor_version(self.version)

max_row_id = self.mds.get_max_rowid()
is_finished = self.get_last_processed_torrent_id() >= max_row_id
last_processed_torrent_id = self.get_last_processed_torrent_id()
is_finished = last_processed_torrent_id >= max_row_id

if not is_finished:
self._start_rowid_in_current_session = last_processed_torrent_id
self._start_time_in_current_session = time.time()
self.logger.info(f'Register process_batch task with interval: {self.batch_interval} sec')
self.register_task(
name=self.process_batch.__name__,
Expand Down Expand Up @@ -137,7 +144,16 @@ def query(_start, _end):
self.set_last_processed_torrent_id(end)

duration = time.time() - start_time
message = f'[Batch] Processed: {processed} titles. Added: {added} tags. Duration: {duration:.3f} seconds.'

def calculate_eta():
processed_in_the_current_session = end - self._start_rowid_in_current_session
remaining = max_row_id - end
duration_in_tne_current_session = time.time() - self._start_time_in_current_session
eta = remaining * duration_in_tne_current_session / processed_in_the_current_session
return f'{human_readable.time_delta(timedelta(seconds=eta))} ({remaining} torrents left)'

message = f'[Batch] Processed: {processed} titles. Added: {added} statements. Duration: {duration:.3f} ' \
f'seconds. Estimated time for full processing: {calculate_eta()}'
self.logger.info(message)

is_finished = end >= max_row_id
Expand Down Expand Up @@ -187,6 +203,10 @@ async def process_torrent_title(self, infohash: Optional[bytes] = None, title: O
if not infohash or not title:
return 0
infohash_str = hexlify(infohash)

self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str, predicate=ResourceType.TITLE,
objects={title})

if tags := set(extract_only_valid_tags(title, rules=general_rules)):
self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str, predicate=ResourceType.TAG,
objects=tags)
Expand All @@ -195,7 +215,7 @@ async def process_torrent_title(self, infohash: Optional[bytes] = None, title: O
self.save_statements(subject_type=ResourceType.TORRENT, subject=infohash_str,
predicate=ResourceType.CONTENT_ITEM, objects=content_items)

return len(tags) + len(content_items)
return len(tags) + len(content_items) + 1

@db_session
def save_statements(self, subject_type: ResourceType, subject: str, predicate: ResourceType, objects: Set[str]):
Expand Down

0 comments on commit ec4761a

Please sign in to comment.