Skip to content

Commit

Permalink
feat(block-scheduler): status page shows completed jobs (#15580)
Browse files Browse the repository at this point in the history
Signed-off-by: Owen Diehl <[email protected]>
Co-authored-by: Ashwanth <[email protected]>
  • Loading branch information
owen-d and ashwanthgoli authored Jan 3, 2025
1 parent 780173a commit 10194f7
Show file tree
Hide file tree
Showing 7 changed files with 225 additions and 0 deletions.
20 changes: 20 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,23 @@ func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) {
var zero V
return zero, false
}

// Range iterates over the elements in the buffer from oldest to newest
// and calls the given function for each element.
// If the function returns false, iteration stops.
func (b *CircularBuffer[V]) Range(f func(V) bool) {
if b.size == 0 {
return
}

// Start from head (oldest) and iterate to tail (newest)
idx := b.head
remaining := b.size
for remaining > 0 {
if !f(b.buffer[idx]) {
return
}
idx = (idx + 1) % len(b.buffer)
remaining--
}
}
81 changes: 81 additions & 0 deletions pkg/blockbuilder/scheduler/priority_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package scheduler

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestCircularBuffer_Range(t *testing.T) {
tests := []struct {
name string
capacity int
input []int
want []int
}{
{
name: "empty buffer",
capacity: 3,
input: []int{},
want: []int{},
},
{
name: "partially filled buffer",
capacity: 3,
input: []int{1, 2},
want: []int{1, 2},
},
{
name: "full buffer",
capacity: 3,
input: []int{1, 2, 3},
want: []int{1, 2, 3},
},
{
name: "buffer with eviction",
capacity: 3,
input: []int{1, 2, 3, 4, 5},
want: []int{3, 4, 5}, // oldest elements (1,2) were evicted
},
{
name: "buffer with multiple evictions",
capacity: 2,
input: []int{1, 2, 3, 4, 5},
want: []int{4, 5}, // only newest elements remain
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create and fill buffer
buf := NewCircularBuffer[int](tt.capacity)
for _, v := range tt.input {
buf.Push(v)
}

// Use Range to collect elements
got := make([]int, 0)
buf.Range(func(v int) bool {
got = append(got, v)
return true
})

require.Equal(t, tt.want, got, "Range should iterate in order from oldest to newest")
})
}
}

func TestCircularBuffer_Range_EarlyStop(t *testing.T) {
buf := NewCircularBuffer[int](5)
for i := 1; i <= 5; i++ {
buf.Push(i)
}

var got []int
buf.Range(func(v int) bool {
got = append(got, v)
return v != 3 // stop after seeing 3
})

require.Equal(t, []int{1, 2, 3}, got, "Range should stop when function returns false")
}
12 changes: 12 additions & 0 deletions pkg/blockbuilder/scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,15 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata {
}
return jobs
}

func (q *JobQueue) ListCompletedJobs() []JobWithMetadata {
q.mu.RLock()
defer q.mu.RUnlock()

jobs := make([]JobWithMetadata, 0, q.completed.Len())
q.completed.Range(func(job *JobWithMetadata) bool {
jobs = append(jobs, *job)
return true
})
return jobs
}
3 changes: 3 additions & 0 deletions pkg/blockbuilder/scheduler/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F
type jobQueue interface {
ListPendingJobs() []JobWithMetadata
ListInProgressJobs() []JobWithMetadata
ListCompletedJobs() []JobWithMetadata
}

type offsetReader interface {
Expand Down Expand Up @@ -63,12 +64,14 @@ func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
data := struct {
PendingJobs []JobWithMetadata
InProgressJobs []JobWithMetadata
CompletedJobs []JobWithMetadata
Now time.Time
PartitionInfo []partitionInfo
}{
Now: time.Now(),
PendingJobs: pendingJobs,
InProgressJobs: inProgressJobs,
CompletedJobs: h.jobQueue.ListCompletedJobs(),
}

for _, partitionOffset := range offsets {
Expand Down
32 changes: 32 additions & 0 deletions pkg/blockbuilder/scheduler/status.gohtml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
</thead>
<tbody>
{{ range $i, $job := .PendingJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
Expand All @@ -47,6 +48,7 @@
</thead>
<tbody>
{{ range $i, $job := .InProgressJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
Expand All @@ -58,6 +60,35 @@
{{ end }}
</tbody>
</table>
<h2>Completed Jobs</h2>
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Priority</th>
<th>Partition</th>
<th>Start Offset</th>
<th>End Offset</th>
<th>Status</th>
<th>Start Timestamp</th>
<th>Completion Timestamp</th>
</tr>
</thead>
<tbody>
{{ range $i, $job := .CompletedJobs }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Priority }}</td>
<td>{{ .Partition }}</td>
<td>{{ .Offsets.Min }}</td>
<td>{{ .Offsets.Max }}</td>
<td>{{ .Status }}</td>
<td>{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
<td>{{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})</td>
</tr>
{{ end }}
</tbody>
</table>
<h3>Partition Lag</h2>
<table width="100%" border="1">
<thead>
Expand All @@ -70,6 +101,7 @@
</thead>
<tbody>
{{ range .PartitionInfo }}
<tr>
<td>{{ .Partition }}</td>
<td>{{ .Lag }}</td>
<td>{{ .EndOffset }}</td>
Expand Down
72 changes: 72 additions & 0 deletions pkg/blockbuilder/scheduler/status_preview_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
//go:build preview

package scheduler

import (
"fmt"
"net/http/httptest"
"testing"
"time"

"github.com/twmb/franz-go/pkg/kadm"

"github.com/grafana/loki/v3/pkg/blockbuilder/types"
)

// TestPreview is a utility test that runs a local server with the status page.
// Run it with: go test -tags=preview -v -run TestPreview
func TestPreview(t *testing.T) {
// Setup mock data with varied timestamps
now := time.Now()
mockLister := &mockQueueLister{
pendingJobs: []JobWithMetadata{
{Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: now.Add(-2 * time.Hour), Priority: 23},
{Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: now.Add(-1 * time.Hour), Priority: 42},
{Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: now.Add(-30 * time.Minute), Priority: 11},
},
inProgressJobs: []JobWithMetadata{
{Job: types.NewJob(44, types.Offsets{Min: 1, Max: 10}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour)},
{Job: types.NewJob(55, types.Offsets{Min: 11, Max: 110}), StartTime: now.Add(-5 * time.Hour), UpdateTime: now.Add(-4 * time.Hour)},
},
completedJobs: []JobWithMetadata{
{Job: types.NewJob(66, types.Offsets{Min: 1, Max: 50}), StartTime: now.Add(-8 * time.Hour), UpdateTime: now.Add(-7 * time.Hour), Status: types.JobStatusComplete},
{Job: types.NewJob(77, types.Offsets{Min: 51, Max: 100}), StartTime: now.Add(-6 * time.Hour), UpdateTime: now.Add(-5 * time.Hour), Status: types.JobStatusComplete},
{Job: types.NewJob(88, types.Offsets{Min: 101, Max: 150}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour), Status: types.JobStatusFailed},
{Job: types.NewJob(99, types.Offsets{Min: 151, Max: 200}), StartTime: now.Add(-2 * time.Hour), UpdateTime: now.Add(-1 * time.Hour), Status: types.JobStatusComplete},
},
}

mockReader := &mockOffsetReader{
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Lag: 10,
Partition: 3,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 90},
},
1: {
Lag: 0,
Partition: 1,
End: kadm.ListedOffset{Offset: 100},
Commit: kadm.Offset{At: 100},
},
2: {
Lag: 233,
Partition: 2,
End: kadm.ListedOffset{Offset: 333},
Commit: kadm.Offset{At: 100},
},
},
}

handler := newStatusPageHandler(mockLister, mockReader, time.Hour)

// Start local server
server := httptest.NewServer(handler)
defer server.Close()

fmt.Printf("\n\n=== Preview server running ===\nOpen this URL in your browser:\n%s\nPress Ctrl+C to stop the server\n\n", server.URL)

// Keep server running until interrupted
select {}
}
5 changes: 5 additions & 0 deletions pkg/blockbuilder/scheduler/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type mockQueueLister struct {
pendingJobs []JobWithMetadata
inProgressJobs []JobWithMetadata
completedJobs []JobWithMetadata
}

func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata {
Expand All @@ -25,6 +26,10 @@ func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata {
return m.inProgressJobs
}

func (m *mockQueueLister) ListCompletedJobs() []JobWithMetadata {
return m.completedJobs
}

func TestStatusPageHandler_ServeHTTP(t *testing.T) {
t.Skip("skipping. only added to inspect the generated status page.")

Expand Down

0 comments on commit 10194f7

Please sign in to comment.