From 7eaea8ab536c5d8e672a1d1caf8e4abaf4684e57 Mon Sep 17 00:00:00 2001 From: Ganesha Upadhyaya Date: Fri, 18 Oct 2024 22:27:06 +0400 Subject: [PATCH] feat: upgrade dependencies, add persistence and other fixes (#22) * update go-sequencing and go-da latest, use rollup id for da namespace * accommodate MaxBytes * add persistence to sequencer * minor fixes * minor refactor * fix test * fix compare and set * deep equal * hold the lock --- .gitignore | 1 + da/da.go | 3 +- go.mod | 14 +- go.sum | 35 ++- main.go | 4 +- sequencing/sequencer.go | 470 ++++++++++++++++++++++++++++++----- sequencing/sequencer_test.go | 122 +++++++-- 7 files changed, 536 insertions(+), 113 deletions(-) diff --git a/.gitignore b/.gitignore index 567609b..6a7c5a3 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ build/ +sequencing/test_db/ diff --git a/da/da.go b/da/da.go index d30a13c..abf5ed1 100644 --- a/da/da.go +++ b/da/da.go @@ -199,7 +199,7 @@ func (dac *DAClient) SubmitBatch(ctx context.Context, data []*sequencing.Batch, // RetrieveBatch retrieves block data from DA. func (dac *DAClient) RetrieveBatch(ctx context.Context, dataLayerHeight uint64) ResultRetrieveBatch { - ids, err := dac.DA.GetIDs(ctx, dataLayerHeight, dac.Namespace) + idsResult, err := dac.DA.GetIDs(ctx, dataLayerHeight, dac.Namespace) if err != nil { return ResultRetrieveBatch{ BaseResult: BaseResult{ @@ -209,6 +209,7 @@ func (dac *DAClient) RetrieveBatch(ctx context.Context, dataLayerHeight uint64) }, } } + ids := idsResult.IDs // If no block data are found, return a non-blocking error. if len(ids) == 0 { diff --git a/go.mod b/go.mod index 8f61b6d..2fee1e9 100644 --- a/go.mod +++ b/go.mod @@ -5,11 +5,12 @@ go 1.22 toolchain go1.22.3 require ( + github.com/dgraph-io/badger/v3 v3.2103.5 github.com/gogo/protobuf v1.3.2 github.com/ipfs/go-log/v2 v2.5.1 - github.com/rollkit/go-da v0.5.0 - github.com/rollkit/go-sequencing v0.0.0-20240906080441-430ed2125493 - github.com/rollkit/rollkit v0.13.6 + github.com/rollkit/go-da v0.8.0 + github.com/rollkit/go-sequencing v0.2.0 + github.com/rollkit/rollkit v0.13.7 github.com/stretchr/testify v1.9.0 ) @@ -28,15 +29,16 @@ require ( github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect github.com/dustin/go-humanize v1.0.1 // indirect - github.com/filecoin-project/go-jsonrpc v0.3.1 // indirect + github.com/filecoin-project/go-jsonrpc v0.6.0 // indirect github.com/go-kit/kit v0.13.0 // indirect github.com/go-kit/log v0.2.1 // indirect github.com/go-logfmt/logfmt v0.6.0 // indirect - github.com/golang/glog v1.2.1 // indirect + github.com/golang/glog v1.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/flatbuffers v1.12.1 // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gorilla/websocket v1.5.3 // indirect @@ -84,7 +86,7 @@ require ( golang.org/x/text v0.17.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect - google.golang.org/grpc v1.66.0 // indirect + google.golang.org/grpc v1.67.1 // indirect google.golang.org/protobuf v1.34.2 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect lukechampine.com/blake3 v1.2.1 // indirect diff --git a/go.sum b/go.sum index 45076a6..7296316 100644 --- a/go.sum +++ b/go.sum @@ -52,6 +52,8 @@ github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0 h1:rpfIENRNNilwHwZeG5+P150SMrnN github.com/decred/dcrd/dcrec/secp256k1/v4 v4.3.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= +github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= +github.com/dgraph-io/badger/v3 v3.2103.5/go.mod h1:4MPiseMeDQ3FNCYwRbbcBOGJLf5jsE0PPFzRiKjtcdw= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= @@ -69,8 +71,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= -github.com/filecoin-project/go-jsonrpc v0.3.1 h1:qwvAUc5VwAkooquKJmfz9R2+F8znhiqcNHYjEp/NM10= -github.com/filecoin-project/go-jsonrpc v0.3.1/go.mod h1:jBSvPTl8V1N7gSTuCR4bis8wnQnIjHbRPpROol6iQKM= +github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= +github.com/filecoin-project/go-jsonrpc v0.6.0/go.mod h1:/n/niXcS4ZQua6i37LcVbY1TmlJR0UIK9mDFQq2ICek= github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg= github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= @@ -94,8 +96,9 @@ github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= -github.com/golang/glog v1.2.1 h1:OptwRhECazUx5ix5TTWC3EZhsZEHWcYWY4FQHTIubm4= -github.com/golang/glog v1.2.1/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/glog v1.2.2 h1:1+mZ9upx1Dh6FmUTFR1naJ77miKiXgALjWOZ3NVFPmY= +github.com/golang/glog v1.2.2/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -118,12 +121,15 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4= +github.com/google/flatbuffers v1.12.1 h1:MVlul7pQNoDzWRLTw5imwYsl+usrS1TXG2H4jg6ImGw= +github.com/google/flatbuffers v1.12.1/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= @@ -314,12 +320,12 @@ github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtB github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= -github.com/rollkit/go-da v0.5.0 h1:sQpZricNS+2TLx3HMjNWhtRfqtvVC/U4pWHpfUz3eN4= -github.com/rollkit/go-da v0.5.0/go.mod h1:VsUeAoPvKl4Y8wWguu/VibscYiFFePkkrvZWyTjZHww= -github.com/rollkit/go-sequencing v0.0.0-20240906080441-430ed2125493 h1:QrAQqM0RxTmv0JxTOxpUeV3As0onCK8uZ6N49xiJJds= -github.com/rollkit/go-sequencing v0.0.0-20240906080441-430ed2125493/go.mod h1:0KE5/iH/uPLLT/CY3Vg5xgwlcHfqHHYaDwH4Oj6PLTs= -github.com/rollkit/rollkit v0.13.6 h1:ZdIBG5D5RuQvnnJSY8s3m46dR3A3F6jHN+01zX+Avt0= -github.com/rollkit/rollkit v0.13.6/go.mod h1:clM4aPsWDJk/IN/SqCBsA+ab0/8gdh+5O4hRkLWKB7s= +github.com/rollkit/go-da v0.8.0 h1:oJKojC421eRC4mNqbujf40GzLFNp7HapgeB7Z/r0tyc= +github.com/rollkit/go-da v0.8.0/go.mod h1:3eHWK5gkv8lhwq6bjOZOi82WwHyS2B9rQOlUrE1GGws= +github.com/rollkit/go-sequencing v0.2.0 h1:IrEA29p07zPDbqY29AzI+q2zUqm7QACAoR+/PBpexvw= +github.com/rollkit/go-sequencing v0.2.0/go.mod h1:P/cQXTw3rWpPqhqnCwKzlkS39XM8ugmyf2u63twBgG8= +github.com/rollkit/rollkit v0.13.7 h1:GNWX0tPs02DsLeTc0z8G7rO5PDR0yavT08Umr7O02Ks= +github.com/rollkit/rollkit v0.13.7/go.mod h1:clM4aPsWDJk/IN/SqCBsA+ab0/8gdh+5O4hRkLWKB7s= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/sasha-s/go-deadlock v0.3.1 h1:sqv7fDNShgjcaxkO0JNcOAlr8B9+cV5Ey/OB71efZx0= github.com/sasha-s/go-deadlock v0.3.1/go.mod h1:F73l+cr82YSh10GxyRI6qZiCgK64VaZjwesgfQ1/iLM= @@ -353,6 +359,7 @@ github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9dec github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ= go.etcd.io/bbolt v1.3.7/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= @@ -408,6 +415,7 @@ golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -419,6 +427,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190502145724-3ef323f4f1fd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -462,17 +471,19 @@ golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNq google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg= google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= -google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c= -google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y= +google.golang.org/grpc v1.67.1 h1:zWnc1Vrcno+lHZCOofnIMvycFcc0QRGIzm9dhnDX68E= +google.golang.org/grpc v1.67.1/go.mod h1:1gLDyUQU7CTLJI90u3nXZ9ekeghjeM7pTDZlqFNg2AA= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= diff --git a/main.go b/main.go index fa110a5..852c730 100644 --- a/main.go +++ b/main.go @@ -31,6 +31,7 @@ func main() { da_address string da_namespace string da_auth_token string + db_path string ) flag.StringVar(&host, "host", defaultHost, "centralized sequencer host") flag.StringVar(&port, "port", defaultPort, "centralized sequencer port") @@ -39,6 +40,7 @@ func main() { flag.StringVar(&da_address, "da_address", defaultDA, "DA address") flag.StringVar(&da_namespace, "da_namespace", "", "DA namespace where the sequencer submits transactions") flag.StringVar(&da_auth_token, "da_auth_token", "", "auth token for the DA") + flag.StringVar(&db_path, "db_path", "", "path to the database") flag.Parse() @@ -58,7 +60,7 @@ func main() { log.Fatalf("Error decoding namespace: %v", err) } - centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime) + centralizedSeq, err := sequencing.NewSequencer(da_address, da_auth_token, namespace, batchTime, db_path) if err != nil { log.Fatalf("Failed to create centralized sequencer: %v", err) } diff --git a/sequencing/sequencer.go b/sequencing/sequencer.go index 53c178d..8325a0d 100644 --- a/sequencing/sequencer.go +++ b/sequencing/sequencer.go @@ -4,11 +4,15 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/hex" "errors" "fmt" + "reflect" "sync" + "sync/atomic" "time" + "github.com/dgraph-io/badger/v3" logging "github.com/ipfs/go-log/v2" "github.com/rollkit/centralized-sequencer/da" @@ -17,6 +21,9 @@ import ( "github.com/rollkit/go-sequencing" ) +// ErrInvalidRollupId is returned when the rollup id is invalid +var ErrInvalidRollupId = errors.New("invalid rollup id") + var _ sequencing.Sequencer = &Sequencer{} var log = logging.Logger("centralized-sequencer") @@ -25,9 +32,6 @@ const defaultMempoolTTL = 25 var initialBackoff = 100 * time.Millisecond -// ErrorRollupIdMismatch is returned when the rollup id does not match -var ErrorRollupIdMismatch = errors.New("rollup id mismatch") - // BatchQueue ... type BatchQueue struct { queue []sequencing.Batch @@ -42,20 +46,92 @@ func NewBatchQueue() *BatchQueue { } // AddBatch adds a new transaction to the queue -func (bq *BatchQueue) AddBatch(batch sequencing.Batch) { +func (bq *BatchQueue) AddBatch(batch sequencing.Batch, db *badger.DB) error { bq.mu.Lock() - defer bq.mu.Unlock() bq.queue = append(bq.queue, batch) + bq.mu.Unlock() + + // Get the hash and bytes of the batch + h, err := batch.Hash() + if err != nil { + return err + } + + // Marshal the batch + batchBytes, err := batch.Marshal() + if err != nil { + return err + } + + // Store the batch in BadgerDB + err = db.Update(func(txn *badger.Txn) error { + return txn.Set(h, batchBytes) + }) + return err } -// Next ... -func (bq *BatchQueue) Next() *sequencing.Batch { +// Next extracts a batch of transactions from the queue +func (bq *BatchQueue) Next(db *badger.DB) (*sequencing.Batch, error) { + bq.mu.Lock() + defer bq.mu.Unlock() if len(bq.queue) == 0 { - return &sequencing.Batch{Transactions: nil} + return &sequencing.Batch{Transactions: nil}, nil } batch := bq.queue[0] bq.queue = bq.queue[1:] - return &batch + + h, err := batch.Hash() + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + // Remove the batch from BadgerDB after processing + err = db.Update(func(txn *badger.Txn) error { + // Get the batch to ensure it exists in the DB before deleting + _, err := txn.Get(h) + if err != nil { + return err + } + // Delete the batch from BadgerDB + return txn.Delete(h) + }) + if err != nil { + return &sequencing.Batch{Transactions: nil}, err + } + + return &batch, nil +} + +// LoadFromDB reloads all batches from BadgerDB into the in-memory queue after a crash or restart. +func (bq *BatchQueue) LoadFromDB(db *badger.DB) error { + bq.mu.Lock() + defer bq.mu.Unlock() + + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all batches stored in BadgerDB + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + var batch sequencing.Batch + // Unmarshal the batch bytes and add them to the in-memory queue + err := batch.Unmarshal(val) + if err != nil { + return err + } + bq.queue = append(bq.queue, batch) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err } // TransactionQueue is a queue of transactions @@ -71,15 +147,27 @@ func NewTransactionQueue() *TransactionQueue { } } +// GetTransactionHash to get hash from transaction bytes using SHA-256 +func GetTransactionHash(txBytes []byte) string { + hashBytes := sha256.Sum256(txBytes) + return hex.EncodeToString(hashBytes[:]) +} + // AddTransaction adds a new transaction to the queue -func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx) { +func (tq *TransactionQueue) AddTransaction(tx sequencing.Tx, db *badger.DB) error { tq.mu.Lock() - defer tq.mu.Unlock() tq.queue = append(tq.queue, tx) + tq.mu.Unlock() + + // Store transaction in BadgerDB + err := db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(GetTransactionHash(tx)), tx) + }) + return err } // GetNextBatch extracts a batch of transactions from the queue -func (tq *TransactionQueue) GetNextBatch(max uint64) sequencing.Batch { +func (tq *TransactionQueue) GetNextBatch(max uint64, db *badger.DB) sequencing.Batch { tq.mu.Lock() defer tq.mu.Unlock() @@ -91,16 +179,81 @@ func (tq *TransactionQueue) GetNextBatch(max uint64) sequencing.Batch { for { batch = tq.queue[:batchSize] blobSize := totalBytes(batch) - if uint64(blobSize) < max { + if uint64(blobSize) <= max { break } batchSize = batchSize - 1 } + // Retrieve transactions from BadgerDB and remove processed ones + for _, tx := range batch { + txHash := GetTransactionHash(tx) + err := db.Update(func(txn *badger.Txn) error { + // Get and then delete the transaction from BadgerDB + _, err := txn.Get([]byte(txHash)) + if err != nil { + return err + } + return txn.Delete([]byte(txHash)) // Remove processed transaction + }) + if err != nil { + return sequencing.Batch{Transactions: nil} // Return empty batch if any transaction retrieval fails + } + } tq.queue = tq.queue[batchSize:] return sequencing.Batch{Transactions: batch} } +// LoadFromDB reloads all transactions from BadgerDB into the in-memory queue after a crash. +func (tq *TransactionQueue) LoadFromDB(db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Start a read-only transaction + err := db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all transactions stored in BadgerDB + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() // Ensure that the iterator is properly closed + + // Iterate through all items in the database + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + err := item.Value(func(val []byte) error { + // Load each transaction from DB and add to the in-memory queue + tq.queue = append(tq.queue, val) + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err +} + +// AddBatchBackToQueue re-adds the batch to the transaction queue (and BadgerDB) after a failure. +func (tq *TransactionQueue) AddBatchBackToQueue(batch sequencing.Batch, db *badger.DB) error { + tq.mu.Lock() + defer tq.mu.Unlock() + + // Add the batch back to the in-memory transaction queue + tq.queue = append(tq.queue, batch.Transactions...) + + // Optionally, persist the batch back to BadgerDB + for _, tx := range batch.Transactions { + err := db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte(GetTransactionHash(tx)), tx) // Store transaction back in DB + }) + if err != nil { + return fmt.Errorf("failed to revert transaction to DB: %w", err) + } + } + + return nil +} + func totalBytes(data [][]byte) int { total := 0 for _, sub := range data { @@ -111,22 +264,27 @@ func totalBytes(data [][]byte) int { // Sequencer implements go-sequencing interface using celestia backend type Sequencer struct { - dalc *da.DAClient - batchTime time.Duration - ctx context.Context - maxDABlobSize uint64 + dalc *da.DAClient + batchTime time.Duration + ctx context.Context + maxSize uint64 rollupId sequencing.RollupId - tq *TransactionQueue - lastBatchHash []byte + tq *TransactionQueue + lastBatchHash []byte + lastBatchHashMutex sync.RWMutex + + seenBatches map[string]struct{} + seenBatchesMutex sync.Mutex + bq *BatchQueue - seenBatches map[string]struct{} - bq *BatchQueue + db *badger.DB // BadgerDB instance for persistence + dbMux sync.Mutex // Mutex for safe concurrent DB access } // NewSequencer ... -func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration) (*Sequencer, error) { +func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime time.Duration, dbPath string) (*Sequencer, error) { ctx := context.Background() dac, err := proxyda.NewClient(daAddress, daAuthToken) if err != nil { @@ -137,19 +295,152 @@ func NewSequencer(daAddress, daAuthToken string, daNamespace []byte, batchTime t if err != nil { return nil, err } + + // Initialize BadgerDB + var opts badger.Options + if dbPath == "" { + opts = badger.DefaultOptions("").WithInMemory(true) + } else { + opts = badger.DefaultOptions(dbPath) + } + opts = opts.WithLogger(nil) + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("failed to open BadgerDB: %w", err) + } s := &Sequencer{ - dalc: dalc, - batchTime: batchTime, - ctx: ctx, - maxDABlobSize: maxBlobSize, - tq: NewTransactionQueue(), - bq: NewBatchQueue(), - seenBatches: make(map[string]struct{}), + dalc: dalc, + batchTime: batchTime, + ctx: ctx, + maxSize: maxBlobSize, + rollupId: daNamespace, + tq: NewTransactionQueue(), + bq: NewBatchQueue(), + seenBatches: make(map[string]struct{}), + db: db, + } + + // Load last batch hash from DB to recover from crash + err = s.LoadLastBatchHashFromDB() + if err != nil { + return nil, fmt.Errorf("failed to load last batch hash from DB: %w", err) + } + + // Load seen batches from DB to recover from crash + err = s.LoadSeenBatchesFromDB() + if err != nil { + return nil, fmt.Errorf("failed to load seen batches from DB: %w", err) } + + // Load TransactionQueue and BatchQueue from DB to recover from crash + err = s.tq.LoadFromDB(s.db) // Load transactions + if err != nil { + return nil, fmt.Errorf("failed to load transaction queue from DB: %w", err) + } + err = s.bq.LoadFromDB(s.db) // Load batches + if err != nil { + return nil, fmt.Errorf("failed to load batch queue from DB: %w", err) + } + go s.batchSubmissionLoop(s.ctx) return s, nil } +// Close safely closes the BadgerDB instance if it is open +func (c *Sequencer) Close() error { + if c.db != nil { + err := c.db.Close() + if err != nil { + return fmt.Errorf("failed to close BadgerDB: %w", err) + } + } + return nil +} + +// CompareAndSetMaxSize compares the passed size with the current max size and sets the max size to the smaller of the two +// Initially the max size is set to the max blob size returned by the DA layer +// This can be overwritten by the execution client if it can only handle smaller size +func (c *Sequencer) CompareAndSetMaxSize(size uint64) { + for { + current := atomic.LoadUint64(&c.maxSize) + if size >= current { + return + } + if atomic.CompareAndSwapUint64(&c.maxSize, current, size) { + return + } + } +} + +// LoadLastBatchHashFromDB loads the last batch hash from BadgerDB into memory after a crash or restart. +func (c *Sequencer) LoadLastBatchHashFromDB() error { + // Lock to ensure concurrency safety + c.dbMux.Lock() + defer c.dbMux.Unlock() + + var hash []byte + // Load the last batch hash from BadgerDB if it exists + err := c.db.View(func(txn *badger.Txn) error { + item, err := txn.Get([]byte("lastBatchHash")) + if errors.Is(err, badger.ErrKeyNotFound) { + // If no last batch hash exists, it's the first time or nothing was processed + c.lastBatchHash = nil + return nil + } + if err != nil { + return err + } + // Set lastBatchHash in memory from BadgerDB + return item.Value(func(val []byte) error { + hash = val + return nil + }) + }) + // Set the in-memory lastBatchHash after successfully loading it from DB + c.lastBatchHash = hash + return err +} + +// LoadSeenBatchesFromDB loads the seen batches from BadgerDB into memory after a crash or restart. +func (c *Sequencer) LoadSeenBatchesFromDB() error { + c.dbMux.Lock() + defer c.dbMux.Unlock() + + err := c.db.View(func(txn *badger.Txn) error { + // Create an iterator to go through all entries in BadgerDB + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + // Add the batch hash to the seenBatches map (for fast in-memory lookups) + c.seenBatches[string(key)] = struct{}{} + } + return nil + }) + + return err +} + +func (c *Sequencer) setLastBatchHash(hash []byte) error { + c.dbMux.Lock() + defer c.dbMux.Unlock() + + return c.db.Update(func(txn *badger.Txn) error { + return txn.Set([]byte("lastBatchHash"), hash) + }) +} + +func (c *Sequencer) addSeenBatch(hash []byte) error { + c.dbMux.Lock() + defer c.dbMux.Unlock() + + return c.db.Update(func(txn *badger.Txn) error { + return txn.Set(hash, []byte{1}) // Just to mark the batch as seen + }) +} + func (c *Sequencer) batchSubmissionLoop(ctx context.Context) { batchTimer := time.NewTimer(0) defer batchTimer.Stop() @@ -170,15 +461,23 @@ func (c *Sequencer) batchSubmissionLoop(ctx context.Context) { } func (c *Sequencer) publishBatch() error { - batch := c.tq.GetNextBatch(c.maxDABlobSize) + batch := c.tq.GetNextBatch(c.maxSize, c.db) if batch.Transactions == nil { return nil } err := c.submitBatchToDA(batch) + if err != nil { + // On failure, re-add the batch to the transaction queue for future retry + revertErr := c.tq.AddBatchBackToQueue(batch, c.db) + if revertErr != nil { + return fmt.Errorf("failed to revert batch to queue: %w", revertErr) + } + return fmt.Errorf("failed to submit batch to DA: %w", err) + } + err = c.bq.AddBatch(batch, c.db) if err != nil { return err } - c.bq.AddBatch(batch) return nil } @@ -189,7 +488,7 @@ func (c *Sequencer) submitBatchToDA(batch sequencing.Batch) error { numSubmittedBatches := 0 attempt := 0 - maxBlobSize := c.maxDABlobSize + maxBlobSize := c.maxSize initialMaxBlobSize := maxBlobSize initialGasPrice := c.dalc.GasPrice gasPrice := c.dalc.GasPrice @@ -277,57 +576,94 @@ func getRemainingSleep(start time.Time, blockTime time.Duration, sleep time.Dura return remaining + sleep } -func hashSHA256(data []byte) []byte { - hash := sha256.Sum256(data) - return hash[:] -} - // SubmitRollupTransaction implements sequencing.Sequencer. -func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, rollupId []byte, tx []byte) error { - if c.rollupId == nil { - c.rollupId = rollupId - } else { - if !bytes.Equal(c.rollupId, rollupId) { - return ErrorRollupIdMismatch - } +func (c *Sequencer) SubmitRollupTransaction(ctx context.Context, req sequencing.SubmitRollupTransactionRequest) (*sequencing.SubmitRollupTransactionResponse, error) { + if !c.isValid(req.RollupId) { + return nil, ErrInvalidRollupId } - c.tq.AddTransaction(tx) - return nil + err := c.tq.AddTransaction(req.Tx, c.db) + if err != nil { + return nil, fmt.Errorf("failed to add transaction: %w", err) + } + return &sequencing.SubmitRollupTransactionResponse{}, nil } // GetNextBatch implements sequencing.Sequencer. -func (c *Sequencer) GetNextBatch(ctx context.Context, lastBatchHash sequencing.Hash) (*sequencing.Batch, time.Time, error) { +func (c *Sequencer) GetNextBatch(ctx context.Context, req sequencing.GetNextBatchRequest) (*sequencing.GetNextBatchResponse, error) { + if !c.isValid(req.RollupId) { + return nil, ErrInvalidRollupId + } now := time.Now() - if c.lastBatchHash == nil { - if lastBatchHash != nil { - return nil, now, errors.New("lastBatch is supposed to be nil") - } - } else if lastBatchHash == nil { - return nil, now, errors.New("lastBatch is not supposed to be nil") - } else { - if !bytes.Equal(c.lastBatchHash, lastBatchHash) { - return nil, now, errors.New("supplied lastBatch does not match with sequencer last batch") - } + c.lastBatchHashMutex.Lock() + defer c.lastBatchHashMutex.Unlock() + lastBatchHash := c.lastBatchHash + + if !reflect.DeepEqual(lastBatchHash, req.LastBatchHash) { + return nil, fmt.Errorf("batch hash mismatch: lastBatchHash = %x, req.LastBatchHash = %x", lastBatchHash, req.LastBatchHash) + } + + // Set the max size if it is provided + if req.MaxBytes > 0 { + c.CompareAndSetMaxSize(req.MaxBytes) } - batch := c.bq.Next() + batch, err := c.bq.Next(c.db) + if err != nil { + return nil, err + } + + batchRes := &sequencing.GetNextBatchResponse{Batch: batch, Timestamp: now} if batch.Transactions == nil { - return batch, now, nil + return batchRes, nil } - batchBytes, err := batch.Marshal() + h, err := batch.Hash() if err != nil { - return nil, now, err + return c.recover(*batch, err) } - c.lastBatchHash = hashSHA256(batchBytes) - c.seenBatches[string(c.lastBatchHash)] = struct{}{} - return batch, now, nil + c.lastBatchHash = h + err = c.setLastBatchHash(h) + if err != nil { + return c.recover(*batch, err) + } + + hexHash := hex.EncodeToString(h) + c.seenBatchesMutex.Lock() + c.seenBatches[hexHash] = struct{}{} + c.seenBatchesMutex.Unlock() + err = c.addSeenBatch(h) + if err != nil { + return c.recover(*batch, err) + } + + return batchRes, nil +} + +func (c *Sequencer) recover(batch sequencing.Batch, err error) (*sequencing.GetNextBatchResponse, error) { + // Revert the batch if Hash() errors out by adding it back to the BatchQueue + revertErr := c.bq.AddBatch(batch, c.db) + if revertErr != nil { + return nil, fmt.Errorf("failed to revert batch: %w", revertErr) + } + return nil, fmt.Errorf("failed to generate hash for batch: %w", err) } // VerifyBatch implements sequencing.Sequencer. -func (c *Sequencer) VerifyBatch(ctx context.Context, batchHash sequencing.Hash) (bool, error) { +func (c *Sequencer) VerifyBatch(ctx context.Context, req sequencing.VerifyBatchRequest) (*sequencing.VerifyBatchResponse, error) { //TODO: need to add DA verification - _, ok := c.seenBatches[string(batchHash)] - return ok, nil + if !c.isValid(req.RollupId) { + return nil, ErrInvalidRollupId + } + c.seenBatchesMutex.Lock() + defer c.seenBatchesMutex.Unlock() + key := hex.EncodeToString(req.BatchHash) + if _, exists := c.seenBatches[key]; exists { + return &sequencing.VerifyBatchResponse{Status: true}, nil + } + return &sequencing.VerifyBatchResponse{Status: false}, nil +} + +func (c *Sequencer) isValid(rollupId []byte) bool { + return bytes.Equal(c.rollupId, rollupId) } diff --git a/sequencing/sequencer_test.go b/sequencing/sequencer_test.go index a770e85..4581c58 100644 --- a/sequencing/sequencer_test.go +++ b/sequencing/sequencer_test.go @@ -2,11 +2,14 @@ package sequencing import ( "context" + "encoding/hex" + "fmt" "net/url" "os" "testing" "time" + "github.com/dgraph-io/badger/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -47,12 +50,13 @@ func startMockDAServJSONRPC(ctx context.Context, da_address string) (*proxy.Serv } func TestNewSequencer(t *testing.T) { - // Mock DA client - // mockDAClient := new(da.DAClient) - // Create a new sequencer with mock DA client - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second) + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 1*time.Second, "") require.NoError(t, err) + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Check if the sequencer was created with the correct values assert.NotNil(t, seq) @@ -63,82 +67,140 @@ func TestNewSequencer(t *testing.T) { func TestSequencer_SubmitRollupTransaction(t *testing.T) { // Initialize a new sequencer - seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("namespace"), 10*time.Second) + seq, err := NewSequencer(MockDAAddressHTTP, "authToken", []byte("rollup1"), 1*time.Second, "") require.NoError(t, err) - + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Test with initial rollup ID rollupId := []byte("rollup1") tx := []byte("transaction1") - err = seq.SubmitRollupTransaction(context.Background(), rollupId, tx) + res, err := seq.SubmitRollupTransaction(context.Background(), sequencing.SubmitRollupTransactionRequest{RollupId: rollupId, Tx: tx}) require.NoError(t, err) + require.NotNil(t, res) + + // Wait for the transaction to be processed + time.Sleep(2 * time.Second) // Verify the transaction was added - assert.Equal(t, 1, len(seq.tq.GetNextBatch(1000).Transactions)) + nextBatchresp, err := seq.GetNextBatch(context.Background(), sequencing.GetNextBatchRequest{RollupId: rollupId, LastBatchHash: nil}) + require.NoError(t, err) + assert.Equal(t, 1, len(nextBatchresp.Batch.Transactions)) // Test with a different rollup ID (expecting an error due to mismatch) - err = seq.SubmitRollupTransaction(context.Background(), []byte("rollup2"), tx) - assert.EqualError(t, err, ErrorRollupIdMismatch.Error()) + res, err = seq.SubmitRollupTransaction(context.Background(), sequencing.SubmitRollupTransactionRequest{RollupId: []byte("rollup2"), Tx: tx}) + assert.EqualError(t, err, ErrInvalidRollupId.Error()) + assert.Nil(t, res) } func TestSequencer_GetNextBatch_NoLastBatch(t *testing.T) { // Initialize a new sequencer + db, err := getDB() + require.NoError(t, err) + seq := &Sequencer{ bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), + rollupId: []byte("rollup"), + db: db, } + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Test case where lastBatchHash and seq.lastBatchHash are both nil - batch, now, err := seq.GetNextBatch(context.Background(), nil) + res, err := seq.GetNextBatch(context.Background(), sequencing.GetNextBatchRequest{RollupId: seq.rollupId, LastBatchHash: nil}) require.NoError(t, err) - assert.Equal(t, time.Now().Day(), now.Day()) // Ensure the time is approximately the same - assert.Equal(t, 0, len(batch.Transactions)) // Should return an empty batch + assert.Equal(t, time.Now().Day(), res.Timestamp.Day()) // Ensure the time is approximately the same + assert.Equal(t, 0, len(res.Batch.Transactions)) // Should return an empty batch } func TestSequencer_GetNextBatch_LastBatchMismatch(t *testing.T) { + db, err := getDB() + require.NoError(t, err) // Initialize a new sequencer with a mock batch seq := &Sequencer{ lastBatchHash: []byte("existingHash"), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), + rollupId: []byte("rollup"), + db: db, } + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Test case where lastBatchHash does not match seq.lastBatchHash - _, _, err := seq.GetNextBatch(context.Background(), []byte("differentHash")) - assert.EqualError(t, err, "supplied lastBatch does not match with sequencer last batch") + res, err := seq.GetNextBatch(context.Background(), sequencing.GetNextBatchRequest{RollupId: seq.rollupId, LastBatchHash: []byte("differentHash")}) + assert.ErrorContains(t, err, "batch hash mismatch") + assert.Nil(t, res) } func TestSequencer_GetNextBatch_LastBatchNilMismatch(t *testing.T) { + db, err := getDB() + require.NoError(t, err) + // Initialize a new sequencer seq := &Sequencer{ lastBatchHash: []byte("existingHash"), bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), + rollupId: []byte("rollup"), + db: db, } + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Test case where lastBatchHash is nil but seq.lastBatchHash is not - _, _, err := seq.GetNextBatch(context.Background(), nil) - assert.EqualError(t, err, "lastBatch is not supposed to be nil") + res, err := seq.GetNextBatch(context.Background(), sequencing.GetNextBatchRequest{RollupId: seq.rollupId, LastBatchHash: nil}) + assert.ErrorContains(t, err, "batch hash mismatch") + assert.Nil(t, res) +} + +func getDB() (*badger.DB, error) { + opts := badger.DefaultOptions("").WithInMemory(true) + opts = opts.WithLogger(nil) + db, err := badger.Open(opts) + if err != nil { + return nil, fmt.Errorf("failed to open BadgerDB: %w", err) + } + return db, nil } func TestSequencer_GetNextBatch_Success(t *testing.T) { // Initialize a new sequencer with a mock batch mockBatch := &sequencing.Batch{Transactions: [][]byte{[]byte("tx1"), []byte("tx2")}} + db, err := getDB() + require.NoError(t, err) + seq := &Sequencer{ bq: NewBatchQueue(), seenBatches: make(map[string]struct{}), lastBatchHash: nil, + rollupId: []byte("rollup"), + db: db, } + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Add mock batch to the BatchQueue - seq.bq.AddBatch(*mockBatch) + err = seq.bq.AddBatch(*mockBatch, seq.db) + require.NoError(t, err) // Test success case with no previous lastBatchHash - batch, now, err := seq.GetNextBatch(context.Background(), nil) + res, err := seq.GetNextBatch(context.Background(), sequencing.GetNextBatchRequest{RollupId: seq.rollupId, LastBatchHash: nil}) require.NoError(t, err) - assert.Equal(t, time.Now().Day(), now.Day()) // Ensure the time is approximately the same - assert.Equal(t, 2, len(batch.Transactions)) // Ensure that the transactions are present + assert.Equal(t, time.Now().Day(), res.Timestamp.Day()) // Ensure the time is approximately the same + assert.Equal(t, 2, len(res.Batch.Transactions)) // Ensure that the transactions are present // Ensure lastBatchHash is updated after the batch assert.NotNil(t, seq.lastBatchHash) @@ -146,22 +208,30 @@ func TestSequencer_GetNextBatch_Success(t *testing.T) { } func TestSequencer_VerifyBatch(t *testing.T) { + db, err := getDB() + require.NoError(t, err) // Initialize a new sequencer with a seen batch seq := &Sequencer{ seenBatches: make(map[string]struct{}), + rollupId: []byte("rollup"), + db: db, } + defer func() { + err := seq.Close() + require.NoError(t, err) + }() // Simulate adding a batch hash batchHash := []byte("validHash") - seq.seenBatches[string(batchHash)] = struct{}{} + seq.seenBatches[hex.EncodeToString(batchHash)] = struct{}{} // Test that VerifyBatch returns true for an existing batch - exists, err := seq.VerifyBatch(context.Background(), batchHash) + res, err := seq.VerifyBatch(context.Background(), sequencing.VerifyBatchRequest{RollupId: seq.rollupId, BatchHash: batchHash}) require.NoError(t, err) - assert.True(t, exists) + assert.True(t, res.Status) // Test that VerifyBatch returns false for a non-existing batch - exists, err = seq.VerifyBatch(context.Background(), []byte("invalidHash")) + res, err = seq.VerifyBatch(context.Background(), sequencing.VerifyBatchRequest{RollupId: seq.rollupId, BatchHash: []byte("invalidHash")}) require.NoError(t, err) - assert.False(t, exists) + assert.False(t, res.Status) }