Skip to content

Commit

Permalink
feat(core): make startStream and stopStream suspendable in case t…
Browse files Browse the repository at this point in the history
…here is a network issue.
  • Loading branch information
ThibaultBee committed Dec 3, 2023
1 parent 795698e commit 4aaa1ef
Show file tree
Hide file tree
Showing 25 changed files with 138 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ abstract class AudioOnlyStreamerTestCase :
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.release()
} catch (e: Exception) {
Log.e(TAG, "defaultUsageTest: exception: ", e)
Expand All @@ -58,8 +58,8 @@ abstract class AudioOnlyStreamerTestCase :
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.release()
} catch (e: Exception) {
Log.e(TAG, "defaultUsageTest2: exception: ", e)
Expand Down Expand Up @@ -149,7 +149,9 @@ abstract class AudioOnlyStreamerTestCase :
streamer.configure(
AndroidUtils.fakeValidAudioConfig()
)
streamer.stopStream()
runBlocking {
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "configureStopStreamTest: exception: ", e)
fail("Must be possible to configure/stopStream but catches exception: $e")
Expand Down Expand Up @@ -180,8 +182,8 @@ abstract class AudioOnlyStreamerTestCase :
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
} catch (e: Exception) {
Log.e(TAG, "startStreamStopStreamTest: exception: ", e)
fail("Must be possible to startStream/stopStream but catches exception: $e")
Expand All @@ -197,8 +199,8 @@ abstract class AudioOnlyStreamerTestCase :
(0..10).forEach { _ ->
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ abstract class CameraStreamerTestCase :
streamer.startPreview(surface)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.stopPreview()
streamer.release()
} catch (e: Exception) {
Expand All @@ -80,8 +80,8 @@ abstract class CameraStreamerTestCase :
streamer.startPreview(surface)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.release()
} catch (e: Exception) {
Log.e(TAG, "defaultUsageTest2: exception: ", e)
Expand Down Expand Up @@ -161,7 +161,9 @@ abstract class CameraStreamerTestCase :
AndroidUtils.fakeValidVideoConfig()
)
streamer.startPreview(surface)
streamer.stopStream()
runBlocking {
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "startPreviewStopStreamTest: exception: ", e)
fail("Must be possible to startPreview/stopStream but catches exception: $e")
Expand Down Expand Up @@ -216,8 +218,8 @@ abstract class CameraStreamerTestCase :
streamer.startPreview(surface)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
} catch (e: Exception) {
Log.e(TAG, "startStreamStopStreamTest: exception: ", e)
fail("Must be possible to startStream/stopStream but catches exception: $e")
Expand Down Expand Up @@ -252,8 +254,8 @@ abstract class CameraStreamerTestCase :
(0..10).forEach { _ ->
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ abstract class StreamerTestCase {
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.release()
} catch (e: Exception) {
Log.e(TAG, "defaultUsageTest: exception: ", e)
Expand All @@ -64,8 +64,8 @@ abstract class StreamerTestCase {
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
streamer.release()
} catch (e: Exception) {
Log.e(TAG, "defaultUsageTest2: exception: ", e)
Expand Down Expand Up @@ -137,7 +137,9 @@ abstract class StreamerTestCase {
@Test
fun stopStreamTest() {
try {
streamer.stopStream()
runBlocking {
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "stopStreamTest: exception: ", e)
fail("Must be possible to only stopStream without exception: $e")
Expand Down Expand Up @@ -191,7 +193,9 @@ abstract class StreamerTestCase {
AndroidUtils.fakeValidAudioConfig(),
AndroidUtils.fakeValidVideoConfig()
)
streamer.stopStream()
runBlocking {
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "configureStopStreamTest: exception: ", e)
fail("Must be possible to configure/stopStream but catches exception: $e")
Expand Down Expand Up @@ -224,8 +228,8 @@ abstract class StreamerTestCase {
)
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
} catch (e: Exception) {
Log.e(TAG, "startStreamStopStreamTest: exception: ", e)
fail("Must be possible to startStream/stopStream but catches exception: $e")
Expand Down Expand Up @@ -258,8 +262,8 @@ abstract class StreamerTestCase {
(0..10).forEach { _ ->
runBlocking {
streamer.startStream()
streamer.stopStream()
}
streamer.stopStream()
}
} catch (e: Exception) {
Log.e(TAG, "multipleStartStreamStopStreamTest: exception: ", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
*/
package io.github.thibaultbee.streampack.internal.encoders

import io.github.thibaultbee.streampack.internal.interfaces.Configurable
import io.github.thibaultbee.streampack.internal.interfaces.Releaseable
import io.github.thibaultbee.streampack.internal.interfaces.Streamable

interface IEncoder<T> : Streamable<T> {
interface IEncoder<T> : Streamable, Configurable<T>, Releaseable {
/**
* Input and output of an async encoder
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ import io.github.thibaultbee.streampack.logger.Logger
* A fake endpoint for test purpose.
*/
class FakeEndpoint : IEndpoint {
override fun startStream() {
Logger.d(TAG, "startStream called")
}

override fun configure(config: Int) {
Logger.d(TAG, "configure called with bitrate = $config")
}
Expand All @@ -34,7 +30,11 @@ class FakeEndpoint : IEndpoint {
Logger.d(TAG, "write called (packet size = ${packet.buffer.remaining()})")
}

override fun stopStream() {
override suspend fun startStream() {
Logger.d(TAG, "startStream called")
}

override suspend fun stopStream() {
Logger.d(TAG, "stopStream called")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class FileWriter : IEndpoint {

var outputStream: OutputStream? = null

override fun startStream() {
override suspend fun startStream() {
if (outputStream == null) {
throw UnsupportedOperationException("Set a file before trying to write it")
}
Expand All @@ -49,7 +49,7 @@ class FileWriter : IEndpoint {
?: throw UnsupportedOperationException("Set a file before trying to write it")
}

override fun stopStream() {
override suspend fun stopStream() {
outputStream?.close()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,11 @@
package io.github.thibaultbee.streampack.internal.endpoints

import io.github.thibaultbee.streampack.internal.data.Packet
import io.github.thibaultbee.streampack.internal.interfaces.Streamable
import io.github.thibaultbee.streampack.internal.interfaces.Configurable
import io.github.thibaultbee.streampack.internal.interfaces.Releaseable
import io.github.thibaultbee.streampack.internal.interfaces.SuspendStreamable

interface IEndpoint : Streamable<Int> {

/**
* Configure endpoint bitrate, mainly for network endpoint.
* @param config bitrate at the beginning of the communication
*/
override fun configure(config: Int)
interface IEndpoint : SuspendStreamable, Configurable<Int>, Releaseable {

/**
* Writes a buffer to endpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,45 @@
*/
package io.github.thibaultbee.streampack.internal.interfaces

interface Streamable<T> {
interface Streamable {
/**
* Configure the [Streamable] implementation.
*
* @param config [Streamable] implementation configuration
* Starts frames or data stream generation
* Throws an exception if not ready for live stream
*/
fun configure(config: T)
fun startStream()

/**
* Stops frames or data stream generation
*/
fun stopStream()
}

/**
* Same as [Streamable] but with suspend functions.
*/
interface SuspendStreamable {
/**
* Starts frames or data stream generation
* Throws an exception if not ready for live stream
*/
fun startStream()
suspend fun startStream()

/**
* Stops frames or data stream generation
*/
fun stopStream()
suspend fun stopStream()
}

interface Configurable<T> {
/**
* Configure the [Configurable] implementation.
*
* @param config [Configurable] implementation configuration
*/
fun configure(config: T)
}

interface Releaseable {
/**
* Closes and releases resources
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package io.github.thibaultbee.streampack.internal.muxers
import io.github.thibaultbee.streampack.data.Config
import io.github.thibaultbee.streampack.internal.data.Frame
import io.github.thibaultbee.streampack.internal.interfaces.ISourceOrientationProvider
import io.github.thibaultbee.streampack.internal.interfaces.Releaseable
import io.github.thibaultbee.streampack.internal.interfaces.Streamable

interface IMuxer : Streamable<Unit> {
interface IMuxer : Streamable, Releaseable {
val helper: IMuxerHelper

var sourceOrientationProvider: ISourceOrientationProvider?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,6 @@ class FlvMuxer(
return streamMap
}

override fun configure(config: Unit) {
// Nothing to configure
}

override fun startStream() {
// Header
if (writeToFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ class MP4Muxer(
return streamMap
}

override fun configure(config: Unit) {
}

override fun startStream() {
writeBuffer(FileTypeBox().toByteBuffer())
currentSegment = createNewSegment(MovieBoxFactory(timescale))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,6 @@ class TSMuxer(
service.streams.clear()
}

override fun configure(config: Unit) {
// Nothing to configure
}

override fun startStream() {
// Nothing to start
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package io.github.thibaultbee.streampack.internal.sources

import io.github.thibaultbee.streampack.internal.data.Frame
import io.github.thibaultbee.streampack.internal.interfaces.Configurable
import io.github.thibaultbee.streampack.internal.interfaces.Releaseable
import io.github.thibaultbee.streampack.internal.interfaces.Streamable
import java.nio.ByteBuffer

interface IFrameSource<T> : Streamable<T> {
interface IFrameSource<T> : Streamable, Configurable<T>, Releaseable {

/**
* Generate a frame from capture device
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ package io.github.thibaultbee.streampack.internal.sources

import android.view.Surface
import io.github.thibaultbee.streampack.data.VideoConfig
import io.github.thibaultbee.streampack.internal.interfaces.Configurable
import io.github.thibaultbee.streampack.internal.interfaces.Releaseable
import io.github.thibaultbee.streampack.internal.interfaces.Streamable

interface ISurfaceSource : Streamable<VideoConfig> {
interface ISurfaceSource : Streamable, Configurable<VideoConfig>, Releaseable {
/**
* The offset between source capture time and MONOTONIC clock. It is used to synchronize video
* with audio. It is only useful for camera source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import androidx.lifecycle.LifecycleOwner
import io.github.thibaultbee.streampack.streamers.interfaces.IStreamer
import io.github.thibaultbee.streampack.utils.getCameraStreamer
import io.github.thibaultbee.streampack.utils.getLiveStreamer
import kotlinx.coroutines.runBlocking

/**
* Add [DefaultLifecycleObserver] to a streamer.
Expand All @@ -35,7 +36,9 @@ import io.github.thibaultbee.streampack.utils.getLiveStreamer
open class StreamerLifeCycleObserver(var streamer: IStreamer) : DefaultLifecycleObserver {
override fun onPause(owner: LifecycleOwner) {
streamer.getCameraStreamer()?.stopPreview()
streamer.stopStream()
runBlocking {
streamer.stopStream()
}
streamer.getLiveStreamer()?.let {
if (it.isConnected) {
it.disconnect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ open class BaseCameraStreamer(
* @see [startPreview]
*/
override fun stopPreview() {
stopStream()
runBlocking {
stopStream()
}
cameraSource.stopPreview()
}

Expand Down
Loading

0 comments on commit 4aaa1ef

Please sign in to comment.