Skip to content

Commit

Permalink
add graceful shutdown for "docker stop"
Browse files Browse the repository at this point in the history
  • Loading branch information
lezhnev74 committed Oct 26, 2024
1 parent db787c0 commit 607d685
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 65 deletions.
6 changes: 6 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ func PrintMem(db *sql.DB) (rss uint64) {
return m.Sys
}

var EnableLogging bool

func Log(message string) {
fmt.Println(message)
}

//func CleanMem() {
// runtime.GC()
// debug.FreeOSMemory()
Expand Down
2 changes: 1 addition & 1 deletion db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestClearUp(t *testing.T) {
_db, storageRoot := test_util.PrepareTestDb(t)
defer os.RemoveAll(storageRoot)

ii, err := inverted_index_2.NewInvertedIndex(storageRoot)
ii, err := inverted_index_2.NewInvertedIndex(storageRoot, true)
require.NoError(t, err)

_, err = _db.Exec(`INSERT INTO files VALUES (1, 'path1'), (2, 'path2')`)
Expand Down
10 changes: 9 additions & 1 deletion db/results.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db

import (
"context"
"database/sql"
"errors"
go_iterators "github.com/lezhnev74/go-iterators"
Expand Down Expand Up @@ -96,7 +97,7 @@ func (q *QueryDB) Flush() {
}

// CheckinQuery returns queryId instantly while ingesting the Messages.
func (q *QueryDB) CheckinQuery(text string, min, max *time.Time, messages go_iterators.Iterator[Message]) (query Query, err error) {
func (q *QueryDB) CheckinQuery(ctx context.Context, text string, min, max *time.Time, messages go_iterators.Iterator[Message]) (query Query, err error) {
queryId, err := q.ReserveQueryId()
if err != nil {
err = xerrors.Errorf("query checkin: %w", err)
Expand Down Expand Up @@ -132,6 +133,13 @@ func (q *QueryDB) CheckinQuery(text string, min, max *time.Time, messages go_ite
n int
)
for {
// Cancellation test:
select {
case <-ctx.Done():
return
default:
}

m, err = messages.Next()
if errors.Is(err, go_iterators.EmptyIterator) {
break
Expand Down
3 changes: 2 additions & 1 deletion db/results_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package db_test

import (
"context"
go_iterators "github.com/lezhnev74/go-iterators"
"github.com/stretchr/testify/require"
"heaplog_2024/common"
Expand Down Expand Up @@ -63,7 +64,7 @@ func TestResultsRead(t *testing.T) {
return
}, func() error { return nil })

r, err := dbContainer.QueryDB.CheckinQuery("sample", &t0, &t2, it)
r, err := dbContainer.QueryDB.CheckinQuery(context.Background(), "sample", &t0, &t2, it)
require.NoError(t, err)

require.False(t, r.Finished)
Expand Down
22 changes: 11 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,27 @@ require (
github.com/gofiber/fiber/v2 v2.52.5
github.com/gofiber/template/html/v2 v2.1.2
github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381
github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4
github.com/marcboeker/go-duckdb v1.8.0
github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656
github.com/marcboeker/go-duckdb v1.8.2
github.com/pkg/errors v0.9.1
github.com/prometheus/procfs v0.15.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
github.com/urfave/cli/v2 v2.27.4
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0
github.com/urfave/cli/v2 v2.27.5
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/sync v0.8.0
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/RoaringBitmap/roaring v1.9.4 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/apache/arrow/go/v17 v17.0.0 // indirect
github.com/bits-and-blooms/bitset v1.14.3 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/blevesearch/vellum v1.0.10 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
Expand All @@ -40,7 +40,7 @@ require (
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.8 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/magiconair/properties v1.8.7 // indirect
Expand All @@ -63,15 +63,15 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.55.0 // indirect
github.com/valyala/fasthttp v1.56.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/text v0.18.0 // indirect
golang.org/x/tools v0.25.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)
52 changes: 24 additions & 28 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M=
github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY=
github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA=
github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA=
github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ=
github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw=
github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54=
Expand All @@ -15,8 +15,8 @@ github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCD
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxyMIPI=
github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k=
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc=
github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -49,8 +49,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM=
github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
Expand All @@ -61,18 +61,12 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ=
github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI=
github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381 h1:PfCSf5r9I459jEGrMGpQPel8Vy+kqSC9S8FXNBviaRY=
github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381/go.mod h1:lK0tbJcxEOV1dc0p+s3gaYYZI00mwvMcw4FBe01x6XY=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240905073239-3207e86de884 h1:FCNbD2r3+4FCfE2wNQNwCNJ/GxCmiH+f8n/4qc3FdWI=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240905073239-3207e86de884/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240914171448-2f2d187fde6b h1:mx3Y1T+bCYE1XRTyybCnsFGepUJv7PKurQvEvKjB0Do=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240914171448-2f2d187fde6b/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4 h1:WU9annvsg1LQB71ECXgztI3I50WS36q3Csf9WrH8v0I=
github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk=
github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656 h1:H2HX3AkVirDqFW9wEXfE4oXYHEaeNaEFyq3Mlwm80Lk=
github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
github.com/marcboeker/go-duckdb v1.7.1 h1:m9/nKfP7cG9AptcQ95R1vfacRuhtrZE5pZF8BPUb/Iw=
github.com/marcboeker/go-duckdb v1.7.1/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is=
github.com/marcboeker/go-duckdb v1.8.0 h1:iOWv1wTL0JIMqpyns6hCf5XJJI4fY6lmJNk+itx5RRo=
github.com/marcboeker/go-duckdb v1.8.0/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is=
github.com/marcboeker/go-duckdb v1.8.2 h1:gHcFjt+HcPSpDVjPSzwof+He12RS+KZPwxcfoVP8Yx4=
github.com/marcboeker/go-duckdb v1.8.2/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down Expand Up @@ -127,24 +121,26 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8=
github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ=
github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w=
github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8=
github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM=
github.com/valyala/fasthttp v1.56.0 h1:bEZdJev/6LCBlpdORfrLu/WOZXXxvrUQSiyniuaoW8U=
github.com/valyala/fasthttp v1.56.0/go.mod h1:sReBt3XZVnudxuLOx4J/fMrJVorWRiWY2koQKgABiVI=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4=
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM=
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk=
golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY=
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8=
golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0=
golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
Expand All @@ -153,12 +149,12 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34=
golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224=
golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE=
golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ=
golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY=
golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90=
gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ=
Expand Down
14 changes: 12 additions & 2 deletions ingest/ingest.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ingest

import (
"context"
"database/sql"
"errors"
"github.com/lezhnev74/inverted_index_2"
Expand All @@ -17,8 +18,6 @@ import (
"unsafe"
)

var EmptySegment = errors.New("no messages begins in the segment")

type Ingest struct {
// findMessages extracts message layouts (boundaries) from the file
findMessages func(file string, locations []common.Location) ([]scanner.MessageLayout, error)
Expand All @@ -28,6 +27,7 @@ type Ingest struct {
ii *inverted_index_2.InvertedIndex
segmentSize uint64
concurrency int // the level of concurrency in ingestion
ctx context.Context
}

type ScannedTokenizedMessage struct {
Expand All @@ -38,6 +38,7 @@ type ScannedTokenizedMessage struct {
}

func NewIngest(
ctx context.Context,
scan func(file string, locations []common.Location) ([]scanner.MessageLayout, error),
parseTime func([]byte) (time.Time, error),
tokenize func([]byte) [][]byte,
Expand All @@ -54,6 +55,7 @@ func NewIngest(
ii: ii,
segmentSize: segmentSize,
concurrency: concurrency,
ctx: ctx,
}
return &ing
}
Expand Down Expand Up @@ -317,7 +319,15 @@ func (ing *Ingest) readMessagesInStream(name string, stream io.ReaderAt, message

buf := make([]byte, 0)
var err error
loop:
for _, layout := range messageLayouts {

select {
case <-ing.ctx.Done():
break loop // stop
default:
}

messageLen := layout.To - layout.From
if messageLen > maxIndexableSize {
log.Printf("big message %dMiB at %s:%d", messageLen/1024/1024, name, layout.From)
Expand Down
17 changes: 15 additions & 2 deletions search/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package search

import (
"bytes"
"context"
"errors"
"fmt"
go_iterators "github.com/lezhnev74/go-iterators"
Expand All @@ -21,15 +22,17 @@ type Search struct {
// dateFormat is GO time format for message's dates,
// extract message date upon scanning heap files
dateFormat string
ctx context.Context
}

type SearchMatcher func(m db.Message, body []byte) bool

func NewSearch(db *db.DbContainer, ii *inverted_index_2.InvertedIndex, dateFormat string) *Search {
func NewSearch(ctx context.Context, db *db.DbContainer, ii *inverted_index_2.InvertedIndex, dateFormat string) *Search {
return &Search{
db: db,
ii: ii,
dateFormat: dateFormat,
ctx: ctx,
}
}

Expand Down Expand Up @@ -113,8 +116,18 @@ func (s *Search) Search(
panic(err)
}

rangeSegments:
for i, segment := range segments {
freeList <- true // get the slot

select {
case <-s.ctx.Done():
// Cancellation test: after another segment is checked we test the context,
// if cancelled, stop processing.
break rangeSegments
case freeList <- true:
// get the slot and process the segment
}

go func() {
defer func() {
<-freeList // release the slot
Expand Down
5 changes: 3 additions & 2 deletions search/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package search_test

import (
"bytes"
"context"
"errors"
"fmt"
go_iterators "github.com/lezhnev74/go-iterators"
Expand Down Expand Up @@ -38,7 +39,7 @@ func TestSearchResults(t *testing.T) {
tokenize := func(s []byte) [][]byte {
return tokenizer.Tokenize(s, 4, 8)
}
s := search.NewSearch(_db, ii, dateFormat)
s := search.NewSearch(context.Background(), _db, ii, dateFormat)

type test struct {
query string
Expand Down Expand Up @@ -225,7 +226,7 @@ multile

// Ingest data before testing search
ing, ii := test_util.PrepareTestIngest(t, 50, storageRoot, _db)
s := search.NewSearch(_db, ii, "2006-01-02T15:04:05.000000-07:00")
s := search.NewSearch(context.Background(), _db, ii, "2006-01-02T15:04:05.000000-07:00")

_, _, err := _db.CheckInFiles([]string{file1, file2})
require.NoError(t, err)
Expand Down
6 changes: 3 additions & 3 deletions test_util/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func buildDependencies(t *testing.T, segmentSize uint64, storageRoot string) (
tok := func(in []byte) [][]byte {
return tokenizer.Tokenize(in, 4, 8)
}
ii, err := inverted_index_2.NewInvertedIndex(storageRoot)
ii, err := inverted_index_2.NewInvertedIndex(storageRoot, true)
require.NoError(t, err)

s := func(file string, locations []common.Location) ([]scanner.MessageLayout, error) {
Expand All @@ -196,9 +196,9 @@ func buildDependencies(t *testing.T, segmentSize uint64, storageRoot string) (
return time.Parse(timeFormat, string(b))
}

ing := ingest.NewIngest(s, pd, tok, dbContainer, ii, segmentSize, 1)
ing := ingest.NewIngest(context.Background(), s, pd, tok, dbContainer, ii, segmentSize, 1)

_search := search.NewSearch(dbContainer, ii, timeFormat)
_search := search.NewSearch(context.Background(), dbContainer, ii, timeFormat)

return _search, ing, ii, dbContainer, tok
}
Expand Down
Loading

0 comments on commit 607d685

Please sign in to comment.