Skip to content

Commit

Permalink
replaced SupervisorJob with try catch
Browse files Browse the repository at this point in the history
  • Loading branch information
LeFrosch committed Oct 7, 2024
1 parent 001a6a6 commit 74f63c4
Showing 1 changed file with 42 additions and 32 deletions.
74 changes: 42 additions & 32 deletions base/src/com/google/idea/blaze/base/buildview/BazelService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.intellij.openapi.util.Key
import com.intellij.util.io.LimitedInputStream
import com.intellij.util.ui.EDT
import kotlinx.coroutines.*
import java.io.BufferedInputStream
import java.io.FileInputStream
import kotlin.io.path.pathString

Expand Down Expand Up @@ -94,47 +95,56 @@ class BazelService(private val project: Project) : Disposable {
return exitCode
}

private fun CoroutineScope.parseEvents(ctx: BlazeContext, helper: BuildResultHelper): Job {
val handler = CoroutineExceptionHandler { _, e -> LOG.error("error in event parser", e) }

return launch(handler + CoroutineName("EventParser") + SupervisorJob()) {
// wait for bazel to create the output file
while (!helper.outputFile.exists()) delay(10)

FileInputStream(helper.outputFile).buffered().use { stream ->
// keep reading events while the coroutine is active, i.e. bazel is still running,
// or while the stream has data available (to ensure that all events are processed)
while (isActive || stream.available() > 0) {
private suspend fun parseEvent(ctx: BlazeContext, stream: BufferedInputStream) {
// make sure that there are at least four bytes already available
while (stream.available() < 4) delay(10)

// make sure that there are at least four bytes already available
while (stream.available() < 4) delay(10)
// protobuf messages are delimited by size (encoded as varint32),
// read size manually to ensure the entire message is already available
val size = CodedInputStream.readRawVarint32(stream.read(), stream)
while (stream.available() < size) delay(10)

// protobuf messages are delimited by size (encoded as varint32),
// read size manually to ensure the entire message is already available
val size = CodedInputStream.readRawVarint32(stream.read(), stream)
while (stream.available() < size) delay(10)
val eventStream = LimitedInputStream(stream, size)
val event = try {
BuildEvent.parseFrom(eventStream)
} catch (e: Exception) {
LOG.error("could not parse event", e)

val eventStream = LimitedInputStream(stream, size)
val event = try {
BuildEvent.parseFrom(eventStream)
} catch (e: Exception) {
LOG.error("could not parse event", e)
// if the message could not be parsed, make sure to skip it
if (eventStream.bytesRead < size) {
stream.skip(size.toLong() - eventStream.bytesRead)
}

// if the message could not be parsed, make sure to skip it
if (eventStream.bytesRead < size) {
stream.skip(size.toLong() - eventStream.bytesRead)
}
return
}

continue
}
if (event == null) {
delay(10)
} else {
BuildEventParser.parse(event)?.let(ctx::output)
}
}

if (event == null) {
delay(10)
} else {
BuildEventParser.parse(event)?.let(ctx::output)
private fun CoroutineScope.parseEvents(ctx: BlazeContext, helper: BuildResultHelper): Job {
return launch(CoroutineName("EventParser")) {
try {
// wait for bazel to create the output file
while (!helper.outputFile.exists()) delay(10)

FileInputStream(helper.outputFile).buffered().use { stream ->
// keep reading events while the coroutine is active, i.e. bazel is still running,
// or while the stream has data available (to ensure that all events are processed)
while (isActive || stream.available() > 0) {
parseEvent(ctx, stream)
}
}
}
catch (e: CancellationException) {
throw e
}
catch (e: Exception) {
LOG.error("error in event parser", e)
}
}
}

Expand Down

0 comments on commit 74f63c4

Please sign in to comment.