Skip to content

Commit

Permalink
Add observeTorrentPeers for the /sync/torrentPeers?hash= endpoint
Browse files Browse the repository at this point in the history
- Refactor MainDataSync into reusable `DataSync<T>` utility
  • Loading branch information
DrewCarlson committed Jul 31, 2022
1 parent 9ac56c9 commit cf1deac
Show file tree
Hide file tree
Showing 9 changed files with 374 additions and 119 deletions.
1 change: 1 addition & 0 deletions client/api/qbittorrent-client.api
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class qbittorrent/QBittorrentClient {
public final fun observeMainData ()Lkotlinx/coroutines/flow/Flow;
public final fun observeTorrent (Ljava/lang/String;Z)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun observeTorrent$default (Lqbittorrent/QBittorrentClient;Ljava/lang/String;ZILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public final fun observeTorrentPeers (Ljava/lang/String;)Lkotlinx/coroutines/flow/Flow;
public final fun pauseTorrents (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun pauseTorrents$default (Lqbittorrent/QBittorrentClient;Ljava/util/List;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun reannounceTorrents (Ljava/util/List;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
56 changes: 51 additions & 5 deletions client/src/commonMain/kotlin/QBittorrentClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import kotlinx.coroutines.Dispatchers.Default
import kotlinx.coroutines.flow.*
import kotlinx.serialization.json.*
import qbittorrent.internal.*
import qbittorrent.internal.AtomicReference
import qbittorrent.internal.FileReader
import qbittorrent.internal.MainDataSync
import qbittorrent.internal.RawCookiesStorage
Expand Down Expand Up @@ -111,6 +112,27 @@ class QBittorrentClient(
}
private val syncScope = CoroutineScope(SupervisorJob() + dispatcher + http.coroutineContext)
private val mainDataSync = MainDataSync(http, config, syncScope)
private val peerDataSyncMapAtomic = AtomicReference(emptyMap<String, TorrentPeersSync>())
private var peerDataSyncMap: Map<String, TorrentPeersSync>
get() = peerDataSyncMapAtomic.value
set(value) {
peerDataSyncMapAtomic.value = value
}

private fun getPeersSync(hash: String): TorrentPeersSync? {
return peerDataSyncMap[hash.lowercase()]
}

private fun createPeersSync(hash: String): TorrentPeersSync {
return TorrentPeersSync(hash.lowercase(), http, config, syncScope).also { peersSync ->
peerDataSyncMap = peerDataSyncMap + (hash to peersSync)
}
}

private fun removePeersSync(hash: String) {
peerDataSyncMap[hash]?.close()
peerDataSyncMap = peerDataSyncMap - hash
}

/**
* Create a session with the username and password provided
Expand Down Expand Up @@ -152,7 +174,10 @@ class QBittorrentClient(
* only once, no matter how many times you invoke [observeMainData].
*/
fun observeMainData(): Flow<MainData> {
return mainDataSync.observeMainData()
return mainDataSync.observeData().transform { (mainData, error) ->
error?.let { throw it }
mainData?.let { emit(it) }
}
}

/**
Expand All @@ -165,14 +190,35 @@ class QBittorrentClient(
*/
fun observeTorrent(hash: String, waitIfMissing: Boolean = false): Flow<Torrent> {
return if (waitIfMissing) {
mainDataSync.observeMainData()
.takeWhile { mainData -> !mainData.torrentsRemoved.contains(hash) }
observeMainData().takeWhile { mainData -> !mainData.torrentsRemoved.contains(hash) }
} else {
mainDataSync.observeMainData()
.takeWhile { mainData -> mainData.torrents.contains(hash) }
observeMainData().takeWhile { mainData -> mainData.torrents.contains(hash) }
}.mapNotNull { mainData -> mainData.torrents[hash] }
}

/**
* Emits the latest [TorrentPeers] data for the [hash].
* If the torrent is removed or not found, the flow will complete.
*
* @param hash The info hash of the torrent to observe.
*/
fun observeTorrentPeers(hash: String): Flow<TorrentPeers> {
val peersSync = getPeersSync(hash) ?: createPeersSync(hash)
return peersSync.observeData()
.takeWhile { (_, error) -> (error?.response?.status != HttpStatusCode.NotFound) }
.transform { (mainData, error) ->
error?.let { throw it }
mainData?.let { emit(it) }
}
.onCompletion {
syncScope.launch {
if (getPeersSync(hash)?.isSyncing() == false) {
removePeersSync(hash)
}
}
}
}

/**
* This method can add torrents from server local file or from URLs.
* http://, https://, magnet: and bc://bt/ links are supported.
Expand Down
154 changes: 154 additions & 0 deletions client/src/commonMain/kotlin/internal/DataSync.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package qbittorrent.internal

import io.ktor.client.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.util.reflect.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.SharingStarted.Companion.Eagerly
import kotlinx.coroutines.launch
import kotlinx.serialization.json.*
import kotlinx.serialization.serializer
import qbittorrent.QBittorrentClient
import qbittorrent.QBittorrentException
import qbittorrent.json
import qbittorrent.models.MainData
import qbittorrent.models.TorrentPeers

typealias DataStatePair<T> = Pair<T?, QBittorrentException?>

internal abstract class DataSync<T>(
private val typeInfo: TypeInfo,
private val http: HttpClient,
private val config: QBittorrentClient.Config,
syncScope: CoroutineScope,
) {

abstract val endpointUrl: String
abstract val nestedObjectKeys: List<String>
open fun HttpRequestBuilder.configureRequest() = Unit

private val serializer = serializer(requireNotNull(typeInfo.kotlinType))
private val state = MutableStateFlow<DataStatePair<T>>(null to null)
private val isSyncingState = state.subscriptionCount.map { it > 0 }.stateIn(syncScope, Eagerly, false)
private val atomicSyncRid = AtomicReference(0L)
private var syncRid: Long
get() = atomicSyncRid.value
set(value) {
atomicSyncRid.value = value
}
private val syncLoopJob = syncScope.launch {
while (true) {
// Wait for the first subscribers
isSyncingState.first { it }
syncData()
}
}

fun isSyncing(): Boolean {
return isSyncingState.value
}

fun observeData(): Flow<DataStatePair<T>> {
return state
}

fun close() {
syncLoopJob.cancel()
}

private suspend fun syncData() {
try {
// Get the current MainData value, fetching the initial data if required
val (initialMainData, _) = state.updateAndGet { (mainData, error) ->
if (error == null) {
(mainData ?: fetchData(0).bodyOrThrow(typeInfo)) to null
} else {
// Last request produced an error, try it again
fetchData(syncRid).bodyOrThrow<T>(typeInfo) to null
}
}

delay(config.syncInterval)

val mainDataJson = json.encodeToJsonElement(serializer, initialMainData).mutateJson()
// Patch MainData while there is at least one subscriber
while (isSyncingState.value) {
if (syncRid == Long.MAX_VALUE) syncRid = 0

// Fetch the next MainData patch and merge into existing model, remove any error
state.value = mainDataJson.applyPatch(fetchData(++syncRid).bodyOrThrow()) to null

delay(config.syncInterval)
}
} catch (e: QBittorrentException) {
// Failed to fetch patch, keep current MainData and add the error
state.update { (mainData, _) -> mainData to e }
}
}

private suspend fun fetchData(rid: Long): HttpResponse {
return http.get("${config.baseUrl}/${endpointUrl.trimStart('/')}") {
parameter("rid", rid)
configureRequest()
}
}

private fun MutableMap<String, JsonElement>.applyPatch(newObject: JsonObject): T {
merge(newObject, nestedObjectKeys)
nestedObjectKeys.forEach { key -> dropRemoved(key) }
dropRemovedStrings("tags")

// Note: Create new model instance here so that one update event includes
// identifiers of removed data
@Suppress("UNCHECKED_CAST")
val mainData: T = json.decodeFromJsonElement(serializer, JsonObject(this)) as T

nestedObjectKeys.forEach { key -> resetRemoved(key) }
resetRemoved("tags")
return mainData
}
}

/**
* Manages a single [MainData] instance and updates it periodically
* when it is being observed.
*/
internal class MainDataSync(
http: HttpClient,
config: QBittorrentClient.Config,
syncScope: CoroutineScope,
) : DataSync<MainData>(
typeInfo = typeInfo<MainData>(),
http = http,
config = config,
syncScope = syncScope
) {
override val endpointUrl: String = "/api/v2/sync/maindata"
override val nestedObjectKeys: List<String> = listOf("torrents", "categories")
}

/**
* Manages a single [TorrentPeers] instance and updates it periodically
* when it is being observed.
*/
internal class TorrentPeersSync(
private val hash: String,
http: HttpClient,
config: QBittorrentClient.Config,
syncScope: CoroutineScope,
) : DataSync<TorrentPeers>(
typeInfo = typeInfo<TorrentPeers>(),
http = http,
config = config,
syncScope = syncScope,
) {
override val endpointUrl: String = "/api/v2/sync/torrentPeers"
override val nestedObjectKeys: List<String> = listOf("peers")

override fun HttpRequestBuilder.configureRequest() {
parameter("hash", hash)
}
}
17 changes: 15 additions & 2 deletions client/src/commonMain/kotlin/internal/HttpUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package qbittorrent.internal
import io.ktor.client.call.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.util.reflect.*
import qbittorrent.QBittorrentException

internal suspend fun HttpResponse.orThrow() {
if (!status.isSuccess()) {
throw call.attributes.takeOrNull(ErrorTransformer.KEY_INTERNAL_ERROR)?.run(::QBittorrentException)
throw call.attributes.takeOrNull(ErrorTransformer.KEY_INTERNAL_ERROR)
?.run(::QBittorrentException)
?: QBittorrentException(this, bodyAsText())
}
}
Expand All @@ -19,7 +21,18 @@ internal suspend inline fun <reified T> HttpResponse.bodyOrThrow(): T {
else -> body()
}
} else {
throw call.attributes.takeOrNull(ErrorTransformer.KEY_INTERNAL_ERROR)?.run(::QBittorrentException)
throw call.attributes.takeOrNull(ErrorTransformer.KEY_INTERNAL_ERROR)
?.run(::QBittorrentException)
?: QBittorrentException(this, bodyAsText())
}
}

internal suspend fun <T> HttpResponse.bodyOrThrow(typeInfo: TypeInfo): T {
return if (status.isSuccess()) {
body(typeInfo)
} else {
throw call.attributes.takeOrNull(ErrorTransformer.KEY_INTERNAL_ERROR)
?.run(::QBittorrentException)
?: QBittorrentException(this, bodyAsText())
}
}
13 changes: 10 additions & 3 deletions client/src/commonMain/kotlin/internal/JsonUtils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@ package qbittorrent.internal

import kotlinx.serialization.json.*

internal val emptyArray = buildJsonArray { }

internal fun MutableMap<String, JsonElement>.resetRemoved(key: String) {
put("${key}_removed", emptyArray)
}

internal fun MutableMap<String, JsonElement>.dropRemoved(key: String) {
val removeKeys = get("${key}_removed").toStringList()
if (removeKeys.isNotEmpty()) {
Expand All @@ -23,19 +29,20 @@ internal fun MutableMap<String, JsonElement>.dropRemovedStrings(key: String) {
}

internal fun MutableMap<String, JsonElement>.merge(
newJson: JsonObject
newJson: JsonObject,
nestedObjectKeys: List<String>,
): MutableMap<String, JsonElement> {
forEach { (key, currentValue) ->
val update = when (val newValue = newJson[key] ?: return@forEach) {
is JsonPrimitive, is JsonArray -> newValue
is JsonObject -> {
val actualObject = (if (currentValue is JsonNull) newValue else currentValue).mutateJson()
if (key == "torrents" || key == "categories") {
if (nestedObjectKeys.contains(key)) {
(newValue.keys - actualObject.keys).forEach { newHash ->
actualObject[newHash] = checkNotNull(newValue[newHash]).jsonObject
}
}
JsonObject(actualObject.merge(newValue.jsonObject))
JsonObject(actualObject.merge(newValue.jsonObject, emptyList()))
}
}

Expand Down
Loading

0 comments on commit cf1deac

Please sign in to comment.