Skip to content

Commit

Permalink
Add event stream
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Sep 6, 2023
1 parent 4eaa0cb commit 8438d1a
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 44 deletions.
57 changes: 55 additions & 2 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1735,6 +1735,19 @@ func (db *DB) CommitWAL(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Perform full checksum verification, if set. For testing only.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, walFile); err != nil {
Expand Down Expand Up @@ -2109,6 +2122,19 @@ func (db *DB) CommitJournal(ctx context.Context, mode JournalMode) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

// Calculate checksum for entire database.
if db.store.StrictVerify {
if chksum, err := db.onDiskChecksum(dbFile, nil); err != nil {
Expand Down Expand Up @@ -2237,6 +2263,19 @@ func (db *DB) Drop(ctx context.Context) (err error) {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: enc.Header().PageSize,
Commit: enc.Header().Commit,
Timestamp: time.UnixMilli(enc.Header().Timestamp).UTC(),
},
})

return nil
}

Expand Down Expand Up @@ -2521,10 +2560,11 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
}

// Update transaction for database.
if err := db.setPos(ltx.Pos{
pos := ltx.Pos{
TXID: dec.Header().MaxTXID,
PostApplyChecksum: dec.Trailer().PostApplyChecksum,
}, dec.Header().Timestamp); err != nil {
}
if err := db.setPos(pos, dec.Header().Timestamp); err != nil {
return fmt.Errorf("set pos: %w", err)
}

Expand All @@ -2543,6 +2583,19 @@ func (db *DB) ApplyLTXNoLock(ctx context.Context, path string) error {
// Notify store of database change.
db.store.MarkDirty(db.name)

// Notify event stream subscribers of new transaction.
db.store.NotifyEvent(Event{
Type: EventTypeTx,
DB: db.name,
Data: TxEventData{
TXID: pos.TXID,
PostApplyChecksum: pos.PostApplyChecksum,
PageSize: dec.Header().PageSize,
Commit: dec.Header().Commit,
Timestamp: time.UnixMilli(dec.Header().Timestamp).UTC(),
},
})

// Calculate latency since LTX file was written.
latency := float64(time.Now().UnixMilli()-dec.Header().Timestamp) / 1000
dbLatencySecondsMetricVec.WithLabelValues(db.name).Set(latency)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ require (
// replace github.com/superfly/litefs-go => ../litefs-go
// replace github.com/mattn/go-sqlite3 => ../../mattn/go-sqlite3
// replace github.com/pierrec/lz4/v4 => ../../pierrec/lz4
// replace github.com/superfly/ltx => ../ltx
replace github.com/superfly/ltx => ../ltx
29 changes: 28 additions & 1 deletion http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,14 @@ func (s *Server) serveHTTP(w http.ResponseWriter, r *http.Request) {
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

case "/events":
switch r.Method {
case http.MethodGet:
s.handleGetEvents(w, r)
default:
Error(w, r, fmt.Errorf("method not allowed"), http.StatusMethodNotAllowed)
}

default:
http.NotFound(w, r)
}
Expand Down Expand Up @@ -506,7 +514,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
defer serverStreamCountMetric.Dec()

// Subscribe to store changes
subscription := s.store.Subscribe(id)
subscription := s.store.SubscribeChangeSet(id)
defer func() { _ = subscription.Close() }()

// Read in pos map.
Expand Down Expand Up @@ -745,6 +753,25 @@ func (s *Server) streamLTXSnapshot(ctx context.Context, w http.ResponseWriter, d
return ltx.Pos{TXID: header.MaxTXID, PostApplyChecksum: trailer.PostApplyChecksum}, nil
}

func (s *Server) handleGetEvents(w http.ResponseWriter, r *http.Request) {
subscription := s.store.SubscribeEvents()
defer func() { subscription.Stop() }()

enc := json.NewEncoder(w)
for {
select {
case <-r.Context().Done():
return
case event := <-subscription.C():
if err := enc.Encode(event); err != nil {
log.Printf("http: %s %s: event stream error: %s", r.Method, r.URL.Path, err)
return
}
w.(http.Flusher).Flush()
}
}
}

func Error(w http.ResponseWriter, r *http.Request, err error, code int) {
log.Printf("http: %s %s: error: %s", r.Method, r.URL.Path, err)
http.Error(w, err.Error(), code)
Expand Down
Loading

0 comments on commit 8438d1a

Please sign in to comment.