Skip to content

Commit

Permalink
Fix: bulk push freeze
Browse files Browse the repository at this point in the history
  • Loading branch information
ubombi committed Jun 21, 2022
1 parent 0daea50 commit 762925d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
7 changes: 6 additions & 1 deletion postgres/pgcopy/parser.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
// package pgcopy - parser for PostgreSQL `COPY TO ... WITH BINARY` command result
// FileFormat: https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.5
//
// FileFormat Docs: https://www.postgresql.org/docs/14/sql-copy.html#id-1.9.3.55.9.4.5
//
// If docs are confusing, check sources instead:
// metadata and headers: https://github.com/postgres/postgres/blob/26b3455/src/backend/commands/copyto.c#L824-L834
// row/tuple implementation: https://github.com/postgres/postgres/blob/26b3455/src/backend/commands/copyto.c#L931-L1000
package pgcopy

import (
Expand Down
27 changes: 15 additions & 12 deletions search/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,8 @@ func (e *BulkElastic) Start(wg *sync.WaitGroup, ctx context.Context) {
}

e.lastReqAt = time.Now()
e.idleTimer.Reset(e.idle)
e.throttleTimer.Reset(e.throttle)
e.idleTimer.Reset(e.idle)
e.full = false

if e.shutdown {
Expand Down Expand Up @@ -226,8 +226,13 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error {
defer e.cond.L.Unlock()
// TODO: update LSN to latest server position,
// even without operations on published tables

// Fast update LSN position
if len(buffers) == 0 {
e.inqueue = pos // Will update LSN after flushing current buffer
if e.buf.Len() == 0 {
e.stream.CommitPosition(pos)
}
return nil
}

Expand All @@ -239,7 +244,7 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error {
// buffer is full, wait for changes
for e.buf.Cap()-e.buf.Len() < size {
e.full = true
e.cond.Broadcast()
e.cond.Broadcast() // unlock push and wait for it to finish
e.cond.Wait()
}
e.full = false
Expand All @@ -249,33 +254,31 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error {
e.buf.WriteByte('\n')
}
e.inqueue = pos
e.cond.Broadcast() // try to unlock push, in case if timers already expired

return nil
}

// exec should be called with mutex locked.
func (e *BulkElastic) exec() error {
// Bulk request without data will return 400 error. So, in case if there was no writes and LSN cursor hasn't changed, we skip this.
// However, during cold start, wee need to push a lot of data, without LSN positions.
// TODO: Improve bulk handling and buffers
committed := e.stream.Position()
if e.buf.Len() == 0 || (e.inqueue != pglogrepl.LSN(0) && committed == e.inqueue) {
e.stream.CommitPosition(e.inqueue)
return nil // nothing to push
size := e.buf.Len()
if size == 0 {
return nil // nothing to push; possible during shutdown
}

// Wrapped into separate reader to make retry possible.
// Since buffer would read from last position, even in case of error
body := bytes.NewReader(e.buf.Bytes())

size := e.buf.Len()
if err := e.client.Bulk(body); err != nil {
// e.logger.Fatal("Failed ES Request", zap.Any("body", json.RawMessage(e.buf.Bytes())))
return errors.Wrap(err, "commit bulk request")
}

e.stream.CommitPosition(e.inqueue)
metricMessageSize.Add(float64(e.buf.Len()))
if e.inqueue != pglogrepl.LSN(0) { // do not commit zero positions during reindexing
e.stream.CommitPosition(e.inqueue)
}
metricMessageSize.Add(float64(size))
e.buf.Reset()
e.logger.Info("pushed bulk request", zap.Int("size", size), zap.String("LSN", e.inqueue.String()))
return nil
Expand Down

0 comments on commit 762925d

Please sign in to comment.