From c102cdd54df3336b99ca382dec9f03b3cff0b17b Mon Sep 17 00:00:00 2001 From: darken Date: Fri, 13 Sep 2024 16:12:44 +0200 Subject: [PATCH] Parallelize Google drive sync --- .../gdrive/core/GDriveAppDataConnector.kt | 86 ++++++++++++------- 1 file changed, 57 insertions(+), 29 deletions(-) diff --git a/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt b/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt index 37570be2..4bf372b8 100644 --- a/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt +++ b/syncs-gdrive/src/main/java/eu/darken/octi/syncs/gdrive/core/GDriveAppDataConnector.kt @@ -29,6 +29,7 @@ import eu.darken.octi.sync.core.SyncSettings import eu.darken.octi.sync.core.SyncWrite import eu.darken.octi.syncs.gdrive.core.GDriveEnvironment.Companion.APPDATAFOLDER import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred import kotlinx.coroutines.NonCancellable import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll @@ -52,7 +53,7 @@ import com.google.api.services.drive.model.File as GDriveFile class GDriveAppDataConnector @AssistedInject constructor( @Assisted private val client: GoogleClient, @AppScope private val scope: CoroutineScope, - dispatcherProvider: DispatcherProvider, + private val dispatcherProvider: DispatcherProvider, @ApplicationContext private val context: Context, private val networkStateProvider: NetworkStateProvider, private val supportedModuleIds: Set<@JvmSuppressWildcards ModuleId>, @@ -163,11 +164,11 @@ class GDriveAppDataConnector @AssistedInject constructor( } val deviceFetchJobs = validDeviceDirs.map { deviceDir -> - scope.async deviceFetch@{ + scope.async(dispatcherProvider.IO) deviceFetch@{ log(TAG, DEBUG) { "readDrive(): Reading module data for device: $deviceDir" } val moduleDirs = deviceDir.listFiles().filter { supportedModuleIds.contains(ModuleId(it.name)) } val moduleFetchJobs = moduleDirs.map { moduleFile -> - scope.async moduleFetch@{ + scope.async(dispatcherProvider.IO) moduleFetch@{ log(TAG, VERBOSE) { "readDrive(): Reading ${moduleFile.name} for ${deviceDir.name}" } val payload = moduleFile.readData() @@ -210,6 +211,7 @@ class GDriveAppDataConnector @AssistedInject constructor( override suspend fun sync(options: SyncOptions) { log(TAG) { "sync(options=$options)" } + val start = System.currentTimeMillis() if (!isInternetAvailable()) { log(TAG, WARN) { "sync(): Skipping, we are offline." } @@ -218,51 +220,77 @@ class GDriveAppDataConnector @AssistedInject constructor( syncLock.withLock { if (isSyncing) { - log(TAG, WARN) { "Already syncing, skipping" } + log(TAG, WARN) { "sync(): Already syncing, skipping" } return } else { isSyncing = true - log(TAG, VERBOSE) { "Starting sync, acquiring" } + log(TAG, VERBOSE) { "sync(): Starting sync, acquiring" } } } - if (options.stats) { - try { - val deviceDirs = runDriveAction("sync-devicelist") { - appDataRoot.child(DEVICE_DATA_DIR_NAME) + runDriveAction("sync-sync") { + val jobs = mutableSetOf>() + + + scope.async(dispatcherProvider.IO) { + if (!options.stats) return@async + + try { + val deviceDirs = appDataRoot.child(DEVICE_DATA_DIR_NAME) ?.listFiles() ?.filter { it.isDirectory } ?.map { DeviceId(id = it.name) } + + _state.updateBlocking { copy(devices = deviceDirs) } + } catch (e: Exception) { + log(TAG, ERROR) { "sync(): Failed to list of known devices: ${e.asLog()}" } } - _state.updateBlocking { copy(devices = deviceDirs) } - } catch (e: Exception) { - log(TAG, ERROR) { "sync(): Failed to list of known devices: ${e.asLog()}" } - } - try { - val newQuota = runDriveAction("sync-quota") { getStorageQuota() } - _state.updateBlocking { copy(quota = newQuota) } - } catch (e: Exception) { - log(TAG, ERROR) { "sync(): Failed to update storage quota: ${e.asLog()}" } + val stop = System.currentTimeMillis() + log(TAG, VERBOSE) { "sync(...): devices finished (took ${stop - start}ms)" } + }.run { jobs.add(this) } + + scope.async(dispatcherProvider.IO) { + if (!options.stats) return@async + + try { + val newQuota = getStorageQuota() + _state.updateBlocking { copy(quota = newQuota) } + } catch (e: Exception) { + log(TAG, ERROR) { "sync(): Failed to update storage quota: ${e.asLog()}" } + } + val stop = System.currentTimeMillis() + log(TAG, VERBOSE) { "sync(...): quota finished (took ${stop - start}ms)" } + }.run { jobs.add(this) } + + + if (options.writeData) { + // TODO Attempt to write data if we were offline and are now online? } - } - if (options.writeData) { - // TODO Attempt to write data if we were offline and are now online? - } - if (options.readData) { - try { - runDriveAction("sync-readData") { + scope.async(dispatcherProvider.IO) { + if (!options.readData) return@async + + try { _data.value = readDrive() + } catch (e: Exception) { + log(TAG, ERROR) { "sync(): Failed to read: ${e.asLog()}" } } - } catch (e: Exception) { - log(TAG, ERROR) { "sync(): Failed to read: ${e.asLog()}" } - } + val stop = System.currentTimeMillis() + log(TAG, VERBOSE) { "sync(...): readData finished (took ${stop - start}ms)" } + }.run { jobs.add(this) } + + + log(TAG) { "sync(...): Waiting for jobs to finish..." } + jobs.awaitAll() + log(TAG) { "sync(...): ... jobs finished." } } + syncLock.withLock { isSyncing = false - log(TAG, VERBOSE) { "Sync done, releasing" } + val stop = System.currentTimeMillis() + log(TAG, VERBOSE) { "sync(): Sync done, releasing (took ${stop - start}ms)" } } }