From 49805a9e130a28945c8054f489b6eaf21fd89742 Mon Sep 17 00:00:00 2001 From: johnnason Date: Tue, 12 Dec 2017 09:43:38 -0500 Subject: [PATCH] Log sink pipe errors from write (#427) Also add back a commented-out write error test for pipeline fixes #426 --- pipeline/node.go | 3 +++ pipeline/pipeline_test.go | 43 ++++++++++++++++++++------------------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/pipeline/node.go b/pipeline/node.go index 3d5320518..f1ce622de 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -554,6 +554,9 @@ func (n *Node) write(msg message.Msg, off offset.Offset) (message.Msg, error) { c := make(chan writeResult) go func() { m, err := client.Write(n.c, n.writer, msg) + if err != nil { + n.l.Errorf("write error, %s", err) + } c <- writeResult{m, err} }() select { diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 1a8b7b1b1..a9b78930f 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -125,27 +125,28 @@ var ( }, client.ErrMockConnect, }, - // uncomment this once the error handling mess is sorted out - // { - // func() *Node { - // a := &adaptor.Mock{} - // n, _ := NewNodeWithOptions( - // "starter", "stopWriter", defaultNsString, - // WithClient(a), - // WithReader(a), - // WithCommitLog("testdata/restart_from_end", 1024), - // ) - // NewNodeWithOptions( - // "stopperWriteErr", "stopWriter", defaultNsString, - // WithClient(a), - // WithWriter(&adaptor.MockWriterErr{}), - // WithParent(n), - // WithOffsetManager(&offset.MockManager{MemoryMap: map[string]uint64{}}), - // ) - // return n - // }, - // client.ErrMockWrite, - // }, + { + func() *Node { + a := &adaptor.Mock{} + n, _ := NewNodeWithOptions( + "starter", "stopWriter", defaultNsString, + WithClient(a), + WithReader(a), + WithCommitLog([]commitlog.OptionFunc{ + commitlog.WithPath("testdata/pipeline_run"), + }...), + ) + NewNodeWithOptions( + "stopperWriteErr", "stopWriter", defaultNsString, + WithClient(a), + WithWriter(&adaptor.MockWriterErr{}), + WithParent(n), + WithOffsetManager(&offset.MockManager{MemoryMap: map[string]uint64{}}), + ) + return n + }, + client.ErrMockWrite, + }, } )