Skip to content

Commit

Permalink
fix hang
Browse files Browse the repository at this point in the history
  • Loading branch information
abraithwaite committed Jul 13, 2023
1 parent 07e7aae commit 171568f
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions x/multi/multisrc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewMultiSource[T any](sources []flow.Source[T]) MultiSource[T] {
func (ms MultiSource[T]) Run(ctx context.Context) error {
var wg sync.WaitGroup

errc := make(chan error)
errc := make(chan error, len(ms.wrapped))

for _, src := range ms.wrapped {
wg.Add(1)
Expand All @@ -46,8 +46,11 @@ func (ms MultiSource[T]) Run(ctx context.Context) error {
for {
msg, ack, err := src.Recv(ctx)
if err != nil {
errc <- err
return
select {
case errc <- err:
case <-ctx.Done():
return
}
}
select {
case ms.msgAckC <- msgAck[T]{msg: msg, ack: ack}:
Expand Down

0 comments on commit 171568f

Please sign in to comment.