Skip to content
This repository has been archived by the owner on Oct 20, 2024. It is now read-only.

Commit

Permalink
feature/queue-with-rpc - pre final draft #6
Browse files Browse the repository at this point in the history
  • Loading branch information
MindHunter86 committed Dec 11, 2021
1 parent 8b127b1 commit 61cb18d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
39 changes: 28 additions & 11 deletions cloner/cloner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ import (
type Cloner struct {
srcNexus, dstNexus *nexus
mainDispatcher *dispatcher
uplDispatcher *dispatcher
}

var (
gLog *zerolog.Logger
gCli *cli.Context
gApi *nexusApi
gRpc *rpcClient
gQueue *dispatcher
gIsDebug bool
gLog *zerolog.Logger
gCli *cli.Context
gApi *nexusApi
gRpc *rpcClient
gQueue *dispatcher
gUplQueue *dispatcher
gIsDebug bool
)

var (
Expand Down Expand Up @@ -63,7 +65,7 @@ func (m *Cloner) Bootstrap(ctx *cli.Context) error {
signal.Notify(kernSignal, syscall.SIGINT, syscall.SIGTERM, syscall.SIGKILL, syscall.SIGQUIT)

var wg sync.WaitGroup
var ep = make(chan error, 1)
var ep = make(chan error, 10) // !!

m.mainDispatcher = newDispatcher(gCli.Int("queue-buffer"), gCli.Int("queue-workers-capacity"), gCli.Int("queue-workers"))
gQueue = m.mainDispatcher
Expand All @@ -73,7 +75,7 @@ func (m *Cloner) Bootstrap(ctx *cli.Context) error {
defer wg.Done()

gLog.Info().Msg("Main queue spawning ...")
ep <- m.mainDispatcher.bootstrap()
ep <- m.mainDispatcher.bootstrap(false)
}()

// FEATURE RPC
Expand All @@ -92,13 +94,17 @@ func (m *Cloner) Bootstrap(ctx *cli.Context) error {
}
}()

var fuckyou bool
LOOP:
for {
select {
case <-kernSignal:
gLog.Info().Msg("Syscall.SIG* has been detected! Closing application...")
break LOOP
case e = <-ep:
if fuckyou {
continue
}
if e != nil {
gLog.Error().Err(e).Msg("Fatal Runtime Error!!! Abnormal application closing ...")
break LOOP
Expand All @@ -110,11 +116,13 @@ LOOP:
break LOOP
}

break LOOP
gLog.Info().Msg("GOOD, CLOSE APPLICATION")
fuckyou = true
}
}

m.mainDispatcher.destroy()
m.uplDispatcher.destroy()
wg.Wait()
fmt.Println("OKOK")

Expand All @@ -137,12 +145,21 @@ func (m *Cloner) syncRPC(ep chan error) (e error) {
m.mainDispatcher = newDispatcher(gCli.Int("queue-buffer"), gCli.Int("queue-workers-capacity"), gCli.Int("queue-workers"))
gQueue = m.mainDispatcher

wg.Add(1)
m.uplDispatcher = newDispatcher(gCli.Int("queue-buffer"), gCli.Int("queue-workers-capacity"), gCli.Int("queue-workers"))
gUplQueue = m.uplDispatcher

wg.Add(1) // !!
go func() {
defer wg.Done()

gLog.Info().Msg("Main queue spawning ...")
ep <- m.mainDispatcher.bootstrap()
ep <- m.mainDispatcher.bootstrap(false)
}()
go func() {
// defer wg.Done()

gLog.Info().Msg("Upload queue spawning ...")
ep <- m.uplDispatcher.bootstrap(true)
}()

if e = m.srcNexus.createTemporaryDirectory(); e != nil {
Expand Down
24 changes: 16 additions & 8 deletions cloner/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func newDispatcher(queueBuffer, workerCapacity, workers int) *dispatcher {
}
}

func (m *dispatcher) bootstrap() (e error) {
func (m *dispatcher) bootstrap(noTimer bool) (e error) {
gLog.Debug().Msg("dispatcher initialization...")

var wg sync.WaitGroup
Expand All @@ -109,22 +109,27 @@ func (m *dispatcher) bootstrap() (e error) {
}

gLog.Debug().Msg("workers spawned successfully")
m.dispatch()
m.dispatch(noTimer)

fmt.Println("WAIT")
wg.Wait()
fmt.Println("OK")
return
}

func (m *dispatcher) dispatch() {
func (m *dispatcher) dispatch(noTimer bool) {
gLog.Debug().Msg("dispatcher start dispatching...")
gLog.Debug().Msg("dispatcher - entering in event loop")

var mu sync.RWMutex
var waitingWorkers int

// !!
var timer = time.NewTimer(5 * time.Second)
timer.Stop()
if !noTimer {
timer = time.NewTimer(5 * time.Second)
}

for {
select {
Expand Down Expand Up @@ -157,7 +162,7 @@ func (m *dispatcher) dispatch() {
// }
// }(Reset changes the timer to expire after duration d. It returns true if the timer had been active, false if the timer had expired or been stopped.)
case status := <-m.statusPipe:
if timer.Stop() {
if timer.Stop() && !noTimer {
gLog.Info().Msg("Timer stopped")
}

Expand All @@ -172,10 +177,13 @@ func (m *dispatcher) dispatch() {
mu.Unlock()
}

gLog.Debug().Msgf("STATUS active - %d;", waitingWorkers)
gLog.Debug().Bool("notimer", noTimer).Msgf("STATUS active - %d;", waitingWorkers)
if waitingWorkers == m.workers {
gLog.Info().Msg("Reset timer")
timer.Reset(5 * time.Second)

if !noTimer {
timer.Reset(5 * time.Second)
}
}
case <-timer.C:
gLog.Info().Msg("There is no jobs, closing dispatcher")
Expand Down Expand Up @@ -285,7 +293,7 @@ func (m *worker) doJob(j *job) {
return
}

gQueue.newJob(&job{
gUplQueue.newJob(&job{
action: jobActUploadAsset,
payload: []interface{}{nxsTo, asset},
})
Expand All @@ -308,7 +316,7 @@ func (m *worker) doJob(j *job) {
nxs.incUploadedAssets()
gLog.Info().Msgf("Asset %s has been upload successfully.", asset.getHumanReadbleName())

gQueue.newJob(&job{
gUplQueue.newJob(&job{
action: jobActDeleteAsset,
payload: []interface{}{nxs, asset},
})
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
},
cli.IntFlag{
Name: "queue-workers",
Value: 24,
Value: 25,
},
cli.IntFlag{
Name: "queue-workers-capacity",
Expand Down

0 comments on commit 61cb18d

Please sign in to comment.