Skip to content

Commit

Permalink
Parallelize Google drive sync
Browse files Browse the repository at this point in the history
  • Loading branch information
d4rken committed Sep 13, 2024
1 parent c017070 commit c102cdd
Showing 1 changed file with 57 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import eu.darken.octi.sync.core.SyncSettings
import eu.darken.octi.sync.core.SyncWrite
import eu.darken.octi.syncs.gdrive.core.GDriveEnvironment.Companion.APPDATAFOLDER
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.NonCancellable
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
Expand All @@ -52,7 +53,7 @@ import com.google.api.services.drive.model.File as GDriveFile
class GDriveAppDataConnector @AssistedInject constructor(
@Assisted private val client: GoogleClient,
@AppScope private val scope: CoroutineScope,
dispatcherProvider: DispatcherProvider,
private val dispatcherProvider: DispatcherProvider,
@ApplicationContext private val context: Context,
private val networkStateProvider: NetworkStateProvider,
private val supportedModuleIds: Set<@JvmSuppressWildcards ModuleId>,
Expand Down Expand Up @@ -163,11 +164,11 @@ class GDriveAppDataConnector @AssistedInject constructor(
}

val deviceFetchJobs = validDeviceDirs.map { deviceDir ->
scope.async deviceFetch@{
scope.async(dispatcherProvider.IO) deviceFetch@{
log(TAG, DEBUG) { "readDrive(): Reading module data for device: $deviceDir" }
val moduleDirs = deviceDir.listFiles().filter { supportedModuleIds.contains(ModuleId(it.name)) }
val moduleFetchJobs = moduleDirs.map { moduleFile ->
scope.async moduleFetch@{
scope.async(dispatcherProvider.IO) moduleFetch@{
log(TAG, VERBOSE) { "readDrive(): Reading ${moduleFile.name} for ${deviceDir.name}" }
val payload = moduleFile.readData()

Expand Down Expand Up @@ -210,6 +211,7 @@ class GDriveAppDataConnector @AssistedInject constructor(

override suspend fun sync(options: SyncOptions) {
log(TAG) { "sync(options=$options)" }
val start = System.currentTimeMillis()

if (!isInternetAvailable()) {
log(TAG, WARN) { "sync(): Skipping, we are offline." }
Expand All @@ -218,51 +220,77 @@ class GDriveAppDataConnector @AssistedInject constructor(

syncLock.withLock {
if (isSyncing) {
log(TAG, WARN) { "Already syncing, skipping" }
log(TAG, WARN) { "sync(): Already syncing, skipping" }
return
} else {
isSyncing = true
log(TAG, VERBOSE) { "Starting sync, acquiring" }
log(TAG, VERBOSE) { "sync(): Starting sync, acquiring" }
}
}

if (options.stats) {
try {
val deviceDirs = runDriveAction("sync-devicelist") {
appDataRoot.child(DEVICE_DATA_DIR_NAME)
runDriveAction("sync-sync") {
val jobs = mutableSetOf<Deferred<*>>()


scope.async(dispatcherProvider.IO) {
if (!options.stats) return@async

try {
val deviceDirs = appDataRoot.child(DEVICE_DATA_DIR_NAME)
?.listFiles()
?.filter { it.isDirectory }
?.map { DeviceId(id = it.name) }

_state.updateBlocking { copy(devices = deviceDirs) }
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to list of known devices: ${e.asLog()}" }
}
_state.updateBlocking { copy(devices = deviceDirs) }
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to list of known devices: ${e.asLog()}" }
}
try {
val newQuota = runDriveAction("sync-quota") { getStorageQuota() }
_state.updateBlocking { copy(quota = newQuota) }
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to update storage quota: ${e.asLog()}" }
val stop = System.currentTimeMillis()
log(TAG, VERBOSE) { "sync(...): devices finished (took ${stop - start}ms)" }
}.run { jobs.add(this) }

scope.async(dispatcherProvider.IO) {
if (!options.stats) return@async

try {
val newQuota = getStorageQuota()
_state.updateBlocking { copy(quota = newQuota) }
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to update storage quota: ${e.asLog()}" }
}
val stop = System.currentTimeMillis()
log(TAG, VERBOSE) { "sync(...): quota finished (took ${stop - start}ms)" }
}.run { jobs.add(this) }


if (options.writeData) {
// TODO Attempt to write data if we were offline and are now online?
}
}

if (options.writeData) {
// TODO Attempt to write data if we were offline and are now online?
}

if (options.readData) {
try {
runDriveAction("sync-readData") {
scope.async(dispatcherProvider.IO) {
if (!options.readData) return@async

try {
_data.value = readDrive()
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to read: ${e.asLog()}" }
}
} catch (e: Exception) {
log(TAG, ERROR) { "sync(): Failed to read: ${e.asLog()}" }
}
val stop = System.currentTimeMillis()
log(TAG, VERBOSE) { "sync(...): readData finished (took ${stop - start}ms)" }
}.run { jobs.add(this) }


log(TAG) { "sync(...): Waiting for jobs to finish..." }
jobs.awaitAll()
log(TAG) { "sync(...): ... jobs finished." }
}


syncLock.withLock {
isSyncing = false
log(TAG, VERBOSE) { "Sync done, releasing" }
val stop = System.currentTimeMillis()
log(TAG, VERBOSE) { "sync(): Sync done, releasing (took ${stop - start}ms)" }
}
}

Expand Down

0 comments on commit c102cdd

Please sign in to comment.