Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated libtorrent session creation to be async #8112

Merged
merged 1 commit into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/tribler/core/libtorrent/download_manager/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,20 @@ def on_performance_alert(self, alert: lt.performance_alert) -> None:

# When the send buffer watermark is too low, double the buffer size to a
# maximum of 50MiB. This is the same mechanism as Deluge uses.
lt_session = self.download_manager.get_session(self.config.get_hops())
lt_session = self.download_manager.get_session(self.config.get_hops()).result()
settings = self.download_manager.get_session_settings(lt_session)
if alert.message().endswith("send buffer watermark too low (upload rate will suffer)"):
if settings["send_buffer_watermark"] <= 26214400:
self._logger.info("Setting send_buffer_watermark to %s", 2 * settings["send_buffer_watermark"])
settings["send_buffer_watermark"] *= 2
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session().result(), settings)
# When the write cache is too small, double the buffer size to a maximum
# of 64MiB. Again, this is the same mechanism as Deluge uses.
elif (alert.message().endswith("max outstanding disk writes reached")
and settings["max_queued_disk_bytes"] <= 33554432):
self._logger.info("Setting max_queued_disk_bytes to %s", 2 * settings["max_queued_disk_bytes"])
settings["max_queued_disk_bytes"] *= 2
self.download_manager.set_session_settings(self.download_manager.get_session(), settings)
self.download_manager.set_session_settings(self.download_manager.get_session().result(), settings)

def on_torrent_removed_alert(self, alert: lt.torrent_removed_alert) -> None:
"""
Expand Down Expand Up @@ -771,7 +771,7 @@ def get_tracker_status(self) -> dict[str, tuple[int, str]]:
if info.source & info.pex:
pex_peers += 1

ltsession = self.download_manager.get_session(self.config.get_hops())
ltsession = self.download_manager.get_session(self.config.get_hops()).result()
public = self.tdef and not self.tdef.is_private()

result = self.tracker_status.copy()
Expand Down Expand Up @@ -864,7 +864,7 @@ def set_def(self, tdef: TorrentDef) -> None:
self.tdef = tdef

@check_handle(None)
def add_trackers(self, trackers: list[str]) -> None:
def add_trackers(self, trackers: list[bytes]) -> None:
"""
Add the given trackers to the handle.
"""
Expand Down
82 changes: 41 additions & 41 deletions src/tribler/core/libtorrent/download_manager/download_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import logging
import os
import time
from asyncio import CancelledError, gather, iscoroutine, shield, sleep, wait_for
from asyncio import CancelledError, Future, gather, iscoroutine, shield, sleep, wait_for
from binascii import hexlify, unhexlify
from collections import defaultdict
from copy import deepcopy
Expand Down Expand Up @@ -94,16 +94,16 @@ def __init__(self, config: TriblerConfigManager, notifier: Notifier,

self.state_dir = Path(config.get_version_state_dir())
self.ltsettings: dict[lt.session, dict] = {} # Stores a copy of the settings dict for each libtorrent session
self.ltsessions: dict[int, lt.session] = {}
self.ltsessions: dict[int, Future[lt.session]] = {}
self.dht_health_manager: DHTHealthManager | None = None
self.listen_ports: dict[int, dict[str, int]] = defaultdict(dict)

self.socks_listen_ports = config.get("libtorrent/socks_listen_ports")

self.notifier = notifier

self.set_upload_rate_limit(0)
self.set_download_rate_limit(0)
self.register_task("Set default upload rate limit", self.set_upload_rate_limit, 0)
self.register_task("Set default download rate limit", self.set_download_rate_limit, 0)

self.downloads: Dict[bytes, Download] = {}

Expand Down Expand Up @@ -171,20 +171,16 @@ async def _check_dht_ready(self, min_dht_peers: int = 60) -> None:

See https://github.com/Tribler/tribler/issues/5319
"""
while not (self.get_session() and self.get_session().status().dht_nodes > min_dht_peers):
while (await self.get_session()).status().dht_nodes < min_dht_peers:
await asyncio.sleep(1)

def initialize(self) -> None:
async def initialize(self) -> None:
"""
Initialize the directory structure, launch the periodic tasks and start libtorrent background processes.
"""
# Create the checkpoints directory
self.checkpoint_directory.mkdir(exist_ok=True, parents=True)

# Start upnp
if self.config.get("libtorrent/upnp"):
self.get_session().start_upnp()

# Register tasks
self.register_task("process_alerts", self._task_process_alerts, interval=1, ignore=(Exception, ))
if self.dht_readiness_timeout > 0 and self.config.get("libtorrent/dht"):
Expand All @@ -194,6 +190,10 @@ def initialize(self) -> None:

self.set_download_states_callback(self.sesscb_states_callback)

# Start upnp
if self.config.get("libtorrent/upnp"):
(await self.get_session()).start_upnp()

def start(self) -> None:
"""
Start loading the checkpoints from disk.
Expand Down Expand Up @@ -248,13 +248,14 @@ async def shutdown(self, timeout: int = 30) -> None:
if self.has_session():
logger.info("Saving state...")
self.notify_shutdown_state("Writing session state to disk.")
session = await self.get_session()
with open(self.state_dir / LTSTATE_FILENAME, "wb") as ltstate_file: # noqa: ASYNC230
ltstate_file.write(lt.bencode(self.get_session().save_state()))
ltstate_file.write(lt.bencode(session.save_state()))

if self.has_session() and self.config.get("libtorrent/upnp"):
logger.info("Stopping upnp...")
self.notify_shutdown_state("Stopping UPnP.")
self.get_session().stop_upnp()
(await self.get_session()).stop_upnp()

# Remove metadata temporary directory
if self.metadata_tmpdir:
Expand Down Expand Up @@ -360,12 +361,12 @@ def has_session(self, hops: int = 0) -> bool:
"""
return hops in self.ltsessions

def get_session(self, hops: int = 0) -> lt.session:
def get_session(self, hops: int = 0) -> Future[lt.session]:
"""
Get the session for the given number of anonymization hops.
"""
if hops not in self.ltsessions:
self.ltsessions[hops] = self.create_session(hops)
self.ltsessions[hops] = self.register_executor_task(f"Create session {hops}", self.create_session, hops)

return self.ltsessions[hops]

Expand All @@ -392,7 +393,7 @@ def set_max_connections(self, conns: int, hops: int | None = None) -> None:
"""
self._map_call_on_ltsessions(hops, "set_max_connections", conns)

def set_upload_rate_limit(self, rate: int) -> None:
async def set_upload_rate_limit(self, rate: int) -> None:
"""
Set the upload rate limit for the given session.
"""
Expand All @@ -403,18 +404,19 @@ def set_upload_rate_limit(self, rate: int) -> None:
# Pass outgoing_port and num_outgoing_ports to dict due to bug in libtorrent 0.16.18
settings_dict = {"upload_rate_limit": libtorrent_rate, "outgoing_port": 0, "num_outgoing_ports": 1}
for session in self.ltsessions.values():
self.set_session_settings(session, settings_dict)
self.set_session_settings(await session, settings_dict)

def get_upload_rate_limit(self, hops: int = 0) -> int:
async def get_upload_rate_limit(self, hops: int = 0) -> int:
"""
Get the upload rate limit for the session with the given hop count.
"""
# Rate conversion due to the fact that we had a different system with Swift
# and the old python BitTorrent core: unlimited == 0, stop == -1, else rate in kbytes
libtorrent_rate = self.get_session(hops).upload_rate_limit()
session = await self.get_session(hops)
libtorrent_rate = session.upload_rate_limit()
return self.reverse_convert_rate(rate=libtorrent_rate)

def set_download_rate_limit(self, rate: int) -> None:
async def set_download_rate_limit(self, rate: int) -> None:
"""
Set the download rate limit for the given session.
"""
Expand All @@ -423,13 +425,14 @@ def set_download_rate_limit(self, rate: int) -> None:
# Pass outgoing_port and num_outgoing_ports to dict due to bug in libtorrent 0.16.18
settings_dict = {"download_rate_limit": libtorrent_rate}
for session in self.ltsessions.values():
self.set_session_settings(session, settings_dict)
self.set_session_settings(await session, settings_dict)

def get_download_rate_limit(self, hops: int = 0) -> int:
async def get_download_rate_limit(self, hops: int = 0) -> int:
"""
Get the download rate limit for the session with the given hop count.
"""
libtorrent_rate = self.get_session(hops=hops).download_rate_limit()
session = await self.get_session(hops)
libtorrent_rate = session.download_rate_limit()
return self.reverse_convert_rate(rate=libtorrent_rate)

def process_alert(self, alert: lt.alert, hops: int = 0) -> None: # noqa: C901, PLR0912
Expand Down Expand Up @@ -598,23 +601,21 @@ def _task_cleanup_metainfo_cache(self) -> None:
if last_time < oldest_time:
del self.metainfo_cache[info_hash]

def _request_torrent_updates(self) -> None:
async def _request_torrent_updates(self) -> None:
for ltsession in self.ltsessions.values():
if ltsession:
ltsession.post_torrent_updates(0xffffffff)
(await ltsession).post_torrent_updates(0xffffffff)

def _task_process_alerts(self) -> None:
async def _task_process_alerts(self) -> None:
for hops, ltsession in list(self.ltsessions.items()):
if ltsession:
for alert in ltsession.pop_alerts():
self.process_alert(alert, hops=hops)
for alert in (await ltsession).pop_alerts():
self.process_alert(alert, hops=hops)

def _map_call_on_ltsessions(self, hops: int | None, funcname: str, *args: Any, **kwargs) -> None: # noqa: ANN401
if hops is None:
for session in self.ltsessions.values():
getattr(session, funcname)(*args, **kwargs)
session.add_done_callback(lambda s: getattr(s.result(), funcname)(*args, **kwargs))
else:
getattr(self.get_session(hops), funcname)(*args, **kwargs)
self.get_session(hops).add_done_callback(lambda s: getattr(s.result(), funcname)(*args, **kwargs))

async def start_download_from_uri(self, uri: str, config: DownloadConfig | None = None) -> Download:
"""
Expand Down Expand Up @@ -732,7 +733,7 @@ async def start_handle(self, download: Download, atp: dict) -> None:
if resume_data:
logger.debug("Download resume data: %s", str(atp["resume_data"]))

ltsession = self.get_session(download.config.get_hops())
ltsession = await self.get_session(download.config.get_hops())
infohash = download.get_def().get_infohash()

if infohash in self.metainfo_requests and self.metainfo_requests[infohash].download != download:
Expand Down Expand Up @@ -813,20 +814,20 @@ def update_max_rates_from_config(self) -> None:
This is the extra step necessary to apply a new maximum download/upload rate setting.
:return:
"""
rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
settings = {"download_rate_limit": download_rate,
"upload_rate_limit": rate}
for lt_session in self.ltsessions.values():
rate = DownloadManager.get_libtorrent_max_upload_rate(self.config)
download_rate = DownloadManager.get_libtorrent_max_download_rate(self.config)
settings = {"download_rate_limit": download_rate,
"upload_rate_limit": rate}
self.set_session_settings(lt_session, settings)
lt_session.add_done_callback(lambda s: self.set_session_settings(s.result(), settings))

def post_session_stats(self) -> None:
"""
Gather statistics and cause a ``session_stats_alert``.
"""
logger.info("Post session stats")
for session in self.ltsessions.values():
session.post_session_stats()
session.add_done_callback(lambda s: s.result().post_session_stats())

async def remove_download(self, download: Download, remove_content: bool = False,
remove_checkpoint: bool = True) -> None:
Expand All @@ -844,8 +845,7 @@ async def remove_download(self, download: Download, remove_content: bool = False
if download.stream is not None:
download.stream.disable()
logger.debug("Removing handle %s", hexlify(infohash))
ltsession = self.get_session(download.config.get_hops())
ltsession.remove_torrent(handle, int(remove_content))
(await self.get_session(download.config.get_hops())).remove_torrent(handle, int(remove_content))
else:
logger.debug("Cannot remove handle %s because it does not exists", hexlify(infohash))
await download.shutdown()
Expand Down Expand Up @@ -892,7 +892,7 @@ async def update_hops(self, download: Download, new_hops: int) -> None:

await self.start_download(tdef=download.tdef, config=config)

def update_trackers(self, infohash: bytes, trackers: list[str]) -> None:
def update_trackers(self, infohash: bytes, trackers: list[bytes]) -> None:
"""
Update the trackers for a download.

Expand Down
6 changes: 3 additions & 3 deletions src/tribler/core/libtorrent/restapi/libtorrent_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def get_libtorrent_settings(self, request: Request) -> RESTResponse:
if hop not in self.download_manager.ltsessions:
return RESTResponse({"hop": hop, "settings": {}})

lt_session = self.download_manager.ltsessions[hop]
lt_session = await self.download_manager.ltsessions[hop]
if hop == 0:
lt_settings = self.download_manager.get_session_settings(lt_session)
lt_settings["peer_fingerprint"] = hexlify(lt_settings["peer_fingerprint"].encode()).decode()
Expand Down Expand Up @@ -107,10 +107,10 @@ def on_session_stats_alert_received(alert: libtorrent.session_stats_alert) -> No
hop = int(args["hop"])

if hop not in self.download_manager.ltsessions or \
not hasattr(self.download_manager.ltsessions[hop], "post_session_stats"):
not hasattr(self.download_manager.ltsessions[hop].result(), "post_session_stats"):
return RESTResponse({"hop": hop, "session": {}})

self.download_manager.session_stats_callback = on_session_stats_alert_received
self.download_manager.ltsessions[hop].post_session_stats()
(await self.download_manager.ltsessions[hop]).post_session_stats()
stats = await session_stats
return RESTResponse({"hop": hop, "session": stats})
2 changes: 1 addition & 1 deletion src/tribler/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def start(self) -> None:
for server in self.socks_servers:
await server.start()
self.download_manager.socks_listen_ports = [s.port for s in self.socks_servers]
self.download_manager.initialize()
await self.download_manager.initialize()
self.download_manager.start()

# IPv8
Expand Down
2 changes: 1 addition & 1 deletion src/tribler/test_integration/test_anon_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ async def add_mock_download_config(self, manager: DownloadManager, hops: int) ->
manager.metadata_tmpdir = Mock(name=config.get_dest_dir())
manager.checkpoint_directory = config.get_dest_dir()
manager.peer_mid = b"0000"
manager.initialize()
await manager.initialize()
manager.start()
await sleep(0)

Expand Down
4 changes: 2 additions & 2 deletions src/tribler/test_integration/test_hidden_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ async def add_mock_download_config(self, manager: DownloadManager, hops: int) ->
manager.metadata_tmpdir = Mock(name=config.get_dest_dir())
manager.checkpoint_directory = config.get_dest_dir()
manager.peer_mid = b"0000"
manager.initialize()
await manager.initialize()
manager.start()
await sleep(0)

Expand All @@ -206,7 +206,7 @@ async def start_seeding(self) -> bytes:
"""
config = await self.add_mock_download_config(self.download_manager_seeder, 1)

with open(config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso", "wb") as f: # noqa: ASYNC101
with open(config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso", "wb") as f: # noqa: ASYNC230
f.write(bytes([0] * 524288))

metainfo = create_torrent_file([config.get_dest_dir() / "ubuntu-15.04-desktop-amd64.iso"], {})["metainfo"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -669,15 +669,17 @@ def test_on_save_resume_data_alert_permission_denied(self) -> None:
self.assertTrue(download.config.config["TEST_CRASH"])
self.assertEqual("name", download.config.config["download_defaults"]["name"])

def test_get_tracker_status_unicode_decode_error(self) -> None:
async def test_get_tracker_status_unicode_decode_error(self) -> None:
"""
Test if a tracker status is returned when getting trackers leads to a UnicodeDecodeError.

See: https://github.com/Tribler/tribler/issues/7036
"""
download = Download(TorrentDefNoMetainfo(b"\x01" * 20, b"name"), None, checkpoint_disabled=True,
config=self.create_mock_download_config())
download.download_manager = Mock(get_session=Mock(return_value=Mock(is_dht_running=Mock(return_value=False))))
fut = Future()
fut.set_result(Mock(is_dht_running=Mock(return_value=False)))
download.download_manager = Mock(get_session=Mock(return_value=fut))
download.handle = Mock(is_valid=Mock(return_value=True),
get_peer_info=Mock(
return_value=[Mock(source=1, dht=1, pex=0)] * 42 + [Mock(source=1, pex=1, dht=0)] * 7
Expand Down
Loading