Skip to content

Commit

Permalink
reused streaming response on unzip
Browse files Browse the repository at this point in the history
  • Loading branch information
nulls committed Oct 18, 2023
1 parent c16598b commit e3b89b0
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ package com.saveourtool.save.entities.cosv
import kotlinx.serialization.Serializable

/**
* @property processedSize
* @property fullSize
* @property progress
* @property progressMessage
* @property result
* @property updateCounters
*/
@Serializable
data class RawCosvFileStreamingResponse(
val processedSize: Long,
val fullSize: Long,
val progress: Int,
val progressMessage: String,
val result: List<RawCosvFileDto>? = null,
val updateCounters: Boolean = false,
)
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import kotlinx.coroutines.delay

typealias StringList = List<String>

private const val BYTES_COEFFICIENT = 1024

Check failure

Code scanning / ktlint

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: private const val BYTES_COEFFICIENT = 1024 Error

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: private const val BYTES_COEFFICIENT = 1024

/**
* @return true if [this] is not null
*/
Expand Down Expand Up @@ -64,3 +66,14 @@ suspend fun <T : Any> retrySilently(
delayMillis: Long = 10_000L,
action: suspend (Int) -> T?,
): T? = retry(times, delayMillis, action).first


/**

Check failure

Code scanning / ktlint

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: /**... Error

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: /**...

Check failure

Code scanning / ktlint

[WRONG_NEWLINES_AROUND_KDOC] there should be a blank line above the kDoc and there should not be no blank lines after kDoc: /**... Error

[WRONG_NEWLINES_AROUND_KDOC] there should be a blank line above the kDoc and there should not be no blank lines after kDoc: /**...
* @return converts bytes to kilobytes
*/
fun Long.toKilobytes(): Long = div(BYTES_COEFFICIENT)

/**

Check failure

Code scanning / ktlint

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: /**... Error

[TOP_LEVEL_ORDER] the declaration part of a top level elements should be in the proper order: /**...
* @return converts bytes to megabytes
*/
fun Double.toMegabytes(): Double = div(BYTES_COEFFICIENT * BYTES_COEFFICIENT)

Check failure

Code scanning / ktlint

[WRONG_INDENTATION] only spaces are allowed for indentation and each indentation should equal to 4 spaces (tabs are not allowed): no newline at the end of file KotlinUtils.kt Error

[WRONG_INDENTATION] only spaces are allowed for indentation and each indentation should equal to 4 spaces (tabs are not allowed): no newline at the end of file KotlinUtils.kt

Check warning

Code scanning / detekt

Checks whether files end with a line separator. Warning

The file /home/runner/work/save-cloud/save-cloud/save-cloud-common/src/commonMain/kotlin/com/saveourtool/save/utils/KotlinUtils.kt is not ending with a new line.
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
import java.nio.ByteBuffer
import java.nio.file.Files
import java.nio.file.Path
import kotlin.io.path.*

typealias RawCosvFileDtoList = List<RawCosvFileDto>
typealias RawCosvFileDtoFlux = Flux<RawCosvFileDto>
typealias RawCosvFileStreamingResponseFlux = Flux<RawCosvFileStreamingResponse>
typealias PathAndSize = Pair<Path, Long>
typealias PathAndSizeAndAccumitiveSize = Triple<Path, Long, Long>

/**
* Rest controller for raw COSV files
Expand Down Expand Up @@ -143,8 +144,8 @@ class RawCosvFileController(
.flatMap { rawCosvFileStorage.findById(id) }
.flatMapMany { rawCosvFile ->
Flux.concat(
Mono.just(firstFakeResponse),
doUploadArchiveEntries(
Mono.just(RawCosvFileStreamingResponse(PROGRESS_FOR_ARCHIVE, "Unzipping")),
doUnzip(
rawCosvFile,
organizationName,
authentication.name
Expand All @@ -157,7 +158,7 @@ class RawCosvFileController(
.body(it)
}

private fun doUploadArchiveEntries(
private fun doUnzip(
archiveFile: RawCosvFileDto,
organizationName: String,
userName: String,
Expand All @@ -176,34 +177,22 @@ class RawCosvFileController(
.blockingMap {
tmpArchiveFile.extractZipTo(contentDir)
val entries = contentDir.listDirectoryEntries()
entries.map { it to it.fileSize() } to tmpArchiveFile.fileSize()
entries.map { it to it.fileSize() }
}
.flatMapMany { (entryWithSizeList, archiveSize) ->
val fullSize = archiveSize * 2 + entryWithSizeList.sumOf { it.second }
.flatMapMany { entryWithSizeList ->
Flux.concat(
Mono.just(RawCosvFileStreamingResponse(archiveSize, fullSize, updateCounters = true)),
Flux.fromIterable(entryWithSizeList.map { it.first })
.flatMap { file ->
log.debug {
"Processing ${file.absolutePathString()}"
}
val contentLength = file.fileSize()
rawCosvFileStorage.uploadAndWrapDuplicateKeyException(
key = RawCosvFileDto(
concatS3Key(archiveFile.fileName, file.relativeTo(contentDir).toString()),
organizationName = organizationName,
userName = userName,
contentLength = contentLength,
),
content = file.toByteBufferFlux(),
)
.map { RawCosvFileStreamingResponse(contentLength, fullSize, result = listOf(it)) }
},
doUploadArchiveEntries(
contentDir,
entryWithSizeList,
archiveFile.fileName,
organizationName,
userName
),
blockingToMono {
tmpDir.deleteRecursivelySafely(log)
}
.then(rawCosvFileStorage.delete(archiveFile))
.thenReturn(RawCosvFileStreamingResponse(archiveSize, fullSize, updateCounters = false)),
.thenReturn(RawCosvFileStreamingResponse(100, "Unzipped ${entryWithSizeList.sumOf { it.second }.toKilobytes()} Kb")),

Check warning

Code scanning / detekt

Report magic numbers. Magic number is a numeric literal that is not defined as a constant and hence it's unclear what the purpose of this number is. It's better to declare such numbers as constants and give them a proper name. By default, -1, 0, 1, and 2 are not considered to be magic numbers. Warning

This expression contains a magic number. Consider defining it to a well named constant.

Check warning

Code scanning / detekt

Report magic numbers. Magic number is a numeric literal that is not defined as a constant and hence it's unclear what the purpose of this number is. It's better to declare such numbers as constants and give them a proper name. By default, -1, 0, 1, and 2 are not considered to be magic numbers. Warning

This expression contains a magic number. Consider defining it to a well named constant.
)
}
.onErrorResume { error ->
Expand All @@ -214,6 +203,46 @@ class RawCosvFileController(
}
}


private fun doUploadArchiveEntries(
contentDir: Path,
entriesWithSize: List<PathAndSize>,
entriesPrefix: String,
organizationName: String,
userName: String,
): RawCosvFileStreamingResponseFlux {
val fullSize = entriesWithSize.sumOf { it.second }
return entriesWithSize.runningFold(null as Pair<PathAndSize, Long>?) { current, next ->
current?.let {
next to next.second + it.second
} ?: (next to next.second)
}
.filterNotNull()
.toFlux()
.flatMap { (fileAndSize, sizeSumOfPrevious) ->
val (file, size) = fileAndSize
log.debug {
"Processing ${file.absolutePathString()}"
}
rawCosvFileStorage.uploadAndWrapDuplicateKeyException(
key = RawCosvFileDto(
concatS3Key(entriesPrefix, file.relativeTo(contentDir).toString()),
organizationName = organizationName,
userName = userName,
contentLength = size,
),
content = file.toByteBufferFlux(),
)
.map {
RawCosvFileStreamingResponse(
progress = ((sizeSumOfPrevious.toDouble() / fullSize) * (100 - PROGRESS_FOR_ARCHIVE)).toInt() + PROGRESS_FOR_ARCHIVE,
progressMessage = "${sizeSumOfPrevious.toKilobytes()} / ${fullSize.toKilobytes()} KB",
result = listOf(it),
)
}
}
}

/**
* @param organizationName
* @param ids
Expand All @@ -240,7 +269,7 @@ class RawCosvFileController(
authentication: Authentication,
): Mono<StringResponse> = rawCosvFileStorage.listByOrganizationAndUser(organizationName, authentication.name)
.map { files ->
files.filter { it.userName == authentication.name }.map { it.requiredId() }
files.map { it.requiredId() }
}
.flatMap { ids ->
doSubmitToProcess(organizationName, ids, authentication)
Expand Down Expand Up @@ -357,67 +386,6 @@ class RawCosvFileController(
}
}

/**
* @param organizationName
* @param authentication
* @return list of deleted keys
*/
@RequiresAuthorizationSourceHeader
@DeleteMapping(
"/delete-processed",
produces = [MediaType.APPLICATION_NDJSON_VALUE],
)
fun deleteProcessed(
@PathVariable organizationName: String,
authentication: Authentication,
): ResponseEntity<RawCosvFileStreamingResponseFlux> = hasPermission(authentication, organizationName, Permission.DELETE, "delete")
.flatMapMany {
Flux.concat(
firstFakeResponse.toMono(),
doDeleteProcessed(organizationName, authentication.name),
)
}
.let {
ResponseEntity.ok()
.cacheControlForNdjson()
.body(it)
}

private fun doDeleteProcessed(
organizationName: String,
userName: String,
) = rawCosvFileStorage.listByOrganizationAndUser(organizationName, userName)
.map { files ->
files.filter { it.status == RawCosvFileStatus.PROCESSED }
.run {
sumOf { it.requiredContentLength() } to windowed(WINDOW_SIZE_ON_DELETE)
}
}
.flatMapMany { (sizeOfFiles, parts) ->
val fullSize = sizeOfFiles + 1
Flux.concat(
RawCosvFileStreamingResponse(
1,
fullSize,
updateCounters = true,
).toMono(),
parts.toFlux().flatMap { keys ->
rawCosvFileStorage.deleteAll(keys)
.filter { it }
.switchIfEmptyToResponseException(HttpStatus.INTERNAL_SERVER_ERROR) {
"Failed to delete process raw cosv files: $keys"
}
.thenReturn(
RawCosvFileStreamingResponse(
keys.sumOf { it.requiredContentLength() },
fullSize,
result = keys,
)
)
},
)
}

private fun hasPermission(
authentication: Authentication,
organizationName: String,
Expand Down Expand Up @@ -454,7 +422,7 @@ class RawCosvFileController(
private const val WINDOW_SIZE_ON_DELETE = 10

Check failure

Code scanning / ktlint

[WRONG_DECLARATIONS_ORDER] declarations of constants and enum members should be sorted alphabetically: constant properties inside companion object order is incorrect Error

[WRONG_DECLARATIONS_ORDER] declarations of constants and enum members should be sorted alphabetically: constant properties inside companion object order is incorrect

Check warning

Code scanning / detekt

Property is unused and should be removed. Warning

Private property WINDOW\_SIZE\_ON\_DELETE is unused.

Check warning

Code scanning / detekt

Property is unused and should be removed. Warning

Private property WINDOW\_SIZE\_ON\_DELETE is unused.

// to show progress bar
private val firstFakeResponse = RawCosvFileStreamingResponse(5, 100, updateCounters = true)
private const val PROGRESS_FOR_ARCHIVE = 5

private fun RawCosvFileStorage.uploadAndWrapDuplicateKeyException(
key: RawCosvFileDto,
Expand Down
Loading

0 comments on commit e3b89b0

Please sign in to comment.