From 456aaf9beaac4b553cfd9b36113502a5f7b54bf8 Mon Sep 17 00:00:00 2001 From: Matthias Urhahn Date: Wed, 22 Jan 2025 19:34:43 +0100 Subject: [PATCH] Replace RxShell with `FlowShell` (#1529) * FlowShell * `FlowProcess` implementation with tests TODO: root handling? * Wip * wip * wip * Further work on process and shell WIP: Next up is a command shell * Working implementation Remove rxshell dependency * Fix cancellation behavior If we prematurely cancel a command and submit another one, the previous commends output may confuse the next one. * Fix output and errors being missed due to `onStart` triggering too early, we need `onSubscription` * Clean up debug output * Fix flaky tests * Fix shared shell closing behavior --- .../eu/darken/sdmse/common/MountMaster.kt | 14 +- .../common/adb/service/AdbServiceHost.kt | 9 +- .../common/pkgs/pkgops/ipc/PkgOpsHost.kt | 20 +- .../common/root/service/RootServiceHost.kt | 9 +- .../eu/darken/sdmse/common/shell/ShellOps.kt | 14 +- .../sdmse/common/shell/ipc/ShellOpsHost.kt | 7 +- .../sdmse/common/shell/ipc/ShellOpsResult.kt | 3 +- .../service/internal/RootHostCmdBuilder.kt | 12 +- .../root/service/internal/RootHostLauncher.kt | 106 ++++--- .../common/root/service/internal/RootIPC.kt | 17 +- app-common-shell/build.gradle.kts | 3 - .../eu/darken/flowshell/core/FlowShell.kt | 111 ++++++++ .../darken/flowshell/core/FlowShellDebug.kt | 7 + .../flowshell/core/FlowShellException.kt | 8 + .../eu/darken/flowshell/core/cmd/FlowCmd.kt | 23 ++ .../flowshell/core/cmd/FlowCmdExtensions.kt | 34 +++ .../darken/flowshell/core/cmd/FlowCmdShell.kt | 166 +++++++++++ .../core/cmd/FlowCmdShellException.kt | 8 + .../flowshell/core/process/FlowProcess.kt | 124 ++++++++ .../core/process/FlowProcessExtensions.kt | 142 ++++++++++ .../darken/sdmse/common/shell/SharedShell.kt | 50 ++-- .../eu/darken/flowshell/core/FlowShellTest.kt | 154 ++++++++++ .../flowshell/core/cmd/FlowCmdShellTest.kt | 188 ++++++++++++ .../flowshell/core/process/FlowProcessTest.kt | 268 ++++++++++++++++++ .../test/app/app_shell/ExampleUnitTest.kt | 16 -- app-common/build.gradle.kts | 2 - .../eu/darken/sdmse/common/JUnitLogger.kt | 7 +- .../ValueBasedPolyJsonAdapterFactory.kt | 2 +- app/build.gradle.kts | 1 + 29 files changed, 1386 insertions(+), 139 deletions(-) create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShell.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellDebug.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellException.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmd.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdExtensions.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShell.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShellException.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcess.kt create mode 100644 app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcessExtensions.kt create mode 100644 app-common-shell/src/test/java/eu/darken/flowshell/core/FlowShellTest.kt create mode 100644 app-common-shell/src/test/java/eu/darken/flowshell/core/cmd/FlowCmdShellTest.kt create mode 100644 app-common-shell/src/test/java/eu/darken/flowshell/core/process/FlowProcessTest.kt delete mode 100644 app-common-shell/src/test/java/test/app/app_shell/ExampleUnitTest.kt diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/MountMaster.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/MountMaster.kt index 4656cf972..3bc8c785b 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/MountMaster.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/MountMaster.kt @@ -1,8 +1,10 @@ package eu.darken.sdmse.common import android.os.Build -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.FlowCmdShell +import eu.darken.flowshell.core.cmd.execute +import eu.darken.flowshell.core.process.FlowProcess import eu.darken.sdmse.common.debug.logging.Logging.Priority.INFO import eu.darken.sdmse.common.debug.logging.log import eu.darken.sdmse.common.debug.logging.logTag @@ -38,13 +40,13 @@ class MountMaster @Inject constructor( } log(TAG) { "Checking for mount-master support..." } - val result = Cmd.builder("su --help").execute(RxCmdShell.builder().root(true).build()) - if (result.exitCode != Cmd.ExitCode.OK) { - log(TAG, INFO) { "mount-master check failed: ${result.merge()}" } + val result = FlowCmd("su --help").execute(FlowCmdShell("su")) + if (result.exitCode != FlowProcess.ExitCode.OK) { + log(TAG, INFO) { "mount-master check failed: ${result.merged}" } return false } - val supported = result.merge().any { it.contains("--mount-master") } + val supported = result.merged.any { it.contains("--mount-master") } log(TAG, INFO) { "mount-master is required and current support status is supported=$supported" } return supported } diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/adb/service/AdbServiceHost.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/adb/service/AdbServiceHost.kt index 4cbb4aa7a..7ab3993b8 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/adb/service/AdbServiceHost.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/adb/service/AdbServiceHost.kt @@ -4,8 +4,8 @@ import android.content.Context import androidx.annotation.Keep import dagger.Lazy import dagger.hilt.android.qualifiers.ApplicationContext -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute import eu.darken.sdmse.common.adb.AdbServiceConnection import eu.darken.sdmse.common.debug.logging.Logging.Priority.INFO import eu.darken.sdmse.common.debug.logging.log @@ -16,6 +16,7 @@ import eu.darken.sdmse.common.pkgs.pkgops.ipc.PkgOpsConnection import eu.darken.sdmse.common.pkgs.pkgops.ipc.PkgOpsHost import eu.darken.sdmse.common.shell.ipc.ShellOpsConnection import eu.darken.sdmse.common.shell.ipc.ShellOpsHost +import kotlinx.coroutines.runBlocking import javax.inject.Inject import javax.inject.Singleton @@ -35,8 +36,8 @@ class AdbServiceHost @Inject constructor( override fun checkBase(): String { val sb = StringBuilder() sb.append("Our pkg: ${context.packageName}\n") - val ids = Cmd.builder("id").submit(RxCmdShell.Builder().build()).blockingGet() - sb.append("Shell ids are: ${ids.merge()}\n") + val ids = runBlocking { FlowCmd("id").execute() } + sb.append("Shell ids are: ${ids.merged}\n") val result = sb.toString() log(TAG) { "checkBase(): $result" } return result diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/pkgs/pkgops/ipc/PkgOpsHost.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/pkgs/pkgops/ipc/PkgOpsHost.kt index c18a5d7f2..0aad86b98 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/pkgs/pkgops/ipc/PkgOpsHost.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/pkgs/pkgops/ipc/PkgOpsHost.kt @@ -5,7 +5,9 @@ import android.content.Context import android.content.pm.PackageInfo import android.content.pm.PackageManager import dagger.hilt.android.qualifiers.ApplicationContext -import eu.darken.rxshell.cmd.Cmd +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute +import eu.darken.flowshell.core.process.FlowProcess import eu.darken.sdmse.common.debug.Bugs import eu.darken.sdmse.common.debug.Bugs.isDryRun import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR @@ -65,8 +67,8 @@ class PkgOpsHost @Inject constructor( log(TAG, ERROR) { "isRunning($packageName): runningAppProcesses failed due to $e " } runBlocking { sharedShell.useRes { - Cmd.builder("pidof $packageName").execute(it) - }.exitCode == Cmd.ExitCode.OK + FlowCmd("pidof $packageName").execute(it) + }.exitCode == FlowProcess.ExitCode.OK } } log(TAG, VERBOSE) { "isRunning(packageName=$packageName)=$result" } @@ -162,10 +164,10 @@ class PkgOpsHost @Inject constructor( log(TAG, VERBOSE) { "grantPermission($packageName, $handleId, $permissionId)..." } val result = runBlocking { sharedShell.useRes { - Cmd.builder("pm grant --user $handleId $packageName $permissionId").execute(it) + FlowCmd("pm grant --user $handleId $packageName $permissionId").execute(it) } } - result.exitCode == Cmd.ExitCode.OK + result.exitCode == FlowProcess.ExitCode.OK } catch (e: Exception) { log(TAG, ERROR) { "grantPermission($packageName, $handleId, $permissionId) failed: ${e.asLog()}" } throw e.wrapToPropagate() @@ -175,10 +177,10 @@ class PkgOpsHost @Inject constructor( log(TAG, VERBOSE) { "revokePermission($packageName, $handleId, $permissionId)..." } val result = runBlocking { sharedShell.useRes { - Cmd.builder("pm revoke --user $handleId $packageName $permissionId").execute(it) + FlowCmd("pm revoke --user $handleId $packageName $permissionId").execute(it) } } - result.exitCode == Cmd.ExitCode.OK + result.exitCode == FlowProcess.ExitCode.OK } catch (e: Exception) { log(TAG, ERROR) { "revokePermission($packageName, $handleId, $permissionId) failed: ${e.asLog()}" } throw e.wrapToPropagate() @@ -188,10 +190,10 @@ class PkgOpsHost @Inject constructor( log(TAG, VERBOSE) { "setAppOps($packageName, $handleId, $key, $value)..." } val result = runBlocking { sharedShell.useRes { - Cmd.builder("appops set --user $handleId $packageName $key $value ").execute(it) + FlowCmd("appops set --user $handleId $packageName $key $value ").execute(it) } } - result.exitCode == Cmd.ExitCode.OK + result.exitCode == FlowProcess.ExitCode.OK } catch (e: Exception) { log(TAG, ERROR) { "setAppOps($packageName, $handleId, $key, $value) failed: ${e.asLog()}" } throw e.wrapToPropagate() diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/root/service/RootServiceHost.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/root/service/RootServiceHost.kt index 52e79ae7c..e004d99a7 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/root/service/RootServiceHost.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/root/service/RootServiceHost.kt @@ -4,8 +4,8 @@ import android.content.Context import androidx.annotation.Keep import dagger.Lazy import dagger.hilt.android.qualifiers.ApplicationContext -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute import eu.darken.sdmse.common.debug.logging.log import eu.darken.sdmse.common.debug.logging.logTag import eu.darken.sdmse.common.files.local.ipc.FileOpsConnection @@ -14,6 +14,7 @@ import eu.darken.sdmse.common.pkgs.pkgops.ipc.PkgOpsConnection import eu.darken.sdmse.common.pkgs.pkgops.ipc.PkgOpsHost import eu.darken.sdmse.common.shell.ipc.ShellOpsConnection import eu.darken.sdmse.common.shell.ipc.ShellOpsHost +import kotlinx.coroutines.runBlocking import javax.inject.Inject import javax.inject.Singleton @@ -29,8 +30,8 @@ class RootServiceHost @Inject constructor( override fun checkBase(): String { val sb = StringBuilder() sb.append("Our pkg: ${context.packageName}\n") - val ids = Cmd.builder("id").submit(RxCmdShell.Builder().build()).blockingGet() - sb.append("Shell ids are: ${ids.merge()}\n") + val ids = runBlocking { FlowCmd("id").execute() } + sb.append("Shell ids are: ${ids.merged}\n") val result = sb.toString() log(TAG) { "checkBase(): $result" } return result diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ShellOps.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ShellOps.kt index f5a9df1ec..a7dd6e3f7 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ShellOps.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ShellOps.kt @@ -1,7 +1,7 @@ package eu.darken.sdmse.common.shell -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute import eu.darken.sdmse.common.adb.AdbManager import eu.darken.sdmse.common.adb.AdbUnavailableException import eu.darken.sdmse.common.adb.canUseAdbNow @@ -66,9 +66,7 @@ class ShellOps @Inject constructor( var result: ShellOpsResult? = null if (mode == Mode.NORMAL) { log(TAG, VERBOSE) { "execute(mode->NORMAL): $cmd" } - result = cmd.toRxCmdBuilder() - .execute(RxCmdShell.builder().build()) - .toShellOpsResult() + result = cmd.toFlowCmd().execute().toShellOpsResult() } if (result == null && rootManager.canUseRootNow() && mode == Mode.ROOT) { @@ -94,10 +92,10 @@ class ShellOps @Inject constructor( } } - private fun ShellOpsCmd.toRxCmdBuilder() = Cmd.builder(cmds) + private fun ShellOpsCmd.toFlowCmd() = FlowCmd(cmds) - private fun Cmd.Result.toShellOpsResult() = ShellOpsResult( - exitCode = exitCode, + private fun FlowCmd.Result.toShellOpsResult() = ShellOpsResult( + exitCode = exitCode.value, output = output, errors = errors ) diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsHost.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsHost.kt index bc9507cb6..9b38cc956 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsHost.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsHost.kt @@ -1,6 +1,7 @@ package eu.darken.sdmse.common.shell.ipc -import eu.darken.rxshell.cmd.Cmd +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute import eu.darken.sdmse.common.coroutine.AppScope import eu.darken.sdmse.common.coroutine.DispatcherProvider import eu.darken.sdmse.common.debug.Bugs @@ -25,10 +26,10 @@ class ShellOpsHost @Inject constructor( override fun execute(cmd: ShellOpsCmd): ShellOpsResult = try { runBlocking { val result = sharedShell.useRes { - Cmd.builder(cmd.cmds).execute(it) + FlowCmd(cmd.cmds).execute(it) } ShellOpsResult( - exitCode = result.exitCode, + exitCode = result.exitCode.value, output = result.output, errors = result.errors ) diff --git a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsResult.kt b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsResult.kt index a1cbe880f..df8752972 100644 --- a/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsResult.kt +++ b/app-common-io/src/main/java/eu/darken/sdmse/common/shell/ipc/ShellOpsResult.kt @@ -1,7 +1,6 @@ package eu.darken.sdmse.common.shell.ipc import android.os.Parcelable -import eu.darken.rxshell.cmd.Cmd import kotlinx.parcelize.Parcelize @Parcelize @@ -12,5 +11,5 @@ data class ShellOpsResult( ) : Parcelable { val isSuccess: Boolean - get() = exitCode == Cmd.ExitCode.OK + get() = exitCode == 0 } \ No newline at end of file diff --git a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostCmdBuilder.kt b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostCmdBuilder.kt index 15b926108..155a7fbda 100644 --- a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostCmdBuilder.kt +++ b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostCmdBuilder.kt @@ -6,7 +6,7 @@ import android.os.Build import android.os.Process import android.util.Base64 import androidx.annotation.RequiresApi -import eu.darken.rxshell.cmd.Cmd +import eu.darken.flowshell.core.cmd.FlowCmd import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN import eu.darken.sdmse.common.debug.logging.asLog @@ -17,7 +17,7 @@ import eu.darken.sdmse.common.parcel.marshall import java.io.File import java.io.FileNotFoundException import java.io.IOException -import java.util.* +import java.util.UUID import kotlin.reflect.KClass /** @@ -28,7 +28,7 @@ import kotlin.reflect.KClass */ @SuppressLint("PrivateApi") -class RootHostCmdBuilder constructor( +class RootHostCmdBuilder( private val context: Context, private val rootHost: KClass, ) { @@ -43,14 +43,12 @@ class RootHostCmdBuilder constructor( Class.forName("android.os.SystemProperties") } - @get:RequiresApi(26) private val isVndkLite by lazy { classSystemProperties .getDeclaredMethod("getBoolean", String::class.java, Boolean::class.java) .invoke(null, "ro.vndk.lite", false) as Boolean } - @get:RequiresApi(26) private val vndkVersion by lazy { classSystemProperties .getDeclaredMethod("get", String::class.java, String::class.java) @@ -185,7 +183,7 @@ class RootHostCmdBuilder constructor( fun build( withRelocation: Boolean, initialOptions: RootHostInitArgs, - ): Cmd.Builder { + ): FlowCmd { log { "build(relocate=$withRelocation, ${initialOptions})" } val cmds = mutableListOf() @@ -209,7 +207,7 @@ class RootHostCmdBuilder constructor( cmds.add(launchCmd) - return Cmd.builder(cmds) + return FlowCmd(cmds) } private fun buildLaunchCmd(isDebug: Boolean, processPath: String): String { diff --git a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostLauncher.kt b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostLauncher.kt index 6f5bbad60..f93bc4b0c 100644 --- a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostLauncher.kt +++ b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootHostLauncher.kt @@ -4,8 +4,11 @@ import android.content.Context import android.os.Debug import android.os.IInterface import dagger.hilt.android.qualifiers.ApplicationContext -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.FlowCmdShell +import eu.darken.flowshell.core.cmd.execute +import eu.darken.flowshell.core.cmd.openSession +import eu.darken.sdmse.common.debug.logging.Logging.Priority.DEBUG import eu.darken.sdmse.common.debug.logging.Logging.Priority.INFO import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN import eu.darken.sdmse.common.debug.logging.asLog @@ -13,10 +16,14 @@ import eu.darken.sdmse.common.debug.logging.log import eu.darken.sdmse.common.debug.logging.logTag import eu.darken.sdmse.common.ipc.getInterface import eu.darken.sdmse.common.root.RootException -import io.reactivex.rxjava3.schedulers.Schedulers +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.channels.trySendBlocking import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeoutOrNull import java.util.UUID import javax.inject.Inject import kotlin.reflect.KClass @@ -42,8 +49,8 @@ class RootHostLauncher @Inject constructor( log(TAG) { "createConnection($serviceClass, $hostClass, $useMountMaster, $options)" } log(TAG, INFO) { "Initiating connection to host($hostClass) via binder($serviceClass)" } - val rootSession = try { - RxCmdShell.builder().root(true).build().open().blockingGet() + val (rootSession, _) = try { + FlowCmdShell("su").openSession(this) } catch (e: Exception) { throw RootException("Failed to open root session.", e.cause) } @@ -65,61 +72,70 @@ class RootHostLauncher @Inject constructor( trySendBlocking(ConnectionWrapper(userConnection, connection)) } - override fun onDisconnect(ipc: RootConnection) { - log(TAG) { "onDisconnect(ipc=$ipc)" } + override fun onDisconnect(connection: RootConnection) { + log(TAG) { "onDisconnect(ipc=$connection)" } close() } } - invokeOnClose { - log(TAG) { "Canceling!" } - ipcReceiver.release() - // TODO timeout until we CANCEL? - rootSession.close().subscribe() - } - ipcReceiver.connect(context) - val result = try { - val cmdBuilder = RootHostCmdBuilder(context, hostClass) + launch { + try { + val cmdBuilder = RootHostCmdBuilder(context, hostClass) - if (useMountMaster) { - Cmd.builder("su --mount-master").submit(rootSession).observeOn(Schedulers.io()).blockingGet() - } + if (useMountMaster) { + FlowCmd("su --mount-master").execute(rootSession) + } - val initArgs = RootHostInitArgs( - pairingCode = pairingCode, - packageName = context.packageName, - waitForDebugger = options.isTrace && Debug.isDebuggerConnected(), - isDebug = options.isDebug, - isTrace = options.isTrace, - isDryRun = options.isDryRun, - recorderPath = options.recorderPath - ) + val initArgs = RootHostInitArgs( + pairingCode = pairingCode, + packageName = context.packageName, + waitForDebugger = options.isTrace && Debug.isDebuggerConnected(), + isDebug = options.isDebug, + isTrace = options.isTrace, + isDryRun = options.isDryRun, + recorderPath = options.recorderPath + ) + + try { + val cmd = cmdBuilder.build(withRelocation = false, initialOptions = initArgs).also { + log { "RootHost launch command is $it" } + } + + // Doesn't return until root host has quit + cmd.execute(rootSession).also { + log(TAG) { "Session (WITH-relocation) has ended: $it" } + } + } catch (e: CancellationException) { + log(TAG, DEBUG) { "Session was cancelled: ${e.asLog()}" } + } catch (e: Exception) { + log(TAG, WARN) { "Launch without relocation failed: ${e.asLog()}" } - try { - val cmd = cmdBuilder.build(withRelocation = false, initialOptions = initArgs) - log { "RootHost launch command is $cmd" } + val cmd = cmdBuilder.build(withRelocation = true, initialOptions = initArgs).also { + log { "RootHost launch command is $it" } + } - // Doesn't return until root host has quit - cmd.submit(rootSession).observeOn(Schedulers.io()).blockingGet() + cmd.execute(rootSession).also { + log(TAG) { "Session (without-relocation) has ended: $it" } + } + } } catch (e: Exception) { - log(TAG, WARN) { "Launch without relocation failed: ${e.asLog()}" } - - val cmd = cmdBuilder.build(withRelocation = true, initialOptions = initArgs) - log { "RootHost launch command is $cmd" } - - cmd.submit(rootSession).observeOn(Schedulers.io()).blockingGet() + log(TAG, WARN) { "Launch completely failed failed: ${e.asLog()}" } + throw RootException("Failed to launch java root host.", e.cause) } - } catch (e: Exception) { - throw RootException("Failed to launch java root host.", e.cause) + + log(TAG, INFO) { "Root host has quit" } } - log(TAG) { "Root host launch result was: $result" } + log(TAG) { "Waiting for session to close..." } + awaitClose { + log(TAG) { "Session is closing..." } + ipcReceiver.release() - // Check exitcode - if (result.exitCode == Cmd.ExitCode.SHELL_DIED) { - throw RootException("Shell died launching the java root host.") + runBlocking { + withTimeoutOrNull(10 * 1000) { rootSession.close() } ?: rootSession.cancel() + } } } diff --git a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootIPC.kt b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootIPC.kt index aacd36964..5ec7d5f0c 100644 --- a/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootIPC.kt +++ b/app-common-root/src/main/java/eu/darken/sdmse/common/root/service/internal/RootIPC.kt @@ -8,8 +8,9 @@ import android.os.RemoteException import dagger.assisted.Assisted import dagger.assisted.AssistedFactory import dagger.assisted.AssistedInject -import eu.darken.rxshell.cmd.Cmd -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmd +import eu.darken.flowshell.core.cmd.execute +import eu.darken.flowshell.core.process.FlowProcess import eu.darken.sdmse.common.debug.logging.Logging.Priority.* import eu.darken.sdmse.common.debug.logging.asLog import eu.darken.sdmse.common.debug.logging.log @@ -120,7 +121,7 @@ class RootIPC @AssistedInject constructor( require(timeout >= 0L) { "Timeout can't be negative: $timeout" } } - fun broadcastAndWait() { + suspend fun broadcastAndWait() { log(TAG) { "broadcast()" } broadcastIPC() @@ -168,7 +169,7 @@ class RootIPC @AssistedInject constructor( * Uses the reflected sendBroadcast method that doesn't require us to have a context * You may call this manually to re-broadcast the interface */ - private fun broadcastIPC() { + private suspend fun broadcastIPC() { val bundle = Bundle().apply { putBinder(RootConnectionReceiver.BROADCAST_BINDER, internalBinder) putString(RootConnectionReceiver.BROADCAST_CODE, initArgs.pairingCode) @@ -224,10 +225,10 @@ class RootIPC @AssistedInject constructor( connections.singleOrNull { it.deathRecipient === deathRecipient } } - private fun getUserIds(): List { - val result = Cmd.builder("pm list users").execute(RxCmdShell.builder().build()) - if (result.exitCode != Cmd.ExitCode.OK) { - log(TAG, ERROR) { "Failed to get user handles via shell: ${result.merge()}" } + private suspend fun getUserIds(): List { + val result = FlowCmd("pm list users").execute() + if (result.exitCode != FlowProcess.ExitCode.OK) { + log(TAG, ERROR) { "Failed to get user handles via shell: ${result.merged}" } return listOf(0) } diff --git a/app-common-shell/build.gradle.kts b/app-common-shell/build.gradle.kts index fc4a12777..ef4425266 100644 --- a/app-common-shell/build.gradle.kts +++ b/app-common-shell/build.gradle.kts @@ -43,9 +43,6 @@ dependencies { addDI() addCoroutines() - api("com.github.d4rken.rxshell:core:v3.0.0") - api("com.github.d4rken.rxshell:root:v3.0.0") - addTesting() testImplementation(project(":app-common-test")) testImplementation("org.robolectric:robolectric:4.9.1") diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShell.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShell.kt new file mode 100644 index 000000000..4899e1cfa --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShell.kt @@ -0,0 +1,111 @@ +package eu.darken.flowshell.core + +import eu.darken.flowshell.core.FlowShellDebug.isDebug +import eu.darken.flowshell.core.process.FlowProcess +import eu.darken.flowshell.core.process.killViaPid +import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN +import eu.darken.sdmse.common.debug.logging.asLog +import eu.darken.sdmse.common.debug.logging.log +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.stream.consumeAsFlow +import kotlinx.coroutines.withContext +import java.io.InputStream +import java.io.OutputStreamWriter +import java.nio.charset.StandardCharsets + +class FlowShell( + process: FlowProcess +) { + constructor( + shell: String = "sh" + ) : this( + process = FlowProcess( + launch = { ProcessBuilder(shell).start() }, + kill = { it.killViaPid(shell) } + ) + ) + + private val sessionProducer = process.session + .onStart { if (isDebug) log(TAG, VERBOSE) { "Starting session..." } } + .map { processSession -> + if (isDebug) log(TAG, VERBOSE) { "Wrapping to shell session..." } + Session(session = processSession) + } + .onEach { if (isDebug) log(TAG, VERBOSE) { "Emitting $it" } } + .onCompletion { + if (isDebug) { + if (it == null || it is CancellationException) { + log(TAG, VERBOSE) { "Flow is complete. (reason=$it)" } + } else { + log(TAG, WARN) { "Flow is completed unexpectedly: ${it.asLog()}" } + } + } + } + + val session: Flow = sessionProducer + + data class Session( + private val session: FlowProcess.Session, + ) { + + private val writer by lazy { + OutputStreamWriter(session.input, StandardCharsets.UTF_8) + } + + private fun InputStream.lineHarvester(tag: String) = flow { + if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) is active" } + bufferedReader().use { reader -> + reader.lines().consumeAsFlow().collect { + if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) -> $it" } + emit(it) + } + } + if (isDebug) log(TAG, VERBOSE) { "Harverster($tag) is finished" } + }.flowOn(Dispatchers.IO) + + val output: Flow = session.output!!.lineHarvester("output") + + val error: Flow = session.errors!!.lineHarvester("error") + + suspend fun write(line: String, flush: Boolean = true) = withContext(Dispatchers.IO) { + if (isDebug) log(TAG) { "write(line=$line, flush=$flush)" } + writer.write(line + System.lineSeparator()) + if (flush) writer.flush() + } + + val exitCode: Flow + get() = session.exitCode + + suspend fun isAlive() = session.isAlive() + + suspend fun waitFor(): FlowProcess.ExitCode = withContext(Dispatchers.IO) { + exitCode.filterNotNull().first() + } + + suspend fun cancel() = withContext(Dispatchers.IO) { + if (isDebug) log(TAG) { "kill()" } + session.cancel() + } + + suspend fun close() = withContext(Dispatchers.IO) { + if (isDebug) log(TAG) { "close()" } + write("exit") + waitFor() + } + } + + companion object { + private const val TAG = "FS:FlowShell" + } +} \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellDebug.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellDebug.kt new file mode 100644 index 000000000..46db19e8f --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellDebug.kt @@ -0,0 +1,7 @@ +package eu.darken.flowshell.core + +import eu.darken.sdmse.common.debug.Bugs + +object FlowShellDebug { + var isDebug: Boolean = Bugs.isTrace +} \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellException.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellException.kt new file mode 100644 index 000000000..c2608d316 --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/FlowShellException.kt @@ -0,0 +1,8 @@ +package eu.darken.flowshell.core + +import java.io.IOException + +open class FlowShellException( + message: String? = null, + cause: Throwable? = null, +) : IOException(message, cause) \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmd.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmd.kt new file mode 100644 index 000000000..1c6325bf0 --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmd.kt @@ -0,0 +1,23 @@ +package eu.darken.flowshell.core.cmd + +import eu.darken.flowshell.core.process.FlowProcess + +data class FlowCmd( + val instructions: List, +) { + + constructor(vararg instrs: String) : this(instrs.toList()) + + data class Result( + val original: FlowCmd, + val exitCode: FlowProcess.ExitCode, + val output: List, + val errors: List, + ) { + val isSuccessful: Boolean + get() = exitCode == FlowProcess.ExitCode.OK + + val merged: List + get() = output + errors + } +} \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdExtensions.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdExtensions.kt new file mode 100644 index 000000000..c3cba37f8 --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdExtensions.kt @@ -0,0 +1,34 @@ +package eu.darken.flowshell.core.cmd + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.WhileSubscribed +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.shareIn +import kotlin.time.Duration + +suspend fun FlowCmd.execute( + cmdShell: FlowCmdShell = FlowCmdShell() +): FlowCmd.Result = cmdShell.session + .map { it.execute(this) } + .first() + +suspend fun FlowCmd.execute(session: FlowCmdShell.Session) = session.execute(this) + +suspend fun FlowCmdShell.openSession(scope: CoroutineScope): Pair { + val sharedSession = session.shareIn( + scope = scope, + replay = 1, + started = SharingStarted.WhileSubscribed(replayExpiration = Duration.ZERO) + ) + val job = sharedSession.launchIn(scope) + return sharedSession.first() to job +} + +val FlowCmd.Result.exception: Throwable? + get() = if (isSuccessful) null else FlowCmdShellException( + message = "Command failed: exitCode=$exitCode:${errors.joinToString("\n")}", + ) \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShell.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShell.kt new file mode 100644 index 000000000..567e6e9eb --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShell.kt @@ -0,0 +1,166 @@ +package eu.darken.flowshell.core.cmd + +import eu.darken.flowshell.core.FlowShell +import eu.darken.flowshell.core.FlowShellDebug.isDebug +import eu.darken.flowshell.core.process.FlowProcess +import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN +import eu.darken.sdmse.common.debug.logging.asLog +import eu.darken.sdmse.common.debug.logging.log +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.SharingStarted +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.dropWhile +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.flow.onSubscription +import kotlinx.coroutines.flow.shareIn +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.plus +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlinx.coroutines.withContext +import java.util.UUID +import kotlin.coroutines.cancellation.CancellationException + +class FlowCmdShell( + flowShell: FlowShell +) { + + constructor(shell: String = "sh") : this(FlowShell(shell)) + + private val sessionProducer = flowShell.session + .onStart { if (isDebug) log(TAG, VERBOSE) { "Starting session..." } } + .map { shellSession -> + if (isDebug) log(TAG, VERBOSE) { "Wrapping to command shell session..." } + Session(session = shellSession) + } + .onEach { if (isDebug) log(TAG, VERBOSE) { "Emitting $it" } } + .onCompletion { + if (isDebug) { + if (it == null || it is CancellationException) { + log(TAG, VERBOSE) { "Flow is complete. (reason=$it)" } + } else { + log(TAG, WARN) { "Flow is completed unexpectedly: ${it.asLog()}" } + } + } + } + + val session: Flow = sessionProducer + + data class Session( + private val session: FlowShell.Session, + ) { + private val scope = CoroutineScope(Job() + Dispatchers.IO) + private val mutex = Mutex() + + private var cmdCount = 0 + val counter: Int + get() = cmdCount + + suspend fun isAlive() = session.isAlive() + + suspend fun waitFor() = session.waitFor() + + suspend fun cancel() = withContext(Dispatchers.IO) { + if (isDebug) log(TAG) { "kill()" } + session.cancel() + scope.cancel() + } + + suspend fun close() = withContext(Dispatchers.IO) { + if (isDebug) log(TAG) { "close()" } + session.close() + scope.cancel() + } + + private val sharedOutput = session.output.shareIn(scope, started = SharingStarted.Eagerly) + private val sharedErrors = session.error.shareIn(scope, started = SharingStarted.Eagerly) + + suspend fun execute(cmd: FlowCmd): FlowCmd.Result = withContext(Dispatchers.IO) { + mutex.withLock { + cmdCount++ + val id = UUID.randomUUID().toString() + val idStart = "$id-start" + val idEnd = "$id-end" + log(TAG, VERBOSE) { "submit($cmdCount): $cmd" } + + val output = mutableListOf() + val outputReady = CompletableDeferred() + val outputJob = sharedOutput + .onSubscription { + outputReady.complete(Unit) + if (isDebug) log(TAG, VERBOSE) { "Output monitor started ($id)" } + } + .dropWhile { it != idStart }.drop(1) + .onEach { + if (isDebug) log(TAG, VERBOSE) { "Adding (output-$id) $it" } + output.add(it) + } + .takeWhile { !it.startsWith(idEnd) } + .onCompletion { if (isDebug) log(TAG, VERBOSE) { "Output monitor finished ($id)" } } + .launchIn(this + Dispatchers.IO) + + val errors = mutableListOf() + val errorReady = CompletableDeferred() + val errorJob = sharedErrors + .onSubscription { + errorReady.complete(Unit) + if (isDebug) log(TAG, VERBOSE) { "Error monitor started ($id)" } + } + .dropWhile { it != idStart }.drop(1) + .takeWhile { it != idEnd } + .onEach { + if (isDebug) log(TAG, VERBOSE) { "Adding (errors-$id) $it" } + errors.add(it) + } + .onCompletion { if (isDebug) log(TAG, VERBOSE) { "Error monitor finished ($id)" } } + .launchIn(this + Dispatchers.IO) + + listOf(outputReady, errorReady).awaitAll() + + if (isDebug) log(TAG, VERBOSE) { "Harvesters are ready, writing commands... ($id)" } + + session.write("echo $idStart", false) + session.write("echo $idStart >&2", false) + cmd.instructions.forEach { session.write(it, flush = false) } + session.write("echo $idEnd $?", false) + session.write("echo $idEnd >&2", true) + + if (isDebug) log(TAG, VERBOSE) { "Commands are written, waiting... ($id)" } + + listOf(outputJob, errorJob).joinAll() + + if (isDebug) log(TAG, VERBOSE) { "Determining exitcode ($id)" } + val rawExitCodeRow = output.removeLast() + + val exitCode = rawExitCodeRow + .split(" ") + .let { it[1].toIntOrNull() } + ?.let { FlowProcess.ExitCode(it) } + ?: throw IllegalArgumentException("Failed to determine exitcode from $rawExitCodeRow") + + FlowCmd.Result( + original = cmd, + exitCode = exitCode, + output = output, + errors = errors + ).also { log(TAG) { "submit($cmdCount): $cmd -> $it" } } + } + } + } + + companion object { + private const val TAG = "FS:FlowCmdShell" + } +} \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShellException.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShellException.kt new file mode 100644 index 000000000..d2db0d4f8 --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/cmd/FlowCmdShellException.kt @@ -0,0 +1,8 @@ +package eu.darken.flowshell.core.cmd + +import eu.darken.flowshell.core.FlowShellException + +open class FlowCmdShellException( + message: String? = null, + cause: Throwable? = null, +) : FlowShellException(message, cause) \ No newline at end of file diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcess.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcess.kt new file mode 100644 index 000000000..ba39cd05e --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcess.kt @@ -0,0 +1,124 @@ +package eu.darken.flowshell.core.process + +import eu.darken.flowshell.core.FlowShellDebug.isDebug +import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR +import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN +import eu.darken.sdmse.common.debug.logging.asLog +import eu.darken.sdmse.common.debug.logging.log +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.NonCancellable +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.onCompletion +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onStart +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withContext +import kotlin.coroutines.cancellation.CancellationException + +class FlowProcess( + launch: suspend () -> Process, + kill: suspend (Process) -> Unit = { if (it.isAlive) it.destroyForcibly() }, +) { + + private val processCreator = callbackFlow { + if (isDebug) log(TAG, VERBOSE) { "Launching..." } + val process = launch() + if (isDebug) log(TAG, VERBOSE) { "Launched!" } + + val processExitCode = MutableStateFlow(null) + + val killRoutine: suspend () -> Unit = { + try { + kill(process) + } catch (e: Exception) { + log(TAG, ERROR) { "sessionKill threw up: ${e.asLog()}" } + throw e + } + } + + val session = Session( + process = process, + exitCode = processExitCode, + onKill = { + if (isDebug) log(TAG) { "Kill session due to kill()..." } + killRoutine() + } + ) + + // Send session first + if (isDebug) log(TAG) { "Emitting session: $session" } + send(session) + + // Otherwise we could already have closed, if the process is short + launch(Dispatchers.IO + NonCancellable) { + if (isDebug) log(TAG, VERBOSE) { "Exit-monitor: Waiting for process finish" } + val code = process.waitFor().let { ExitCode(it) } + if (isDebug) log(TAG) { "Exit-monitor: Process finished with $code" } + processExitCode.value = code + this@callbackFlow.close() + } + + if (isDebug) log(TAG, VERBOSE) { "Waiting for flow to close..." } + awaitClose { + if (isDebug) log(TAG, VERBOSE) { "Flow is closing..." } + runBlocking { + killRoutine() + + if (isDebug) log(TAG) { "Waiting for process to be terminate" } + process.waitFor() + if (isDebug) log(TAG) { "Process has terminated" } + } + if (isDebug) log(TAG, VERBOSE) { "Flow is closed!" } + } + } + + val session: Flow = processCreator + .onStart { if (isDebug) log(TAG, VERBOSE) { "Starting session..." } } + .onEach { if (isDebug) log(TAG, VERBOSE) { "Emitting $it" } } + .onCompletion { + if (isDebug) { + if (it == null || it is CancellationException) { + log(TAG, VERBOSE) { "Flow is complete. (reason=$it)" } + } else { + log(TAG, WARN) { "Flow is completed unexpectedly: ${it.asLog()}" } + } + } + } + + data class Session( + private val process: Process, + val exitCode: Flow, + private val onKill: suspend () -> Unit, + ) { + val input = process.outputStream + val output = process.inputStream + val errors = process.errorStream + + suspend fun waitFor() = withContext(Dispatchers.IO) { + exitCode.filterNotNull().first() + } + + suspend fun isAlive() = exitCode.first() == null + + suspend fun cancel() = withContext(Dispatchers.IO) { + onKill() + } + } + + data class ExitCode(val value: Int) { + companion object { + val OK = ExitCode(0) + } + } + + companion object { + private const val TAG = "FS:FlowProcess" + } +} diff --git a/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcessExtensions.kt b/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcessExtensions.kt new file mode 100644 index 000000000..a118ed4a4 --- /dev/null +++ b/app-common-shell/src/main/java/eu/darken/flowshell/core/process/FlowProcessExtensions.kt @@ -0,0 +1,142 @@ +package eu.darken.flowshell.core.process + +import eu.darken.flowshell.core.FlowShellDebug.isDebug +import eu.darken.sdmse.common.debug.logging.Logging.Priority.ERROR +import eu.darken.sdmse.common.debug.logging.Logging.Priority.VERBOSE +import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN +import eu.darken.sdmse.common.debug.logging.log +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.withContext +import java.io.IOException +import java.io.InputStream +import java.io.OutputStreamWriter +import java.util.regex.Pattern + +private const val TAG = "FS:FlowProcess:Extensions" +private val PID_PATTERN = Pattern.compile("^.+?pid=(\\d+).+?$") +private val SPACES_PATTERN = Pattern.compile("\\s+") + +// stupid method for getting the pid, but it actually works +internal val Process.processPid: Int? + get() = PID_PATTERN.matcher(this.toString()) + .takeIf { it.matches() } + ?.group(1)?.toInt() + +suspend fun Process.killViaPid(shell: String = "sh"): Boolean { + if (isDebug) log(TAG, VERBOSE) { "killViaPid($this,$shell)" } + if (!this.isAlive) { + if (isDebug) log(TAG, VERBOSE) { "Process is no longer alive, skipping kill." } + return true + } + + val pid = this.processPid + if (pid == null) { + if (isDebug) log(TAG, ERROR) { "Can't find PID for $this" } + return false + } + val pidFamily = this.pidFamily(shell) + if (isDebug) log(TAG, VERBOSE) { "Family pids: $pidFamily" } + + return pidFamily?.kill() ?: false +} + +// use 'ps' to get this pid and all pids that are related to it (e.g. spawned by it) +internal suspend fun Process.pidFamily(shell: String = "sh"): PidFamily? = withContext(Dispatchers.IO) { + val parentPid = processPid ?: return@withContext null + var process: Process? = null + val childPids = try { + process = ProcessBuilder(shell).start() + + val rawPidLines = coroutineScope { + val output = mutableListOf() + val error = mutableListOf() + process.errorStream.miniHarvester().onEach { output.add(it) }.launchIn(this) + process.inputStream.miniHarvester().onEach { error.add(it) }.launchIn(this) + + OutputStreamWriter(process.outputStream).apply { + write("ps -o pid,ppid${System.lineSeparator()}") + write("exit${System.lineSeparator()}") + flush() + close() + } + + val exitcode = process.waitFor() + if (exitcode == 0) output + error else null + } + + rawPidLines + ?.asSequence() + ?.drop(1) // Title line + ?.map { SPACES_PATTERN.split(it) } + ?.filter { it.size >= 3 } + ?.filter { parentPid == it[2].toInt() } + ?.mapNotNull { line -> + try { + line[1].toInt() + } catch (e: NumberFormatException) { + if (isDebug) log(TAG, WARN) { "pidFamily(parentPid) parse failure: $line" } + null + } + } + ?.toSet() + } catch (interrupt: InterruptedException) { + if (isDebug) log(TAG, WARN) { "Interrupted" } + return@withContext null + } catch (e: IOException) { + if (isDebug) log(TAG, WARN) { "IOException, pipe broke?" } + return@withContext null + } finally { + process?.destroy() + } + + return@withContext PidFamily(parentPid, childPids ?: emptySet()).also { + if (isDebug) log(TAG, VERBOSE) { "pidFamily($parentPid) is $it" } + } +} + +internal data class PidFamily( + val parent: Int, + val children: Set +) { + suspend fun kill(shell: String = "sh"): Boolean = withContext(Dispatchers.IO) { + val pids = listOf(parent) + children + var process: Process? = null + try { + process = ProcessBuilder(shell).start() + + coroutineScope { + process.errorStream.miniHarvester().launchIn(this) + process.inputStream.miniHarvester().launchIn(this) + + OutputStreamWriter(process.outputStream).apply { + pids.forEach { write("kill -9 $it${System.lineSeparator()}") } + write("exit${System.lineSeparator()}") + flush() + close() + } + } + + val exitcode = process.waitFor() + if (isDebug) log(TAG, VERBOSE) { "kill(pids=$pids) -> exitcode: $exitcode" } + exitcode == 0 + + } catch (interrupt: InterruptedException) { + log(TAG, WARN) { "kill(pids=$pids) Interrupted!" } + false + } catch (e: IOException) { + log(TAG, WARN) { "kill(pids=$pids) IOException, command failed? not found?" } + false + } finally { + process?.destroy() + } + } +} + +private fun InputStream.miniHarvester() = flow { + val reader = this@miniHarvester.reader().buffered() + reader.lineSequence().forEach { emit(it) } +} diff --git a/app-common-shell/src/main/java/eu/darken/sdmse/common/shell/SharedShell.kt b/app-common-shell/src/main/java/eu/darken/sdmse/common/shell/SharedShell.kt index 120a92e9a..60fed5841 100644 --- a/app-common-shell/src/main/java/eu/darken/sdmse/common/shell/SharedShell.kt +++ b/app-common-shell/src/main/java/eu/darken/sdmse/common/shell/SharedShell.kt @@ -1,47 +1,57 @@ package eu.darken.sdmse.common.shell -import eu.darken.rxshell.cmd.RxCmdShell +import eu.darken.flowshell.core.cmd.FlowCmdShell +import eu.darken.sdmse.common.debug.logging.Logging.Priority.WARN import eu.darken.sdmse.common.debug.logging.log +import eu.darken.sdmse.common.flow.replayingShare import eu.darken.sdmse.common.sharedresource.HasSharedResource import eu.darken.sdmse.common.sharedresource.SharedResource +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.callbackFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.plus +import kotlinx.coroutines.runBlocking -class SharedShell constructor( +class SharedShell( tag: String, scope: CoroutineScope, -) : HasSharedResource { +) : HasSharedResource { private val aTag = "$tag:SharedShell" - private val source = callbackFlow { + private val source = callbackFlow { log(aTag) { "Initiating connection to host." } val session = try { - RxCmdShell.builder().build().open().blockingGet() + val sharedSession = FlowCmdShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + sharedSession.first() } catch (e: Exception) { throw e } - invokeOnClose { - log(aTag) { "Canceling!" } - session.close().subscribe() - - } - send(session) - val end = try { - session.waitFor().blockingGet() - } catch (sessionError: Exception) { - throw IllegalStateException("SharedShell finished unexpectedly", sessionError) - } - - if (end != 0) { - throw IllegalStateException("SharedShell finished with exitcode $end") + awaitClose { + log(aTag) { "Closing!" } + runBlocking { + try { + session.close() + val exitCode = session.waitFor() + log(aTag) { "FlowCmdShell finished with exitcode $exitCode" } + } catch (e: CancellationException) { + log(aTag) { "FlowCmdShell was cancelled: $e" } + } catch (e: Exception) { + log(aTag, WARN) { "Session.close() failed: $e" } + } + } } } val session = SharedResource(aTag, scope, source) - override val sharedResource: SharedResource = session + override val sharedResource: SharedResource = session } \ No newline at end of file diff --git a/app-common-shell/src/test/java/eu/darken/flowshell/core/FlowShellTest.kt b/app-common-shell/src/test/java/eu/darken/flowshell/core/FlowShellTest.kt new file mode 100644 index 000000000..d34d3a418 --- /dev/null +++ b/app-common-shell/src/test/java/eu/darken/flowshell/core/FlowShellTest.kt @@ -0,0 +1,154 @@ +package eu.darken.flowshell.core + + +import eu.darken.flowshell.core.process.FlowProcess +import eu.darken.sdmse.common.debug.logging.log +import eu.darken.sdmse.common.flow.replayingShare +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.takeWhile +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.plus +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import testhelpers.BaseTest +import testhelpers.coroutine.runTest2 +import java.util.Base64 +import java.util.UUID + +class FlowShellTest : BaseTest() { + @BeforeEach + fun setup() { + FlowShellDebug.isDebug = true + } + + @AfterEach + fun teardown() { + FlowShellDebug.isDebug = false + } + + @Test fun `base operation`() = runTest { + val shell = FlowShell() + + val output = mutableListOf() + val errors = mutableListOf() + val rows = (1..10L) + + shell.session.flowOn(Dispatchers.IO).collect { session -> + session.output.onEach { output.add(it) }.launchIn(this) + session.error.onEach { errors.add(it) }.launchIn(this) + rows.forEach { + session.write("echo test $it") + session.write("echo error $it 1>&2") + delay(it) + } + session.close() + session.isAlive() shouldBe false + session.waitFor() shouldBe FlowProcess.ExitCode.OK + } + + output shouldBe rows.map { "test $it" } + errors shouldBe rows.map { "error $it" } + } + + @Test fun `exitcode behavior`() = runTest2(autoCancel = true) { + val sharedSession = FlowShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + + sharedSession.first().apply { + isAlive() shouldBe true + exitCode.first() shouldBe null + close() + waitFor() shouldBe FlowProcess.ExitCode.OK + exitCode.first() shouldBe waitFor() + isAlive() shouldBe false + } + } + + @Test fun `session can be closed`() = runTest2(autoCancel = true) { + val sharedSession = FlowShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + sharedSession.first().apply { + close() + waitFor() shouldBe FlowProcess.ExitCode.OK + } + } + + @Test fun `session can be killed`() = runTest2(autoCancel = true) { + val sharedSession = FlowShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + sharedSession.first().apply { + cancel() + waitFor() shouldBe FlowProcess.ExitCode(137) + } + } + + @Test fun `slow consumer`() = runTest2(autoCancel = true) { + val sharedSession = FlowShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + + val loop = 1000 + val expected = mutableListOf() + val output = mutableListOf() + + val session = sharedSession.first() + + (1..loop).forEach { + val data = "$it# ${UUID.randomUUID()}" + session.write("echo $data") + expected.add(data) + } + session.close() + + session.output.collect { output.add(it) } + + sharedSession.first().waitFor() shouldBe FlowProcess.ExitCode.OK + output shouldBe expected + output.size shouldBe loop + } + + @Test fun `blocking consumer`() = runTest2(autoCancel = true) { + val sharedSession = FlowShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + + val session = sharedSession.first() + val expectedSize = 1048576 * 2 + val outputData = StringBuffer() + val errorData = StringBuffer() + + session.write("head -c $expectedSize < /dev/urandom | base64") + session.write("head -c $expectedSize < /dev/urandom | base64 1>&2") + session.write("echo done") + session.write("echo done 1>&2") + + val job1 = launch(Dispatchers.IO) { + session.output.takeWhile { it != "done" }.collect { line -> + outputData.append(line) + } + log { "Job1 finished" } + } + val job2 = launch(Dispatchers.IO) { + session.error.takeWhile { it != "done" }.collect { line -> + errorData.append(line) + } + log { "Job2 finished" } + } + + listOf(job1, job2).joinAll() + + Base64.getDecoder().apply { + decode(outputData.toString()).size shouldBe expectedSize + decode(errorData.toString()).size shouldBe expectedSize + outputData shouldNotBe errorData + } + } +} \ No newline at end of file diff --git a/app-common-shell/src/test/java/eu/darken/flowshell/core/cmd/FlowCmdShellTest.kt b/app-common-shell/src/test/java/eu/darken/flowshell/core/cmd/FlowCmdShellTest.kt new file mode 100644 index 000000000..9ad9822c6 --- /dev/null +++ b/app-common-shell/src/test/java/eu/darken/flowshell/core/cmd/FlowCmdShellTest.kt @@ -0,0 +1,188 @@ +package eu.darken.flowshell.core.cmd + + +import eu.darken.flowshell.core.FlowShellDebug +import eu.darken.flowshell.core.process.FlowProcess +import eu.darken.sdmse.common.flow.replayingShare +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.longs.shouldBeGreaterThanOrEqual +import io.kotest.matchers.longs.shouldBeLessThan +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.joinAll +import kotlinx.coroutines.launch +import kotlinx.coroutines.plus +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import testhelpers.BaseTest +import testhelpers.coroutine.runTest2 + +class FlowCmdShellTest : BaseTest() { + @BeforeEach + fun setup() { + FlowShellDebug.isDebug = true + } + + @AfterEach + fun teardown() { + FlowShellDebug.isDebug = false + } + + @Test fun `base operation`() = runTest2(autoCancel = true) { + val sharedSession = FlowCmdShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + val session = sharedSession.first() + session.isAlive() shouldBe true + + val cmd = FlowCmd( + "echo output test", + "echo error test >&2", + ) + session.execute(cmd).apply { + original shouldBe cmd + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("output test") + errors shouldBe listOf("error test") + } + + session.close() + session.isAlive() shouldBe false + } + + @Test fun `quick execute`() = runTest { + FlowCmd("echo 123").execute().apply { + output shouldBe listOf("123") + exitCode shouldBe FlowProcess.ExitCode.OK + } + } + + @Test fun `closing session aborts command`() = runTest2(autoCancel = true) { + val sharedSession = FlowCmdShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + val session = sharedSession.first() + + launch { + shouldThrow { + FlowCmd("sleep 3").execute(session) + } + } + + session.close() + } + + @Test fun `killing session aborts command`() = runTest2(autoCancel = true) { + val sharedSession = FlowCmdShell().session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + val session = sharedSession.first() + + launch { + shouldThrow { + FlowCmd("sleep 3").execute(session) + } + } + + session.cancel() + } + + @Test fun `queued commands`() = runTest2(autoCancel = true) { + FlowCmdShell().session.collect { session -> + session.counter shouldBe 0 + (1..1000).forEach { + FlowCmd( + "echo output$it", + "echo error$it >&2", + ).execute(session).apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("output$it") + errors shouldBe listOf("error$it") + } + session.counter shouldBe it + } + session.counter shouldBe 1000 + session.close() + } + } + + @Test fun `race command commands`() = runTest2(autoCancel = true) { + FlowCmdShell().session.collect { session -> + session.counter shouldBe 0 + (1..1000) + .map { + launch(Dispatchers.IO) { + delay(5) + FlowCmd( + "echo output$it", + "echo error$it >&2", + ).execute(session).apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("output$it") + errors shouldBe listOf("error$it") + } + } + } + .toList() + .joinAll() + session.counter shouldBe 1000 + session.close() + } + } + + @Test fun `commands can be timeoutted`(): Unit = runBlocking { + val start = System.currentTimeMillis() + + shouldThrow { + withTimeout(1000) { + FlowCmd("sleep 3", "echo done").execute().apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("done") + } + } + } + (System.currentTimeMillis() - start) shouldBeGreaterThanOrEqual 500 + (System.currentTimeMillis() - start) shouldBeLessThan 3000 + } + + @Test fun `open session extension`() = runTest2(autoCancel = true) { + val (session, job) = FlowCmdShell().openSession(this) + + FlowCmd("echo done").execute(session).apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("done") + } + } + + @Test fun `cancellation behavior`() = runTest2(autoCancel = true) { + val (session, job) = FlowCmdShell().openSession(this) + + shouldThrow { + withTimeout(500) { + FlowCmd("sleep 3", "echo nope").execute(session).apply { + exitCode shouldBe FlowProcess.ExitCode.OK + } + } + } + + FlowCmd("echo done").execute(session).apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("done") + } + } + + @Test fun `direct execution behavior`() = runTest { + val start = System.currentTimeMillis() + FlowCmd("sleep 1", "echo done").execute().apply { + exitCode shouldBe FlowProcess.ExitCode.OK + output shouldBe listOf("done") + } + (System.currentTimeMillis() - start) shouldBeGreaterThanOrEqual 1000 + } + +} \ No newline at end of file diff --git a/app-common-shell/src/test/java/eu/darken/flowshell/core/process/FlowProcessTest.kt b/app-common-shell/src/test/java/eu/darken/flowshell/core/process/FlowProcessTest.kt new file mode 100644 index 000000000..5628c8adb --- /dev/null +++ b/app-common-shell/src/test/java/eu/darken/flowshell/core/process/FlowProcessTest.kt @@ -0,0 +1,268 @@ +package eu.darken.flowshell.core.process + + +import eu.darken.flowshell.core.FlowShellDebug +import eu.darken.sdmse.common.debug.logging.log +import eu.darken.sdmse.common.flow.replayingShare +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.matchers.longs.shouldBeGreaterThan +import io.kotest.matchers.longs.shouldBeGreaterThanOrEqual +import io.kotest.matchers.longs.shouldBeLessThan +import io.kotest.matchers.shouldBe +import io.kotest.matchers.shouldNotBe +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.filterNotNull +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.plus +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import testhelpers.BaseTest +import testhelpers.coroutine.runTest2 +import java.io.IOException + +class FlowProcessTest : BaseTest() { + @BeforeEach + fun setup() { + FlowShellDebug.isDebug = true + } + + @AfterEach + fun teardown() { + FlowShellDebug.isDebug = false + } + + @Test fun `base opening`() = runTest { + var opened = false + var killed = false + val flow = FlowProcess( + launch = { + opened = true + ProcessBuilder("sh", "-c", "echo 'Error' 1>&2; echo 'Input'; sleep 1").start() + }, + kill = { + killed = true + it.destroyForcibly() + } + ) + + flow.session.first() + + opened shouldBe true + killed shouldBe true + } + + @Test fun `session waits`() = runTest { + var started = -1L + var stopped = -1L + val flow = FlowProcess( + launch = { + ProcessBuilder("sleep", "1").start().also { + started = System.currentTimeMillis() + } + }, + kill = { + stopped = System.currentTimeMillis() + log { "Killing process" } + it.destroyForcibly() + log { "Process killed" } + } + ) + + log { "Waiting for exit code" } + flow.session.collect { + it.waitFor() shouldBe FlowProcess.ExitCode.OK + } + + (stopped - started) shouldBeGreaterThanOrEqual 1000 + } + + @Test fun `session stays open`() = runTest { + val flow = FlowProcess( + launch = { + ProcessBuilder("sh").start() + }, + ) + var session: FlowProcess.Session? = null + flow.session.onEach { session = it }.launchIn(this) + delay(100) + session shouldNotBe null + + val writer = session!!.input.buffered().bufferedWriter() + + (1..100).forEach { + writer.write("echo hi$it\n") + writer.flush() + } + + session!!.isAlive() shouldBe true + + writer.write("exit\n") + writer.flush() + + log { "Waiting for exit code" } + session!!.waitFor() shouldBe FlowProcess.ExitCode.OK + } + + @Test fun `wait for blocks until exit`() = runTest2(autoCancel = true) { + val sharedSession = FlowProcess( + launch = { ProcessBuilder("sleep", "1").start() }, + ).session.replayingShare(this) + sharedSession.launchIn(this + Dispatchers.IO) + + sharedSession.first().apply { + val start = System.currentTimeMillis() + waitFor() shouldBe FlowProcess.ExitCode.OK + val stop = System.currentTimeMillis() + stop - start shouldBeGreaterThan 900L + } + } + + @Test fun `session can be killed`() = runTest { + var start = 0L + val flow = FlowProcess( + launch = { + ProcessBuilder("sleep", "3").start().also { + start = System.currentTimeMillis() + } + }, + ) + + flow.session.collect { + it.cancel() + it.waitFor() shouldBe FlowProcess.ExitCode(137) + } + System.currentTimeMillis() - start shouldBeLessThan 2000 + } + + @Test fun `session is killed on scope cancel`() = runTest { + var started = -1L + var stopped = -1L + + val flow = FlowProcess( + launch = { + log { "Launching process" } + ProcessBuilder("sleep", "3").start().also { + log { "Process launched" } + started = System.currentTimeMillis() + } + }, + kill = { + log { "Killing process" } + it.destroyForcibly() + log { "Process killed" } + stopped = System.currentTimeMillis() + } + ) + + log { "Waiting for exit code" } + flow.session.first().exitCode.filterNotNull().first() shouldBe FlowProcess.ExitCode(137) + + (stopped - started) shouldBeLessThan 3000 + } + + @Test fun `exception on close`() = runTest { + val flow = FlowProcess( + launch = { + log { "Launching process" } + ProcessBuilder("sleep", "1").start().also { + log { "Process launched" } + } + }, + kill = { + log { "Killing process" } + throw IOException("test") + } + ) + + log { "Waiting for throw" } + shouldThrow { + flow.session.first() + } + log { "We threw :)" } + } + + @Test fun `exception on open`() = runTest { + val flow = FlowProcess( + launch = { + throw IOException("test") + }, + ) + + log { "Waiting for throw" } + shouldThrow { + flow.session.first() + } + } + + @Test fun `session is restartable`() = runTest { + var startCount = 0 + val flow = FlowProcess( + launch = { + ProcessBuilder("echo", "<3").start().also { + startCount++ + } + }, + ) + + log { "Waiting for exit code (launch #1)" } + flow.session.collect { + it.waitFor() shouldBe FlowProcess.ExitCode.OK + } + startCount shouldBe 1 + + log { "Waiting for exit code (launch #2)" } + flow.session.collect { + it.waitFor() shouldBe FlowProcess.ExitCode.OK + } + startCount shouldBe 2 + } + + @Test fun `session is kill and restartable`() = runTest { + var startCount = 0 + val flow = FlowProcess( + launch = { + ProcessBuilder("sleep", "1").start().also { + startCount++ + } + }, + ) + + // Immediately ends the scope after the emission + log { "Starting and killing (launch #1)" } + flow.session.first().exitCode.first() shouldNotBe FlowProcess.ExitCode.OK + startCount shouldBe 1 + + log { "Waiting for exit code (launch #2)" } + flow.session.collect { + it.waitFor() shouldBe FlowProcess.ExitCode.OK + } + startCount shouldBe 2 + } + + @Test fun `session is killed via pid`() = runTest { + var opened = false + var killed = false + val flow = FlowProcess( + launch = { + opened = true + ProcessBuilder("sh").start() + }, + kill = { + it.killViaPid() + killed = true + } + ) + + flow.session.first().apply { + waitFor() shouldBe FlowProcess.ExitCode(137) + } + + opened shouldBe true + killed shouldBe true + } +} \ No newline at end of file diff --git a/app-common-shell/src/test/java/test/app/app_shell/ExampleUnitTest.kt b/app-common-shell/src/test/java/test/app/app_shell/ExampleUnitTest.kt deleted file mode 100644 index 2c2c257e3..000000000 --- a/app-common-shell/src/test/java/test/app/app_shell/ExampleUnitTest.kt +++ /dev/null @@ -1,16 +0,0 @@ -package test.app.app_shell - -import org.junit.Assert.assertEquals -import org.junit.Test - -/** - * Example local unit test, which will execute on the development machine (host). - * - * See [testing documentation](http://d.android.com/tools/testing). - */ -class ExampleUnitTest { - @Test - fun addition_isCorrect() { - assertEquals(4, 2 + 2) - } -} \ No newline at end of file diff --git a/app-common/build.gradle.kts b/app-common/build.gradle.kts index 2fc52c2e3..e3c43aefd 100644 --- a/app-common/build.gradle.kts +++ b/app-common/build.gradle.kts @@ -50,6 +50,4 @@ dependencies { addCoil() addLottie() - - implementation("com.github.d4rken.rxshell:core:v3.0.0") } \ No newline at end of file diff --git a/app-common/src/main/java/eu/darken/sdmse/common/JUnitLogger.kt b/app-common/src/main/java/eu/darken/sdmse/common/JUnitLogger.kt index 9a62410d8..0e768b1d9 100644 --- a/app-common/src/main/java/eu/darken/sdmse/common/JUnitLogger.kt +++ b/app-common/src/main/java/eu/darken/sdmse/common/JUnitLogger.kt @@ -1,13 +1,18 @@ package eu.darken.sdmse.common import eu.darken.sdmse.common.debug.logging.Logging +import java.time.Duration +import java.time.Instant class JUnitLogger(private val minLogLevel: Logging.Priority = Logging.Priority.VERBOSE) : Logging.Logger { + private val startTime = Instant.now() override fun isLoggable(priority: Logging.Priority): Boolean = priority.intValue >= minLogLevel.intValue override fun log(priority: Logging.Priority, tag: String, message: String, metaData: Map?) { - println("${System.currentTimeMillis()} ${priority.shortLabel}/$tag: $message") + val now = Instant.now() + val seconds = Duration.between(startTime, now).seconds + println("${now.toEpochMilli()} ($seconds) ${priority.shortLabel}/$tag: $message") } } diff --git a/app-common/src/main/java/eu/darken/sdmse/common/serialization/ValueBasedPolyJsonAdapterFactory.kt b/app-common/src/main/java/eu/darken/sdmse/common/serialization/ValueBasedPolyJsonAdapterFactory.kt index 255e65cb7..e99e2b934 100644 --- a/app-common/src/main/java/eu/darken/sdmse/common/serialization/ValueBasedPolyJsonAdapterFactory.kt +++ b/app-common/src/main/java/eu/darken/sdmse/common/serialization/ValueBasedPolyJsonAdapterFactory.kt @@ -3,10 +3,10 @@ package eu.darken.sdmse.common.serialization import com.squareup.moshi.* -import io.reactivex.rxjava3.annotations.CheckReturnValue import java.io.IOException import java.lang.reflect.Type import java.util.* +import javax.annotation.CheckReturnValue class ValueBasedPolyJsonAdapterFactory internal constructor( val baseType: Class, diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 9a211a3e5..b7d9fc1f4 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -169,6 +169,7 @@ dependencies { implementation(project(":app-common-adb")) implementation(project(":app-common-io")) implementation(project(":app-common-pkgs")) + implementation(project(":app-common-shell")) addDI() addCoroutines()