Skip to content

Commit

Permalink
Merge pull request #9 from hpidcock/time-dilation
Browse files Browse the repository at this point in the history
Added Advance to dilated wall clock.
  • Loading branch information
hpidcock authored Aug 8, 2022
2 parents a2b96c8 + 3137a9d commit 68a2376
Show file tree
Hide file tree
Showing 9 changed files with 342 additions and 45 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.vscode/
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/juju/clock

go 1.17
go 1.18

require (
github.com/juju/errors v0.0.0-20220203013757-bd733f3c86b9
Expand Down
3 changes: 0 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,15 @@ github.com/juju/retry v0.0.0-20180821225755-9058e192b216/go.mod h1:OohPQGsr4pnxw
github.com/juju/testing v0.0.0-20180402130637-44801989f0f7/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/juju/testing v0.0.0-20190723135506-ce30eb24acd2/go.mod h1:63prj8cnj0tU0S9OHjGJn+b1h0ZghCndfnbQolrYTwA=
github.com/juju/testing v0.0.0-20210302031854-2c7ee8570c07/go.mod h1:7lxZW0B50+xdGFkvhAb8bwAGt6IU87JB1H9w4t8MNVM=
github.com/juju/testing v0.0.0-20220202055744-1ad0816210a6/go.mod h1:QgWc2UdIPJ8t3rnvv95tFNOsQDfpXYEZDbP281o3b2c=
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494 h1:XEDzpuZb8Ma7vLja3+5hzUqVTvAqm5Y+ygvnDs5iTMM=
github.com/juju/testing v0.0.0-20220203020004-a0ff61f03494/go.mod h1:rUquetT0ALL48LHZhyRGvjjBH8xZaZ8dFClulKK5wK4=
github.com/juju/utils v0.0.0-20180424094159-2000ea4ff043/go.mod h1:6/KLg8Wz/y2KVGWEpkK9vMNGkOnu4k/cqs8Z1fKjTOk=
github.com/juju/utils v0.0.0-20200116185830-d40c2fe10647 h1:wQpkHVbIIpz1PCcLYku9KFWsJ7aEMQXHBBmLy3tRBTk=
github.com/juju/utils v0.0.0-20200116185830-d40c2fe10647/go.mod h1:6/KLg8Wz/y2KVGWEpkK9vMNGkOnu4k/cqs8Z1fKjTOk=
github.com/juju/utils/v2 v2.0.0-20200923005554-4646bfea2ef1/go.mod h1:fdlDtQlzundleLLz/ggoYinEt/LmnrpNKcNTABQATNI=
github.com/juju/utils/v3 v3.0.0-20220130232349-cd7ecef0e94a h1:5ZWDCeCF0RaITrZGemzmDFIhjR/MVSvBUqgSyaeTMbE=
github.com/juju/utils/v3 v3.0.0-20220130232349-cd7ecef0e94a/go.mod h1:LzwbbEN7buYjySp4nqnti6c6olSqRXUk6RkbSUUP1n8=
github.com/juju/version v0.0.0-20161031051906-1f41e27e54f2/go.mod h1:kE8gK5X0CImdr7qpSKl3xB2PmpySSmfj7zVbkZFs81U=
github.com/juju/version v0.0.0-20180108022336-b64dbd566305/go.mod h1:kE8gK5X0CImdr7qpSKl3xB2PmpySSmfj7zVbkZFs81U=
github.com/juju/version v0.0.0-20191219164919-81c1be00b9a6 h1:nrqc9b4YKpKV4lPI3GPPFbo5FUuxkWxgZE2Z8O4lgaw=
github.com/juju/version v0.0.0-20191219164919-81c1be00b9a6/go.mod h1:kE8gK5X0CImdr7qpSKl3xB2PmpySSmfj7zVbkZFs81U=
github.com/juju/version/v2 v2.0.0-20211007103408-2e8da085dc23 h1:wtEPbidt1VyHlb8RSztU6ySQj29FLsOQiI9XiJhXDM4=
github.com/juju/version/v2 v2.0.0-20211007103408-2e8da085dc23/go.mod h1:Ljlbryh9sYaUSGXucslAEDf0A2XUSGvDbHJgW8ps6nc=
Expand Down
2 changes: 2 additions & 0 deletions testclock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Clock struct {
notifyAlarms chan struct{}
}

var _ AdvanceableClock = (*Clock)(nil)

// NewClock returns a new clock set to the supplied time. If your SUT needs to
// call After, AfterFunc, NewTimer or Timer.Reset more than 10000 times: (1)
// you have probably written a bad test; and (2) you'll need to read from the
Expand Down
10 changes: 0 additions & 10 deletions testclock/clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package testclock_test

import (
"sync"
gotesting "testing"
"time"

"github.com/juju/loggo"
Expand All @@ -20,10 +19,6 @@ type clockSuite struct {
testing.LoggingSuite
}

func TestAll(t *gotesting.T) {
gc.TestingT(t)
}

var _ = gc.Suite(&clockSuite{})

func (*clockSuite) TestNow(c *gc.C) {
Expand All @@ -32,11 +27,6 @@ func (*clockSuite) TestNow(c *gc.C) {
c.Assert(cl.Now(), gc.Equals, t0)
}

var (
shortWait = 50 * time.Millisecond
longWait = time.Second
)

func (*clockSuite) TestAdvanceLogs(c *gc.C) {
loggo.GetLogger("juju.clock").SetLogLevel(loggo.DEBUG)
t0 := time.Now()
Expand Down
201 changes: 176 additions & 25 deletions testclock/dilated.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,218 @@
package testclock

import (
"sync"
"sync/atomic"
"time"

"github.com/juju/clock"
)

// NewDilatedWallClock returns a clock that can be sped up or slowed down.
// realSecondDuration is the real duration of a second.
func NewDilatedWallClock(realSecondDuration time.Duration) clock.Clock {
return &dilationClock{
func NewDilatedWallClock(realSecondDuration time.Duration) AdvanceableClock {
dc := &dilationClock{
epoch: time.Now(),
realSecondDuration: realSecondDuration,
offsetChanged: make(chan any),
}
dc.offsetChangedCond = sync.NewCond(dc.offsetChangedMutex.RLocker())
return dc
}

type dilationClock struct {
epoch time.Time
realSecondDuration time.Duration

// offsetAtomic is the current dilated offset to allow for time jumps/advances.
offsetAtomic int64
// offsetChanged is a channel that is closed when timers need to be signaled
// that there is a offset change coming.
offsetChanged chan any
// offsetChangedMutex is a mutex protecting the offsetChanged and is used by
// the offsetChangedCond.
offsetChangedMutex sync.RWMutex
// offsetChangedCond is used to signal timers that they may try to pull the new
// offset.
offsetChangedCond *sync.Cond
}

// Now is part of the Clock interface.
func (dc *dilationClock) Now() time.Time {
now := time.Now()
return dc.epoch.Add(time.Duration(float64(now.Sub(dc.epoch)) / dc.realSecondDuration.Seconds()))
dt, _ := dc.nowWithOffset()
return dt
}

func (dc *dilationClock) nowWithOffset() (time.Time, time.Duration) {
offset := time.Duration(atomic.LoadInt64(&dc.offsetAtomic))
realNow := time.Now()
dt := dilateTime(dc.epoch, realNow, dc.realSecondDuration, offset)
return dt, offset
}

// After implements Clock.After.
// After implements Clock.After
func (dc *dilationClock) After(d time.Duration) <-chan time.Time {
return time.After(time.Duration(float64(d) * dc.realSecondDuration.Seconds()))
t := newDilatedWallTimer(dc, d, nil)
return t.c
}

// AfterFunc implements Clock.AfterFunc.
// AfterFunc implements Clock.AfterFunc
func (dc *dilationClock) AfterFunc(d time.Duration, f func()) clock.Timer {
return dilatedWallTimer{
timer: time.AfterFunc(time.Duration(float64(d)*dc.realSecondDuration.Seconds()), f),
realSecondDuration: dc.realSecondDuration,
}
return newDilatedWallTimer(dc, d, f)
}

// NewTimer implements Clock.NewTimer.
// NewTimer implements Clock.NewTimer
func (dc *dilationClock) NewTimer(d time.Duration) clock.Timer {
return dilatedWallTimer{
timer: time.NewTimer(time.Duration(float64(d) * dc.realSecondDuration.Seconds())),
realSecondDuration: dc.realSecondDuration,
}
return newDilatedWallTimer(dc, d, nil)
}

// Advance implements AdvanceableClock.Advance
func (dc *dilationClock) Advance(d time.Duration) {
close(dc.offsetChanged)
dc.offsetChangedMutex.Lock()
dc.offsetChanged = make(chan any)
atomic.AddInt64(&dc.offsetAtomic, int64(d))
dc.offsetChangedCond.Broadcast()
dc.offsetChangedMutex.Unlock()
}

// dilatedWallTimer implements the Timer interface.
type dilatedWallTimer struct {
timer *time.Timer
realSecondDuration time.Duration
timer *time.Timer
dc *dilationClock
c chan time.Time
target time.Time
offset time.Duration
after func()
done chan any
resetChan chan resetReq
resetMutex sync.Mutex
stopChan chan chan bool
}

type resetReq struct {
d time.Duration
r chan bool
}

func newDilatedWallTimer(dc *dilationClock, d time.Duration, after func()) *dilatedWallTimer {
t := &dilatedWallTimer{
dc: dc,
c: make(chan time.Time),
resetChan: make(chan resetReq),
stopChan: make(chan chan bool),
}
t.start(d, after)
return t
}

func (t *dilatedWallTimer) start(d time.Duration, after func()) {
t.dc.offsetChangedMutex.RLock()
dialatedNow, offset := t.dc.nowWithOffset()
realDuration := time.Duration(float64(d) * t.dc.realSecondDuration.Seconds())
t.target = dialatedNow.Add(d)
t.timer = time.NewTimer(realDuration)
t.offset = offset
t.after = after
t.done = make(chan any)
go t.run()
}

func (t *dilatedWallTimer) run() {
defer t.dc.offsetChangedMutex.RUnlock()
defer close(t.done)
var sendChan chan time.Time
var sendTime time.Time
for {
select {
case reset := <-t.resetChan:
realNow := time.Now()
dialatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset)
realDuration := time.Duration(float64(reset.d) * t.dc.realSecondDuration.Seconds())
t.target = dialatedNow.Add(reset.d)
sendChan = nil
sendTime = time.Time{}
reset.r <- t.timer.Reset(realDuration)
case stop := <-t.stopChan:
stop <- t.timer.Stop()
return
case tt := <-t.timer.C:
if t.after != nil {
t.after()
return
}
if sendChan != nil {
panic("reset should have been called")
}
sendChan = t.c
sendTime = tt
case sendChan <- sendTime:
sendChan = nil
sendTime = time.Time{}
return
case <-t.dc.offsetChanged:
t.dc.offsetChangedCond.Wait()
newOffset := time.Duration(atomic.LoadInt64(&t.dc.offsetAtomic))
if newOffset == t.offset {
continue
}
t.offset = newOffset
stopped := t.timer.Stop()
if !stopped {
panic("stopped timer but still running")
}
realNow := time.Now()
dialatedNow := dilateTime(t.dc.epoch, realNow, t.dc.realSecondDuration, t.offset)
dialatedDuration := t.target.Sub(dialatedNow)
if dialatedDuration <= 0 {
sendChan = t.c
sendTime = dialatedNow
continue
}
realDuration := time.Duration(float64(dialatedDuration) * t.dc.realSecondDuration.Seconds())
t.timer.Reset(realDuration)
}
}
}

// Chan implements Timer.Chan.
func (t dilatedWallTimer) Chan() <-chan time.Time {
return t.timer.C
// Chan implements Timer.Chan
func (t *dilatedWallTimer) Chan() <-chan time.Time {
return t.c
}

func (t dilatedWallTimer) Reset(d time.Duration) bool {
return t.timer.Reset(time.Duration(float64(d) * t.realSecondDuration.Seconds()))
// Chan implements Timer.Reset
func (t *dilatedWallTimer) Reset(d time.Duration) bool {
t.resetMutex.Lock()
defer t.resetMutex.Unlock()
reset := resetReq{
d: d,
r: make(chan bool),
}
select {
case <-t.done:
t.start(d, nil)
return true
case t.resetChan <- reset:
return <-reset.r
}
}

// Chan implements Timer.Stop
func (t *dilatedWallTimer) Stop() bool {
stop := make(chan bool)
select {
case <-t.done:
return false
case t.stopChan <- stop:
return <-stop
}
}

func (t dilatedWallTimer) Stop() bool {
return t.timer.Stop()
func dilateTime(epoch, realNow time.Time,
realSecondDuration, dilatedOffset time.Duration) time.Time {
delta := realNow.Sub(epoch)
if delta < 0 {
delta = time.Duration(0)
}
return epoch.Add(dilatedOffset).Add(time.Duration(float64(delta) / realSecondDuration.Seconds()))
}
Loading

0 comments on commit 68a2376

Please sign in to comment.