diff --git a/pkg/blockbuilder/scheduler/priority_queue.go b/pkg/blockbuilder/scheduler/priority_queue.go index 330846d0da087..9ea2cc725ff33 100644 --- a/pkg/blockbuilder/scheduler/priority_queue.go +++ b/pkg/blockbuilder/scheduler/priority_queue.go @@ -210,3 +210,23 @@ func (b *CircularBuffer[V]) Lookup(f func(V) bool) (V, bool) { var zero V return zero, false } + +// Range iterates over the elements in the buffer from oldest to newest +// and calls the given function for each element. +// If the function returns false, iteration stops. +func (b *CircularBuffer[V]) Range(f func(V) bool) { + if b.size == 0 { + return + } + + // Start from head (oldest) and iterate to tail (newest) + idx := b.head + remaining := b.size + for remaining > 0 { + if !f(b.buffer[idx]) { + return + } + idx = (idx + 1) % len(b.buffer) + remaining-- + } +} diff --git a/pkg/blockbuilder/scheduler/priority_queue_test.go b/pkg/blockbuilder/scheduler/priority_queue_test.go new file mode 100644 index 0000000000000..2b98b5980bb95 --- /dev/null +++ b/pkg/blockbuilder/scheduler/priority_queue_test.go @@ -0,0 +1,81 @@ +package scheduler + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCircularBuffer_Range(t *testing.T) { + tests := []struct { + name string + capacity int + input []int + want []int + }{ + { + name: "empty buffer", + capacity: 3, + input: []int{}, + want: []int{}, + }, + { + name: "partially filled buffer", + capacity: 3, + input: []int{1, 2}, + want: []int{1, 2}, + }, + { + name: "full buffer", + capacity: 3, + input: []int{1, 2, 3}, + want: []int{1, 2, 3}, + }, + { + name: "buffer with eviction", + capacity: 3, + input: []int{1, 2, 3, 4, 5}, + want: []int{3, 4, 5}, // oldest elements (1,2) were evicted + }, + { + name: "buffer with multiple evictions", + capacity: 2, + input: []int{1, 2, 3, 4, 5}, + want: []int{4, 5}, // only newest elements remain + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create and fill buffer + buf := NewCircularBuffer[int](tt.capacity) + for _, v := range tt.input { + buf.Push(v) + } + + // Use Range to collect elements + got := make([]int, 0) + buf.Range(func(v int) bool { + got = append(got, v) + return true + }) + + require.Equal(t, tt.want, got, "Range should iterate in order from oldest to newest") + }) + } +} + +func TestCircularBuffer_Range_EarlyStop(t *testing.T) { + buf := NewCircularBuffer[int](5) + for i := 1; i <= 5; i++ { + buf.Push(i) + } + + var got []int + buf.Range(func(v int) bool { + got = append(got, v) + return v != 3 // stop after seeing 3 + }) + + require.Equal(t, []int{1, 2, 3}, got, "Range should stop when function returns false") +} diff --git a/pkg/blockbuilder/scheduler/queue.go b/pkg/blockbuilder/scheduler/queue.go index 5169d0ffc1400..019ca61a0c487 100644 --- a/pkg/blockbuilder/scheduler/queue.go +++ b/pkg/blockbuilder/scheduler/queue.go @@ -382,3 +382,15 @@ func (q *JobQueue) ListInProgressJobs() []JobWithMetadata { } return jobs } + +func (q *JobQueue) ListCompletedJobs() []JobWithMetadata { + q.mu.RLock() + defer q.mu.RUnlock() + + jobs := make([]JobWithMetadata, 0, q.completed.Len()) + q.completed.Range(func(job *JobWithMetadata) bool { + jobs = append(jobs, *job) + return true + }) + return jobs +} diff --git a/pkg/blockbuilder/scheduler/status.go b/pkg/blockbuilder/scheduler/status.go index 402c08031cfe6..ef08598fef8f5 100644 --- a/pkg/blockbuilder/scheduler/status.go +++ b/pkg/blockbuilder/scheduler/status.go @@ -20,6 +20,7 @@ var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.F type jobQueue interface { ListPendingJobs() []JobWithMetadata ListInProgressJobs() []JobWithMetadata + ListCompletedJobs() []JobWithMetadata } type offsetReader interface { @@ -63,12 +64,14 @@ func (h *statusPageHandler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { data := struct { PendingJobs []JobWithMetadata InProgressJobs []JobWithMetadata + CompletedJobs []JobWithMetadata Now time.Time PartitionInfo []partitionInfo }{ Now: time.Now(), PendingJobs: pendingJobs, InProgressJobs: inProgressJobs, + CompletedJobs: h.jobQueue.ListCompletedJobs(), } for _, partitionOffset := range offsets { diff --git a/pkg/blockbuilder/scheduler/status.gohtml b/pkg/blockbuilder/scheduler/status.gohtml index 98e196ecfb824..35eb961d2cd34 100644 --- a/pkg/blockbuilder/scheduler/status.gohtml +++ b/pkg/blockbuilder/scheduler/status.gohtml @@ -22,6 +22,7 @@ {{ range $i, $job := .PendingJobs }} + {{ .ID }} {{ .Priority }} {{ .Partition }} @@ -47,6 +48,7 @@ {{ range $i, $job := .InProgressJobs }} + {{ .ID }} {{ .Priority }} {{ .Partition }} @@ -58,6 +60,35 @@ {{ end }} +

Completed Jobs

+ + + + + + + + + + + + + + + {{ range $i, $job := .CompletedJobs }} + + + + + + + + + + + {{ end }} + +
IDPriorityPartitionStart OffsetEnd OffsetStatusStart TimestampCompletion Timestamp
{{ .ID }}{{ .Priority }}{{ .Partition }}{{ .Offsets.Min }}{{ .Offsets.Max }}{{ .Status }}{{ .StartTime | durationSince }} ago ({{ .StartTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }}){{ .UpdateTime | durationSince }} ago ({{ .UpdateTime.Format "Mon, 02 Jan 2006 15:04:05 -0700" }})

Partition Lag

@@ -70,6 +101,7 @@ {{ range .PartitionInfo }} + diff --git a/pkg/blockbuilder/scheduler/status_preview_test.go b/pkg/blockbuilder/scheduler/status_preview_test.go new file mode 100644 index 0000000000000..4def1ad8684a1 --- /dev/null +++ b/pkg/blockbuilder/scheduler/status_preview_test.go @@ -0,0 +1,72 @@ +//go:build preview + +package scheduler + +import ( + "fmt" + "net/http/httptest" + "testing" + "time" + + "github.com/twmb/franz-go/pkg/kadm" + + "github.com/grafana/loki/v3/pkg/blockbuilder/types" +) + +// TestPreview is a utility test that runs a local server with the status page. +// Run it with: go test -tags=preview -v -run TestPreview +func TestPreview(t *testing.T) { + // Setup mock data with varied timestamps + now := time.Now() + mockLister := &mockQueueLister{ + pendingJobs: []JobWithMetadata{ + {Job: types.NewJob(11, types.Offsets{Min: 11, Max: 20}), UpdateTime: now.Add(-2 * time.Hour), Priority: 23}, + {Job: types.NewJob(22, types.Offsets{Min: 21, Max: 30}), UpdateTime: now.Add(-1 * time.Hour), Priority: 42}, + {Job: types.NewJob(33, types.Offsets{Min: 22, Max: 40}), UpdateTime: now.Add(-30 * time.Minute), Priority: 11}, + }, + inProgressJobs: []JobWithMetadata{ + {Job: types.NewJob(44, types.Offsets{Min: 1, Max: 10}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour)}, + {Job: types.NewJob(55, types.Offsets{Min: 11, Max: 110}), StartTime: now.Add(-5 * time.Hour), UpdateTime: now.Add(-4 * time.Hour)}, + }, + completedJobs: []JobWithMetadata{ + {Job: types.NewJob(66, types.Offsets{Min: 1, Max: 50}), StartTime: now.Add(-8 * time.Hour), UpdateTime: now.Add(-7 * time.Hour), Status: types.JobStatusComplete}, + {Job: types.NewJob(77, types.Offsets{Min: 51, Max: 100}), StartTime: now.Add(-6 * time.Hour), UpdateTime: now.Add(-5 * time.Hour), Status: types.JobStatusComplete}, + {Job: types.NewJob(88, types.Offsets{Min: 101, Max: 150}), StartTime: now.Add(-4 * time.Hour), UpdateTime: now.Add(-3 * time.Hour), Status: types.JobStatusFailed}, + {Job: types.NewJob(99, types.Offsets{Min: 151, Max: 200}), StartTime: now.Add(-2 * time.Hour), UpdateTime: now.Add(-1 * time.Hour), Status: types.JobStatusComplete}, + }, + } + + mockReader := &mockOffsetReader{ + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Lag: 10, + Partition: 3, + End: kadm.ListedOffset{Offset: 100}, + Commit: kadm.Offset{At: 90}, + }, + 1: { + Lag: 0, + Partition: 1, + End: kadm.ListedOffset{Offset: 100}, + Commit: kadm.Offset{At: 100}, + }, + 2: { + Lag: 233, + Partition: 2, + End: kadm.ListedOffset{Offset: 333}, + Commit: kadm.Offset{At: 100}, + }, + }, + } + + handler := newStatusPageHandler(mockLister, mockReader, time.Hour) + + // Start local server + server := httptest.NewServer(handler) + defer server.Close() + + fmt.Printf("\n\n=== Preview server running ===\nOpen this URL in your browser:\n%s\nPress Ctrl+C to stop the server\n\n", server.URL) + + // Keep server running until interrupted + select {} +} diff --git a/pkg/blockbuilder/scheduler/status_test.go b/pkg/blockbuilder/scheduler/status_test.go index f056ffd4ca42e..ac471b344b2cc 100644 --- a/pkg/blockbuilder/scheduler/status_test.go +++ b/pkg/blockbuilder/scheduler/status_test.go @@ -15,6 +15,7 @@ import ( type mockQueueLister struct { pendingJobs []JobWithMetadata inProgressJobs []JobWithMetadata + completedJobs []JobWithMetadata } func (m *mockQueueLister) ListPendingJobs() []JobWithMetadata { @@ -25,6 +26,10 @@ func (m *mockQueueLister) ListInProgressJobs() []JobWithMetadata { return m.inProgressJobs } +func (m *mockQueueLister) ListCompletedJobs() []JobWithMetadata { + return m.completedJobs +} + func TestStatusPageHandler_ServeHTTP(t *testing.T) { t.Skip("skipping. only added to inspect the generated status page.")
{{ .Partition }} {{ .Lag }} {{ .EndOffset }}