From 52f7c4253c99dac9f4fca5c80402e1d3122c7ddf Mon Sep 17 00:00:00 2001 From: ThibaultBee <37510686+ThibaultBee@users.noreply.github.com> Date: Sat, 25 Nov 2023 12:07:33 +0100 Subject: [PATCH] feat(core): refactor ChunkedFileOutputStream so it cuts the frame at the exact byte --- .../utils/ChunkedFileOutputStream.kt | 191 ++++++++++++++++ .../streampack/utils/MultiFileOutputStream.kt | 168 -------------- .../utils/ChunkedFileOutputStreamTest.kt | 205 ++++++++++++++++++ .../utils/MultiFileOutputStreamTest.kt | 107 --------- .../streampack/app/utils/StreamerManager.kt | 2 +- 5 files changed, 397 insertions(+), 276 deletions(-) create mode 100644 core/src/main/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStream.kt delete mode 100644 core/src/main/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStream.kt create mode 100644 core/src/test/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStreamTest.kt delete mode 100644 core/src/test/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStreamTest.kt diff --git a/core/src/main/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStream.kt b/core/src/main/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStream.kt new file mode 100644 index 000000000..5d6ea5766 --- /dev/null +++ b/core/src/main/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStream.kt @@ -0,0 +1,191 @@ +/* + * Copyright (C) 2022 Thibault B. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.github.thibaultbee.streampack.utils + +import io.github.thibaultbee.streampack.streamers.interfaces.IFileStreamer +import java.io.File +import java.io.FileOutputStream +import java.io.OutputStream + +/** + * A class that allows to write into multiple files. + * It is useful when you want to upload a file to a server but you don't want to wait for the record + * to finish before. + * Use this class as an [OutputStream] of a [IFileStreamer]. + * + * @param filesDir the directory where the files will be written + * @param chunkSize the size of each file in bytes + * @param chunkNameGenerator generate the name of each file from its index + * @param listener the listener that will be called when a file is created + */ +class ChunkedFileOutputStream( + val filesDir: File, + private val chunkSize: Int, + private val chunkNameGenerator: (Int) -> String = { id -> "chunk_$id" }, + private val listener: Listener = object : Listener {} +) : OutputStream() { + private var currentFileBytesWritten = 0 + private var totalBytesWritten = 0 + + private var _isClosed = false + + private var _numOfFiles: Int = 0 + + /** + * Get the number of files written. + */ + val numOfFiles: Int + get() = _numOfFiles + + private val fileId: Int + get() = numOfFiles - 1 + + + private var outputStream: FileOutputStream? = null + + /** + * Get if the stream is closed. + */ + val isClosed: Boolean + get() = _isClosed + + init { + require(chunkSize > 0) { "Part size must be greater than 0" } + require(filesDir.isDirectory) { "Files directory must be a directory" } + require(filesDir.canWrite()) { "Files directory must be writable" } + } + + private fun getFile(): File { + return File(filesDir, chunkNameGenerator(fileId)) + } + + private fun closeOutputStream(outputStream: FileOutputStream, isLast: Boolean) { + outputStream.close() + listener.onFileClosed( + fileId, + isLast, + getFile() + ) + } + + private fun getOutputStream(): FileOutputStream { + if ((currentFileBytesWritten >= chunkSize) || (outputStream == null)) { + // Close current stream + outputStream?.let { + closeOutputStream(it, false) + } + + // Prepare a new stream + currentFileBytesWritten = 0 + _numOfFiles++ + + outputStream = FileOutputStream(getFile()) + } + return outputStream!! + } + + /** + * Write [i] to the stream. + * + * @param i the byte to write + */ + override fun write(i: Int) { + if (_isClosed) { + throw IllegalStateException("Stream is closed") + } + + synchronized(this) { + val outputStream = getOutputStream() + outputStream.write(i) + } + + currentFileBytesWritten++ + totalBytesWritten++ + } + + /** + * Write [b] to the stream. + * + * @param b the byte to write + */ + override fun write(b: ByteArray) { + write(b, 0, b.size) + } + + /** + * Write [len] bytes from [b] starting at [offset]. + * + * @param b the bytes to write + * @param offset the offset in the output stream + * @param len the number of bytes to write + */ + override fun write(b: ByteArray, offset: Int, len: Int) { + if (_isClosed) { + throw IllegalStateException("Stream is closed") + } + + var remainingBytes = len + var numOfBytesWritten = 0 + synchronized(this) { + while (remainingBytes > 0) { + val outputStream = getOutputStream() + val bytesToWrite = minOf(remainingBytes, chunkSize - currentFileBytesWritten) + + outputStream.write(b, offset + numOfBytesWritten, bytesToWrite) + + currentFileBytesWritten += bytesToWrite + totalBytesWritten += bytesToWrite + numOfBytesWritten += bytesToWrite + remainingBytes -= bytesToWrite + } + } + } + + /** + * Close the stream. + * This will close the current file and call [Listener.onFileClosed] with the last file. + */ + override fun close() { + if (_isClosed) { + return + } + _isClosed = true + + outputStream?.let { + closeOutputStream(it, true) + } + } + + override fun flush() { + outputStream?.flush() + } + + /** + * Listener for [ChunkedFileOutputStream] + */ + interface Listener { + /** + * Called when a file has been closed. + * It means that the file is ready to be read and won't be used anymore for the stream. + * You can use the file as you please like uploading it to a server. + * + * @param index the index of the file + * @param isLast true if this is the last file + * @param file the file + */ + fun onFileClosed(index: Int, isLast: Boolean, file: File) {} + } +} \ No newline at end of file diff --git a/core/src/main/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStream.kt b/core/src/main/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStream.kt deleted file mode 100644 index a42fe966c..000000000 --- a/core/src/main/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStream.kt +++ /dev/null @@ -1,168 +0,0 @@ -/* - * Copyright (C) 2022 Thibault B. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.github.thibaultbee.streampack.utils - -import io.github.thibaultbee.streampack.streamers.interfaces.IFileStreamer -import java.io.File -import java.io.FileOutputStream -import java.io.OutputStream - -/** - * A class that allows to write to multiple files. - * If you are looking to write the video or audio in multiple files, use this class - * as an [OutputStream] for a [IFileStreamer]. - * - * @param filesDir the directory where the files will be written - * @param partSize the size of each file - * @param namePrefix the prefix of each file - * @param listener the listener that will be called when a file is created - */ -class MultiFileOutputStream( - val filesDir: File, - private val partSize: Long, - private val namePrefix: String = "part_", - private val listener: Listener -) : OutputStream() { - private var currentFileBytesWritten = 0L - private var bytesWritten = 0 - - private var _isClosed = false - - /** - * Get if the stream is closed. - */ - val isClosed: Boolean - get() = _isClosed - - private var _outputStream: FileOutputStream? = null - private val outputStream: FileOutputStream - get() { - if (_isClosed) { - throw IllegalStateException("Stream is closed") - } - synchronized(this) { - if ((_outputStream == null) || (currentFileBytesWritten >= partSize)) { - _outputStream?.let { - it.close() - listener.onFileCreated( - numOfFileWritten, - false, - getFile(numOfFileWritten) - ) - } - - currentFileBytesWritten = 0 - _numOfFileWritten++ - - _outputStream = FileOutputStream(getFile(numOfFileWritten)) - } - return _outputStream!! - } - } - - private var _numOfFileWritten: Int = 0 - /** - * Get the number of files written. - */ - val numOfFileWritten: Int - get() = _numOfFileWritten - - init { - require(partSize > 0) { "Part size must be greater than 0" } - require(filesDir.isDirectory) { "Files directory must be a directory" } - require(filesDir.canWrite()) { "Files directory must be writable" } - } - - private fun getFile(fileIndex: Int): File { - return File(filesDir, "$namePrefix$fileIndex") - } - - /** - * Write [b] to the stream. - * - * @param b the byte to write - */ - override fun write(b: Int) { - outputStream.write(b) - currentFileBytesWritten++ - bytesWritten++ - } - - /** - * Write [b] to the stream. - * - * @param b the byte to write - */ - override fun write(b: ByteArray) { - outputStream.write(b) - currentFileBytesWritten += b.size - bytesWritten += b.size - } - - /** - * Write [len] bytes from [b] starting at [off]. - * - * @param b the bytes to write - * @param off the offset in [b] to start writing - * @param len the number of bytes to write - */ - override fun write(b: ByteArray, off: Int, len: Int) { - outputStream.write(b, off, len) - currentFileBytesWritten += len - bytesWritten += len - } - - /** - * Close the stream. - * This will close the current file and call [Listener.onFileCreated] with the last file. - */ - override fun close() { - if (_isClosed) { - return - } - _isClosed = true - _outputStream?.let { - it.close() - listener.onFileCreated(numOfFileWritten, true, getFile(numOfFileWritten)) - } - _outputStream = null - } - - override fun flush() { - _outputStream?.flush() - } - - /** - * Delete all files - */ - fun delete() { - filesDir.deleteRecursively() - } - - /** - * Listener for [MultiFileOutputStream] - */ - interface Listener { - /** - * Called when a file is created. - * - * @param index the index of the file - * @param isLast true if this is the last file - * @param file the file - */ - fun onFileCreated(index: Int, isLast: Boolean, file: File) {} - } -} \ No newline at end of file diff --git a/core/src/test/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStreamTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStreamTest.kt new file mode 100644 index 000000000..674f577da --- /dev/null +++ b/core/src/test/java/io/github/thibaultbee/streampack/utils/ChunkedFileOutputStreamTest.kt @@ -0,0 +1,205 @@ +package io.github.thibaultbee.streampack.utils + +import org.junit.After +import org.junit.Assert +import org.junit.Rule +import org.junit.Test +import org.junit.rules.TemporaryFolder +import java.io.File +import java.util.concurrent.CountDownLatch + + +class ChunkedFileOutputStreamTest { + @get:Rule + val rootFolder: TemporaryFolder = TemporaryFolder() + private lateinit var chunkFileOutputStream: ChunkedFileOutputStream + + @After + fun tearDown() { + chunkFileOutputStream.close() + } + + @Test + fun `write data larger than chunk`() { + val chunkReadyCountDownLatch = CountDownLatch(3) + val isLastChunkCountDownLatch = CountDownLatch(1) + var nextChunkId = 0 + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + Assert.assertEquals(nextChunkId, index) + chunkReadyCountDownLatch.countDown() + if (isLast) { + isLastChunkCountDownLatch.countDown() + } + nextChunkId = index + 1 + } + } + + val folder = rootFolder.newFolder() + chunkFileOutputStream = + ChunkedFileOutputStream(folder, 2, listener = listener) + + chunkFileOutputStream.write(byteArrayOf(1, 2, 3)) + chunkFileOutputStream.write(byteArrayOf(4, 5, 6, 7)) + + chunkFileOutputStream.close() + + // Check listener + Assert.assertEquals(0, chunkReadyCountDownLatch.count) + Assert.assertEquals(0, isLastChunkCountDownLatch.count) + Assert.assertEquals(4, nextChunkId) + + // Check files + Assert.assertEquals(4, folder.listFiles()?.size) + Assert.assertArrayEquals(byteArrayOf(1, 2), File(folder, "chunk_0").readBytes()) + Assert.assertArrayEquals(byteArrayOf(3, 4), File(folder, "chunk_1").readBytes()) + Assert.assertArrayEquals(byteArrayOf(5, 6), File(folder, "chunk_2").readBytes()) + Assert.assertArrayEquals(byteArrayOf(7), File(folder, "chunk_3").readBytes()) + } + + @Test + fun `write data == chunk`() { + val countDownLatch = CountDownLatch(4) + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + countDownLatch.countDown() + } + } + + val folder = rootFolder.newFolder() + chunkFileOutputStream = + ChunkedFileOutputStream(folder, 2, listener = listener) + + chunkFileOutputStream.write(byteArrayOf(1, 2)) + chunkFileOutputStream.write(byteArrayOf(3, 4)) + chunkFileOutputStream.write(byteArrayOf(5, 6)) + + chunkFileOutputStream.close() // Must not create an empty chunk + + // Check listener + Assert.assertEquals(1, countDownLatch.count) + + // Check files + Assert.assertEquals(3, folder.listFiles()?.size) + Assert.assertArrayEquals(byteArrayOf(1, 2), File(folder, "chunk_0").readBytes()) + Assert.assertArrayEquals(byteArrayOf(3, 4), File(folder, "chunk_1").readBytes()) + Assert.assertArrayEquals(byteArrayOf(5, 6), File(folder, "chunk_2").readBytes()) + } + + @Test + fun `write data smaller than chunk`() { + val chunkReadyCountDownLatch = CountDownLatch(3) + val isLastChunkCountDownLatch = CountDownLatch(1) + var nextChunkId = 0 + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + Assert.assertEquals(nextChunkId, index) + chunkReadyCountDownLatch.countDown() + if (isLast) { + isLastChunkCountDownLatch.countDown() + } + nextChunkId = index + 1 + } + } + + val folder = rootFolder.newFolder() + chunkFileOutputStream = + ChunkedFileOutputStream(folder, 2, listener = listener) + + chunkFileOutputStream.write(byteArrayOf(1)) + chunkFileOutputStream.write(byteArrayOf(2)) + chunkFileOutputStream.write(byteArrayOf(3)) + chunkFileOutputStream.write(byteArrayOf(4)) + chunkFileOutputStream.write(byteArrayOf(5)) + chunkFileOutputStream.write(byteArrayOf(6)) + chunkFileOutputStream.write(byteArrayOf(7)) + + chunkFileOutputStream.close() + + // Check listener + Assert.assertEquals(0, chunkReadyCountDownLatch.count) + Assert.assertEquals(0, isLastChunkCountDownLatch.count) + Assert.assertEquals(4, nextChunkId) + + // Check files + Assert.assertEquals(4, folder.listFiles()?.size) + Assert.assertArrayEquals(byteArrayOf(1, 2), File(folder, "chunk_0").readBytes()) + Assert.assertArrayEquals(byteArrayOf(3, 4), File(folder, "chunk_1").readBytes()) + Assert.assertArrayEquals(byteArrayOf(5, 6), File(folder, "chunk_2").readBytes()) + Assert.assertArrayEquals(byteArrayOf(7), File(folder, "chunk_3").readBytes()) + } + + @Test + fun `write single int`() { + val countDownLatch = CountDownLatch(4) + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + countDownLatch.countDown() + } + } + + val folder = rootFolder.newFolder() + chunkFileOutputStream = + ChunkedFileOutputStream(folder, 2, listener = listener) + + chunkFileOutputStream.write(1) + chunkFileOutputStream.write(2) + chunkFileOutputStream.write(3) + chunkFileOutputStream.write(4) + chunkFileOutputStream.write(5) + + chunkFileOutputStream.close() // Must not create an empty chunk + + // Check listener + Assert.assertEquals(1, countDownLatch.count) + + // Check files + Assert.assertEquals(3, folder.listFiles()?.size) + Assert.assertArrayEquals(byteArrayOf(1, 2), File(folder, "chunk_0").readBytes()) + Assert.assertArrayEquals(byteArrayOf(3, 4), File(folder, "chunk_1").readBytes()) + Assert.assertArrayEquals(byteArrayOf(5), File(folder, "chunk_2").readBytes()) + } + + @Test + fun `multiple close test`() { + val countDownLatch = CountDownLatch(3) + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + countDownLatch.countDown() + } + } + + val folder = rootFolder.newFolder() + chunkFileOutputStream = + ChunkedFileOutputStream(folder, 16, listener = listener) + + chunkFileOutputStream.write(Utils.generateRandomArray(8)) + chunkFileOutputStream.write(Utils.generateRandomArray(16)) + chunkFileOutputStream.close() + chunkFileOutputStream.close() + chunkFileOutputStream.close() + chunkFileOutputStream.close() + + // Check listener + Assert.assertEquals(1, countDownLatch.count) + + // Check files + Assert.assertEquals(2, folder.listFiles()?.size) + } + + @Test + fun `close without writing data`() { + val countDownLatch = CountDownLatch(1) + val listener = object : ChunkedFileOutputStream.Listener { + override fun onFileClosed(index: Int, isLast: Boolean, file: File) { + countDownLatch.countDown() + } + } + chunkFileOutputStream = + ChunkedFileOutputStream(rootFolder.newFolder(), 16, listener = listener) + + chunkFileOutputStream.close() + + Assert.assertEquals(1, countDownLatch.count) + } +} \ No newline at end of file diff --git a/core/src/test/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStreamTest.kt b/core/src/test/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStreamTest.kt deleted file mode 100644 index bec31eadb..000000000 --- a/core/src/test/java/io/github/thibaultbee/streampack/utils/MultiFileOutputStreamTest.kt +++ /dev/null @@ -1,107 +0,0 @@ -package io.github.thibaultbee.streampack.utils - -import org.junit.After -import org.junit.Assert -import org.junit.Rule -import org.junit.Test -import org.junit.rules.TemporaryFolder -import java.io.File -import java.util.concurrent.CountDownLatch - - -class MultiFileOutputStreamTest { - @get:Rule - val rootFolder: TemporaryFolder = TemporaryFolder() - private lateinit var multiFileOutputStream: MultiFileOutputStream - - @After - fun tearDown() { - multiFileOutputStream.close() - } - - @Test - fun `write data`() { - val chunkReadyCountDownLatch = CountDownLatch(3) - val isLastChunkCountDownLatch = CountDownLatch(1) - var lastChunkId = 0 - val listener = object : MultiFileOutputStream.Listener { - override fun onFileCreated(index: Int, isLast: Boolean, file: File) { - Assert.assertEquals(lastChunkId + 1, index) - chunkReadyCountDownLatch.countDown() - if (isLast) { - isLastChunkCountDownLatch.countDown() - } - lastChunkId = index - } - } - multiFileOutputStream = - MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener) - - multiFileOutputStream.write(Utils.generateRandomArray(2048)) - multiFileOutputStream.write(Utils.generateRandomArray(2048)) - multiFileOutputStream.write(Utils.generateRandomArray(600)) - multiFileOutputStream.close() - - Assert.assertEquals(0, chunkReadyCountDownLatch.count) - Assert.assertEquals(0, isLastChunkCountDownLatch.count) - Assert.assertEquals(3, lastChunkId) - } - - @Test - fun `write data size == chunk size`() { - val countDownLatch = CountDownLatch(4) - val listener = object : MultiFileOutputStream.Listener { - override fun onFileCreated(index: Int, isLast: Boolean, file: File) { - countDownLatch.countDown() - } - } - multiFileOutputStream = - MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener) - - multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE)) - multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE)) - multiFileOutputStream.write(Utils.generateRandomArray(DEFAULT_CHUNK_SIZE)) - multiFileOutputStream.close() // Must not create an empty chunk - - Assert.assertEquals(1, countDownLatch.count) - } - - @Test - fun `multiple close test`() { - val countDownLatch = CountDownLatch(3) - val listener = object : MultiFileOutputStream.Listener { - override fun onFileCreated(index: Int, isLast: Boolean, file: File) { - countDownLatch.countDown() - } - } - multiFileOutputStream = - MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener) - multiFileOutputStream.write(Utils.generateRandomArray(2048)) - multiFileOutputStream.write(Utils.generateRandomArray(600)) - multiFileOutputStream.close() - multiFileOutputStream.close() - multiFileOutputStream.close() - multiFileOutputStream.close() - - Assert.assertEquals(1, countDownLatch.count) - } - - @Test - fun `close without writing data`() { - val countDownLatch = CountDownLatch(1) - val listener = object : MultiFileOutputStream.Listener { - override fun onFileCreated(index: Int, isLast: Boolean, file: File) { - countDownLatch.countDown() - } - } - multiFileOutputStream = - MultiFileOutputStream(rootFolder.newFolder(), DEFAULT_CHUNK_SIZE, "", listener) - multiFileOutputStream.close() - - Assert.assertEquals(1, countDownLatch.count) - } - - companion object { - const val DEFAULT_CHUNK_SIZE = 1024L - } -} \ No newline at end of file diff --git a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt index 69354fe9a..590c9ff30 100644 --- a/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt +++ b/demos/camera/src/main/java/io/github/thibaultbee/streampack/app/utils/StreamerManager.kt @@ -103,7 +103,7 @@ class StreamerManager( /** * Use OutputStream. * FYI, outputStream is closed by stopStream. - * To cut the video into multiple parts/chunks, use [MultiFileOutputStream]. + * To cut the video into multiple parts/chunks, use [ChunkedFileOutputStream]. */ it.outputStream = context.createVideoMediaOutputStream(configuration.endpoint.file.filename)