Skip to content

Commit

Permalink
Improve observability; add sqlite retries to deal with sqlite_busy er…
Browse files Browse the repository at this point in the history
…rors (#352)

* Use the pragma `sqlite_busy` to add automatic retries on sqlite busy
error
* I was seeing high rates of sqlitebusy errors when deployed on K8s on
Azure and this was causing learning to happen much less frequently then
it should

* Clean up the prometheus metrics to support dashboards for monitoring
impact. The key metrics we want are
   * Number of accepted suggestions
   * Number of executed cells
   * Number of learned examples

* Cleanup up prometheus metrics so we can better monitor why learning
isn't happening; this was used to track down sqlite busy errors

* Support monitoring with Datadog
* With Datadog structured logging we need to use the field "level" for
severity or else it won't automatically parse that field
* Udpate the configuration to allow the user to control what fields are
used in the encoder
  • Loading branch information
jlewi authored Dec 4, 2024
1 parent 48cc643 commit a3a7611
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 89 deletions.
31 changes: 17 additions & 14 deletions app/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ require (
gonum.org/v1/gonum v0.15.0
google.golang.org/api v0.189.0
google.golang.org/grpc v1.64.1
google.golang.org/protobuf v1.34.2
google.golang.org/protobuf v1.35.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/apimachinery v0.27.3
k8s.io/client-go v1.5.2
Expand All @@ -81,8 +81,9 @@ require (
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.6.1 // indirect
github.com/bufbuild/connect-go v1.10.0 // indirect
github.com/bytedance/sonic v1.11.3 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chenzhuoyu/base64x v0.0.0-20230717121745-296ad89f973d // indirect
github.com/chenzhuoyu/iasm v0.9.1 // indirect
github.com/cli/go-gh v1.2.1 // indirect
Expand All @@ -100,6 +101,7 @@ require (
github.com/docker/docker v26.1.2+incompatible // indirect
github.com/docker/docker-credential-helpers v0.7.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/easyCZ/connect-go-prometheus v0.0.1 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
Expand Down Expand Up @@ -136,7 +138,7 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/kevinburke/ssh_config v1.2.0 // indirect
github.com/klauspost/compress v1.17.4 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand All @@ -147,23 +149,24 @@ require (
github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/ncruces/go-strftime v0.1.9 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
github.com/prometheus/client_golang v1.12.0 // indirect
github.com/prometheus/client_model v0.2.1-0.20210607210712-147c58e9608a // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/client_golang v1.20.5 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/replicate/replicate-go v0.21.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
Expand Down Expand Up @@ -207,12 +210,12 @@ require (
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.27.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/time v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20240722135656-d784300faade // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240722135656-d784300faade // indirect
Expand Down
37 changes: 37 additions & 0 deletions app/go.sum

Large diffs are not rendered by default.

24 changes: 24 additions & 0 deletions app/pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"strings"
"sync"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/jlewi/foyle/app/pkg/runme/ulid"

"github.com/jlewi/foyle/protos/go/foyle/v1alpha1/v1alpha1connect"
Expand Down Expand Up @@ -49,6 +52,19 @@ const (
temperature = 0.9
)

var (
executedCounter = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "cells_executed_total",
Help: "Total number of executed cells broken down by status"},
[]string{"status"},
)

acceptedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "cells_accepted_total",
Help: "Total number of suggested cells accepted broken down by type"},
)
)

// Agent is the agent.
type Agent struct {
v1alpha1.UnimplementedGenerateServiceServer
Expand Down Expand Up @@ -625,6 +641,14 @@ func (a *Agent) LogEvents(ctx context.Context, req *connect.Request[v1alpha1.Log
func() {
_, span := tp.Start(ctx, "LogEvent", trace.WithAttributes(attribute.String("eventType", event.Type.String()), attribute.String("contextId", event.ContextId), attribute.String("selectedCellId", event.SelectedId)))
defer span.End()

switch event.GetType() {
case v1alpha1.LogEventType_ACCEPTED:

acceptedCounter.Inc()
case v1alpha1.LogEventType_EXECUTE:
executedCounter.WithLabelValues(event.GetExecuteStatus().String()).Inc()
}
// N.B we can't use zap.Object to log the event because it contains runme protos which don't have the zap marshaler bindings.
log.Info("LogEvent", "eventId", event.GetEventId(), "eventType", event.Type, "contextId", event.ContextId, "selectedCellId", event.SelectedId, logs.ZapProto("event", event))
}()
Expand Down
2 changes: 1 addition & 1 deletion app/pkg/analyze/fsql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@ CREATE TABLE IF NOT EXISTS results (

-- The JSON serialization of the proto.
proto_json TEXT NOT NULL
);
);
17 changes: 12 additions & 5 deletions app/pkg/analyze/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"os"
"path/filepath"

"github.com/go-logr/zapr"
"go.uber.org/zap"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

Expand Down Expand Up @@ -69,6 +72,15 @@ func NewSessionsManager(db *sql.DB) (*SessionsManager, error) {
return nil, err
}

// Set busy_timeout using PRAGMA. This is to deal with frequent sqlite busy errors when deployed on
// Azure.
// This is in milliseconds
if _, err := db.Exec("PRAGMA busy_timeout = 10000;"); err != nil {
return nil, errors.Wrapf(err, "Failed to set busy timeout for the database")
}
log := zapr.NewLogger(zap.L())
log.Info("sqlite busy_timeout set", "timeout", 5000)

// Create the dbtx from the actual database
queries := fsql.New(db)

Expand Down Expand Up @@ -115,7 +127,6 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF

tx, err := db.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
// DO NOT COMMIT
sessCounter.WithLabelValues("failedstart").Inc()
return errors.Wrapf(err, "Failed to start transaction")
}
Expand All @@ -132,7 +143,6 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
if err != nil {
logDBErrors(ctx, err)
if err != sql.ErrNoRows {
// DO NOT COMMIT
sessCounter.WithLabelValues("failedget").Inc()
return errors.Wrapf(err, "Failed to get session with id %v", contextID)
}
Expand All @@ -144,7 +154,6 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
}
}

// DO NOT COMMIT
sessCounter.WithLabelValues("callupdatefunc").Inc()

if err := updateFunc(session); err != nil {
Expand Down Expand Up @@ -172,7 +181,6 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
NumGenerateTraces: newRow.NumGenerateTraces,
}

// DO NOT COMMIT
sessCounter.WithLabelValues("callupdatesession").Inc()
if err := queries.UpdateSession(ctx, update); err != nil {
logDBErrors(ctx, err)
Expand All @@ -199,7 +207,6 @@ func (db *SessionsManager) Update(ctx context.Context, contextID string, updateF
return err
}

// DO NOT COMMIT
sessCounter.WithLabelValues("done").Inc()
return nil
}
Expand Down
52 changes: 46 additions & 6 deletions app/pkg/application/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,27 @@ func (a *App) createCoreForConsole(paths []string) (zapcore.Core, error) {

// Use the keys used by cloud logging
// https://cloud.google.com/logging/docs/structured-logging
c.LevelKey = "severity"
c.TimeKey = "time"
c.MessageKey = "message"
logFields := a.Config.Logging.LogFields
if logFields == nil {
logFields = &config.LogFields{}
}
if logFields.Level != "" {
c.LevelKey = logFields.Level
} else {
c.LevelKey = "severity"
}

if logFields.Time != "" {
c.TimeKey = logFields.Time
} else {
c.TimeKey = "time"
}

if logFields.Message != "" {
c.MessageKey = logFields.Message
} else {
c.MessageKey = "message"
}

lvl := a.Config.GetLogLevel()
zapLvl := zap.NewAtomicLevel()
Expand Down Expand Up @@ -333,10 +351,32 @@ func (a *App) createJSONCoreLogger(paths []string) (zapcore.Core, error) {
c := zap.NewProductionEncoderConfig()
// Use the keys used by cloud logging
// https://cloud.google.com/logging/docs/structured-logging
c.LevelKey = "severity"
c.TimeKey = "time"
c.MessageKey = "message"
logFields := a.Config.Logging.LogFields
if logFields == nil {
logFields = &config.LogFields{}
}
if logFields.Level != "" {
c.LevelKey = logFields.Level
} else {
c.LevelKey = "severity"
}

if logFields.Time != "" {
c.TimeKey = logFields.Time
} else {
c.TimeKey = "time"
}

if logFields.Message != "" {
c.MessageKey = logFields.Message
} else {
c.MessageKey = "message"
}

// We attach the function key to the logs because that is useful for identifying the function that generated the log.
// N.B are logs processing depends on this field being present in the logs. This is one reason
// why we don't allow it to be customized to match the field expected by a logging backend like Datadog
// or Cloud Logging
c.FunctionKey = "function"

jsonEncoder := zapcore.NewJSONEncoder(c)
Expand Down
9 changes: 9 additions & 0 deletions app/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ type Logging struct {

// MaxDelaySeconds is the maximum delay in seconds to wait before processing the logs.
MaxDelaySeconds int `json:"maxDelaySeconds,omitempty" yaml:"maxDelaySeconds,omitempty"`

LogFields *LogFields `json:"logFields,omitempty" yaml:"logFields,omitempty"`
}

// LogFields is the fields to use when logging to structured logging
type LogFields struct {
Level string `json:"level,omitempty" yaml:"level,omitempty"`
Time string `json:"time,omitempty" yaml:"time,omitempty"`
Message string `json:"message,omitempty" yaml:"message,omitempty"`
}

type LogSink struct {
Expand Down
29 changes: 24 additions & 5 deletions app/pkg/learn/learner.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,32 @@ const (
var (
enqueuedCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "learner_enqueued_total",
Help: "Total number of enqueued blocks",
Help: "Total number of enqueued sessions for learning",
})

sessFiltered = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "learner_not_learnable",
Help: "Number of sessions that aren't learnable",
},
[]string{"status"},
)

sessProcessed = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "learner_sessions_processed",
Help: "Number of sessions processed by the learner",
},
[]string{"status"},
)

learnedCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "learned_examples_total",
Help: "Number of examples learned",
},
[]string{"status"},
)
)

// Learner handles the learn loop to learn from past mistakes.
Expand Down Expand Up @@ -298,7 +314,10 @@ func (l *Learner) Reconcile(ctx context.Context, id string) error {

if len(writeErrors.Causes) > 0 {
writeErrors.Final = errors.New("Not all examples could be successfully reconciled")
learnedCounter.WithLabelValues("error").Inc()
return writeErrors
} else {
learnedCounter.WithLabelValues("success").Inc()
}
return nil
}
Expand Down Expand Up @@ -381,26 +400,26 @@ func isLearnable(session *logspb.Session) bool {

if execEvent == nil {
// Since the cell wasn't successfully executed we don't learn from it
sessProcessed.WithLabelValues("noexec").Inc()
sessFiltered.WithLabelValues("noexec").Inc()
return false
}

log := zapr.NewLogger(zap.L())
if session.GetFullContext() == nil {
sessProcessed.WithLabelValues("nocontext").Inc()
sessFiltered.WithLabelValues("nocontext").Inc()
log.Error(errors.New("Session missing fullcontext"), "contextId", session.GetContextId())
return false
}

if session.GetFullContext().GetNotebook() == nil {
sessProcessed.WithLabelValues("nonotebook").Inc()
sessFiltered.WithLabelValues("nonotebook").Inc()
log.Error(errors.New("Session missing notebook"), "contextId", session.GetContextId())
return false
}

if session.GetFullContext().GetSelected() == 0 {
// If its the first cell we can't learn from it because what would we use as context to predict it?
sessProcessed.WithLabelValues("firstcell").Inc()
sessFiltered.WithLabelValues("firstcell").Inc()
return false
}
return true
Expand Down
Loading

0 comments on commit a3a7611

Please sign in to comment.