diff --git a/README.md b/README.md index 1712d8e..b30f6d5 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,35 @@ ![Tunny](tunny_logo.png "Tunny") -Tunny is a Golang library for spawning and managing a goroutine pool. +[![godoc for Jeffail/tunny][1]][2] +[![goreportcard for Jeffail/tunny][3]][4] -The API is synchronous and simple to use. Jobs are allocated to a worker when one becomes available. +Tunny is a Golang library for spawning and managing a goroutine pool, allowing +you to limit work coming from any number of goroutines with a synchronous API. -https://godoc.org/github.com/Jeffail/tunny +A fixed goroutine pool is helpful when you have work coming from an arbitrary +number of asynchronous sources, but a limited capacity for parallel processing. +For example, when processing jobs from HTTP requests that are CPU heavy you can +create a pool with a size that matches your CPU count. -## How to install: +## Install -```bash -go get github.com/jeffail/tunny +``` sh +go get github.com/Jeffail/tunny ``` -## How to use: +Or, using dep: -The most obvious use for a goroutine pool would be limiting heavy jobs to the number of CPUs available. In the example below we limit the work from arbitrary numbers of HTTP request goroutines through our pool. +``` sh +dep ensure -add github.com/Jeffail/tunny +``` + +## Use -```go +For most cases your heavy work can be expressed in a simple `func()`, where you +can use `NewFunc`. Let's see how this looks using our HTTP requests to CPU count +example: + +``` go package main import ( @@ -24,22 +37,19 @@ import ( "net/http" "runtime" - "github.com/jeffail/tunny" + "github.com/Jeffail/tunny" ) func main() { numCPUs := runtime.NumCPU() - runtime.GOMAXPROCS(numCPUs+1) // numCPUs hot threads + one for async tasks. - - pool, _ := tunny.CreatePool(numCPUs, func(object interface{}) interface{} { - input, _ := object.([]byte) - // Do something that takes a lot of work - output := input + pool := tunny.NewFunc(numCPUs, func(payload interface{}) interface{} { + var result []byte - return output - }).Open() + // TODO: Something CPU heavy with payload + return result + }) defer pool.Close() http.HandleFunc("/work", func(w http.ResponseWriter, r *http.Request) { @@ -47,9 +57,11 @@ func main() { if err != nil { http.Error(w, "Internal error", http.StatusInternalServerError) } + defer r.Body.Close() - // Send work to our pool - result, _ := pool.SendWork(input) + // Funnel this work into our pool. This call is synchronous and will + // block until the job is completed. + result := pool.Process(input) w.Write(result.([]byte)) }) @@ -58,172 +70,56 @@ func main() { } ``` -Tunny supports timeouts. You can replace the `SendWork` call above to the following: - -```go - // Or, alternatively, send it with a timeout (in this case 5 seconds). - result, err := pool.SendWorkTimed(5000, input) - if err != nil { - http.Error(w, "Request timed out", http.StatusRequestTimeout) - } -``` - -## Can I send a closure instead of data? - -Yes, the arguments passed to the worker are boxed as interface{}, so this can actually be a func, you can implement this yourself, or if you're not bothered about return values you can use: - -```go -exampleChannel := make(chan int) - -pool, _ := tunny.CreatePoolGeneric(numCPUs).Open() - -err := pool.SendWork(func() { - /* Do your hard work here, usual rules of closures apply here, - * so you can return values like so: - */ - exampleChannel <- 10 -}) +Tunny also supports timeouts. You can replace the `Process` call above to the +following: -if err != nil { - // You done goofed +``` go +result, err := pool.ProcessTimed(input, time.Second*5) +if err == tunny.ErrJobTimedOut { + http.Error(w, "Request timed out", http.StatusRequestTimeout) } ``` -## How do I give my workers state? - -Tunny workers implement the `TunnyWorkers` interface, simply implement this interface to have your own objects (and state) act as your workers. +## Changing Pool Size -```go -/* -TunnyWorker - The basic interface of a tunny worker. -*/ -type TunnyWorker interface { +The size of a Tunny pool can be changed at any time with `SetSize(int)`: - // Called for each job, expects the result to be returned synchronously - TunnyJob(interface{}) interface{} - - // Called after each job, this indicates whether the worker is ready for the next job. - // The default implementation is to return true always. If false is returned then the - // method is called every five milliseconds until either true is returned or the pool - // is closed. - TunnyReady() bool -} +``` go +pool.SetSize(10) // 10 goroutines +pool.SetSize(100) // 100 goroutines ``` -Here is a short example: - -```go -type customWorker struct { - // TODO: Put some state here -} - -// Use this call to block further jobs if necessary -func (worker *customWorker) TunnyReady() bool { - return true -} - -// This is where the work actually happens -func (worker *customWorker) TunnyJob(data interface{}) interface{} { - /* TODO: Use and modify state - * there's no need for thread safety paradigms here unless the - * data is being accessed from another goroutine outside of - * the pool. - */ - if outputStr, ok := data.(string); ok { - return ("custom job done: " + outputStr) - } - return nil -} - -func TestCustomWorkers (t *testing.T) { - outChan := make(chan int, 10) +This is safe to perform from any goroutine even if others are still processing. - wg := new(sync.WaitGroup) - wg.Add(10) +## Goroutines With State - workers := make([]tunny.TunnyWorker, 4) - for i, _ := range workers { - workers[i] = &(customWorker{}) - } - - pool, _ := tunny.CreateCustomPool(workers).Open() - - defer pool.Close() +Sometimes each goroutine within a Tunny pool will require its own managed state. +In this case you should implement [`tunny.Worker`][tunny-worker], which includes +calls for terminating, interrupting (in case a job times out and is no longer +needed) and blocking the next job allocation until a condition is met. - for i := 0; i < 10; i++ { - go func() { - value, _ := pool.SendWork("hello world") - fmt.Println(value.(string)) +When creating a pool using `Worker` types you will need to provide a constructor +function for spawning your custom implementation: - wg.Done() - }() - } - - wg.Wait() -} -``` - -The TunnyReady method allows you to use your state to determine whether or not a worker should take on another job. For example, your worker could hold a counter of how many jobs it has done, and perhaps after a certain amount it should perform another act before taking on more work, it's important to use TunnyReady for these occasions since blocking the TunnyJob call will hold up the waiting client. - -It is recommended that you do not block TunnyReady() whilst you wait for some condition to change, since this can prevent the pool from closing the worker goroutines. Currently, TunnyReady is called at 5 millisecond intervals until you answer true or the pool is closed. - -## I need more control - -You crazy fool, let's take this up to the next level. You can optionally implement `TunnyExtendedWorker` for more control. - -```go -/* -TunnyExtendedWorker - An optional interface that can be implemented if the worker needs -more control over its state. -*/ -type TunnyExtendedWorker interface { - - // Called when the pool is opened, this will be called before any jobs are sent. - TunnyInitialize() - - // Called when the pool is closed, this will be called after all jobs are completed. - TunnyTerminate() -} -``` - -## Can a worker detect when a timeout occurs? - -Yes, you can also implement the `TunnyInterruptable` interface. - -```go -/* -TunnyInterruptable - An optional interface that can be implemented in order to allow the -worker to drop jobs when they are abandoned. -*/ -type TunnyInterruptable interface { - - // Called when the current job has been abandoned by the client. - TunnyInterrupt() -} +``` go +pool := tunny.New(poolSize, func() Worker { + // TODO: Any per-goroutine state allocation here. + return newCustomWorker() +}) ``` -This method will be called in the event that a timeout occurs whilst waiting for the result. `TunnyInterrupt` is called from a newly spawned goroutine, so you'll need to create your own mechanism for stopping your worker mid-way through a job. - -## Can SendWork be called asynchronously? - -There are the helper functions SendWorkAsync and SendWorkTimedAsync, that are the same as their respective sync calls with an optional second argument func(interface{}, error), this is the call made when a result is returned and can be nil if there is no need for the closure. - -However, if you find yourself in a situation where the sync return is not necessary then chances are you don't actually need Tunny at all. Golang is all about making concurrent programming simple by nature, and using Tunny for implementing simple async worker calls defeats the great work of the language spec and adds overhead that isn't necessary. - -## Behaviours and caveats: - -### - Workers request jobs on an ad-hoc basis - -When there is a backlog of jobs waiting to be serviced, and all workers are occupied, a job will not be assigned to a worker until it is already prepared for its next job. This means workers do not develop their own individual queues. Instead, the backlog is shared by the entire pool. - -This means an individual worker is able to halt, or spend exceptional lengths of time on a single request without hindering the flow of any other requests, provided there are other active workers in the pool. - -### - A job can be dropped before work is begun - -Tunny has support for specified timeouts at the work request level, if this timeout is triggered whilst waiting for a worker to become available then the request is dropped entirely and no effort is wasted on the abandoned request. +This allows Tunny to create and destroy `Worker` types cleanly when the pool +size is changed. -### - Backlogged jobs are FIFO, for now +## Ordering -When a job arrives and all workers are occupied the waiting thread will lock at a select block whilst waiting to be assigned a worker. In practice this seems to create a FIFO queue, implying that this is how the implementation of Golang has dealt with select blocks, channels and multiple reading goroutines. +Backlogged jobs are not guaranteed to be processed in order. Due to the current +implementation of channels and select blocks a stack of backlogged jobs will be +processed as a FIFO queue. However, this behaviour is not part of the spec and +should not be relied upon. -However, I haven't found a guarantee of this behaviour in the Golang documentation, so I cannot guarantee that this will always be the case. +[1]: https://godoc.org/github.com/Jeffail/tunny?status.svg +[2]: http://godoc.org/github.com/Jeffail/tunny +[3]: https://goreportcard.com/badge/github.com/Jeffail/tunny +[4]: https://goreportcard.com/report/Jeffail/tunny +[tunny-worker]: https://godoc.org/github.com/Jeffail/tunny#Worker diff --git a/tunny.go b/tunny.go index 96b1032..c83ccfa 100644 --- a/tunny.go +++ b/tunny.go @@ -1,379 +1,253 @@ -/* -Copyright (c) 2014 Ashley Jeffs - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ - -// Package tunny implements a simple pool for maintaining independant worker goroutines. +// Copyright (c) 2014 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package tunny import ( "errors" - "expvar" - "reflect" - "strconv" "sync" - "sync/atomic" "time" ) +//------------------------------------------------------------------------------ + // Errors that are used throughout the Tunny API. var ( - ErrPoolAlreadyRunning = errors.New("the pool is already running") - ErrPoolNotRunning = errors.New("the pool is not running") - ErrJobNotFunc = errors.New("generic worker not given a func()") - ErrWorkerClosed = errors.New("worker was closed") - ErrJobTimedOut = errors.New("job request timed out") + ErrPoolNotRunning = errors.New("the pool is not running") + ErrJobNotFunc = errors.New("generic worker not given a func()") + ErrWorkerClosed = errors.New("worker was closed") + ErrJobTimedOut = errors.New("job request timed out") ) -/* -TunnyWorker - The basic interface of a tunny worker. -*/ -type TunnyWorker interface { - - // Called for each job, expects the result to be returned synchronously - TunnyJob(interface{}) interface{} +// Worker is an interface representing a Tunny working agent. It will be used to +// block a calling goroutine until ready to process a job, process that job +// synchronously, interrupt its own process call when jobs are abandoned, and +// clean up its resources when being removed from the pool. +// +// Each of these duties are implemented as a single method and can be averted +// when not needed by simply implementing an empty func. +type Worker interface { + // Process will synchronously perform a job and return the result. + Process(interface{}) interface{} - // Called after each job, this indicates whether the worker is ready for the next job. - // The default implementation is to return true always. If false is returned then the - // method is called every five milliseconds until either true is returned or the pool - // is closed. For efficiency you should have this call block until your worker is ready, - // otherwise you introduce a 5ms latency between jobs. - TunnyReady() bool -} - -/* -TunnyExtendedWorker - An optional interface that can be implemented if the worker needs -more control over its state. -*/ -type TunnyExtendedWorker interface { + // BlockUntilReady is called before each job is processed and must block the + // calling goroutine until the Worker is ready to process the next job. + BlockUntilReady() - // Called when the pool is opened, this will be called before any jobs are sent. - TunnyInitialize() + // Interrupt is called when a job is cancelled. The worker is responsible + // for unblocking the Process implementation. + Interrupt() - // Called when the pool is closed, this will be called after all jobs are completed. - TunnyTerminate() + // Terminate is called when a Worker is removed from the processing pool + // and is responsible for cleaning up any held resources. + Terminate() } -/* -TunnyInterruptable - An optional interface that can be implemented in order to allow the -worker to drop jobs when they are abandoned. -*/ -type TunnyInterruptable interface { - - // Called when the current job has been abandoned by the client. - TunnyInterrupt() -} +//------------------------------------------------------------------------------ -/* -Default and very basic implementation of a tunny worker. This worker holds a closure which -is assigned at construction, and this closure is called on each job. -*/ -type tunnyDefaultWorker struct { - job *func(interface{}) interface{} +// closureWorker is a minimal Worker implementation that simply wraps a +// func(interface{}) interface{} +type closureWorker struct { + processor func(interface{}) interface{} } -func (worker *tunnyDefaultWorker) TunnyJob(data interface{}) interface{} { - return (*worker.job)(data) +func (w *closureWorker) Process(payload interface{}) interface{} { + return w.processor(payload) } -func (worker *tunnyDefaultWorker) TunnyReady() bool { - return true -} +func (w *closureWorker) BlockUntilReady() {} +func (w *closureWorker) Interrupt() {} +func (w *closureWorker) Terminate() {} -/* -WorkPool contains the structures and methods required to communicate with your pool, it must -be opened before sending work and closed when all jobs are completed. - -You may open and close a pool as many times as you wish, calling close is a blocking call that -guarantees all goroutines are stopped. -*/ -type WorkPool struct { - workers []*workerWrapper - selects []reflect.SelectCase - statusMutex sync.RWMutex - running uint32 - pendingAsyncJobs int32 -} +//------------------------------------------------------------------------------ -func (pool *WorkPool) isRunning() bool { - return (atomic.LoadUint32(&pool.running) == 1) -} +// callbackWorker is a minimal Worker implementation that attempts to cast +// each job into func() and either calls it if successful or returns +// ErrJobNotFunc. +type callbackWorker struct{} -func (pool *WorkPool) setRunning(running bool) { - if running { - atomic.SwapUint32(&pool.running, 1) - } else { - atomic.SwapUint32(&pool.running, 0) +func (w *callbackWorker) Process(payload interface{}) interface{} { + f, ok := payload.(func()) + if !ok { + return ErrJobNotFunc } + f() + return nil } -/* -Open all channels and launch the background goroutines managed by the pool. -*/ -func (pool *WorkPool) Open() (*WorkPool, error) { - pool.statusMutex.Lock() - defer pool.statusMutex.Unlock() +func (w *callbackWorker) BlockUntilReady() {} +func (w *callbackWorker) Interrupt() {} +func (w *callbackWorker) Terminate() {} + +//------------------------------------------------------------------------------ - if !pool.isRunning() { +// Pool is a struct that manages a collection of workers, each with their own +// goroutine. The Pool can initialize, expand, compress and close the workers, +// as well as processing jobs with the workers synchronously. +type Pool struct { + ctor func() Worker + workers []*workerWrapper + reqChan chan workRequest - pool.selects = make([]reflect.SelectCase, len(pool.workers)) + workerMut sync.Mutex +} - for i, workerWrapper := range pool.workers { - workerWrapper.Open() +// New creates a new Pool of workers that starts with n workers. You must +// provide a constructor function that creates new Worker types and when you +// change the size of the pool the constructor will be called to create each new +// Worker. +func New(n int, ctor func() Worker) *Pool { + p := &Pool{ + ctor: ctor, + reqChan: make(chan workRequest), + } + p.SetSize(n) - pool.selects[i] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(workerWrapper.readyChan), - } - } + return p +} - pool.setRunning(true) - return pool, nil +// NewFunc creates a new Pool of workers where each worker will process using +// the provided func. +func NewFunc(n int, f func(interface{}) interface{}) *Pool { + return New(n, func() Worker { + return &closureWorker{ + processor: f, + } + }) +} - } - return nil, ErrPoolAlreadyRunning +// NewCallback creates a new Pool of workers where workers cast the job payload +// into a func() and runs it, or returns ErrNotFunc if the cast failed. +func NewCallback(n int) *Pool { + return New(n, func() Worker { + return &callbackWorker{} + }) } -/* -Close all channels and goroutines managed by the pool. -*/ -func (pool *WorkPool) Close() error { - pool.statusMutex.Lock() - defer pool.statusMutex.Unlock() +//------------------------------------------------------------------------------ - if pool.isRunning() { - for _, workerWrapper := range pool.workers { - workerWrapper.Close() - } - for _, workerWrapper := range pool.workers { - workerWrapper.Join() - } - pool.setRunning(false) - return nil +// Process will use the Pool to process a payload and synchronously return the +// result. Process can be called safely by any goroutines, but will panic if the +// Pool has been stopped. +func (p *Pool) Process(payload interface{}) interface{} { + request, open := <-p.reqChan + if !open { + panic(ErrPoolNotRunning) } - return ErrPoolNotRunning -} -/* -CreatePool - Creates a pool of workers, and takes a closure argument which is the action -to perform for each job. -*/ -func CreatePool(numWorkers int, job func(interface{}) interface{}) *WorkPool { - pool := WorkPool{running: 0} - - pool.workers = make([]*workerWrapper, numWorkers) - for i := range pool.workers { - newWorker := workerWrapper{ - worker: &(tunnyDefaultWorker{&job}), - } - pool.workers[i] = &newWorker + request.jobChan <- payload + + payload, open = <-request.retChan + if !open { + panic(ErrWorkerClosed) } - return &pool + return payload } -/* -CreatePoolGeneric - Creates a pool of generic workers. When sending work to a pool of -generic workers you send a closure (func()) which is the job to perform. -*/ -func CreatePoolGeneric(numWorkers int) *WorkPool { - - return CreatePool(numWorkers, func(jobCall interface{}) interface{} { - if method, ok := jobCall.(func()); ok { - method() - return nil - } - return ErrJobNotFunc - }) +// ProcessTimed will use the Pool to process a payload and synchronously return +// the result. If the timeout occurs before the job has finished the worker will +// be interrupted and ErrJobTimedOut will be returned. ProcessTimed can be +// called safely by any goroutines. +func (p *Pool) ProcessTimed( + payload interface{}, + timeout time.Duration, +) (interface{}, error) { + tout := time.NewTimer(timeout) -} + var request workRequest + var open bool -/* -CreateCustomPool - Creates a pool for an array of custom workers. The custom workers -must implement TunnyWorker, and may also optionally implement TunnyExtendedWorker and -TunnyInterruptable. -*/ -func CreateCustomPool(customWorkers []TunnyWorker) *WorkPool { - pool := WorkPool{running: 0} - - pool.workers = make([]*workerWrapper, len(customWorkers)) - for i := range pool.workers { - newWorker := workerWrapper{ - worker: customWorkers[i], + select { + case request, open = <-p.reqChan: + if !open { + return nil, ErrPoolNotRunning } - pool.workers[i] = &newWorker + case <-tout.C: + return nil, ErrJobTimedOut } - return &pool -} + select { + case request.jobChan <- payload: + case <-tout.C: + request.interruptFunc() + return nil, ErrJobTimedOut + } -/* -SendWorkTimed - Send a job to a worker and return the result, this is a synchronous -call with a timeout. -*/ -func (pool *WorkPool) SendWorkTimed(milliTimeout time.Duration, jobData interface{}) (interface{}, error) { - pool.statusMutex.RLock() - defer pool.statusMutex.RUnlock() - - if pool.isRunning() { - before := time.Now() - - // Create a new time out timer - timeout := time.NewTimer(milliTimeout * time.Millisecond) - defer timeout.Stop() - - // Create new selectcase[] and add time out case - selectCases := append(pool.selects[:], reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(timeout.C), - }) - - // Wait for workers, or time out - if chosen, _, ok := reflect.Select(selectCases); ok { - - // Check if the selected index is a worker, otherwise we timed out - if chosen < (len(selectCases) - 1) { - pool.workers[chosen].jobChan <- jobData - - timeoutRemain := time.NewTimer((milliTimeout * time.Millisecond) - time.Since(before)) - defer timeoutRemain.Stop() - - // Wait for response, or time out - select { - case data, open := <-pool.workers[chosen].outputChan: - if !open { - return nil, ErrWorkerClosed - } - return data, nil - case <-timeoutRemain.C: - /* If we time out here we also need to ensure that the output is still - * collected and that the worker can move on. Therefore, we fork the - * waiting process into a new goroutine. - */ - go func() { - pool.workers[chosen].Interrupt() - <-pool.workers[chosen].outputChan - }() - return nil, ErrJobTimedOut - } - } else { - return nil, ErrJobTimedOut - } - } else { - // This means the chosen channel was closed + select { + case payload, open = <-request.retChan: + if !open { return nil, ErrWorkerClosed } - } else { - return nil, ErrPoolNotRunning + case <-tout.C: + request.interruptFunc() + return nil, ErrJobTimedOut } -} -/* -SendWorkTimedAsync - Send a timed job to a worker without blocking, and optionally -send the result to a receiving closure. You may set the closure to nil if no -further actions are required. -*/ -func (pool *WorkPool) SendWorkTimedAsync( - milliTimeout time.Duration, - jobData interface{}, - after func(interface{}, error), -) { - atomic.AddInt32(&pool.pendingAsyncJobs, 1) - go func() { - defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) - result, err := pool.SendWorkTimed(milliTimeout, jobData) - if after != nil { - after(result, err) - } - }() + tout.Stop() + return payload, nil } -/* -SendWork - Send a job to a worker and return the result, this is a synchronous call. -*/ -func (pool *WorkPool) SendWork(jobData interface{}) (interface{}, error) { - pool.statusMutex.RLock() - defer pool.statusMutex.RUnlock() - - if pool.isRunning() { - if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 { - pool.workers[chosen].jobChan <- jobData - result, open := <-pool.workers[chosen].outputChan - - if !open { - return nil, ErrWorkerClosed - } - return result, nil - } - return nil, ErrWorkerClosed +// SetSize changes the total number of workers in the Pool. This can be called +// by any goroutine at any time unless the Pool has been stopped, in which case +// a panic will occur. +func (p *Pool) SetSize(n int) { + p.workerMut.Lock() + defer p.workerMut.Unlock() + + lWorkers := len(p.workers) + if lWorkers == n { + return } - return nil, ErrPoolNotRunning -} -/* -SendWorkAsync - Send a job to a worker without blocking, and optionally send the -result to a receiving closure. You may set the closure to nil if no further actions -are required. -*/ -func (pool *WorkPool) SendWorkAsync(jobData interface{}, after func(interface{}, error)) { - atomic.AddInt32(&pool.pendingAsyncJobs, 1) - go func() { - defer atomic.AddInt32(&pool.pendingAsyncJobs, -1) - result, err := pool.SendWork(jobData) - if after != nil { - after(result, err) - } - }() -} + // Add extra workers if N > len(workers) + for i := lWorkers; i < n; i++ { + p.workers = append(p.workers, newWorkerWrapper(p.reqChan, p.ctor())) + } -/* -NumPendingAsyncJobs - Get the current count of async jobs either in flight, or waiting for a worker -*/ -func (pool *WorkPool) NumPendingAsyncJobs() int32 { - return atomic.LoadInt32(&pool.pendingAsyncJobs) -} + // Asynchronously stop all workers > N + for i := n; i < lWorkers; i++ { + p.workers[i].stop() + } + + // Synchronously wait for all workers > N to stop + for i := n; i < lWorkers; i++ { + p.workers[i].join() + } -/* -NumWorkers - Number of workers in the pool -*/ -func (pool *WorkPool) NumWorkers() int { - return len(pool.workers) + // Remove stopped workers from slice + p.workers = p.workers[:n] } -type liveVarAccessor func() string +// GetSize returns the current size of the pool. +func (p *Pool) GetSize() int { + p.workerMut.Lock() + defer p.workerMut.Unlock() -func (a liveVarAccessor) String() string { - return a() + return len(p.workers) } -/* -PublishExpvarMetrics - Publishes the NumWorkers and NumPendingAsyncJobs to expvars -*/ -func (pool *WorkPool) PublishExpvarMetrics(poolName string) { - ret := expvar.NewMap(poolName) - asyncJobsFn := func() string { - return strconv.FormatInt(int64(pool.NumPendingAsyncJobs()), 10) - } - numWorkersFn := func() string { - return strconv.FormatInt(int64(pool.NumWorkers()), 10) - } - ret.Set("pendingAsyncJobs", liveVarAccessor(asyncJobsFn)) - ret.Set("numWorkers", liveVarAccessor(numWorkersFn)) +// Close will terminate all workers and close the job channel of this Pool. +func (p *Pool) Close() { + p.SetSize(0) + close(p.reqChan) } + +//------------------------------------------------------------------------------ diff --git a/tunny_logo.png b/tunny_logo.png index 36028de..16b7b73 100644 Binary files a/tunny_logo.png and b/tunny_logo.png differ diff --git a/tunny_test.go b/tunny_test.go index f241cbe..3e9aab2 100644 --- a/tunny_test.go +++ b/tunny_test.go @@ -1,77 +1,189 @@ -/* -Copyright (c) 2014 Ashley Jeffs - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ +// Copyright (c) 2014 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. package tunny import ( "sync" + "sync/atomic" "testing" "time" ) -/*-------------------------------------------------------------------------------------------------- - */ +//------------------------------------------------------------------------------ -func TestBasicJob(t *testing.T) { - pool, err := CreatePool(1, func(in interface{}) interface{} { +func TestPoolSizeAdjustment(t *testing.T) { + pool := NewFunc(10, func(interface{}) interface{} { return "foo" }) + if exp, act := 10, len(pool.workers); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + pool.SetSize(10) + if exp, act := 10, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + pool.SetSize(9) + if exp, act := 9, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + pool.SetSize(10) + if exp, act := 10, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + pool.SetSize(0) + if exp, act := 0, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + pool.SetSize(10) + if exp, act := 10, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } + + // Finally, make sure we still have actual active workers. + if exp, act := "foo", pool.Process(0).(string); exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) + } + + pool.Close() + if exp, act := 0, pool.GetSize(); exp != act { + t.Errorf("Wrong size of pool: %v != %v", act, exp) + } +} + +//------------------------------------------------------------------------------ + +func TestFuncJob(t *testing.T) { + pool := NewFunc(10, func(in interface{}) interface{} { intVal := in.(int) return intVal * 2 - }).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return + }) + defer pool.Close() + + for i := 0; i < 10; i++ { + ret := pool.Process(10) + if exp, act := 20, ret.(int); exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) + } } +} + +func TestFuncJobTimed(t *testing.T) { + pool := NewFunc(10, func(in interface{}) interface{} { + intVal := in.(int) + return intVal * 2 + }) defer pool.Close() - for i := 0; i < 1; i++ { - ret, err := pool.SendWork(10) + for i := 0; i < 10; i++ { + ret, err := pool.ProcessTimed(10, time.Millisecond) if err != nil { - t.Errorf("Failed to send work: %v", err) - return + t.Fatalf("Failed to process: %v", err) } - retInt := ret.(int) - if ret != 20 { - t.Errorf("Wrong return value: %v != %v", 20, retInt) + if exp, act := 20, ret.(int); exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) } } } +func TestCallbackJob(t *testing.T) { + pool := NewCallback(10) + defer pool.Close() + + var counter int32 + for i := 0; i < 10; i++ { + ret := pool.Process(func() { + atomic.AddInt32(&counter, 1) + }) + if ret != nil { + t.Errorf("Non-nil callback response: %v", ret) + } + } + + ret := pool.Process("foo") + if exp, act := ErrJobNotFunc, ret; exp != act { + t.Errorf("Wrong result from non-func: %v != %v", act, exp) + } + + if exp, act := int32(10), counter; exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) + } +} + +func TestTimeout(t *testing.T) { + pool := NewFunc(1, func(in interface{}) interface{} { + intVal := in.(int) + <-time.After(time.Millisecond) + return intVal * 2 + }) + defer pool.Close() + + _, act := pool.ProcessTimed(10, time.Duration(1)) + if exp := ErrJobTimedOut; exp != act { + t.Errorf("Wrong error returned: %v != %v", act, exp) + } +} + +func TestTimedJobsAfterClose(t *testing.T) { + pool := NewFunc(1, func(in interface{}) interface{} { + return 1 + }) + pool.Close() + + _, act := pool.ProcessTimed(10, time.Duration(1)) + if exp := ErrPoolNotRunning; exp != act { + t.Errorf("Wrong error returned: %v != %v", act, exp) + } +} + +func TestJobsAfterClose(t *testing.T) { + pool := NewFunc(1, func(in interface{}) interface{} { + return 1 + }) + pool.Close() + + defer func() { + if r := recover(); r == nil { + t.Errorf("Process after Stop() did not panic") + } + }() + + pool.Process(10) +} + func TestParallelJobs(t *testing.T) { nWorkers := 10 jobGroup := sync.WaitGroup{} testGroup := sync.WaitGroup{} - pool, err := CreatePool(nWorkers, func(in interface{}) interface{} { + pool := NewFunc(nWorkers, func(in interface{}) interface{} { jobGroup.Done() jobGroup.Wait() intVal := in.(int) return intVal * 2 - }).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return - } + }) defer pool.Close() for j := 0; j < 1; j++ { @@ -80,16 +192,10 @@ func TestParallelJobs(t *testing.T) { for i := 0; i < nWorkers; i++ { go func() { - ret, err := pool.SendWork(10) - if err != nil { - t.Errorf("Failed to send work: %v", err) - return + ret := pool.Process(10) + if exp, act := 20, ret.(int); exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) } - retInt := ret.(int) - if ret != 20 { - t.Errorf("Wrong return value: %v != %v", 20, retInt) - } - testGroup.Done() }() } @@ -98,189 +204,73 @@ func TestParallelJobs(t *testing.T) { } } -/*-------------------------------------------------------------------------------------------------- - */ +//------------------------------------------------------------------------------ -// Basic worker implementation -type dummyWorker struct { - ready bool - t *testing.T +type mockWorker struct { + blockProcChan chan struct{} + blockReadyChan chan struct{} + interruptChan chan struct{} + terminated bool } -func (d *dummyWorker) TunnyJob(in interface{}) interface{} { - if !d.ready { - d.t.Errorf("TunnyJob called without polling TunnyReady") +func (m *mockWorker) Process(in interface{}) interface{} { + select { + case <-m.blockProcChan: + case <-m.interruptChan: } - d.ready = false return in } -func (d *dummyWorker) TunnyReady() bool { - d.ready = true - return d.ready -} - -// Test the pool with a basic worker implementation -func TestDummyWorker(t *testing.T) { - pool, err := CreateCustomPool([]TunnyWorker{&dummyWorker{t: t}}).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return - } - defer pool.Close() - - for i := 0; i < 100; i++ { - if result, err := pool.SendWork(12); err != nil { - t.Errorf("Failed to send work: %v", err) - } else if resInt, ok := result.(int); !ok || resInt != 12 { - t.Errorf("Unexpected result from job: %v != %v", 12, result) - } - } -} - -// Extended worker implementation -type dummyExtWorker struct { - dummyWorker - - initialized bool -} - -func (d *dummyExtWorker) TunnyJob(in interface{}) interface{} { - if !d.initialized { - d.t.Errorf("TunnyJob called without calling TunnyInitialize") - } - return d.dummyWorker.TunnyJob(in) +func (m *mockWorker) BlockUntilReady() { + <-m.blockReadyChan } -func (d *dummyExtWorker) TunnyInitialize() { - d.initialized = true +func (m *mockWorker) Interrupt() { + m.interruptChan <- struct{}{} } -func (d *dummyExtWorker) TunnyTerminate() { - if !d.initialized { - d.t.Errorf("TunnyTerminate called without calling TunnyInitialize") - } - d.initialized = false +func (m *mockWorker) Terminate() { + m.terminated = true } -// Test the pool with an extended worker implementation -func TestDummyExtWorker(t *testing.T) { - pool, err := CreateCustomPool( - []TunnyWorker{ - &dummyExtWorker{ - dummyWorker: dummyWorker{t: t}, - }, - }).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return - } - defer pool.Close() - - for i := 0; i < 100; i++ { - if result, err := pool.SendWork(12); err != nil { - t.Errorf("Failed to send work: %v", err) - } else if resInt, ok := result.(int); !ok || resInt != 12 { - t.Errorf("Unexpected result from job: %v != %v", 12, result) +func TestCustomWorker(t *testing.T) { + pool := New(1, func() Worker { + return &mockWorker{ + blockProcChan: make(chan struct{}), + blockReadyChan: make(chan struct{}), + interruptChan: make(chan struct{}), } - } -} + }) -// Extended and interruptible worker implementation -type dummyExtIntWorker struct { - dummyExtWorker - - jobLock *sync.Mutex -} - -func (d *dummyExtIntWorker) TunnyJob(in interface{}) interface{} { - d.jobLock.Lock() - d.jobLock.Unlock() - - return d.dummyExtWorker.TunnyJob(in) -} - -func (d *dummyExtIntWorker) TunnyReady() bool { - d.jobLock.Lock() - - return d.dummyExtWorker.TunnyReady() -} - -func (d *dummyExtIntWorker) TunnyInterrupt() { - d.jobLock.Unlock() -} - -// Test the pool with an extended and interruptible worker implementation -func TestDummyExtIntWorker(t *testing.T) { - pool, err := CreateCustomPool( - []TunnyWorker{ - &dummyExtIntWorker{ - dummyExtWorker: dummyExtWorker{ - dummyWorker: dummyWorker{t: t}, - }, - jobLock: &sync.Mutex{}, - }, - }).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return + worker1, ok := pool.workers[0].worker.(*mockWorker) + if !ok { + t.Fatal("Wrong type of worker in pool") } - defer pool.Close() - for i := 0; i < 100; i++ { - if _, err := pool.SendWorkTimed(1, nil); err == nil { - t.Errorf("Expected timeout from dummyExtIntWorker.") - } + if worker1.terminated { + t.Fatal("Worker started off terminated") } -} -func TestNumWorkers(t *testing.T) { - numWorkers := 10 - pool, err := CreatePoolGeneric(numWorkers).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return - } - defer pool.Close() - actual := pool.NumWorkers() - if actual != numWorkers { - t.Errorf("Expected to get %d workers, but got %d", numWorkers, actual) + _, err := pool.ProcessTimed(10, time.Millisecond) + if exp, act := ErrJobTimedOut, err; exp != act { + t.Errorf("Wrong error: %v != %v", act, exp) } -} - -var waitHalfSecond = func() { - time.Sleep(500 * time.Millisecond) -} -func TestNumPendingReportsAllWorkersWithNoWork(t *testing.T) { - numWorkers := 10 - pool, err := CreatePoolGeneric(numWorkers).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return - } - defer pool.Close() - actual := pool.NumPendingAsyncJobs() - if actual != 0 { - t.Errorf("Expected to get 0 pending jobs when pool is quiet, but got %d", actual) + close(worker1.blockReadyChan) + _, err = pool.ProcessTimed(10, time.Millisecond) + if exp, act := ErrJobTimedOut, err; exp != act { + t.Errorf("Wrong error: %v != %v", act, exp) } -} -func TestNumPendingReportsNotAllWorkersWhenSomeBusy(t *testing.T) { - numWorkers := 10 - pool, err := CreatePoolGeneric(numWorkers).Open() - if err != nil { - t.Errorf("Failed to create pool: %v", err) - return + close(worker1.blockProcChan) + if exp, act := 10, pool.Process(10).(int); exp != act { + t.Errorf("Wrong result: %v != %v", act, exp) } - defer pool.Close() - pool.SendWorkAsync(waitHalfSecond, nil) - actual := pool.NumPendingAsyncJobs() - expected := int32(1) - if actual != expected { - t.Errorf("Expected to get %d pending jobs when pool has work, but got %d", expected, actual) + + pool.Close() + if !worker1.terminated { + t.Fatal("Worker was not terminated") } } -/*-------------------------------------------------------------------------------------------------- - */ +//------------------------------------------------------------------------------ diff --git a/worker.go b/worker.go index 9f2cad8..5d9c522 100644 --- a/worker.go +++ b/worker.go @@ -1,110 +1,126 @@ -/* -Copyright (c) 2014 Ashley Jeffs - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in -all copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -THE SOFTWARE. -*/ +// Copyright (c) 2014 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. package tunny -import ( - "sync/atomic" - "time" -) +//------------------------------------------------------------------------------ -type workerWrapper struct { - readyChan chan int - jobChan chan interface{} - outputChan chan interface{} - poolOpen uint32 - worker TunnyWorker -} +// workRequest is a struct containing context representing a workers intention +// to receive a work payload. +type workRequest struct { + // jobChan is used to send the payload to this worker. + jobChan chan<- interface{} -func (wrapper *workerWrapper) Loop() { + // retChan is used to read the result from this worker. + retChan <-chan interface{} - // TODO: Configure? - tout := time.Duration(5) + // interruptFunc can be called to cancel a running job. When called it is no + // longer necessary to read from retChan. + interruptFunc func() +} - for !wrapper.worker.TunnyReady() { - // It's sad that we can't simply check if jobChan is closed here. - if atomic.LoadUint32(&wrapper.poolOpen) == 0 { - break - } - time.Sleep(tout * time.Millisecond) - } +//------------------------------------------------------------------------------ - wrapper.readyChan <- 1 +// workerWrapper takes a Worker implementation and wraps it within a goroutine +// and channel arrangement. The workerWrapper is responsible for managing the +// lifetime of both the Worker and the goroutine. +type workerWrapper struct { + worker Worker + interruptChan chan struct{} - for data := range wrapper.jobChan { - wrapper.outputChan <- wrapper.worker.TunnyJob(data) - for !wrapper.worker.TunnyReady() { - if atomic.LoadUint32(&wrapper.poolOpen) == 0 { - break - } - time.Sleep(tout * time.Millisecond) - } - wrapper.readyChan <- 1 - } + // reqChan is NOT owned by this type, it is used to send requests for work. + reqChan chan<- workRequest - close(wrapper.readyChan) - close(wrapper.outputChan) + // closeChan can be closed in order to cleanly shutdown this worker. + closeChan chan struct{} + // closedChan is closed by the run() goroutine when it exits. + closedChan chan struct{} } -func (wrapper *workerWrapper) Open() { - if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok { - extWorker.TunnyInitialize() +func newWorkerWrapper( + reqChan chan<- workRequest, + worker Worker, +) *workerWrapper { + w := workerWrapper{ + worker: worker, + interruptChan: make(chan struct{}), + reqChan: reqChan, + closeChan: make(chan struct{}), + closedChan: make(chan struct{}), } - wrapper.readyChan = make(chan int) - wrapper.jobChan = make(chan interface{}) - wrapper.outputChan = make(chan interface{}) + go w.run() - atomic.SwapUint32(&wrapper.poolOpen, uint32(1)) - - go wrapper.Loop() + return &w } -// Follow this with Join(), otherwise terminate isn't called on the worker -func (wrapper *workerWrapper) Close() { - close(wrapper.jobChan) +//------------------------------------------------------------------------------ - // Breaks the worker out of a Ready() -> false loop - atomic.SwapUint32(&wrapper.poolOpen, uint32(0)) +func (w *workerWrapper) interrupt() { + close(w.interruptChan) + w.worker.Interrupt() } -func (wrapper *workerWrapper) Join() { - // Ensure that both the ready and output channels are closed +func (w *workerWrapper) run() { + jobChan, retChan := make(chan interface{}), make(chan interface{}) + defer func() { + w.worker.Terminate() + close(retChan) + close(w.closedChan) + }() + for { - _, readyOpen := <-wrapper.readyChan - _, outputOpen := <-wrapper.outputChan - if !readyOpen && !outputOpen { - break + // NOTE: Blocking here will prevent the worker from closing down. + w.worker.BlockUntilReady() + select { + case w.reqChan <- workRequest{ + jobChan: jobChan, + retChan: retChan, + interruptFunc: w.interrupt, + }: + select { + case payload := <-jobChan: + result := w.worker.Process(payload) + select { + case retChan <- result: + case <-w.interruptChan: + w.interruptChan = make(chan struct{}) + } + case _, _ = <-w.interruptChan: + w.interruptChan = make(chan struct{}) + } + case <-w.closeChan: + return } } +} - if extWorker, ok := wrapper.worker.(TunnyExtendedWorker); ok { - extWorker.TunnyTerminate() - } +//------------------------------------------------------------------------------ + +func (w *workerWrapper) stop() { + close(w.closeChan) } -func (wrapper *workerWrapper) Interrupt() { - if extWorker, ok := wrapper.worker.(TunnyInterruptable); ok { - extWorker.TunnyInterrupt() - } +func (w *workerWrapper) join() { + <-w.closedChan } + +//------------------------------------------------------------------------------