Skip to content

Commit

Permalink
ui work
Browse files Browse the repository at this point in the history
  • Loading branch information
wardviaene committed Sep 21, 2024
1 parent fdee5a4 commit f5a8bfc
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 11 deletions.
25 changes: 24 additions & 1 deletion pkg/observability/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"bytes"
"fmt"
"io"
"path"
"strconv"
"strings"
"time"

"github.com/in4it/wireguard-server/pkg/logging"
"github.com/in4it/wireguard-server/pkg/storage"
)

func (o *Observability) WriteBufferToStorage(n int64) error {
Expand All @@ -21,14 +24,20 @@ func (o *Observability) WriteBufferToStorage(n int64) error {
return fmt.Errorf("write error from buffer to temporary buffer: %s", err)
}
now := time.Now()
file, err := o.Storage.OpenFileForWriting(now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10))
filename := now.Format("2006/01/02") + "/data-" + now.Format("150405") + "-" + strconv.FormatUint(o.FlushOverflowSequence.Add(1), 10)
err = ensurePath(o.Storage, filename)
if err != nil {
return fmt.Errorf("ensure path error: %s", err)
}
file, err := o.Storage.OpenFileForWriting(filename)
if err != nil {
return fmt.Errorf("open file for writing error: %s", err)
}
_, err = io.Copy(file, tempBuf)
if err != nil {
return fmt.Errorf("file write error: %s", err)
}
logging.DebugLog(fmt.Errorf("wrote file: %s", filename))
return file.Close()
}

Expand Down Expand Up @@ -91,3 +100,17 @@ func (c *ConcurrentRWBuffer) Len() int {
func (c *ConcurrentRWBuffer) Cap() int {
return c.buffer.Cap()
}

func ensurePath(storage storage.Iface, filename string) error {
base := path.Dir(filename)
baseSplit := strings.Split(base, "/")
fullPath := ""
for _, v := range baseSplit {
fullPath = path.Join(fullPath, v)
err := storage.EnsurePath(fullPath)
if err != nil {
return err
}
}
return nil
}
16 changes: 11 additions & 5 deletions pkg/observability/buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ func TestIngestionMoreMessages(t *testing.T) {
t.Skip() // we can skip this for general unit testing
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o.Storage = storage
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
payload := IncomingData{
{
"date": 1720613813.197045,
Expand Down Expand Up @@ -134,8 +133,7 @@ func TestIngestionMoreMessages(t *testing.T) {
func BenchmarkIngest10000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o.Storage = storage
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
payload := IncomingData{
{
"date": 1720613813.197045,
Expand Down Expand Up @@ -170,7 +168,7 @@ func BenchmarkIngest10000000(b *testing.B) {
func BenchmarkIngest100000000(b *testing.B) {
totalMessagesToGenerate := 10000000 // 10,000,000
storage := &memorystorage.MockMemoryStorage{}
o := NewWithoutMonitor(MAX_BUFFER_SIZE)
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
o.Storage = storage
payload := IncomingData{
{
Expand Down Expand Up @@ -202,3 +200,11 @@ func BenchmarkIngest100000000(b *testing.B) {
}
}
}

func TestEnsurePath(t *testing.T) {
storage := &memorystorage.MockMemoryStorage{}
err := ensurePath(storage, "a/b/c/filename.txt")
if err != nil {
t.Fatalf("error: %s", err)
}
}
2 changes: 1 addition & 1 deletion pkg/observability/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package observability

const MAX_BUFFER_SIZE = 1024 * 1024 // 1 MB
const FLUSH_TIME_MAX_MINUTES = 5
const FLUSH_TIME_MAX_MINUTES = 1 // should have 5 as default at release

const TIMESTAMP_FORMAT = "2006-01-02T15:04:05"
9 changes: 8 additions & 1 deletion pkg/observability/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,17 @@ import (
)

func (o *Observability) getLogs(fromDate, endDate time.Time, pos int64, offset, maxLogLines int) (LogEntryResponse, error) {
logEntryResponse := LogEntryResponse{}
logEntryResponse := LogEntryResponse{
Enabled: true,
Environments: []string{"dev", "qa", "prod"},
}

logFiles := []string{}

if maxLogLines == 0 {
maxLogLines = 100
}

for d := fromDate; d.Before(endDate) || d.Equal(endDate); d = d.AddDate(0, 0, 1) {
fileList, err := o.Storage.ReadDir(d.Format("2006/01/02"))
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/observability/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"github.com/in4it/wireguard-server/pkg/storage"
)

func New(storage storage.Iface) *Observability {
o := NewWithoutMonitor(storage, MAX_BUFFER_SIZE)
func New(defaultStorage storage.Iface) *Observability {
o := NewWithoutMonitor(defaultStorage, MAX_BUFFER_SIZE)
go o.monitorBuffer()
return o
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/observability/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type ConcurrentRWBuffer struct {
}

type LogEntryResponse struct {
LogEntries []LogEntry `json:"logEntries"`
Enabled bool `json:"enabled"`
LogEntries []LogEntry `json:"logEntries"`
Environments []string `json:"environments"`
}

type LogEntry struct {
Expand Down

0 comments on commit f5a8bfc

Please sign in to comment.