From 607d685fa27ae32adfe7d50098f7978e0b7d2751 Mon Sep 17 00:00:00 2001 From: lezhnev Date: Sat, 26 Oct 2024 16:01:16 +0500 Subject: [PATCH] add graceful shutdown for "docker stop" --- common/util.go | 6 +++++ db/db_test.go | 2 +- db/results.go | 10 +++++++- db/results_test.go | 3 ++- go.mod | 22 +++++++++--------- go.sum | 52 +++++++++++++++++++----------------------- ingest/ingest.go | 14 ++++++++++-- search/search.go | 17 ++++++++++++-- search/search_test.go | 5 ++-- test_util/e2e_test.go | 6 ++--- test_util/factories.go | 4 ++-- ui/app.go | 14 ++++++++---- ui/config.go | 6 ++--- ui/console.go | 39 +++++++++++++++++++++++++++---- 14 files changed, 135 insertions(+), 65 deletions(-) diff --git a/common/util.go b/common/util.go index def2b1b..e10dcaa 100644 --- a/common/util.go +++ b/common/util.go @@ -122,6 +122,12 @@ func PrintMem(db *sql.DB) (rss uint64) { return m.Sys } +var EnableLogging bool + +func Log(message string) { + fmt.Println(message) +} + //func CleanMem() { // runtime.GC() // debug.FreeOSMemory() diff --git a/db/db_test.go b/db/db_test.go index 256b5a3..4e0fd5b 100644 --- a/db/db_test.go +++ b/db/db_test.go @@ -15,7 +15,7 @@ func TestClearUp(t *testing.T) { _db, storageRoot := test_util.PrepareTestDb(t) defer os.RemoveAll(storageRoot) - ii, err := inverted_index_2.NewInvertedIndex(storageRoot) + ii, err := inverted_index_2.NewInvertedIndex(storageRoot, true) require.NoError(t, err) _, err = _db.Exec(`INSERT INTO files VALUES (1, 'path1'), (2, 'path2')`) diff --git a/db/results.go b/db/results.go index 35c8ea0..b272007 100644 --- a/db/results.go +++ b/db/results.go @@ -1,6 +1,7 @@ package db import ( + "context" "database/sql" "errors" go_iterators "github.com/lezhnev74/go-iterators" @@ -96,7 +97,7 @@ func (q *QueryDB) Flush() { } // CheckinQuery returns queryId instantly while ingesting the Messages. -func (q *QueryDB) CheckinQuery(text string, min, max *time.Time, messages go_iterators.Iterator[Message]) (query Query, err error) { +func (q *QueryDB) CheckinQuery(ctx context.Context, text string, min, max *time.Time, messages go_iterators.Iterator[Message]) (query Query, err error) { queryId, err := q.ReserveQueryId() if err != nil { err = xerrors.Errorf("query checkin: %w", err) @@ -132,6 +133,13 @@ func (q *QueryDB) CheckinQuery(text string, min, max *time.Time, messages go_ite n int ) for { + // Cancellation test: + select { + case <-ctx.Done(): + return + default: + } + m, err = messages.Next() if errors.Is(err, go_iterators.EmptyIterator) { break diff --git a/db/results_test.go b/db/results_test.go index b1ca68b..388c9ba 100644 --- a/db/results_test.go +++ b/db/results_test.go @@ -1,6 +1,7 @@ package db_test import ( + "context" go_iterators "github.com/lezhnev74/go-iterators" "github.com/stretchr/testify/require" "heaplog_2024/common" @@ -63,7 +64,7 @@ func TestResultsRead(t *testing.T) { return }, func() error { return nil }) - r, err := dbContainer.QueryDB.CheckinQuery("sample", &t0, &t2, it) + r, err := dbContainer.QueryDB.CheckinQuery(context.Background(), "sample", &t0, &t2, it) require.NoError(t, err) require.False(t, r.Finished) diff --git a/go.mod b/go.mod index 3995649..59c8360 100644 --- a/go.mod +++ b/go.mod @@ -9,14 +9,14 @@ require ( github.com/gofiber/fiber/v2 v2.52.5 github.com/gofiber/template/html/v2 v2.1.2 github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381 - github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4 - github.com/marcboeker/go-duckdb v1.8.0 + github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656 + github.com/marcboeker/go-duckdb v1.8.2 github.com/pkg/errors v0.9.1 github.com/prometheus/procfs v0.15.1 github.com/spf13/viper v1.19.0 github.com/stretchr/testify v1.9.0 - github.com/urfave/cli/v2 v2.27.4 - golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 + github.com/urfave/cli/v2 v2.27.5 + golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c golang.org/x/sync v0.8.0 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da gopkg.in/yaml.v3 v3.0.1 @@ -24,12 +24,12 @@ require ( require ( github.com/RoaringBitmap/roaring v1.9.4 // indirect - github.com/andybalholm/brotli v1.1.0 // indirect + github.com/andybalholm/brotli v1.1.1 // indirect github.com/apache/arrow/go/v17 v17.0.0 // indirect github.com/bits-and-blooms/bitset v1.14.3 // indirect github.com/blevesearch/mmap-go v1.0.4 // indirect github.com/blevesearch/vellum v1.0.10 // indirect - github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect + github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect github.com/go-playground/locales v0.14.1 // indirect @@ -40,7 +40,7 @@ require ( github.com/google/flatbuffers v24.3.25+incompatible // indirect github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/klauspost/compress v1.17.9 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.8 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -63,15 +63,15 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.55.0 // indirect + github.com/valyala/fasthttp v1.56.0 // indirect github.com/valyala/tcplisten v1.0.0 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/sys v0.25.0 // indirect - golang.org/x/text v0.18.0 // indirect - golang.org/x/tools v0.25.0 // indirect + golang.org/x/sys v0.26.0 // indirect + golang.org/x/text v0.19.0 // indirect + golang.org/x/tools v0.26.0 // indirect gopkg.in/go-playground/assert.v1 v1.2.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) diff --git a/go.sum b/go.sum index 7c7b2ac..376a443 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/andybalholm/brotli v1.1.0 h1:eLKJA0d02Lf0mVpIDgYnqXcUn0GqVmEFny3VuID1U3M= -github.com/andybalholm/brotli v1.1.0/go.mod h1:sms7XGricyQI9K10gOSf56VKKWS4oLer58Q+mhRPtnY= +github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= +github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/apache/arrow/go/v17 v17.0.0 h1:RRR2bdqKcdbss9Gxy2NS/hK8i4LDMh23L6BbkN5+F54= @@ -15,8 +15,8 @@ github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCD github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxyMIPI= github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k= -github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4= -github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= +github.com/cpuguy83/go-md2man/v2 v2.0.5 h1:ZtcqGrnekaHpVLArFSe4HK5DoKx1T0rq2DwVB0alcyc= +github.com/cpuguy83/go-md2man/v2 v2.0.5/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= @@ -49,8 +49,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= -github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.2.8 h1:+StwCXwm9PdpiEkPyzBXIy+M9KUb4ODm0Zarf1kS5BM= github.com/klauspost/cpuid/v2 v2.2.8/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= @@ -61,18 +61,12 @@ github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381 h1:PfCSf5r9I459jEGrMGpQPel8Vy+kqSC9S8FXNBviaRY= github.com/lezhnev74/go-iterators v0.0.0-20240902070734-4c1f359dc381/go.mod h1:lK0tbJcxEOV1dc0p+s3gaYYZI00mwvMcw4FBe01x6XY= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240905073239-3207e86de884 h1:FCNbD2r3+4FCfE2wNQNwCNJ/GxCmiH+f8n/4qc3FdWI= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240905073239-3207e86de884/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240914171448-2f2d187fde6b h1:mx3Y1T+bCYE1XRTyybCnsFGepUJv7PKurQvEvKjB0Do= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240914171448-2f2d187fde6b/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4 h1:WU9annvsg1LQB71ECXgztI3I50WS36q3Csf9WrH8v0I= -github.com/lezhnev74/inverted_index_2 v0.0.0-20240915090532-e9822aa2e9a4/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk= +github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656 h1:H2HX3AkVirDqFW9wEXfE4oXYHEaeNaEFyq3Mlwm80Lk= +github.com/lezhnev74/inverted_index_2 v0.0.0-20241025145959-abaf487ff656/go.mod h1:klfQynw1TsDbFokToqzh9hDSqIlzPbxX365VKzw16Pk= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0= -github.com/marcboeker/go-duckdb v1.7.1 h1:m9/nKfP7cG9AptcQ95R1vfacRuhtrZE5pZF8BPUb/Iw= -github.com/marcboeker/go-duckdb v1.7.1/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is= -github.com/marcboeker/go-duckdb v1.8.0 h1:iOWv1wTL0JIMqpyns6hCf5XJJI4fY6lmJNk+itx5RRo= -github.com/marcboeker/go-duckdb v1.8.0/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is= +github.com/marcboeker/go-duckdb v1.8.2 h1:gHcFjt+HcPSpDVjPSzwof+He12RS+KZPwxcfoVP8Yx4= +github.com/marcboeker/go-duckdb v1.8.2/go.mod h1:2oV8BZv88S16TKGKM+Lwd0g7DX84x0jMxjTInThC8Is= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= @@ -127,24 +121,26 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= -github.com/urfave/cli/v2 v2.27.4 h1:o1owoI+02Eb+K107p27wEX9Bb8eqIoZCfLXloLUSWJ8= -github.com/urfave/cli/v2 v2.27.4/go.mod h1:m4QzxcD2qpra4z7WhzEGn74WZLViBnMpb1ToCAKdGRQ= +github.com/urfave/cli/v2 v2.27.5 h1:WoHEJLdsXr6dDWoJgMq/CboDmyY/8HMMH1fTECbih+w= +github.com/urfave/cli/v2 v2.27.5/go.mod h1:3Sevf16NykTbInEnD0yKkjDAeZDS0A6bzhBH5hrMvTQ= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.55.0 h1:Zkefzgt6a7+bVKHnu/YaYSOPfNYNisSVBo/unVCf8k8= -github.com/valyala/fasthttp v1.55.0/go.mod h1:NkY9JtkrpPKmgwV3HTaS2HWaJss9RSIsRVfcxxoHiOM= +github.com/valyala/fasthttp v1.56.0 h1:bEZdJev/6LCBlpdORfrLu/WOZXXxvrUQSiyniuaoW8U= +github.com/valyala/fasthttp v1.56.0/go.mod h1:sReBt3XZVnudxuLOx4J/fMrJVorWRiWY2koQKgABiVI= github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 h1:gEOO8jv9F4OT7lGCjxCBTO/36wtF6j2nSip77qHd4x4= github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1/go.mod h1:Ohn+xnUBiLI6FVj/9LpzZWtj1/D6lUovWYBkxHVV3aM= +github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU= +github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= -golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c h1:7dEasQXItcW1xKJ2+gg5VOiBnqWrJc+rq0DPKyvvdbY= +golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c/go.mod h1:NQtJDoLvd6faHhE7m4T/1IY708gDefGGjR/iUW8yQQ8= golang.org/x/mod v0.21.0 h1:vvrHzRwRfVKSiLrG+d4FMl/Qi4ukBCE6kZlTUkDYRT0= golang.org/x/mod v0.21.0/go.mod h1:6SkKJ3Xj0I0BrPOZoBy3bdMptDDU9oJrpohJ3eWZ1fY= golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= @@ -153,12 +149,12 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.25.0 h1:r+8e+loiHxRqhXVl6ML1nO3l1+oFoWbnlu2Ehimmi34= -golang.org/x/sys v0.25.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/tools v0.25.0 h1:oFU9pkj/iJgs+0DT+VMHrx+oBKs/LJMV+Uvg78sl+fE= -golang.org/x/tools v0.25.0/go.mod h1:/vtpO8WL1N9cQC3FN5zPqb//fRXskFHbLKk4OW1Q7rg= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ= +golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= gonum.org/v1/gonum v0.15.0 h1:2lYxjRbTYyxkJxlhC+LvJIx3SsANPdRybu1tGj9/OrQ= diff --git a/ingest/ingest.go b/ingest/ingest.go index 63011c8..ba2d450 100644 --- a/ingest/ingest.go +++ b/ingest/ingest.go @@ -1,6 +1,7 @@ package ingest import ( + "context" "database/sql" "errors" "github.com/lezhnev74/inverted_index_2" @@ -17,8 +18,6 @@ import ( "unsafe" ) -var EmptySegment = errors.New("no messages begins in the segment") - type Ingest struct { // findMessages extracts message layouts (boundaries) from the file findMessages func(file string, locations []common.Location) ([]scanner.MessageLayout, error) @@ -28,6 +27,7 @@ type Ingest struct { ii *inverted_index_2.InvertedIndex segmentSize uint64 concurrency int // the level of concurrency in ingestion + ctx context.Context } type ScannedTokenizedMessage struct { @@ -38,6 +38,7 @@ type ScannedTokenizedMessage struct { } func NewIngest( + ctx context.Context, scan func(file string, locations []common.Location) ([]scanner.MessageLayout, error), parseTime func([]byte) (time.Time, error), tokenize func([]byte) [][]byte, @@ -54,6 +55,7 @@ func NewIngest( ii: ii, segmentSize: segmentSize, concurrency: concurrency, + ctx: ctx, } return &ing } @@ -317,7 +319,15 @@ func (ing *Ingest) readMessagesInStream(name string, stream io.ReaderAt, message buf := make([]byte, 0) var err error + loop: for _, layout := range messageLayouts { + + select { + case <-ing.ctx.Done(): + break loop // stop + default: + } + messageLen := layout.To - layout.From if messageLen > maxIndexableSize { log.Printf("big message %dMiB at %s:%d", messageLen/1024/1024, name, layout.From) diff --git a/search/search.go b/search/search.go index 1836434..adc3079 100644 --- a/search/search.go +++ b/search/search.go @@ -2,6 +2,7 @@ package search import ( "bytes" + "context" "errors" "fmt" go_iterators "github.com/lezhnev74/go-iterators" @@ -21,15 +22,17 @@ type Search struct { // dateFormat is GO time format for message's dates, // extract message date upon scanning heap files dateFormat string + ctx context.Context } type SearchMatcher func(m db.Message, body []byte) bool -func NewSearch(db *db.DbContainer, ii *inverted_index_2.InvertedIndex, dateFormat string) *Search { +func NewSearch(ctx context.Context, db *db.DbContainer, ii *inverted_index_2.InvertedIndex, dateFormat string) *Search { return &Search{ db: db, ii: ii, dateFormat: dateFormat, + ctx: ctx, } } @@ -113,8 +116,18 @@ func (s *Search) Search( panic(err) } + rangeSegments: for i, segment := range segments { - freeList <- true // get the slot + + select { + case <-s.ctx.Done(): + // Cancellation test: after another segment is checked we test the context, + // if cancelled, stop processing. + break rangeSegments + case freeList <- true: + // get the slot and process the segment + } + go func() { defer func() { <-freeList // release the slot diff --git a/search/search_test.go b/search/search_test.go index d5331fc..b495ab6 100644 --- a/search/search_test.go +++ b/search/search_test.go @@ -2,6 +2,7 @@ package search_test import ( "bytes" + "context" "errors" "fmt" go_iterators "github.com/lezhnev74/go-iterators" @@ -38,7 +39,7 @@ func TestSearchResults(t *testing.T) { tokenize := func(s []byte) [][]byte { return tokenizer.Tokenize(s, 4, 8) } - s := search.NewSearch(_db, ii, dateFormat) + s := search.NewSearch(context.Background(), _db, ii, dateFormat) type test struct { query string @@ -225,7 +226,7 @@ multile // Ingest data before testing search ing, ii := test_util.PrepareTestIngest(t, 50, storageRoot, _db) - s := search.NewSearch(_db, ii, "2006-01-02T15:04:05.000000-07:00") + s := search.NewSearch(context.Background(), _db, ii, "2006-01-02T15:04:05.000000-07:00") _, _, err := _db.CheckInFiles([]string{file1, file2}) require.NoError(t, err) diff --git a/test_util/e2e_test.go b/test_util/e2e_test.go index d79cc5f..fda7d35 100644 --- a/test_util/e2e_test.go +++ b/test_util/e2e_test.go @@ -184,7 +184,7 @@ func buildDependencies(t *testing.T, segmentSize uint64, storageRoot string) ( tok := func(in []byte) [][]byte { return tokenizer.Tokenize(in, 4, 8) } - ii, err := inverted_index_2.NewInvertedIndex(storageRoot) + ii, err := inverted_index_2.NewInvertedIndex(storageRoot, true) require.NoError(t, err) s := func(file string, locations []common.Location) ([]scanner.MessageLayout, error) { @@ -196,9 +196,9 @@ func buildDependencies(t *testing.T, segmentSize uint64, storageRoot string) ( return time.Parse(timeFormat, string(b)) } - ing := ingest.NewIngest(s, pd, tok, dbContainer, ii, segmentSize, 1) + ing := ingest.NewIngest(context.Background(), s, pd, tok, dbContainer, ii, segmentSize, 1) - _search := search.NewSearch(dbContainer, ii, timeFormat) + _search := search.NewSearch(context.Background(), dbContainer, ii, timeFormat) return _search, ing, ii, dbContainer, tok } diff --git a/test_util/factories.go b/test_util/factories.go index 6bd102a..f663085 100644 --- a/test_util/factories.go +++ b/test_util/factories.go @@ -60,7 +60,7 @@ func PrepareTestIngest(t *testing.T, segmentSize uint64, storageRoot string, db return tokenizer.Tokenize(in, 4, 10) } - ii, err := inverted_index_2.NewInvertedIndex(storageRoot) + ii, err := inverted_index_2.NewInvertedIndex(storageRoot, true) require.NoError(t, err) s := func(file string, locations []common.Location) ([]scanner.MessageLayout, error) { @@ -72,7 +72,7 @@ func PrepareTestIngest(t *testing.T, segmentSize uint64, storageRoot string, db return time.Parse(TimeFormat, string(b)) } - ing := ingest.NewIngest(s, pd, tok, db, ii, segmentSize, 1) + ing := ingest.NewIngest(context.Background(), s, pd, tok, db, ii, segmentSize, 1) return ing, ii } diff --git a/ui/app.go b/ui/app.go index afcc019..40cdb15 100644 --- a/ui/app.go +++ b/ui/app.go @@ -32,6 +32,7 @@ type HeaplogApp struct { db *db.DbContainer search *search.Search cfg Config + ctx context.Context } // DeleteQuery removes the query and its results @@ -131,7 +132,7 @@ func (happ *HeaplogApp) NewQuery(text string, min *time.Time, max *time.Time) (n }, int(happ.cfg.Concurrency), ) - newQuery, err = happ.db.CheckinQuery(text, min, max, messagesIt) + newQuery, err = happ.db.CheckinQuery(happ.ctx, text, min, max, messagesIt) return } @@ -207,7 +208,9 @@ func (happ *HeaplogApp) Query(queryId int, from, to *time.Time) (query db.Query, return } -func NewHeaplog(cfg Config, startBackground bool) (*HeaplogApp, error) { +func NewHeaplog(ctx context.Context, cfg Config, startBackground bool) (*HeaplogApp, error) { + + common.EnableLogging = cfg.EnableLogging // 1. Init the database connector, err := db.PrepareDuckDB(cfg.StoragePath, int(cfg.DuckdbMaxMemMb)) @@ -245,7 +248,7 @@ func NewHeaplog(cfg Config, startBackground bool) (*HeaplogApp, error) { tok := func(in []byte) [][]byte { return tokenizer.Tokenize(in, int(cfg.MinTermLen), int(cfg.MaxTermLen)) } - ii, err := inverted_index_2.NewInvertedIndex(cfg.StoragePath) + ii, err := inverted_index_2.NewInvertedIndex(cfg.StoragePath, cfg.EnableLogging) if err != nil { return nil, err } @@ -256,8 +259,8 @@ func NewHeaplog(cfg Config, startBackground bool) (*HeaplogApp, error) { return time.Parse(cfg.DateFormat, string(b)) } segmentSize := uint64(5_000_000) - ingestor := ingest.NewIngest(layoutFile, pd, tok, dbContainer, ii, segmentSize, int(cfg.Concurrency)) - _search := search.NewSearch(dbContainer, ii, cfg.DateFormat) + ingestor := ingest.NewIngest(ctx, layoutFile, pd, tok, dbContainer, ii, segmentSize, int(cfg.Concurrency)) + _search := search.NewSearch(ctx, dbContainer, ii, cfg.DateFormat) _discover := ingest.NewDiscover([]string{cfg.FilesGlobPattern}, dbContainer.FilesDb) @@ -366,6 +369,7 @@ func NewHeaplog(cfg Config, startBackground bool) (*HeaplogApp, error) { db: dbContainer, search: _search, cfg: cfg, + ctx: ctx, }, nil } diff --git a/ui/config.go b/ui/config.go index cce212f..dfefef2 100644 --- a/ui/config.go +++ b/ui/config.go @@ -33,8 +33,8 @@ type Config struct { // Max memory the duckdb instance is allowed to allocate. // Increase if you see related errors on big data sets. (default: 500) DuckdbMaxMemMb uint - // ReportLevel controls how much output will be shown - ReportLevel int + // EnableLogging controls how much output will be shown + EnableLogging bool } // Validate is the final check after all overrides are done (file load, command arguments substituted) @@ -92,7 +92,7 @@ var DefaultCfg = Config{ MaxTermLen: 8, DuckdbMaxMemMb: 500, Concurrency: uint(runtime.NumCPU()), - ReportLevel: 1, + EnableLogging: true, } func LoadConfig(loadFile bool) (cfg Config, err error) { diff --git a/ui/console.go b/ui/console.go index 497a6af..a246ddc 100644 --- a/ui/console.go +++ b/ui/console.go @@ -1,10 +1,15 @@ package ui import ( + "context" "fmt" "github.com/urfave/cli/v2" "gopkg.in/yaml.v3" "log" + "os" + "os/signal" + "syscall" + "time" ) import _ "net/http/pprof" @@ -86,6 +91,10 @@ func PrepareConsoleApp() (app *cli.App) { Aliases: []string{"duckdb"}, Usage: "Max memory the duckdb instance is allowed to allocate (Mb)", }, + &cli.BoolFlag{ + Aliases: []string{"v"}, + Usage: "Show extra details about what the service does", + }, } app = &cli.App{ @@ -96,21 +105,43 @@ func PrepareConsoleApp() (app *cli.App) { Description: "Runs the service (indexes files and exposes search UI over HTTP)", Flags: flags, Action: func(ctx *cli.Context) error { + + log.Printf("Pid: %d", os.Getpid()) + cfg, err := prepareCfg(ctx) if err != nil { return err } - happ, err := NewHeaplog(cfg, true) + + _ctx, cancel := context.WithCancel(context.Background()) + happ, err := NewHeaplog(_ctx, cfg, true) if err != nil { + cancel() return err } + httpApp := makeHttpApp(happ, "") + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGTERM) + go func() { + <-sigs + cancel() // stop the program + log.Printf("Stopping heaplog...") + + t := time.Second * 10 // the same as the docker's timeout for "docker stop" + time.Sleep(t) + + err := httpApp.Shutdown() + if err != nil { + log.Printf("%s", err) + } + }() //go func() { // log.Printf("Listening pprof on port 6060") // log.Println(http.ListenAndServe(":6060", nil)) //}() - httpApp := makeHttpApp(happ, "") log.Printf("Listening on port 8393") log.Fatal(httpApp.Listen(":8393")) return nil @@ -125,7 +156,7 @@ func PrepareConsoleApp() (app *cli.App) { if err != nil { return err } - happ, err := NewHeaplog(cfg, false) + happ, err := NewHeaplog(context.Background(), cfg, false) if err != nil { return err } @@ -145,7 +176,7 @@ func PrepareConsoleApp() (app *cli.App) { if err != nil { return err } - happ, err := NewHeaplog(cfg, false) + happ, err := NewHeaplog(context.Background(), cfg, false) if err != nil { return err }