Skip to content

Commit

Permalink
Test stream cancellation.
Browse files Browse the repository at this point in the history
  • Loading branch information
robinheghan committed Nov 30, 2024
1 parent a9191af commit bd8e2dc
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 2 deletions.
45 changes: 43 additions & 2 deletions streams/src/Main.gren
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ main =


type alias Model =
{}
{ stdout : Stream.Writable Bytes }


type Msg
= Exit (Result Stream.Error (Stream.Writable Bytes))
| ExitAfterPipe (Result InvolvedError {})
| TransformationStreamCreated (Result Stream.Error (Stream.Transformation Bytes Bytes))


type InvolvedError
Expand All @@ -37,7 +38,7 @@ init : Node.Environment -> Init.Task { model : Model, command : Cmd Msg }
init env =
Init.await FileSystem.initialize <| \fsPerm ->
Node.startProgram
{ model = {}
{ model = { stdout = env.stdout }
, command =
case Array.get 2 env.args of
Just "hello world" ->
Expand Down Expand Up @@ -73,6 +74,10 @@ init env =
)
)
|> Task.attempt Exit

Just "errorHandling" ->
Stream.identityTransformation
|> Task.attempt TransformationStreamCreated

Just "compression" ->
FileSystem.writeFileStream fsPerm (Path.fromPosixString "compressed.txt")
Expand Down Expand Up @@ -127,3 +132,39 @@ update msg model =
, command =
Node.exit |> Task.execute
}

TransformationStreamCreated (Err _) ->
{ model = model
, command =
Stream.writeLineAsBytes "Failed to created transformation stream" model.stdout
|> Task.attempt Exit
}

TransformationStreamCreated (Ok idStream) ->
{ model = model
, command =
Cmd.batch
[ Stream.read (Stream.readable idStream)
|> Task.andThen (\value -> Stream.write value model.stdout)
|> Task.onError (\err ->
case err of
Stream.Closed ->
Stream.writeLineAsBytes "CLOSED" model.stdout

Stream.Cancelled reason ->
Stream.writeLineAsBytes ("CANCELLED: " ++ reason) model.stdout

Stream.Locked ->
Stream.writeLineAsBytes "LOCKED" model.stdout
)
|> Task.attempt Exit
, Stream.cancelWritable "foo" (Stream.writable idStream)
|> Task.onError
(\_ ->
Stream.writeLineAsBytes "Failed to cancel stream" model.stdout
|> Task.map (\_ -> {})
)
|> Task.onError (\_ -> Task.succeed {})
|> Task.execute
]
}
7 changes: 7 additions & 0 deletions streams/test/commands.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ describe("Streams", () => {
.stdout("CLOSED");
});

it("errorHandling", async () => {
await runner()
.cwd(baseDir)
.fork("app", ["errorHandling"], {})
.stdout("CANCELLED: foo")
});

it("compression", async () => {
await runner()
.cwd(baseDir)
Expand Down

0 comments on commit bd8e2dc

Please sign in to comment.