Skip to content

Commit

Permalink
refactor(services/systat): 添加缓存数据的功能
Browse files Browse the repository at this point in the history
  • Loading branch information
caixw committed Dec 20, 2024
1 parent f08b463 commit beb06bc
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
14 changes: 13 additions & 1 deletion services/systat/systat.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package systat

import (
"container/ring"
"context"
"time"

Expand All @@ -16,6 +17,7 @@ import (
// 系统状态监视服务
type service struct {
events *events.Event[*Stats]
ring *ring.Ring
}

// Init 初始化监视系统状态的服务
Expand All @@ -24,9 +26,11 @@ type service struct {
//
// dur 为监视数据的频率;
// interval 为每次监视数据的时间;
func Init(s web.Server, dur, interval time.Duration) events.Subscriber[*Stats] {
// size 缓存监控数据的数量;
func Init(s web.Server, dur, interval time.Duration, size int) events.Subscriber[*Stats] {
srv := &service{
events: events.New[*Stats](),
ring: ring.New(size),
}

job := func(now time.Time) error {
Expand All @@ -36,6 +40,8 @@ func Init(s web.Server, dur, interval time.Duration) events.Subscriber[*Stats] {

stat, err := calcState(interval, now)
if err == nil {
srv.ring.Value = stat
srv.ring = srv.ring.Next()
srv.events.Publish(true, stat)
}
return err
Expand All @@ -48,5 +54,11 @@ func Init(s web.Server, dur, interval time.Duration) events.Subscriber[*Stats] {

// Subscribe 订阅状态变化的通知
func (s *service) Subscribe(f events.SubscribeFunc[*Stats]) context.CancelFunc {
s.ring.Next().Do(func(v any) { // 一次性发送所有缓存的数据
if v != nil {
f(v.(*Stats))
}
})

return s.events.Subscribe(f)
}
13 changes: 9 additions & 4 deletions services/systat/systat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,24 @@ func TestService(t *testing.T) {
defer servertest.Run(a, s)()
defer s.Close(0)

sub := Init(s, time.Second, time.Second)
sub := Init(s, time.Second, time.Second, 10)
o1 := &bytes.Buffer{}
o2 := &bytes.Buffer{}

sub.Subscribe(func(data *Stats) {
o1.Write([]byte(data.Created.String()))
})
time.Sleep(2 * time.Second)
a.NotZero(o1.Len()).
Zero(o2.Len())

// 后订阅,但是内容应该和之前 o1 是一样的

sub.Subscribe(func(data *Stats) {
o2.Write([]byte(data.Created.String()))
})

time.Sleep(2 * time.Second)

a.NotZero(o1.Len()).
NotZero(o2.Len())
NotZero(o2.Len()).
Equal(o1.String(), o2.String())
}

0 comments on commit beb06bc

Please sign in to comment.