Skip to content

Commit

Permalink
feat(reader): Stream entries to empty feeds when populating feeds pan…
Browse files Browse the repository at this point in the history
…e at startup
  • Loading branch information
bow committed Jan 26, 2024
1 parent c40715d commit 3724a24
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 18 deletions.
13 changes: 13 additions & 0 deletions internal/reader/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,16 @@ type Backend interface {
PullFeedsF(context.Context, []entity.ID, bool) func() (<-chan entity.PullResult, error)
String() string
}

type result[T any] struct {
value T
err error
}

func okResult[T any](value T) result[T] {
return result[T]{value: value, err: nil}
}

func errResult[T any](err error) result[T] {
return result[T]{err: err}
}
75 changes: 62 additions & 13 deletions internal/reader/backend/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"

"github.com/bow/neon/api"
"github.com/bow/neon/internal"
"github.com/bow/neon/internal/entity"
"google.golang.org/grpc"
)
Expand Down Expand Up @@ -52,23 +53,12 @@ func (r *RPC) GetStatsF(ctx context.Context) func() (*entity.Stats, error) {
}

func (r *RPC) GetAllFeedsF(ctx context.Context) func() ([]*entity.Feed, error) {
// FIXME: Actually implement querying all feeds.
return func() ([]*entity.Feed, error) {
nmax := uint32(0)
rsp, err := r.client.ListFeeds(
ctx,
&api.ListFeedsRequest{MaxEntriesPerFeed: &nmax},
)
feeds, err := r.listEmptyFeeds(ctx)
if err != nil {
return nil, err
}
rfeeds := rsp.GetFeeds()
feeds := make([]*entity.Feed, len(rfeeds))
for i, rfeed := range rfeeds {
feeds[i] = entity.FromFeedPb(rfeed)
}

return feeds, nil
return r.fillEmptyFeeds(ctx, feeds)
}
}

Expand Down Expand Up @@ -109,3 +99,62 @@ func (r *RPC) PullFeedsF(
func (r *RPC) String() string {
return fmt.Sprintf("grpc://%s", r.addr)
}

func (r *RPC) listEmptyFeeds(ctx context.Context) ([]*entity.Feed, error) {
nmax := uint32(0)
rsp, err := r.client.ListFeeds(
ctx,
&api.ListFeedsRequest{MaxEntriesPerFeed: &nmax},
)
if err != nil {
return nil, err
}

return entity.FromFeedPbs(rsp.GetFeeds()), nil
}

func (r *RPC) fillEmptyFeeds(
ctx context.Context,
feeds []*entity.Feed,
) ([]*entity.Feed, error) {

chs := make([]<-chan result[*entity.Feed], len(feeds))
for i, feed := range feeds {
feed := feed
ch := make(chan result[*entity.Feed])
chs[i] = ch
go func() {
defer close(ch)
stream, err := r.client.StreamEntries(
ctx,
&api.StreamEntriesRequest{FeedId: feed.ID},
)
if err != nil {
ch <- errResult[*entity.Feed](err)
return
}
for {
srsp, serr := stream.Recv()
if serr != nil {
if serr != io.EOF {
ch <- errResult[*entity.Feed](serr)
} else {
ch <- okResult(feed)
}
return
}
feed.Entries = append(feed.Entries, entity.FromEntryPb(srsp.GetEntry()))
}
}()
}

filled := make([]*entity.Feed, 0)
for res := range internal.Merge(chs) {
res := res
if err := res.err; err != nil {
return nil, err
}
filled = append(filled, res.value)
}
return filled, nil
}
54 changes: 51 additions & 3 deletions internal/reader/backend/rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -54,12 +55,14 @@ func TestGetStatsFErr(t *testing.T) {
a.EqualError(err, "nope")
}

func TestListFeedsFOk(t *testing.T) {
func TestGetAllFeedsFOk(t *testing.T) {
t.Parallel()

r := require.New(t)
a := assert.New(t)
rpc, client := newBackendRPCTest(t)
streamClient1 := NewMockNeon_StreamEntriesClient(gomock.NewController(t))
streamClient2 := NewMockNeon_StreamEntriesClient(gomock.NewController(t))

client.EXPECT().
ListFeeds(gomock.Any(), gomock.Any(), gomock.Any()).
Expand All @@ -85,18 +88,63 @@ func TestListFeedsFOk(t *testing.T) {
nil,
)

client.EXPECT().
StreamEntries(
gomock.Any(),
gomock.Cond(
func(v any) bool {
req, ok := v.(*api.StreamEntriesRequest)
return ok && req.GetFeedId() == uint32(5)
},
),
).
Return(streamClient1, nil)
streamClient1.EXPECT().
Recv().
Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F1-A"}}, nil)
streamClient1.EXPECT().
Recv().
Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F1-B"}}, nil)
streamClient1.EXPECT().
Recv().
Return(nil, io.EOF)

client.EXPECT().
StreamEntries(
gomock.Any(),
gomock.Cond(
func(v any) bool {
req, ok := v.(*api.StreamEntriesRequest)
return ok && req.GetFeedId() == uint32(8)
},
),
).
Return(streamClient2, nil)
streamClient2.EXPECT().
Recv().
Return(&api.StreamEntriesResponse{Entry: &api.Entry{Title: "F3-A"}}, nil)
streamClient2.EXPECT().
Recv().
Return(nil, io.EOF)

feeds, err := rpc.GetAllFeedsF(context.Background())()
r.NoError(err)
a.Len(feeds, 2)
r.Len(feeds, 2)

sort.SliceStable(feeds, func(i, j int) bool { return feeds[i].ID < feeds[j].ID })

a.Equal(uint32(5), feeds[0].ID)
a.Equal("F1", feeds[0].Title)
a.Equal("https://f1.com/feed.xml", feeds[0].FeedURL)
a.Len(feeds[0].Entries, 2)

a.Equal(uint32(8), feeds[1].ID)
a.Equal("F3", feeds[1].Title)
a.Equal("https://f3.com/feed.xml", feeds[1].FeedURL)
a.Len(feeds[1].Entries, 1)
}

func TestListFeedsFErr(t *testing.T) {
func TestGetAllFeedsFErrList(t *testing.T) {
t.Parallel()

r := require.New(t)
Expand Down
6 changes: 4 additions & 2 deletions internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ func (r *Reader) Start() error {
r.opr.ShowIntroPopup(r.display)
defer r.state.MarkIntroSeen()
}
r.opr.PopulateFeedsPane(r.display, r.backend.GetAllFeedsF(r.ctx))
r.opr.RefreshStats(r.display, r.backend.GetStatsF(r.ctx))
go func() {
r.opr.PopulateFeedsPane(r.display, r.backend.GetAllFeedsF(r.ctx))
r.opr.RefreshStats(r.display, r.backend.GetStatsF(r.ctx))
}()
return r.display.Start()
}

Expand Down

0 comments on commit 3724a24

Please sign in to comment.