diff --git a/postgres/pgcopy/parser.go b/postgres/pgcopy/parser.go index c78bc93..98e549d 100644 --- a/postgres/pgcopy/parser.go +++ b/postgres/pgcopy/parser.go @@ -1,5 +1,10 @@ // package pgcopy - parser for PostgreSQL `COPY TO ... WITH BINARY` command result -// FileFormat: https://www.postgresql.org/docs/current/sql-copy.html#id-1.9.3.55.9.4.5 +// +// FileFormat Docs: https://www.postgresql.org/docs/14/sql-copy.html#id-1.9.3.55.9.4.5 +// +// If docs are confusing, check sources instead: +// metadata and headers: https://github.com/postgres/postgres/blob/26b3455/src/backend/commands/copyto.c#L824-L834 +// row/tuple implementation: https://github.com/postgres/postgres/blob/26b3455/src/backend/commands/copyto.c#L931-L1000 package pgcopy import ( diff --git a/search/bulk.go b/search/bulk.go index 009110b..21c2dc2 100644 --- a/search/bulk.go +++ b/search/bulk.go @@ -176,8 +176,8 @@ func (e *BulkElastic) Start(wg *sync.WaitGroup, ctx context.Context) { } e.lastReqAt = time.Now() - e.idleTimer.Reset(e.idle) e.throttleTimer.Reset(e.throttle) + e.idleTimer.Reset(e.idle) e.full = false if e.shutdown { @@ -226,8 +226,13 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error { defer e.cond.L.Unlock() // TODO: update LSN to latest server position, // even without operations on published tables + + // Fast update LSN position if len(buffers) == 0 { e.inqueue = pos // Will update LSN after flushing current buffer + if e.buf.Len() == 0 { + e.stream.CommitPosition(pos) + } return nil } @@ -239,7 +244,7 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error { // buffer is full, wait for changes for e.buf.Cap()-e.buf.Len() < size { e.full = true - e.cond.Broadcast() + e.cond.Broadcast() // unlock push and wait for it to finish e.cond.Wait() } e.full = false @@ -249,33 +254,31 @@ func (e *BulkElastic) Add(pos pglogrepl.LSN, buffers ...[]byte) error { e.buf.WriteByte('\n') } e.inqueue = pos + e.cond.Broadcast() // try to unlock push, in case if timers already expired return nil } // exec should be called with mutex locked. func (e *BulkElastic) exec() error { - // Bulk request without data will return 400 error. So, in case if there was no writes and LSN cursor hasn't changed, we skip this. - // However, during cold start, wee need to push a lot of data, without LSN positions. - // TODO: Improve bulk handling and buffers - committed := e.stream.Position() - if e.buf.Len() == 0 || (e.inqueue != pglogrepl.LSN(0) && committed == e.inqueue) { - e.stream.CommitPosition(e.inqueue) - return nil // nothing to push + size := e.buf.Len() + if size == 0 { + return nil // nothing to push; possible during shutdown } // Wrapped into separate reader to make retry possible. // Since buffer would read from last position, even in case of error body := bytes.NewReader(e.buf.Bytes()) - size := e.buf.Len() if err := e.client.Bulk(body); err != nil { // e.logger.Fatal("Failed ES Request", zap.Any("body", json.RawMessage(e.buf.Bytes()))) return errors.Wrap(err, "commit bulk request") } - e.stream.CommitPosition(e.inqueue) - metricMessageSize.Add(float64(e.buf.Len())) + if e.inqueue != pglogrepl.LSN(0) { // do not commit zero positions during reindexing + e.stream.CommitPosition(e.inqueue) + } + metricMessageSize.Add(float64(size)) e.buf.Reset() e.logger.Info("pushed bulk request", zap.Int("size", size), zap.String("LSN", e.inqueue.String())) return nil