From 171568fbaa8ea61f8f3942b5be4a06c77b5fe1bb Mon Sep 17 00:00:00 2001 From: Alan Braithwaite Date: Wed, 12 Jul 2023 22:21:29 -0700 Subject: [PATCH] fix hang --- x/multi/multisrc.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/x/multi/multisrc.go b/x/multi/multisrc.go index c296429..510f392 100644 --- a/x/multi/multisrc.go +++ b/x/multi/multisrc.go @@ -37,7 +37,7 @@ func NewMultiSource[T any](sources []flow.Source[T]) MultiSource[T] { func (ms MultiSource[T]) Run(ctx context.Context) error { var wg sync.WaitGroup - errc := make(chan error) + errc := make(chan error, len(ms.wrapped)) for _, src := range ms.wrapped { wg.Add(1) @@ -46,8 +46,11 @@ func (ms MultiSource[T]) Run(ctx context.Context) error { for { msg, ack, err := src.Recv(ctx) if err != nil { - errc <- err - return + select { + case errc <- err: + case <-ctx.Done(): + return + } } select { case ms.msgAckC <- msgAck[T]{msg: msg, ack: ack}: