diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..201ebc8 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,20 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Launch", + "type": "go", + "request": "launch", + "mode": "auto", + "port": 2345, + "host": "127.0.0.1", + "program": "${workspaceRoot}", + "env": {}, + "args": [], + "showLog": true + } + ] +} diff --git a/go.mod b/go.mod index 92b568d..fc3a0a6 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/argus-labs/go-jobqueue +module github.com/ezavada/go-jobqueue go 1.22 @@ -6,9 +6,11 @@ require ( github.com/dgraph-io/badger/v4 v4.2.0 github.com/goccy/go-json v0.10.3 github.com/google/uuid v1.6.0 + github.com/loov/hrtime v1.0.3 github.com/puzpuzpuz/xsync/v3 v3.2.0 github.com/rs/zerolog v1.33.0 github.com/stretchr/testify v1.9.0 + go.uber.org/atomic v1.11.0 ) require ( @@ -20,17 +22,26 @@ require ( github.com/golang/glog v1.0.0 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/protobuf v1.5.2 // indirect - github.com/golang/snappy v0.0.3 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/flatbuffers v1.12.1 // indirect - github.com/klauspost/compress v1.12.3 // indirect + github.com/klauspost/compress v1.13.6 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/montanaflynn/stats v0.7.1 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect + go.mongodb.org/mongo-driver v1.16.1 // indirect go.opencensus.io v0.22.5 // indirect - golang.org/x/net v0.7.0 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/net v0.21.0 // indirect + golang.org/x/sync v0.7.0 // indirect golang.org/x/sys v0.21.0 // indirect + golang.org/x/text v0.14.0 // indirect google.golang.org/protobuf v1.33.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index e09ff15..6ca1766 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -46,16 +48,22 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= +github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/loov/hrtime v1.0.3 h1:LiWKU3B9skJwRPUf0Urs9+0+OE3TxdMuiRPOTwR0gcU= +github.com/loov/hrtime v1.0.3/go.mod h1:yDY3Pwv2izeY4sq7YcPX/dtLwzg5NU1AxWuWxKwd0p0= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8eaE= +github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -69,19 +77,36 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +go.mongodb.org/mongo-driver v1.16.1 h1:rIVLL3q0IHM39dvE+z2ulZLp9ENZKThVfuvN/IiN4l8= +go.mongodb.org/mongo-driver v1.16.1/go.mod h1:oB6AhJQvFQL4LEHyXi6aJzQJtBiTQHiAd83l0GdFaiw= go.opencensus.io v0.22.5 h1:dntmOdLpSpHlVqbW5Eay97DelsZHe+55D+xC6i0dDS0= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= +go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -90,8 +115,12 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -99,19 +128,32 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -119,6 +161,7 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3 golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/job.go b/job.go index f15714b..77e9484 100644 --- a/job.go +++ b/job.go @@ -1,12 +1,9 @@ package jobqueue import ( - "fmt" "time" ) -const jobDBKeyPrefix = "job-" - // JobContext provides context for a job which is injected into the job Process method. type JobContext interface { JobID() uint64 @@ -24,9 +21,9 @@ type job[T any] struct { CreatedAt time.Time `json:"created_at"` } -func newJob[T any](id uint64, payload T) *job[T] { +func newJob[T any](payload T) *job[T] { return &job[T]{ - ID: id, + ID: 0, // ID is set when the job is added to the queue Payload: payload, Status: JobStatusPending, CreatedAt: time.Now(), @@ -52,9 +49,3 @@ func (j *job[T]) Process(handler func(JobContext, T) error) error { return nil } - -// dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID -// is strictly increasing to avoid queues being processed out of order. -func (j *job[T]) dbKey() []byte { - return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, j.ID)) -} diff --git a/jobqueue.go b/jobqueue.go index f21c745..c98c2f0 100644 --- a/jobqueue.go +++ b/jobqueue.go @@ -2,17 +2,17 @@ package jobqueue import ( "context" - "encoding/binary" "errors" "fmt" "sync" "time" - "github.com/dgraph-io/badger/v4" - "github.com/goccy/go-json" + "github.com/loov/hrtime" "github.com/puzpuzpuz/xsync/v3" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + + "go.uber.org/atomic" ) type JobStatus string @@ -28,31 +28,51 @@ var errJobChannelFull = errors.New("job channel is closed") const defaultFetchInterval = 100 * time.Millisecond const defaultJobBufferSize = 1000 -const defaultJobIDSequenceSize = 100 +const defaultJobsPerFetch = 10 type JobQueue[T any] struct { - db *badger.DB - dbPath string - dbInMemory bool + db JobQueueDb[T] + dbPath string + dbInMemory bool + dbUseMongo bool + dbUseBadger bool wg sync.WaitGroup logger zerolog.Logger cancel context.CancelFunc handler func(JobContext, T) error - jobID *badger.Sequence isJobIDInQueue *xsync.MapOf[uint64, bool] jobs chan *job[T] // Options fetchInterval time.Duration + jobsPerFetch int + + // Stats + statsLock sync.Mutex // protects the stats below + + // job stats + jobRunTime TimeStat // stats on time that it takes to run a job (across all workers) + jobQueuedTime TimeStat // stats on how much time a job sits in the queue before being processed + + // queue stats + busyTime TimeStat // stats on time that the queue actively processing jobs + idleTime TimeStat // stats on how much time the queue is empty between jobs being processed + jobsProcessed int + jobsEnqueued int + jobsFailed int + jobsSucceeded int + + busyWorkerCount atomic.Int32 + busyStateChangeAt atomic.Time + queueIsIdle atomic.Bool } // New creates a new JobQueue with the specified database, name, and number // of worker goroutines. It initializes the job queue, starts the worker goroutines, // and returns the JobQueue instance and an error, if any. -func New[T any]( - dbPath string, name string, workers int, handler func(JobContext, T) error, opts ...Option[T], +func New[T any](name string, workers int, handler func(JobContext, T) error, opts ...Option[T], ) (*JobQueue[T], error) { if workers < 0 { return nil, errors.New("invalid number of workers") @@ -61,42 +81,65 @@ func New[T any]( } jq := &JobQueue[T]{ - db: nil, - dbPath: dbPath, - dbInMemory: false, + db: nil, + dbPath: "", + dbInMemory: false, + dbUseMongo: false, + dbUseBadger: false, wg: sync.WaitGroup{}, logger: log.With().Str("module", "JobQueue").Str("jobName", name).Logger(), cancel: nil, handler: handler, - jobID: nil, isJobIDInQueue: xsync.NewMapOf[uint64, bool](), jobs: make(chan *job[T], defaultJobBufferSize), fetchInterval: defaultFetchInterval, + jobsPerFetch: defaultJobsPerFetch, + + statsLock: sync.Mutex{}, + jobRunTime: TimeStat{}, + jobQueuedTime: TimeStat{}, + busyTime: TimeStat{}, // wall time, not CPU time + idleTime: TimeStat{}, // wall time, not CPU time + jobsProcessed: 0, + jobsEnqueued: 0, + jobsFailed: 0, + jobsSucceeded: 0, } + + jq.busyWorkerCount.Store(0) + jq.busyStateChangeAt.Store(time.Now()) + jq.queueIsIdle.Store(true) + for _, opt := range opts { opt(jq) } - - db, err := jq.openDB() - if err != nil { - return nil, err + // make sure we have a valid db option, default to in memory if none provided + if !jq.dbUseBadger && !jq.dbUseMongo && !jq.dbInMemory { + jq.dbInMemory = true } - jq.db = db - - jq.logger.Info().Msg("Starting job queue") ctx, cancel := context.WithCancel(context.Background()) jq.cancel = cancel - jq.jobID, err = jq.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize) + // Open JobQueue DB + var db JobQueueDb[T] + if jq.dbUseMongo { + db = NewJobQueueDbMongo[T](ctx) + } else { + db = NewJobQueueDbBadger[T](jq.dbInMemory) + } + err := db.Open(jq.dbPath, name) if err != nil { - return nil, fmt.Errorf("failed to start job id sequence: %w", err) + return nil, err } + jq.db = db - // Load jobs from BadgerDB + jq.logger.Info().Msg("Starting job queue") + + // Load jobs from JobQueue DB go jq.pollJobs(ctx) // Start workers @@ -109,31 +152,19 @@ func New[T any]( } func (jq *JobQueue[T]) Enqueue(payload T) (uint64, error) { - id, err := jq.jobID.Next() - if err != nil { - return 0, fmt.Errorf("failed to get next job id: %w", err) - } - - // Create a new job and store it in BadgerDB - job := newJob(id, payload) - jobBytes, err := json.Marshal(job) - if err != nil { - return 0, fmt.Errorf("failed to marshal job: %w", err) - } - - err = jq.db.Update(func(txn *badger.Txn) error { - if err := txn.Set(job.dbKey(), jobBytes); err != nil { - return fmt.Errorf("failed to store job: %w", err) - } - return nil - }) + // Create a new job and store it in queue's database + job := newJob(payload) + id, err := jq.db.AddJob(job) if err != nil { - jq.logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Failed to enqueue job") + jq.logger.Error().Err(err).Uint64("jobID", id).Msg("Failed to enqueue job") return 0, err } + jq.statsLock.Lock() + jq.jobsEnqueued++ + jq.statsLock.Unlock() - jq.logger.Info().Uint64("jobID", job.ID).Msg("job enqueued successfully") - return job.ID, nil + jq.logger.Info().Uint64("jobID", id).Msg("job enqueued successfully") + return id, nil } // worker processes jobs received from the job queue and logs any errors encountered. @@ -145,7 +176,28 @@ func (jq *JobQueue[T]) worker(id int) { // Worker stops running when the job channel is closed for job := range jq.jobs { - err := jq.processJob(job) + + wasIdle := jq.queueIsIdle.Swap(false) + jq.busyWorkerCount.Inc() + if wasIdle { + timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) + jq.busyStateChangeAt.Store(time.Now()) + jq.idleTime.RecordTime(timeSpentInState) + logger.Debug().Dur("timeIdle(ms)", timeSpentInState).Msg("*** Queue now busy *** ") + } + + err := jq.processJob(job, id) + + if jq.busyWorkerCount.Dec() == 0 { + wasIdle := jq.queueIsIdle.Swap(true) + if !wasIdle { + timeSpentInState := time.Since(jq.busyStateChangeAt.Load()) + jq.busyStateChangeAt.Store(time.Now()) + jq.busyTime.RecordTime(timeSpentInState) + logger.Debug().Dur("timeBusy(ms)", timeSpentInState).Msg("*** Queue now idle *** ") + } + } + if err != nil { logger.Error().Err(err).Uint64("jobID", job.ID).Msg("Error processing job") } @@ -155,35 +207,41 @@ func (jq *JobQueue[T]) worker(id int) { } // processJob processes a job and updates its status in the database. -func (jq *JobQueue[T]) processJob(job *job[T]) error { +func (jq *JobQueue[T]) processJob(job *job[T], worker int) error { logger := jq.logger.With().Uint64("jobID", job.ID).Logger() if logger.GetLevel() == zerolog.DebugLevel { - logger.Debug().Interface("jobPayload", job.Payload).Msg("Processing job") + logger.Debug().Interface("jobPayload", job.Payload).Int("worker", worker).Msg("Processing job") } else { - logger.Info().Msg("Processing job") + logger.Info().Int("worker", worker).Msg("Processing job") } - if err := job.Process(jq.handler); err != nil { + queuedTime := time.Since(job.CreatedAt) + startTime := hrtime.Now() + err := job.Process(jq.handler) + runTime := hrtime.Since(startTime) + jq.statsLock.Lock() + jq.jobsProcessed++ + jq.jobRunTime.RecordTime(runTime) + jq.jobQueuedTime.RecordTime(queuedTime) + if err != nil { + jq.jobsFailed++ + jq.statsLock.Unlock() return fmt.Errorf("failed to process job: %w", err) } - + jq.jobsSucceeded++ + jq.statsLock.Unlock() logger.Info().Msg("Job processed successfully") - // Now that we've successfully processed the job, we can remove it from BadgerDB - jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from BadgerDB") - err := jq.db.Update(func(txn *badger.Txn) error { - if err := txn.Delete(job.dbKey()); err != nil { - return err - } - return nil - }) + // Now that we've successfully processed the job, we can remove it from JobQueue DB + jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from JobQueue DB") + err = jq.db.DeleteJob(job.ID) if err != nil { logger.Error().Err(err).Msg("Failed to remove completed job from db") return err } // Remove the job from the in-memory index - jq.logger.Debug().Uint64("jobID", job.ID).Msg("Removing job from in-memory index") + jq.logger.Debug().Uint64("jobID", job.ID).Int("worker", worker).Msg("Removing job from in-memory index") jq.isJobIDInQueue.Delete(job.ID) return nil @@ -192,8 +250,8 @@ func (jq *JobQueue[T]) processJob(job *job[T]) error { func (jq *JobQueue[T]) Stop() error { jq.logger.Info().Msg("Stopping job queue") - // Stop jobs fetch from BadgerDB - jq.logger.Debug().Msg("Stopping jobs fetch from BadgerDB") + // Stop jobs fetch from JobQueue DB + jq.logger.Debug().Msg("Stopping jobs fetch from JobQueue DB") jq.cancel() // Close the channel to signal the workers to stop @@ -203,22 +261,32 @@ func (jq *JobQueue[T]) Stop() error { jq.logger.Debug().Msg("Waiting for workers to finish") jq.wg.Wait() - // Close Badger DB connection - jq.logger.Debug().Msg("Closing Badger DB connection") - if err := jq.jobID.Release(); err != nil { - jq.logger.Error().Err(err).Msg("Failed to release next job id sequence") - } + // Close JobQueue DB connection + jq.logger.Debug().Msg("Closing JobQueue DB connection") if err := jq.db.Close(); err != nil { - jq.logger.Error().Err(err).Msg("Failed to close Badger DB connection") + jq.logger.Error().Err(err).Msg("Failed to close JobQueue DB connection") return err } + if jq.jobsEnqueued+jq.jobsProcessed > 0 { + jq.logger.Info(). + Int("jobsProcessed", jq.jobsProcessed). + Int("jobsEnqueued", jq.jobsEnqueued). + Int("jobsFailed", jq.jobsFailed). + Int("jobsSucceeded", jq.jobsSucceeded). + Str("jobRunTime", jq.jobRunTime.String()). + Str("jobQueuedTime", jq.jobQueuedTime.String()). + Str("busyTime", jq.busyTime.String()). + Str("idleTime", jq.idleTime.String()). + Msg("Job queue stats") + } + jq.logger.Info().Msg("Job queue stopped successfully") return nil } -// pollJobs is a long-running goroutine that fetches jobs from BadgerDB and sends them to the worker channels. +// pollJobs is a long-running goroutine that fetches jobs from the JobQueue DB and sends them to the worker channels. func (jq *JobQueue[T]) pollJobs(ctx context.Context) { ticker := time.NewTicker(jq.fetchInterval) @@ -237,73 +305,31 @@ func (jq *JobQueue[T]) pollJobs(ctx context.Context) { } func (jq *JobQueue[T]) fetchJobs(ctx context.Context) error { //nolint:gocognit - err := jq.db.View(func(txn *badger.Txn) error { - opts := badger.DefaultIteratorOptions - opts.PrefetchSize = 10 - it := txn.NewIterator(opts) - defer it.Close() - - for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() { - item := it.Item() - err := item.Value(func(v []byte) error { - var job job[T] - if err := json.Unmarshal(v, &job); err != nil { - jq.logger.Error().Err(err).Uint64("jobID", - binary.BigEndian.Uint64(item.Key())).Msg("Failed to unmarshal job") - return err - } - - if job.Status == JobStatusPending { - // If the job is already fetched, skip it - _, ok := jq.isJobIDInQueue.Load(job.ID) - if ok { - return nil - } - - select { - case <-ctx.Done(): - jq.logger.Debug().Msg("Context cancelled, stopping iteration") - break - - case jq.jobs <- &job: - jq.isJobIDInQueue.Store(job.ID, true) - jq.logger.Debug().Uint64("jobID", job.ID).Msg("New pending job found and sent to worker") - - default: - jq.logger.Warn().Uint64("JobID", - job.ID).Msg("Found pending jobs, but job channel is full") - return errJobChannelFull - } - } - - return nil - }) - if err != nil { - return err - } - } - return nil - }) + jobs, err := jq.db.FetchJobs(jq.jobsPerFetch) if err != nil { return fmt.Errorf("failed to fetch jobs: %w", err) } + for _, job := range jobs { + if job.Status == JobStatusPending { + // If the job is already fetched, skip it + _, ok := jq.isJobIDInQueue.Load(job.ID) + if ok { + continue + } + } + select { + case <-ctx.Done(): + jq.logger.Debug().Msg("Context cancelled, stopping iteration") + return nil // stop the fetch loop, but don't return an error - return nil -} - -func (jq *JobQueue[T]) openDB() (*badger.DB, error) { - var opts badger.Options - if jq.dbInMemory { - opts = badger.DefaultOptions("").WithInMemory(true) - } else { - opts = badger.DefaultOptions(jq.dbPath) - } - opts.Logger = nil + case jq.jobs <- job: + jq.isJobIDInQueue.Store(job.ID, true) + jq.logger.Debug().Uint64("jobID", job.ID).Msg("New job found and sent to worker") - db, err := badger.Open(opts) - if err != nil { - return nil, fmt.Errorf("failed to open BadgerDB: %w", err) + default: + jq.logger.Warn().Uint64("JobID", job.ID).Msg("Found jobs, but job channel is full") + return errJobChannelFull + } } - - return db, nil + return nil } diff --git a/jobqueue_db.go b/jobqueue_db.go new file mode 100644 index 0000000..87733e5 --- /dev/null +++ b/jobqueue_db.go @@ -0,0 +1,18 @@ +package jobqueue + +import ( + "errors" +) + +type JobQueueDb[T any] interface { + Open(path string, queueName string) error + Close() error + GetNextJobId() (uint64, error) + FetchJobs(count int) ([]*job[T], error) + ReadJob(jobID uint64) (*job[T], error) + AddJob(job *job[T]) (uint64, error) // returns the job ID + DeleteJob(jobID uint64) error +} + +// returned by ReadJob or UpdateJob if the job is not found +var ErrJobNotFound = errors.New("job not found") diff --git a/jobqueue_db_badger.go b/jobqueue_db_badger.go new file mode 100644 index 0000000..7e3bf95 --- /dev/null +++ b/jobqueue_db_badger.go @@ -0,0 +1,182 @@ +package jobqueue + +import ( + "encoding/binary" + "errors" + "fmt" + + "github.com/dgraph-io/badger/v4" + "github.com/goccy/go-json" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +const defaultJobIDSequenceSize = 100 +const jobDBKeyPrefix = "job-" +const jobPrefetchSize = 10 + +type JobQueueDbBadger[T any] struct { + db *badger.DB + dbPath string + dbInMemory bool + jobID *badger.Sequence + logger zerolog.Logger +} + +func NewJobQueueDbBadger[T any](inMemory bool) JobQueueDb[T] { + return &JobQueueDbBadger[T]{ + db: nil, + dbPath: "", + dbInMemory: inMemory, + jobID: nil, + logger: log.With().Str("module", "JobQueue").Str("dbType", "Badger").Logger(), + } +} + +func (jqdb *JobQueueDbBadger[T]) Open(path string, queueName string) error { + jqdb.dbPath = path + + var opts badger.Options + if jqdb.dbInMemory { + opts = badger.DefaultOptions("").WithInMemory(true) + } else { + opts = badger.DefaultOptions(jqdb.dbPath) + } + opts.Logger = nil + + // open the BadgerDB + db, err := badger.Open(opts) + if err != nil { + return fmt.Errorf("failed to open BadgerDB: %w", err) + } + jqdb.db = db + + // setup the job ID sequence + jobID, err := jqdb.db.GetSequence([]byte("nextJobID"), defaultJobIDSequenceSize) + if err != nil { + return fmt.Errorf("failed to get next job ID sequence: %w", err) + } else { + jqdb.jobID = jobID + } + return err +} + +func (jqdb *JobQueueDbBadger[T]) Close() error { + //jqdb.logger.Debug().Msg("Closing Badger DB connection") + if err := jqdb.jobID.Release(); err != nil { + jqdb.logger.Error().Err(err).Msg("Failed to release next job id sequence") + } + if err := jqdb.db.Close(); err != nil { + jqdb.logger.Error().Err(err).Msg("Failed to close Badger DB connection") + return err + } + return nil +} + +func (jqdb *JobQueueDbBadger[T]) GetNextJobId() (uint64, error) { + id, err := jqdb.jobID.Next() + return id, err +} + +func (jqdb *JobQueueDbBadger[T]) FetchJobs(count int) ([]*job[T], error) { + // create a new array of jobs + jobs := make([]*job[T], 0, count) + + err := jqdb.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.PrefetchSize = jobPrefetchSize + it := txn.NewIterator(opts) + defer it.Close() + + for it.Seek([]byte(jobDBKeyPrefix)); it.ValidForPrefix([]byte(jobDBKeyPrefix)); it.Next() { + item := it.Item() + err := item.Value(func(v []byte) error { + var job job[T] + if err := json.Unmarshal(v, &job); err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(item.Key())).Msg("Failed to unmarshal job") + return err + } + jobs = append(jobs, &job) + return nil + }) + if err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(item.Key())).Msg("Failed fetch job") + return err + } + } + return nil + }) + if err != nil && len(jobs) == 0 { + // only return an error if we didn't fetch any jobs at all. If we fetched some jobs, we can still process them. + return nil, fmt.Errorf("failed to fetch any jobs: %w", err) + } + + return jobs, nil +} + +func (jqdb *JobQueueDbBadger[T]) ReadJob(jobID uint64) (*job[T], error) { + var val []byte + var theItem *badger.Item + err := jqdb.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte(dbKeyForJob(jobID))) + if err != nil { + return err + } + theItem = item + val, err = item.ValueCopy(nil) + return err + }) + if err != nil { + if errors.Is(err, badger.ErrKeyNotFound) { + return nil, ErrJobNotFound + } + return nil, fmt.Errorf("failed to read job: %w", err) + } + var theJob job[T] + if err := json.Unmarshal(val, &theJob); err != nil { + jqdb.logger.Error().Err(err).Uint64("jobID", + binary.BigEndian.Uint64(theItem.Key())).Msg("Failed to unmarshal job") + return nil, err + } + + return &theJob, nil +} + +func (jqdb *JobQueueDbBadger[T]) AddJob(job *job[T]) (uint64, error) { + id, err := jqdb.GetNextJobId() + if err != nil { + return 0, fmt.Errorf("failed to get next job id: %w", err) + } + job.ID = id + jobBytes, err := json.Marshal(job) + if err != nil { + return 0, fmt.Errorf("failed to marshal job: %w", err) + } + + err = jqdb.db.Update(func(txn *badger.Txn) error { + if err := txn.Set(dbKeyForJob(job.ID), jobBytes); err != nil { + return fmt.Errorf("failed to store job: %w", err) + } + return nil + }) + return job.ID, err +} + +func (jqdb *JobQueueDbBadger[T]) DeleteJob(jobID uint64) error { + err := jqdb.db.Update(func(txn *badger.Txn) error { + if err := txn.Delete(dbKeyForJob(jobID)); err != nil { + return err + } + return nil + }) + return err +} + +// dbKey BadgerDB iterates over keys in lexicographical order, so we need to make sure that the job ID +// is strictly increasing to avoid queues being processed out of order. +// FIXME: not sure this is relevant anymore, since we can execute jobs in parallel +func dbKeyForJob(jobId uint64) []byte { + return []byte(fmt.Sprintf("%s%d", jobDBKeyPrefix, jobId)) +} diff --git a/jobqueue_db_mongo.go b/jobqueue_db_mongo.go new file mode 100644 index 0000000..8c4a09c --- /dev/null +++ b/jobqueue_db_mongo.go @@ -0,0 +1,161 @@ +package jobqueue + +import ( + "context" + "errors" + "fmt" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +// JobQueueDbMongo is the MongoDB implementation of the JobQueueDb interface +type JobQueueDbMongo[T any] struct { + client *mongo.Client + ctx context.Context + db *mongo.Database + coll *mongo.Collection + idColl *mongo.Collection + jobQueueName string +} + +// NewJobQueueDbMongo creates a new JobQueueDbMongo instance +func NewJobQueueDbMongo[T any](ctx context.Context) JobQueueDb[T] { + return &JobQueueDbMongo[T]{ + client: nil, + ctx: ctx, + db: nil, + coll: nil, + jobQueueName: "", + } +} + +// Open the MongoDB database +func (jqdb *JobQueueDbMongo[T]) Open(path string, queueName string) error { + client, err := mongo.Connect(jqdb.ctx, options.Client().ApplyURI(path)) + if err != nil { + return fmt.Errorf("failed to connect to MongoDB at %s: %w", path, err) + } + jqdb.client = client + // TODO: handle mongo db options + jqdb.db = client.Database("job_queues") + if jqdb.db == nil { + return fmt.Errorf("failed to open mongo database job_queues") + } + // holds the jobs for the queue + jqdb.jobQueueName = dbCollectionNameForQueue(queueName) + jqdb.coll = jqdb.db.Collection(jqdb.jobQueueName) + if jqdb.coll == nil { + return fmt.Errorf("failed to open collection job_queues.%s", jqdb.jobQueueName) + } + // holds the job IDs for all queues + jqdb.idColl = jqdb.db.Collection("job_ids") + if jqdb.idColl == nil { + return fmt.Errorf("failed to open collection job_queues.job_ids") + } + return nil +} + +// Close the MongoDB database +func (jqdb *JobQueueDbMongo[T]) Close() error { + err := jqdb.client.Disconnect(jqdb.ctx) + return err +} + +// GetNextJobId() (uint64, error) +func (jqdb *JobQueueDbMongo[T]) GetNextJobId() (uint64, error) { + + var nextJobId uint64 + result := jqdb.idColl.FindOneAndUpdate(jqdb.ctx, + bson.D{{Key: "queue", Value: jqdb.jobQueueName}}, // selector + bson.D{{Key: "$inc", Value: bson.D{{Key: "next_job_id", Value: 1}}}}) // update, increment next_job_id by 1, return old record + if result.Err() != nil { + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + // insert the queue if it doesn't exist. We start with ID 2 because we are returning 1 + _, err := jqdb.idColl.InsertOne(jqdb.ctx, bson.D{{Key: "queue", Value: jqdb.jobQueueName}, {Key: "next_job_id", Value: 2}}) + if err != nil { + return 0, fmt.Errorf("failed to create initial mongo record for next job id: %w", err) + } + nextJobId = 1 + } else { + return 0, fmt.Errorf("failed to get next job id: %w", result.Err()) + } + } else { + raw, err := result.Raw() + if err != nil { + return 0, fmt.Errorf("failed to get raw result from mongo: %w", err) + } + val := raw.Lookup("next_job_id") + nextJobId = uint64(val.AsInt64()) + } + return nextJobId, nil +} + +// FetchJobs(count int) ([]*job[T], error) +func (jqdb *JobQueueDbMongo[T]) FetchJobs(count int) ([]*job[T], error) { + // create a new array of jobs + jobs := make([]*job[T], 0, count) + + opts := options.Find().SetLimit(int64(count)).SetAllowPartialResults(true) + cursor, err := jqdb.coll.Find(jqdb.ctx, bson.D{}, opts) + if err != nil { + return nil, fmt.Errorf("failed to fetch jobs from mongo collection: %w", err) + } + defer cursor.Close(jqdb.ctx) + for cursor.Next(jqdb.ctx) { + var j job[T] + err := cursor.Decode(&j) + if err != nil { + continue // skip this job + } else { + jobs = append(jobs, &j) + } + } + return jobs, nil +} + +// ReadJob(jobID uint64) (*job[T], error) +func (jqdb *JobQueueDbMongo[T]) ReadJob(jobID uint64) (*job[T], error) { + result := jqdb.coll.FindOne(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) + if result.Err() != nil { + if errors.Is(result.Err(), mongo.ErrNoDocuments) { + return nil, ErrJobNotFound + } + return nil, fmt.Errorf("failed to read job from mongo collection: %w", result.Err()) + } + var j job[T] + err := result.Decode(&j) + if err != nil { + return nil, fmt.Errorf("failed to decode job from mongo collection: %w", err) + } + return &j, nil +} + +// AddJob(job *job[T]) (uint64, error) // returns the job ID +func (jqdb *JobQueueDbMongo[T]) AddJob(job *job[T]) (uint64, error) { + id, err := jqdb.GetNextJobId() + if err != nil { + return 0, fmt.Errorf("failed to get next job id: %w", err) + } + job.ID = id + _, err = jqdb.coll.InsertOne(jqdb.ctx, job) + if err != nil { + return 0, fmt.Errorf("failed to insert job into mongo collection: %w", err) + } + return job.ID, nil +} + +// DeleteJob(jobID uint64) error +func (jqdb *JobQueueDbMongo[T]) DeleteJob(jobID uint64) error { + _, err := jqdb.coll.DeleteOne(jqdb.ctx, bson.D{{Key: "id", Value: jobID}}) + if err != nil { + return fmt.Errorf("failed to delete job from mongo collection: %w", err) + } + return nil +} + +func dbCollectionNameForQueue(queueName string) string { + // TODO: normalize queueName + return queueName + "_jobs" +} diff --git a/jobqueue_test.go b/jobqueue_test.go index 512f59d..13790ef 100644 --- a/jobqueue_test.go +++ b/jobqueue_test.go @@ -1,20 +1,23 @@ package jobqueue import ( + "context" "fmt" + "math/rand" "os" + "strconv" "testing" "time" "github.com/dgraph-io/badger/v4" - "github.com/goccy/go-json" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" -) -const BadgerDBPath = "/tmp/badger" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) func init() { //nolint:gochecknoinits // for testing zerolog.SetGlobalLevel(zerolog.DebugLevel) @@ -27,8 +30,20 @@ type testJob struct { func testJobHandler() func(JobContext, testJob) error { return func(ctx JobContext, job testJob) error { - fmt.Println("Test job processed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + fmt.Println("Job Performed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + ctx.JobCreatedAt().Unix()) + return nil + } +} + +func complexJobHandler() func(JobContext, testJob) error { + return func(ctx JobContext, job testJob) error { + fmt.Println("Starting job...", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing ctx.JobCreatedAt().Unix()) + numMicroseconds := rand.Int63n(200) + 1 + time.Sleep(time.Duration(numMicroseconds) * time.Microsecond) + fmt.Println("Job completed:", job.Msg, ctx.JobID(), //nolint:forbidigo // for testing + ctx.JobCreatedAt().Unix(), "after "+strconv.FormatInt(numMicroseconds, 10)+"µs") return nil } } @@ -39,7 +54,6 @@ func TestNewJobQueue(t *testing.T) { // Test cases testCases := []struct { name string - dbPath string queueName string workers int options []Option[testJob] @@ -48,26 +62,23 @@ func TestNewJobQueue(t *testing.T) { }{ { name: "Valid configuration", - dbPath: "/tmp/test_jobqueue_1", queueName: "test-queue-1", workers: 2, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: false, }, { name: "Invalid workers count", - dbPath: "/tmp/test_jobqueue_2", queueName: "test-queue-2", workers: -1, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: true, }, { name: "Zero workers", - dbPath: "/tmp/test_jobqueue_3", queueName: "test-queue-3", workers: 0, - options: []Option[testJob]{WithInmemDB[testJob]()}, + options: []Option[testJob]{WithInMemDB[testJob]()}, expectedError: false, }, } @@ -78,7 +89,7 @@ func TestNewJobQueue(t *testing.T) { t.Parallel() // Act - jq, err := New[testJob](tc.dbPath, tc.queueName, tc.workers, testJobHandler(), tc.options...) + jq, err := New[testJob](tc.queueName, tc.workers, testJobHandler(), tc.options...) // Assert if tc.expectedError { @@ -89,7 +100,6 @@ func TestNewJobQueue(t *testing.T) { require.NotNil(t, jq) assert.NotNil(t, jq.db) - assert.NotNil(t, jq.jobID) assert.NotNil(t, jq.isJobIDInQueue) assert.NotNil(t, jq.jobs) @@ -104,7 +114,7 @@ func TestNewJobQueue(t *testing.T) { func TestJobQueue_Enqueue(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { @@ -118,11 +128,7 @@ func TestJobQueue_Enqueue(t *testing.T) { assert.NoError(t, err) // Verify that the job was stored in badger DB - value, err := readJob(jq.db, id) - assert.NoError(t, err) - - var dbJob job[testJob] - err = json.Unmarshal(value, &dbJob) + dbJob, err := jq.db.ReadJob(id) assert.NoError(t, err) // Verify that the job is what we're expecting @@ -136,7 +142,7 @@ func TestJobQueue_Enqueue(t *testing.T) { func TestJobQueue_ProcessJob(t *testing.T) { cleanupBadgerDB(t) - jq, err := New[testJob](BadgerDBPath, "test-job", 0, testJobHandler(), WithInmemDB[testJob]()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithInMemDB[testJob]()) assert.NoError(t, err) t.Cleanup(func() { @@ -165,16 +171,16 @@ func TestJobQueue_ProcessJob(t *testing.T) { assert.WithinDuration(t, time.Now(), j.CreatedAt, time.Second) // Process the job - assert.NoError(t, jq.processJob(j)) + assert.NoError(t, jq.processJob(j, 0)) // Check that the job is removed from the in-memory index _, ok := jq.isJobIDInQueue.Load(ids[i]) assert.False(t, ok) // Check that the job is no longer in the badger DB - value, err := readJob(jq.db, ids[i]) + dbJob, err := jq.db.ReadJob(ids[i]) assert.Error(t, err, badger.ErrKeyNotFound) - assert.Nil(t, value) + assert.Nil(t, dbJob) } } @@ -182,7 +188,7 @@ func TestJobQueue_Recovery(t *testing.T) { cleanupBadgerDB(t) // Create initial job queue - jq, err := New[testJob]("/tmp/badger", "test-job", 0, testJobHandler()) + jq, err := New[testJob]("test-job", 0, testJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) assert.NoError(t, err) t.Cleanup(func() { @@ -197,7 +203,7 @@ func TestJobQueue_Recovery(t *testing.T) { assert.NoError(t, jq.Stop()) // Create recovered job queue - recoveredJq, err := New[testJob]("/tmp/badger", "test-job", 0, testJobHandler()) + recoveredJq, err := New[testJob]("test-job", 0, testJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) assert.NoError(t, err) j := <-recoveredJq.jobs @@ -207,35 +213,85 @@ func TestJobQueue_Recovery(t *testing.T) { assert.Equal(t, j.Payload, testJob{Msg: "hello"}) // Process the job in recovered job queue - assert.NoError(t, recoveredJq.processJob(j)) + assert.NoError(t, recoveredJq.processJob(j, 0)) // Stop recovered job queue assert.NoError(t, recoveredJq.Stop()) } -func readJob(db *badger.DB, id uint64) ([]byte, error) { - return readKey(db, fmt.Sprintf("%s%d", jobDBKeyPrefix, id)) +func TestBadgerJobConcurrency(t *testing.T) { + cleanupBadgerDB(t) + jq, err := New[testJob]("test-job", 5, complexJobHandler(), WithBadgerDB[testJob]("/tmp/badger")) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, jq.Stop()) + cleanupBadgerDB(t) + }) + DoJobConcurrencyTest(jq, t) } -func readKey(db *badger.DB, key string) ([]byte, error) { - var valCopy []byte - err := db.View(func(txn *badger.Txn) error { - item, err := txn.Get([]byte(key)) - if err != nil { - return err - } - - valCopy, err = item.ValueCopy(nil) - return err +func TestMongoJobConcurrency(t *testing.T) { + cleanupMongoDB(t) + jq, err := New[testJob]("test-job", 5, complexJobHandler(), WithMongoDB[testJob]("mongodb://localhost:27017")) + assert.NoError(t, err) + t.Cleanup(func() { + assert.NoError(t, jq.Stop()) + cleanupMongoDB(t) }) + DoJobConcurrencyTest(jq, t) +} + +func DoJobConcurrencyTest(jq *JobQueue[testJob], t *testing.T) { + // Queue a bunch of jobs, which should be processed concurrently + ids := make([]uint64, 0) + for i := 0; i < 20; i++ { + j := testJob{Msg: fmt.Sprintf("hello %d", i)} + + id, err := jq.Enqueue(j) + assert.NoError(t, err) - if err != nil { - return nil, err + ids = append(ids, id) + } + + // Give time for all jobs to be processed + time.Sleep(time.Second) + + // add a few more new jobs + for i := 20; i < 22; i++ { + j := testJob{Msg: fmt.Sprintf("hello %d", i)} + + id, err := jq.Enqueue(j) + assert.NoError(t, err) + + ids = append(ids, id) + } + + // more time for new jobs to be processed + time.Sleep(time.Second) + + // Check that all jobs were processed + for id := range ids { + // Check that the job is removed from the in-memory index + _, ok := jq.isJobIDInQueue.Load(uint64(id)) + assert.False(t, ok) + + // Check that the job is no longer in the JobQueue DB + job, err := jq.db.ReadJob(uint64(id)) + assert.Error(t, err, ErrJobNotFound) + assert.Nil(t, job) } - return valCopy, nil } func cleanupBadgerDB(t *testing.T) { - assert.NoError(t, os.RemoveAll(BadgerDBPath)) + assert.NoError(t, os.RemoveAll("/tmp/badger")) +} + +func cleanupMongoDB(t *testing.T) { + path := "mongodb://localhost:27017" + client, err := mongo.Connect(context.TODO(), options.Client().ApplyURI(path)) + assert.NoError(t, err) + db := client.Database("job_queues") + assert.NoError(t, db.Drop(context.TODO())) + assert.NoError(t, client.Disconnect(context.Background())) } diff --git a/options.go b/options.go index 7139138..0136281 100644 --- a/options.go +++ b/options.go @@ -23,10 +23,58 @@ func WithJobBufferSize[T any](size int) Option[T] { } } +// how many jobs at once are retrieved from the DB in a single fetch operation +func WithJobsPerFetch[T any](count int) Option[T] { + return func(jq *JobQueue[T]) { + jq.logger.Debug().Msg(fmt.Sprintf("Jobs per fetch set to %d", count)) + jq.jobsPerFetch = count + } +} + // WithInmemDB uses an in-memory BadgerDB instead of a persistent one. // Useful for testing, but provides no durability guarantees. -func WithInmemDB[T any]() Option[T] { +// if we previously called UseMongoDB, we will warn and ignore this option. +func WithInMemDB[T any]() Option[T] { + return func(jq *JobQueue[T]) { + if jq.dbUseMongo { + jq.logger.Warn().Msg("Ignoring WithInMemDB option, not compatible with UseMongoDB option") + } else if jq.dbUseBadger { + jq.logger.Warn().Msg("Ignoring WithInMemDB option, not compatible with UseBadgerDB option") + } else { + jq.logger.Debug().Msg("Using Badger In-Memory DB for Job Queue DB") + jq.dbInMemory = true + } + } +} + +// WithMongoDB sets the JobQueue to use MongoDB instead of BadgerDB. +// if WithInMemDB was previously called, we will warn and ignore this option. +func WithMongoDB[T any](uri string) Option[T] { + return func(jq *JobQueue[T]) { + if jq.dbInMemory { + jq.logger.Warn().Msg("Ignoring WithMongoDB option, not compatible with WithInMemDB option") + } else if jq.dbUseBadger { + jq.logger.Warn().Msg("Ignoring WitMongoDB option, not compatible with WithBadgerDB option") + } else { + jq.logger.Debug().Msg(fmt.Sprintf("Using Mongo DB at %s for Job Queue DB", uri)) + jq.dbPath = uri + jq.dbUseMongo = true + } + } +} + +// WithBadgerDB sets the JobQueue to use BadgerDB instead of MongoDB. +// if WithInMemDB or WithBadgerDB was previously called, we will warn and ignore this option. +func WithBadgerDB[T any](path string) Option[T] { return func(jq *JobQueue[T]) { - jq.dbInMemory = true + if jq.dbInMemory { + jq.logger.Warn().Msg("Ignoring WithBadgerDB option, not compatible with WithInMemDB option") + } else if jq.dbUseMongo { + jq.logger.Warn().Msg("Ignoring WithBadgerDB option, not compatible with WithMongoDB option") + } else { + jq.logger.Debug().Msg(fmt.Sprintf("Using Badger DB at %s for Job Queue DB", path)) + jq.dbPath = path + jq.dbUseBadger = true + } } } diff --git a/timestat.go b/timestat.go new file mode 100644 index 0000000..83fa8a3 --- /dev/null +++ b/timestat.go @@ -0,0 +1,46 @@ +package jobqueue + +import ( + "strconv" + "time" +) + +type TimeStat struct { + TotalTime time.Duration + MinTime time.Duration + MaxTime time.Duration + Count int64 +} + +func (ts *TimeStat) AvgTime() time.Duration { + if ts.Count == 0 { + return 0 + } + return ts.TotalTime / time.Duration(ts.Count) +} + +func (ts *TimeStat) Reset() { + ts.TotalTime = 0 + ts.MinTime = 0 + ts.MaxTime = 0 + ts.Count = 0 +} + +func (ts *TimeStat) RecordTime(duration time.Duration) { + ts.TotalTime += duration + ts.Count++ + if ts.MinTime == 0 || duration < ts.MinTime { + ts.MinTime = duration + } + if duration > ts.MaxTime { + ts.MaxTime = duration + } +} + +func (ts *TimeStat) String() string { + return "tot " + ts.TotalTime.String() + + " cnt " + strconv.FormatInt(ts.Count, 10) + + " avg " + ts.AvgTime().String() + + " min " + ts.MinTime.String() + + " max " + ts.MaxTime.String() +}