From 3724a244e5aee4018e1214c2fd10b83e3a082bed Mon Sep 17 00:00:00 2001 From: Wibowo Arindrarto Date: Fri, 26 Jan 2024 07:12:17 +0100 Subject: [PATCH] feat(reader): Stream entries to empty feeds when populating feeds pane at startup --- internal/reader/backend/backend.go | 13 +++++ internal/reader/backend/rpc.go | 75 ++++++++++++++++++++++++----- internal/reader/backend/rpc_test.go | 54 +++++++++++++++++++-- internal/reader/reader.go | 6 ++- 4 files changed, 130 insertions(+), 18 deletions(-) diff --git a/internal/reader/backend/backend.go b/internal/reader/backend/backend.go index 78206a7..be70aa5 100644 --- a/internal/reader/backend/backend.go +++ b/internal/reader/backend/backend.go @@ -16,3 +16,16 @@ type Backend interface { PullFeedsF(context.Context, []entity.ID, bool) func() (<-chan entity.PullResult, error) String() string } + +type result[T any] struct { + value T + err error +} + +func okResult[T any](value T) result[T] { + return result[T]{value: value, err: nil} +} + +func errResult[T any](err error) result[T] { + return result[T]{err: err} +} diff --git a/internal/reader/backend/rpc.go b/internal/reader/backend/rpc.go index 85dc333..20a6d73 100644 --- a/internal/reader/backend/rpc.go +++ b/internal/reader/backend/rpc.go @@ -10,6 +10,7 @@ import ( "io" "github.com/bow/neon/api" + "github.com/bow/neon/internal" "github.com/bow/neon/internal/entity" "google.golang.org/grpc" ) @@ -52,23 +53,12 @@ func (r *RPC) GetStatsF(ctx context.Context) func() (*entity.Stats, error) { } func (r *RPC) GetAllFeedsF(ctx context.Context) func() ([]*entity.Feed, error) { - // FIXME: Actually implement querying all feeds. return func() ([]*entity.Feed, error) { - nmax := uint32(0) - rsp, err := r.client.ListFeeds( - ctx, - &api.ListFeedsRequest{MaxEntriesPerFeed: &nmax}, - ) + feeds, err := r.listEmptyFeeds(ctx) if err != nil { return nil, err } - rfeeds := rsp.GetFeeds() - feeds := make([]*entity.Feed, len(rfeeds)) - for i, rfeed := range rfeeds { - feeds[i] = entity.FromFeedPb(rfeed) - } - - return feeds, nil + return r.fillEmptyFeeds(ctx, feeds) } } @@ -109,3 +99,62 @@ func (r *RPC) PullFeedsF( func (r *RPC) String() string { return fmt.Sprintf("grpc://%s", r.addr) } + +func (r *RPC) listEmptyFeeds(ctx context.Context) ([]*entity.Feed, error) { + nmax := uint32(0) + rsp, err := r.client.ListFeeds( + ctx, + &api.ListFeedsRequest{MaxEntriesPerFeed: &nmax}, + ) + if err != nil { + return nil, err + } + + return entity.FromFeedPbs(rsp.GetFeeds()), nil +} + +func (r *RPC) fillEmptyFeeds( + ctx context.Context, + feeds []*entity.Feed, +) ([]*entity.Feed, error) { + + chs := make([]<-chan result[*entity.Feed], len(feeds)) + for i, feed := range feeds { + feed := feed + ch := make(chan result[*entity.Feed]) + chs[i] = ch + go func() { + defer close(ch) + stream, err := r.client.StreamEntries( + ctx, + &api.StreamEntriesRequest{FeedId: feed.ID}, + ) + if err != nil { + ch <- errResult[*entity.Feed](err) + return + } + for { + srsp, serr := stream.Recv() + if serr != nil { + if serr != io.EOF { + ch <- errResult[*entity.Feed](serr) + } else { + ch <- okResult(feed) + } + return + } + feed.Entries = append(feed.Entries, entity.FromEntryPb(srsp.GetEntry())) + } + }() + } + + filled := make([]*entity.Feed, 0) + for res := range internal.Merge(chs) { + res := res + if err := res.err; err != nil { + return nil, err + } + filled = append(filled, res.value) + } + return filled, nil +} diff --git a/internal/reader/backend/rpc_test.go b/internal/reader/backend/rpc_test.go index 1826f39..423bf0e 100644 --- a/internal/reader/backend/rpc_test.go +++ b/internal/reader/backend/rpc_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "io" + "sort" "testing" "time" @@ -54,12 +55,14 @@ func TestGetStatsFErr(t *testing.T) { a.EqualError(err, "nope") } -func TestListFeedsFOk(t *testing.T) { +func TestGetAllFeedsFOk(t *testing.T) { t.Parallel() r := require.New(t) a := assert.New(t) rpc, client := newBackendRPCTest(t) + streamClient1 := NewMockNeon_StreamEntriesClient(gomock.NewController(t)) + streamClient2 := NewMockNeon_StreamEntriesClient(gomock.NewController(t)) client.EXPECT(). ListFeeds(gomock.Any(), gomock.Any(), gomock.Any()). @@ -85,18 +88,63 @@ func TestListFeedsFOk(t *testing.T) { nil, ) + client.EXPECT(). + StreamEntries( + gomock.Any(), + gomock.Cond( + func(v any) bool { + req, ok := v.(*api.StreamEntriesRequest) + return ok && req.GetFeedId() == uint32(5) + }, + ), + ). + Return(streamClient1, nil) + streamClient1.EXPECT(). + Recv(). + Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F1-A"}}, nil) + streamClient1.EXPECT(). + Recv(). + Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F1-B"}}, nil) + streamClient1.EXPECT(). + Recv(). + Return(nil, io.EOF) + + client.EXPECT(). + StreamEntries( + gomock.Any(), + gomock.Cond( + func(v any) bool { + req, ok := v.(*api.StreamEntriesRequest) + return ok && req.GetFeedId() == uint32(8) + }, + ), + ). + Return(streamClient2, nil) + streamClient2.EXPECT(). + Recv(). + Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F3-A"}}, nil) + streamClient2.EXPECT(). + Recv(). + Return(nil, io.EOF) + feeds, err := rpc.GetAllFeedsF(context.Background())() r.NoError(err) - a.Len(feeds, 2) + r.Len(feeds, 2) + + sort.SliceStable(feeds, func(i, j int) bool { return feeds[i].ID < feeds[j].ID }) + a.Equal(uint32(5), feeds[0].ID) a.Equal("F1", feeds[0].Title) a.Equal("https://f1.com/feed.xml", feeds[0].FeedURL) + a.Len(feeds[0].Entries, 2) + a.Equal(uint32(8), feeds[1].ID) a.Equal("F3", feeds[1].Title) a.Equal("https://f3.com/feed.xml", feeds[1].FeedURL) + a.Len(feeds[1].Entries, 1) } -func TestListFeedsFErr(t *testing.T) { +func TestGetAllFeedsFErrList(t *testing.T) { t.Parallel() r := require.New(t) diff --git a/internal/reader/reader.go b/internal/reader/reader.go index ef63b65..0b3aa7b 100644 --- a/internal/reader/reader.go +++ b/internal/reader/reader.go @@ -32,8 +32,10 @@ func (r *Reader) Start() error { r.opr.ShowIntroPopup(r.display) defer r.state.MarkIntroSeen() } - r.opr.PopulateFeedsPane(r.display, r.backend.GetAllFeedsF(r.ctx)) - r.opr.RefreshStats(r.display, r.backend.GetStatsF(r.ctx)) + go func() { + r.opr.PopulateFeedsPane(r.display, r.backend.GetAllFeedsF(r.ctx)) + r.opr.RefreshStats(r.display, r.backend.GetStatsF(r.ctx)) + }() return r.display.Start() }