Skip to content

Commit 10a60c7

Browse files
committed
fix(leaks): fix leaks
1 parent 9455d9e commit 10a60c7

File tree

9 files changed

+82
-29
lines changed

9 files changed

+82
-29
lines changed

application/abstract.go

-10
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,7 @@ import (
55
"fmt"
66
"io/ioutil"
77
"net/http"
8-
"os"
9-
"os/signal"
108
"sync"
11-
"syscall"
129
"time"
1310

1411
"github.com/Halfi/postmanq/common"
@@ -310,13 +307,6 @@ func (i InitFireAction) Fire(app common.Application, event *common.ApplicationEv
310307
}
311308

312309
func (i InitFireAction) PreFire(app common.Application, event *common.ApplicationEvent) {
313-
sig := make(chan os.Signal, 1)
314-
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
315-
go func() {
316-
<-sig
317-
app.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind))
318-
}()
319-
320310
bytes, warn, err := app.GetConfigData()
321311
if warn != nil {
322312
logger.All().WarnWithErr(err, "application configuration read warning")

cmd/postmanq/main.go

+10
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"flag"
55
"fmt"
66
"os"
7+
"os/signal"
8+
"syscall"
79

810
"github.com/Halfi/postmanq/application"
911
"github.com/Halfi/postmanq/common"
@@ -28,6 +30,14 @@ func main() {
2830
}
2931

3032
app := application.NewPost()
33+
34+
sig := make(chan os.Signal, 1)
35+
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
36+
go func() {
37+
<-sig
38+
app.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind))
39+
}()
40+
3141
if app.IsValidConfigFilename(file) {
3242
app.SetConfigMeta(file, configURL, configUpdateDuration)
3343
app.Run()

cmd/tools/pmq-grep/main.go

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
69

710
"github.com/Halfi/postmanq/application"
811
"github.com/Halfi/postmanq/common"
@@ -18,6 +21,14 @@ func main() {
1821
flag.Parse()
1922

2023
app := application.NewGrep()
24+
25+
sig := make(chan os.Signal, 1)
26+
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
27+
go func() {
28+
<-sig
29+
app.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind))
30+
}()
31+
2132
if app.IsValidConfigFilename(file) && recipient != common.InvalidInputString {
2233
app.SetConfigMeta(file, configURL, "")
2334
app.RunWithArgs(envelope, recipient, numberLines)

cmd/tools/pmq-publish/main.go

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
69

710
"github.com/Halfi/postmanq/application"
811
"github.com/Halfi/postmanq/common"
@@ -22,6 +25,14 @@ func main() {
2225
flag.Parse()
2326

2427
app := application.NewPublish()
28+
29+
sig := make(chan os.Signal, 1)
30+
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
31+
go func() {
32+
<-sig
33+
app.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind))
34+
}()
35+
2536
if app.IsValidConfigFilename(file) &&
2637
srcQueue != common.InvalidInputString &&
2738
destQueue != common.InvalidInputString {

cmd/tools/pmq-report/main.go

+11
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ package main
33
import (
44
"flag"
55
"fmt"
6+
"os"
7+
"os/signal"
8+
"syscall"
69

710
"github.com/Halfi/postmanq/application"
811
"github.com/Halfi/postmanq/common"
@@ -15,6 +18,14 @@ func main() {
1518
flag.Parse()
1619

1720
app := application.NewReport()
21+
22+
sig := make(chan os.Signal, 1)
23+
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
24+
go func() {
25+
<-sig
26+
app.SendEvents(common.NewApplicationEvent(common.FinishApplicationEventKind))
27+
}()
28+
1829
if app.IsValidConfigFilename(file) {
1930
app.SetConfigMeta(file, configURL, "")
2031
app.Run()

connector/connector.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,10 @@ receiveConnect:
8080
targetClient.Wakeup()
8181
event.Client = targetClient
8282
// передаем событие отправителю
83-
event.Iterator.Next().(common.SendingService).Event(event.SendEvent)
83+
next := event.Iterator.Next()
84+
if next != nil {
85+
next.(common.SendingService).Event(event.SendEvent)
86+
}
8487
}
8588
return
8689

heap.out

5.67 KB
Binary file not shown.

limiter/cleaner.go

+28-13
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,49 @@
11
package limiter
22

33
import (
4+
"context"
45
"sync/atomic"
56
"time"
67
)
78

89
// Cleaner чистильщик, проверяет значения ограничений и обнуляет значения ограничений
910
type Cleaner struct {
1011
service *Service
12+
ticker *time.Ticker
13+
ctx context.Context
14+
done context.CancelFunc
1115
}
1216

1317
// создает нового чистильщика
14-
func newCleaner(service *Service) {
15-
(&Cleaner{service: service}).clean()
18+
func newCleaner(service *Service) *Cleaner {
19+
ctx, done := context.WithCancel(context.Background())
20+
return &Cleaner{service: service, ticker: time.NewTicker(time.Second), ctx: ctx, done: done}
1621
}
1722

1823
// проверяет значения ограничений и обнуляет значения ограничений
19-
func (c *Cleaner) clean() {
20-
for now := range c.service.ticker.C {
21-
// смотрим все ограничения
22-
for _, conf := range c.service.Configs {
23-
for _, limit := range conf.Limits {
24-
// проверяем дату последнего изменения ограничения
25-
if !limit.isValidDuration(now) {
26-
// если дата последнего изменения выходит за промежуток времени для проверки
27-
// обнулям текущее количество отправленных писем
28-
atomic.StoreInt32(&limit.currentValue, 0)
29-
limit.modifyDate = time.Now()
24+
func (c *Cleaner) run() {
25+
for {
26+
select {
27+
case <-c.ctx.Done():
28+
return
29+
case now := <-c.ticker.C:
30+
// смотрим все ограничения
31+
for _, conf := range c.service.Configs {
32+
for _, limit := range conf.Limits {
33+
// проверяем дату последнего изменения ограничения
34+
if !limit.isValidDuration(now) {
35+
// если дата последнего изменения выходит за промежуток времени для проверки
36+
// обнулям текущее количество отправленных писем
37+
atomic.StoreInt32(&limit.currentValue, 0)
38+
limit.modifyDate = time.Now()
39+
}
3040
}
3141
}
3242
}
3343
}
3444
}
45+
46+
func (c *Cleaner) stop() {
47+
c.ticker.Stop()
48+
c.done()
49+
}

limiter/service.go

+7-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package limiter
22

33
import (
4-
"time"
5-
64
"gopkg.in/yaml.v3"
75

86
"github.com/Halfi/postmanq/common"
@@ -16,14 +14,14 @@ type Service struct {
1614

1715
Configs map[string]*Config `yaml:"postmans"`
1816

19-
ticker *time.Ticker
17+
cleaner *Cleaner
2018
events chan *common.SendEvent
2119
eventsClosed bool
2220
}
2321

2422
// Inst создает сервис ограничений
2523
func Inst() common.SendingService {
26-
return &Service{ticker: time.NewTicker(time.Second)}
24+
return &Service{}
2725
}
2826

2927
// OnInit инициализирует сервис
@@ -43,6 +41,8 @@ func (s *Service) OnInit(event *common.ApplicationEvent) {
4341
s.events = make(chan *common.SendEvent)
4442
s.eventsClosed = false
4543

44+
s.cleaner = newCleaner(s)
45+
4646
for name, config := range s.Configs {
4747
s.init(config, name)
4848
}
@@ -67,7 +67,7 @@ func (s *Service) init(conf *Config, hostname string) {
6767
// OnRun запускает проверку ограничений и очистку значений лимитов
6868
func (s *Service) OnRun() {
6969
// сразу запускаем проверку значений ограничений
70-
go newCleaner(s)
70+
go s.cleaner.run()
7171
for i := 0; i < s.LimitersCount; i++ {
7272
go newLimiter(i+1, s)
7373
}
@@ -88,6 +88,8 @@ func (s *Service) OnFinish() {
8888
if !s.eventsClosed {
8989
s.eventsClosed = true
9090
close(s.events)
91+
s.cleaner.stop()
92+
s.cleaner = nil
9193
}
9294
}
9395

0 commit comments

Comments
 (0)