You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
When running the code https://scastie.scala-lang.org/btTjBsN3QWGSP8S0AMPz6A, the Stream.bracket resource closes unexpectedly before the inner process and emit streams complete their operations.
It seems that somehow the Stream.bracket sticks to the incoming stream argument in the processData function.
In the actual code, line [2] is used inside a third-party library.
Output:
> 3
=== START PROCESS === with 3 elems
> 1
> 2
>>> Got Chunk(1, 2)
> 3
=== END PROCESS ===
>>> Got Chunk(3)
< Emitting element #0
< Emitting element #1
< Emitting element #2
Vector(element #0, element #1, element #2)
Expected output:
> 3
=== START PROCESS === with 3 elems
> 1
> 2
>>> Got Chunk(1, 2)
> 3
>>> Got Chunk(3)
< Emitting element #0
< Emitting element #1
< Emitting element #2
=== END PROCESS ===
Vector(element #0, element #1, element #2)
The text was updated successfully, but these errors were encountered:
This issue is caused by the re-threading of the tail stream here:
input.pull.uncons1
.flatMap {
caseNone=>Pull.done
caseSome(head -> tail) =>Pull.extendScopeTo(tail)
.evalMap(t =>IO(Data(head, t))) // <--- tail is returned here
.flatMap(Pull.output1)
}
.stream
.flatMap { data =>
processData(data.size, data.stream). // <-- and then later evaluated here
}
The tail of the result from uncons1 is returned outside of the normal flow of the pull, resulting in finalizers running at an unexpected time. This seems like the same type of scope rethreading issue that requires StepLeg to be a thing when pulling from different streams. Is this re-threading essential to the use case in question?
Yes, this re-threading is essential because the head of the stream always contains the input parameters needed for the processData function and the rest of the stream is a data stream for this function.
Yes, this re-threading is essential because the head of the stream always contains the input parameters needed for the processData function and the rest of the stream is a data stream for this function.
I encountered a quetion that after deleting codes chunkN(1) and unchunk , the program ran as what we expected and the result is correct , but i do not understanding the reason which confused me ,sincerely hope you point the hidden key points.The picture as following
Version:
co.fs2::fs2-core::3.10.2
When running the code https://scastie.scala-lang.org/btTjBsN3QWGSP8S0AMPz6A, the
Stream.bracket
resource closes unexpectedly before the innerprocess
andemit
streams complete their operations.It seems that somehow the
Stream.bracket
sticks to the incomingstream
argument in theprocessData
function.The lines
.chunkN(1).unchunks
are needed only to simplify the code, which is more like this https://scastie.scala-lang.org/pFqksNB6TIeVl9emDlInjAIn the actual code, line [2] is used inside a third-party library.
Output:
Expected output:
The text was updated successfully, but these errors were encountered: