Skip to content

Commit

Permalink
[OCC] perform validateAll after execution (#437)
Browse files Browse the repository at this point in the history
## Describe your changes and provide context
- This performs validateAll after execution 

## Testing performed to validate your change
- Unit Tests
  • Loading branch information
stevenlanders authored and udpatil committed Feb 27, 2024
1 parent 308262f commit d0d7902
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 44 deletions.
74 changes: 31 additions & 43 deletions tasks/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type deliverTxTask struct {
Ctx sdk.Context
AbortCh chan occ.Abort

mx sync.RWMutex
Status status
Dependencies []int
Abort *occ.Abort
Expand All @@ -52,8 +53,20 @@ type deliverTxTask struct {
ValidateCh chan status
}

func (dt *deliverTxTask) IsStatus(s status) bool {
dt.mx.RLock()
defer dt.mx.RUnlock()
return dt.Status == s
}

func (dt *deliverTxTask) SetStatus(s status) {
dt.mx.Lock()
defer dt.mx.Unlock()
dt.Status = s
}

func (dt *deliverTxTask) Reset() {
dt.Status = statusPending
dt.SetStatus(statusPending)
dt.Response = nil
dt.Abort = nil
dt.AbortCh = nil
Expand Down Expand Up @@ -182,7 +195,7 @@ func (s *scheduler) tryInitMultiVersionStore(ctx sdk.Context) {

func indexesValidated(tasks []*deliverTxTask, idx []int) bool {
for _, i := range idx {
if tasks[i].Status != statusValidated {
if !tasks[i].IsStatus(statusValidated) {
return false
}
}
Expand All @@ -191,7 +204,7 @@ func indexesValidated(tasks []*deliverTxTask, idx []int) bool {

func allValidated(tasks []*deliverTxTask) bool {
for _, t := range tasks {
if t.Status != statusValidated {
if !t.IsStatus(statusValidated) {
return false
}
}
Expand Down Expand Up @@ -220,7 +233,7 @@ type schedulerMetrics struct {

func (s *scheduler) emitMetrics() {
telemetry.IncrCounter(float32(s.metrics.retries), "scheduler", "retries")
telemetry.SetGauge(float32(s.metrics.maxIncarnation), "scheduler", "max_incarnation")
telemetry.IncrCounter(float32(s.metrics.maxIncarnation), "scheduler", "incarnations")
}

func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]types.ResponseDeliverTx, error) {
Expand Down Expand Up @@ -296,12 +309,12 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool {
} else {
// otherwise, wait for completion
task.Dependencies = conflicts
task.Status = statusWaiting
task.SetStatus(statusWaiting)
return false
}
} else if len(conflicts) == 0 {
// mark as validated, which will avoid re-validating unless a lower-index re-validates
task.Status = statusValidated
task.SetStatus(statusValidated)
return false
}
// conflicts and valid, so it'll validate next time
Expand Down Expand Up @@ -346,18 +359,18 @@ func (s *scheduler) validateAll(ctx sdk.Context, tasks []*deliverTxTask) ([]*del
return nil, nil
}

wg := sync.WaitGroup{}
wg := &sync.WaitGroup{}
for i := startIdx; i < len(tasks); i++ {
t := tasks[i]
wg.Add(1)
t := tasks[i]
s.DoValidate(func() {
defer wg.Done()
if !s.validateTask(ctx, t) {
mx.Lock()
defer mx.Unlock()
t.Reset()
t.Increment()
mx.Lock()
res = append(res, t)
mx.Unlock()
}
})
}
Expand All @@ -373,53 +386,28 @@ func (s *scheduler) executeAll(ctx sdk.Context, tasks []*deliverTxTask) error {

// validationWg waits for all validations to complete
// validations happen in separate goroutines in order to wait on previous index
validationWg := &sync.WaitGroup{}
validationWg.Add(len(tasks))
wg := &sync.WaitGroup{}
wg.Add(len(tasks))

for _, task := range tasks {
t := task
s.DoExecute(func() {
s.prepareAndRunTask(validationWg, ctx, t)
s.prepareAndRunTask(wg, ctx, t)
})
}

validationWg.Wait()
wg.Wait()

return nil
}

func (s *scheduler) waitOnPreviousAndValidate(wg *sync.WaitGroup, task *deliverTxTask) {
defer wg.Done()
defer close(task.ValidateCh)
// wait on previous task to finish validation
// if a previous task fails validation, then subsequent should fail too (cascade)
if task.Index > 0 {
res, ok := <-s.allTasks[task.Index-1].ValidateCh
if ok && res != statusValidated {
task.Reset()
task.ValidateCh <- task.Status
return
}
}
// if not validated, reset the task
if !s.validateTask(task.Ctx, task) {
task.Reset()
}

// notify next task of this one's status
task.ValidateCh <- task.Status
}

func (s *scheduler) prepareAndRunTask(wg *sync.WaitGroup, ctx sdk.Context, task *deliverTxTask) {
eCtx, eSpan := s.traceSpan(ctx, "SchedulerExecute", task)
defer eSpan.End()
task.Ctx = eCtx

task.Ctx = eCtx
s.executeTask(task)

s.DoValidate(func() {
s.waitOnPreviousAndValidate(wg, task)
})
wg.Done()
}

func (s *scheduler) traceSpan(ctx sdk.Context, name string, task *deliverTxTask) (sdk.Context, trace.Span) {
Expand Down Expand Up @@ -509,12 +497,12 @@ func (s *scheduler) executeTask(task *deliverTxTask) {

// If abort has occurred, return, else set the response and status
if abortOccurred {
task.Status = statusAborted
task.SetStatus(statusAborted)
task.Abort = abort
return
}

task.Status = statusExecuted
task.SetStatus(statusExecuted)
task.Response = &resp

// write from version store to multiversion stores
Expand Down
29 changes: 28 additions & 1 deletion tasks/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestProcessAll(t *testing.T) {
name: "Test tx writing to a store that another tx is iterating",
workers: 50,
runs: 1,
requests: requestList(500),
requests: requestList(100),
addStores: true,
before: func(ctx sdk.Context) {
kv := ctx.MultiStore().GetKVStore(testStoreKey)
Expand Down Expand Up @@ -208,6 +208,33 @@ func TestProcessAll(t *testing.T) {
},
expectedErr: nil,
},
{
name: "Test some tx accesses same key",
workers: 50,
runs: 1,
addStores: true,
requests: requestList(2000),
deliverTxFunc: func(ctx sdk.Context, req types.RequestDeliverTx) types.ResponseDeliverTx {
if ctx.TxIndex()%10 != 0 {
return types.ResponseDeliverTx{
Info: "none",
}
}
// all txs read and write to the same key to maximize conflicts
kv := ctx.MultiStore().GetKVStore(testStoreKey)
val := string(kv.Get(itemKey))

// write to the store with this tx's index
kv.Set(itemKey, req.Tx)

// return what was read from the store (final attempt should be index-1)
return types.ResponseDeliverTx{
Info: val,
}
},
assertions: func(t *testing.T, ctx sdk.Context, res []types.ResponseDeliverTx) {},
expectedErr: nil,
},
{
name: "Test no stores on context should not panic",
workers: 50,
Expand Down

0 comments on commit d0d7902

Please sign in to comment.