From c99be444392d9d4ad87eaa67ace117ed5119cac0 Mon Sep 17 00:00:00 2001 From: Ashley Jeffs Date: Thu, 20 Mar 2014 15:45:13 +0000 Subject: [PATCH] First commit --- LICENSE | 19 +++ README.md | 212 +++++++++++++++++++++++++ tunny.go | 304 +++++++++++++++++++++++++++++++++++ tunny_test.go | 426 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 961 insertions(+) create mode 100644 LICENSE create mode 100644 README.md create mode 100644 tunny.go create mode 100644 tunny_test.go diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..99a62c6 --- /dev/null +++ b/LICENSE @@ -0,0 +1,19 @@ +Copyright (c) 2014 Ashley Jeffs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..a677a2f --- /dev/null +++ b/README.md @@ -0,0 +1,212 @@ +![Tunny](http://www.creepybit.co.uk/images/tunny_logo_small.png?v=1 "Tunny") + +Tunny is a golang library for creating and managing a thread pool, aiming to be simple, intuitive, ground breaking, revolutionary, world dominating and also trashy. + +Use cases for tunny are any situation where a large flood of jobs are imminent, potentially from different threads, and you need to bottleneck those jobs through a fixed number of dedicated worker threads. The most obvious example is as an easy wrapper for limiting the hard work done in your software to the number of CPU's available, preventing the threads from foolishly competing with each other for CPU time. + +##How to install: + +```bash +go get github.com/jeffail/tunny +``` + +##How to use: + +Here's a simple example of tunny being used to distribute a batch of calculations to a pool of workers that matches the number of CPU's: + +```go +... + +import "github.com/jeffail/tunny" + +... + +func CalcRoots (inputs []float64) []float64 { + numCPUs := runtime.NumCPU() + numJobs := len(inputs) + doneChan := make( chan int, numJobs ) + outputs := make( []float64, numJobs ) + + runtime.GOMAXPROCS(numCPUs) + + /* Create the pool, and specify the job each worker should perform, + * if each worker needs to carry its own state then this can also + * be accomplished, read on. + */ + pool, err := tunny.CreatePool(numCPUs, func( object interface{} ) ( interface{} ) { + if value, ok := object.(float64); ok { + // Hard work here + return math.Sqrt(value) + } + return nil + }).Open() + + if err != nil { + fmt.Fprintln(os.Stderr, "Error starting pool: ", err) + return nil + } + + defer pool.Close() + + /* Creates a go routine for all jobs, these will be blocked until + * a worker is available and has finished the request. + */ + for i := 0; i < numJobs; i++ { + go func(index int) { + // SendWork is thread safe. Go ahead and call it from any go routine + if value, err2 := pool.SendWork(inputs[index]); err2 == nil { + if result, ok := value.(float64); ok { + outputs[index] = result + } + } + doneChan <- 1 + }(i) + } + + // Wait for all jobs to be completed before closing the pool + for i := 0; i < numJobs; i++ { + <-doneChan + } + + return outputs +} + +... + +``` + +This particular example, since it all resides in the one func, could actually be done with less code by simply spawning numCPU's go routines that gobble up a shared channel of float64's. This would probably also be quicker since you waste cycles here boxing and unboxing the job values, but at least you don't have to write it all yourself you lazy scum. + +##Can I specify the job for each work call? + +Yes, the arguments passed to the worker are boxed as interface{}, so this can actually be a func, you can implement this yourself, or if you're not bothered about return values you can use: + +```go +... + +exampleChannel := make(chan int) + +pool, _ := tunny.CreatePoolGeneric(numCPUs).Open() + +err := pool.SendWork(func() { + /* Do your hard work here, usual rules of enclosures apply here, + * so you can return values like so: + */ + exampleChannel <- 10 +}) + +if err != nil { + // You done goofed +} + +... +``` + +##Specify a time out period + +To make pool calls adhere to a timeout period of your choice simply swap the call to SendWork with SendWorkTimed, like so: + +```go +... + +// SendWorkTimed takes an argument for a timeout in milliseconds. +// If this timeout triggers the call will return with an error +if value, err := pool.SendWorkTimed(500, inputs[index]); err == nil { + if result, ok := value.(float64); ok { + outputs[index] = result + } +} else { +/* A timeout most likely occured, I haven't checked this specifically because + * I am a lazy garbage mongler. + */ +} + +... +``` + +This snippet will send the job, and wait for up to 500 milliseconds for an answer. You could optionally implement a timeout yourself by starting a new go routine that returns the output through a channel, and having that channel compete with time.After(). + +You'd be an idiot for doing that though because you would be forcing the pool to send work to a worker even if the timeout occured whilst waiting for a worker to become available, you muppet! + +##How do I give my workers state? + +The call to tunny.CreatePool will generate a pool of TunnyWorkers for you, and then assign each worker the closure argument to run for each job. You can, however, create these workers yourself, thereby allowing you to also give them their own state and methods. + +Here is a short example: + +```go +... + +type customWorker struct { + // TODO: Put some state here +} + +// Use this call to block further jobs if necessary +func (worker *customWorker) Ready() bool { + return true +} + +// This is where the work actually happens +func (worker *customWorker) Job(data interface{}) interface{} { + /* TODO: Use and modify state + * there's no need for thread safety paradigms here unless the + * data is being accessed from another go routine outside of + * the pool. + */ + if outputStr, ok := data.(string); ok { + return ("custom job done: " + outputStr ) + } + return nil +} + +func TestCustomWorkers (t *testing.T) { + outChan := make(chan int, 10) + + workers := make([]tunny.TunnyWorker, 4) + for i, _ := range workers { + workers[i] = &(customWorker{}) + } + + pool, errPool := tunny.CreateCustomPool(workers).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + defer pool.Close() + + for i := 0; i < 10; i++ { + go func() { + if value, err := pool.SendWork("hello world"); err == nil { + if str, ok := value.(string); ok { + if str != "custom job done: hello world" { + t.Errorf("Unexpected output from custom worker") + } + } else { + t.Errorf("Not a string!") + } + } else { + t.Errorf("Error returned: ", err) + } + outChan <- 1 + }() + } + + for i := 0; i < 10; i++ { + <-outChan + } +} + +... +``` + +You'll notice that as well as the important Job(data interface{}) interface{} call to implement there is also the call Ready() bool. Ready is potentially an important part of the TunnyWorker that allows you to use your state to determine whether or not this worker should take on another job yet, and answer true or false accordingly. + +For example, your worker could hold a counter of how many jobs it has done, and perhaps after a certain amount it should perform another act before taking on more work, it's important to use Ready for these occasions since blocking the Job call will hold up the waiting client. + +You can block Ready whilst you wait for some condition to change, or alternatively you can return a true/false straight away, in this case the call will be repeated at 50 millisecond intervals until you answer true. + +##So where do I actually benefit from using tunny? + +You don't, I'm not a god damn charity. diff --git a/tunny.go b/tunny.go new file mode 100644 index 0000000..5def9cb --- /dev/null +++ b/tunny.go @@ -0,0 +1,304 @@ +/* +Copyright (c) 2014 Ashley Jeffs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +/* +Package tunny implements a simple pool for maintaining independant worker threads. +Here's a simple example of tunny in action, creating a four threaded worker pool: + +pool := tunny.CreatePool(4, func( object interface{} ) ( interface{} ) { + if w, ok := object.(int); ok { + return w * 2 + } + return "Not an int!" +}).Open() + +defer pool.Close() + +// pool.SendWork is thread safe, so it can be called from another pool of go routines. +// This call blocks until a worker is ready and has completed the job +out, err := pool.SendWork(50) + +// This call blocks until either a result is obtained or the specified timeout period +// (5000 milliseconds) occurs. +out2, err2 := pool.SendWorkTimed(5000, 50) +*/ +package tunny + +import ( + "reflect" + "errors" + "time" + "sync" +) + +type TunnyWorker interface { + Job(interface{}) (interface{}) + Ready() bool +} + +type TunnyExtendedWorker interface { + Job(interface{}) (interface{}) + Ready() bool + Initialize() + Terminate() +} + +type workerWrapper struct { + readyChan chan int + jobChan chan interface{} + outputChan chan interface{} + worker TunnyWorker +} + +/* TODO: As long as Ready is able to lock this loop entirely we cannot + * guarantee that all go routines stop at pool.Close(), which totally + * stinks. + */ +func (wrapper *workerWrapper) Loop () { + for !wrapper.worker.Ready() { + time.Sleep(50 * time.Millisecond) + } + wrapper.readyChan <- 1 + for data := range wrapper.jobChan { + wrapper.outputChan <- wrapper.worker.Job( data ) + for !wrapper.worker.Ready() { + time.Sleep(50 * time.Millisecond) + } + wrapper.readyChan <- 1 + } + close(wrapper.readyChan) + close(wrapper.outputChan) +} + +func (wrapper *workerWrapper) Close () { + close(wrapper.jobChan) +} + +type tunnyDefaultWorker struct { + job *func(interface{}) (interface{}) +} + +func (worker *tunnyDefaultWorker) Job(data interface{}) interface{} { + return (*worker.job)(data) +} + +func (worker *tunnyDefaultWorker) Ready() bool { + return true +} + +/* +WorkPool allows you to contain and send work to your worker pool. +You must first indicate that the pool should run by calling Open(), then send work to the workers +through SendWork. +*/ +type WorkPool struct { + workers []*workerWrapper + selects []reflect.SelectCase + mutex sync.RWMutex + running bool +} + +/* +SendWorkTimed - Send a job to a worker and return the result, this is a blocking call with a timeout. +SendWorkTimed - Args: milliTimeout time.Duration, jobData interface{} +SendWorkTimed - Summary: the timeout period in milliseconds, the input data for the worker to process +*/ +func (pool *WorkPool) SendWorkTimed (milliTimeout time.Duration, jobData interface{}) (interface{}, error) { + pool.mutex.RLock() + defer pool.mutex.RUnlock() + + if pool.running { + before := time.Now() + + // Create new selectcase[] and add time out case + selectCases := append(pool.selects[:], reflect.SelectCase { + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(time.After(milliTimeout * time.Millisecond)), + }) + + // Wait for workers, or time out + chosen, _, ok := reflect.Select(selectCases) + if ( ok ) { + if ( chosen < ( len(selectCases) - 1 ) ) { + (*pool.workers[chosen]).jobChan <- jobData + + // Wait for response, or time out + select { + case data := <-(*pool.workers[chosen]).outputChan: + return data, nil + case <- time.After((milliTimeout * time.Millisecond) - time.Since(before)): + /* If we time out here we also need to ensure that the output is still + * collected and that the worker can move on. Therefore, we fork the + * waiting process into a new thread. + */ + go func() { + <-(*pool.workers[chosen]).outputChan + }() + return nil, errors.New("Request timed out whilst waiting for job to complete") + } + } else { + return nil, errors.New("Request timed out whilst waiting for a worker") + } + } else { + return nil, errors.New("Failed to find a worker") + } + } else { + return nil, errors.New("Pool is not running! Call Open() before sending work") + } +} + +/* +SendWork - Send a job to a worker and return the result, this is a blocking call. +SendWork - Args: jobData interface{} +SendWork - Summary: the input data for the worker to process +*/ +func (pool *WorkPool) SendWork (jobData interface{}) (interface{}, error) { + pool.mutex.RLock() + defer pool.mutex.RUnlock() + + if pool.running { + + if chosen, _, ok := reflect.Select(pool.selects); ok && chosen >= 0 { + (*pool.workers[chosen]).jobChan <- jobData + return <- (*pool.workers[chosen]).outputChan, nil + } + + return nil, errors.New("Failed to find or wait for a worker") + + } else { + return nil, errors.New("Pool is not running! Call Open() before sending work") + } +} + +/* +Open - Open all channels and launch the background goroutines managed by the pool. +*/ +func (pool *WorkPool) Open () (*WorkPool, error) { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + if !pool.running { + + pool.selects = make( []reflect.SelectCase, len(pool.workers) ) + + for i, workerWrapper := range pool.workers { + (*workerWrapper).readyChan = make (chan int) + (*workerWrapper).jobChan = make (chan interface{}) + (*workerWrapper).outputChan = make (chan interface{}) + + pool.selects[i] = reflect.SelectCase { + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf((*workerWrapper).readyChan), + } + + if extWorker, ok := (*workerWrapper).worker.(TunnyExtendedWorker); ok { + extWorker.Initialize() + } + + go (*workerWrapper).Loop() + } + + pool.running = true + return pool, nil + + } else { + return nil, errors.New("Pool is already running!") + } +} + +/* +Close - Close all channels and goroutines managed by the pool. +*/ +func (pool *WorkPool) Close () error { + pool.mutex.Lock() + defer pool.mutex.Unlock() + + if pool.running { + + for _, workerWrapper := range pool.workers { + (*workerWrapper).Close() + if extWorker, ok := (*workerWrapper).worker.(TunnyExtendedWorker); ok { + extWorker.Terminate() + } + } + pool.running = false + return nil + + } else { + return errors.New("Cannot close when the pool is not running!") + } +} + +/* +CreatePool - Creates a pool of workers. +CreatePool - Args: numWorkers int, job func(interface{}) (interface{}) +CreatePool - Summary: number of threads, the closure to run for each job +*/ +func CreatePool (numWorkers int, job func(interface{}) interface{}) *WorkPool { + pool := WorkPool { running: false } + + pool.workers = make ([]*workerWrapper, numWorkers) + for i, _ := range pool.workers { + newWorker := workerWrapper { + worker: &(tunnyDefaultWorker { &job }), + } + pool.workers[i] = &newWorker + } + + return &pool +} + +/* +CreatePoolGeneric - Creates a pool of generic workers, they take a func as their only argument and execute it. +CreatePoolGeneric - Args: numWorkers int +CreatePoolGeneric - Summary: number of threads +*/ +func CreatePoolGeneric (numWorkers int) *WorkPool { + + return CreatePool(numWorkers, func (jobCall interface{}) interface{} { + if method, ok := jobCall.(func()); ok { + method() + return nil + } + return errors.New("Generic worker not given a func()") + }) + +} + +/* +CreateCustomPool - Creates a pool for an array of custom workers. +CreateCustomPool - Args: customWorkers []TunnyWorker +CreateCustomPool - Summary: An array of workers to use in the pool, each worker gets its own thread +*/ +func CreateCustomPool (customWorkers []TunnyWorker) *WorkPool { + pool := WorkPool { running: false } + + pool.workers = make ([]*workerWrapper, len(customWorkers)) + for i, _ := range pool.workers { + newWorker := workerWrapper { + worker: customWorkers[i], + } + pool.workers[i] = &newWorker + } + + return &pool +} diff --git a/tunny_test.go b/tunny_test.go new file mode 100644 index 0000000..917413b --- /dev/null +++ b/tunny_test.go @@ -0,0 +1,426 @@ +/* +Copyright (c) 2014 Ashley Jeffs + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +package tunny + +import ( + "testing" + "time" + "runtime" +) + +func TestTimeout (t *testing.T) { + outChan := make(chan int, 3) + + pool, errPool := CreatePool(1, func(object interface{}) interface{} { + time.Sleep(500 * time.Millisecond) + return nil + }).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + defer pool.Close() + + before := time.Now() + + go func() { + if _, err := pool.SendWorkTimed(200, nil); err == nil { + t.Errorf("No timeout triggered thread one") + } else { + taken := ( time.Since(before) / time.Millisecond ) + if taken > 210 { + t.Errorf("Time taken at thread one: ", taken, ", with error: ", err) + } + } + outChan <- 1 + + go func() { + if _, err := pool.SendWork(nil); err == nil { + } else { + t.Errorf("Error at thread three: ", err) + } + outChan <- 1 + }() + }() + + go func() { + if _, err := pool.SendWorkTimed(200, nil); err == nil { + t.Errorf("No timeout triggered thread two") + } else { + taken := ( time.Since(before) / time.Millisecond ) + if taken > 210 { + t.Errorf("Time taken at thread two: ", taken, ", with error: ", err) + } + } + outChan <- 1 + }() + + for i := 0; i < 3; i++ { + <-outChan + } +} + +func TestTimeoutRequests (t *testing.T) { + n_polls := 200 + outChan := make(chan int, n_polls) + + pool, errPool := CreatePool(1, func(object interface{}) interface{} { + time.Sleep(time.Millisecond) + return nil + }).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + defer pool.Close() + + for i := 0; i < n_polls; i++ { + if _, err := pool.SendWorkTimed(50, nil); err == nil { + } else { + t.Errorf("thread %v error: ", i, err) + } + outChan <- 1 + } + + for i := 0; i < n_polls; i++ { + <-outChan + } +} + +func validateReturnInt (t *testing.T, expecting int, object interface{}) { + if w, ok := object.(int); ok { + if w != expecting { + t.Errorf("Wrong, expected %v, got %v", expecting, w) + } + } else { + t.Errorf("Wrong, expected int") + } +} + +func TestBasic (t *testing.T) { + sizePool, repeats, sleepFor, margin := 16, 2, 250, 100 + outChan := make(chan int, sizePool) + + runtime.GOMAXPROCS(runtime.NumCPU()) + + pool, errPool := CreatePool(sizePool, func(object interface{}) interface{} { + time.Sleep(time.Duration(sleepFor) * time.Millisecond) + if w, ok := object.(int); ok { + return w * 2 + } + return "Not an int!" + }).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + defer pool.Close() + + for i := 0; i < sizePool * repeats; i++ { + go func() { + if out, err := pool.SendWork(50); err == nil { + validateReturnInt (t, 100, out) + } else { + t.Errorf("Error returned: ", err) + } + outChan <- 1 + }() + } + + before := time.Now() + + for i := 0; i < sizePool * repeats; i++ { + <-outChan + } + + taken := float64( time.Since(before) ) / float64(time.Millisecond) + expected := float64( sleepFor + margin ) * float64(repeats) + + if taken > expected { + t.Errorf("Wrong, should have taken less than %v seconds, actually took %v", expected, taken) + } +} + +func TestGeneric (t *testing.T) { + runtime.GOMAXPROCS(runtime.NumCPU()) + + if pool, err := CreatePoolGeneric(10).Open(); err == nil { + defer pool.Close() + + outChan := make(chan int, 10) + + for i := 0; i < 10; i++ { + go func(id int) { + one, err := pool.SendWork(func() { + outChan <- id + }) + + if err != nil { + t.Errorf("Generic call timed out!") + } + + if one != nil { + if funcerr, ok := one.(error); ok { + t.Errorf("Generic worker call: ", funcerr) + } else { + t.Errorf("Unexpected result from generic worker") + } + } + }(i) + } + + results := make([]int, 10) + + for i := 0; i < 10; i++ { + value := <-outChan + if results[value] != 0 || value > 9 || value < 0 { + t.Errorf("duplicate or incorrect key: %v", value) + } + results[value] = 1 + } + + } else { + t.Errorf("Error starting pool: ", err) + return + } + +} + +func TestExampleCase (t *testing.T) { + outChan := make(chan int, 10) + runtime.GOMAXPROCS(runtime.NumCPU()) + + pool, errPool := CreatePool(4, func(object interface{}) interface{} { + if str, ok := object.(string); ok { + return "job done: " + str + } + return nil + }).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + defer pool.Close() + + for i := 0; i < 10; i++ { + go func() { + if value, err := pool.SendWork("hello world"); err == nil { + if _, ok := value.(string); ok { + } else { + t.Errorf("Not a string!") + } + } else { + t.Errorf("Error returned: ", err) + } + outChan <- 1 + }() + } + + for i := 0; i < 10; i++ { + <-outChan + } +} + +type customWorker struct { + jobsCompleted int +} + +func (worker *customWorker) Ready() bool { + return true +} + +func (worker *customWorker) Job(data interface{}) interface{} { + /* There's no need for thread safety paradigms here unless the data is being accessed from + * another goroutine outside of the pool. + */ + if outputStr, ok := data.(string); ok { + (*worker).jobsCompleted++; + return ("custom job done: " + outputStr) + } + return nil +} + +func TestCustomWorkers (t *testing.T) { + outChan := make(chan int, 10) + runtime.GOMAXPROCS(runtime.NumCPU()) + + workers := make([]TunnyWorker, 4) + for i, _ := range workers { + workers[i] = &(customWorker{ jobsCompleted: 0 }) + } + + pool, errPool := CreateCustomPool(workers).Open() + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + for i := 0; i < 10; i++ { + /* Calling SendWork is thread safe, go ahead and call it from any goroutine. + * The call will block until a worker is ready and has completed the job. + */ + go func() { + if value, err := pool.SendWork("hello world"); err == nil { + if str, ok := value.(string); ok { + if str != "custom job done: hello world" { + t.Errorf("Unexpected output from custom worker") + } + } else { + t.Errorf("Not a string!") + } + } else { + t.Errorf("Error returned: ", err) + } + outChan <- 1 + }() + } + + for i := 0; i < 10; i++ { + <-outChan + } + + /* After this call we should be able to guarantee that no other go routine is + * accessing the workers. + */ + pool.Close() + + totalJobs := 0 + for i := 0; i < len(workers); i++ { + if custom, ok := workers[i].(*customWorker); ok { + totalJobs += (*custom).jobsCompleted + } else { + t.Errorf("could not cast to customWorker") + } + } + + if totalJobs != 10 { + t.Errorf("Total jobs expected: %v, actual: %v", 10, totalJobs) + } +} + +type customExtendedWorker struct { + jobsCompleted int + asleep bool +} + +func (worker *customExtendedWorker) Job(data interface{}) interface{} { + if outputStr, ok := data.(string); ok { + (*worker).jobsCompleted++; + return ("custom job done: " + outputStr) + } + return nil +} + +// Do 10 jobs and then stop. +func (worker *customExtendedWorker) Ready() bool { + return !(*worker).asleep && ((*worker).jobsCompleted < 10) +} + +func (worker *customExtendedWorker) Initialize() { + (*worker).asleep = false +} + +func (worker *customExtendedWorker) Terminate() { + (*worker).asleep = true +} + +func TestCustomExtendedWorkers (t *testing.T) { + outChan := make(chan int, 10) + runtime.GOMAXPROCS(runtime.NumCPU()) + + extWorkers := make([]*customExtendedWorker, 4) + tunnyWorkers := make([]TunnyWorker, 4) + for i, _ := range tunnyWorkers { + extWorkers [i] = &(customExtendedWorker{ jobsCompleted: 0, asleep: true }) + tunnyWorkers[i] = extWorkers[i] + } + + pool := CreateCustomPool(tunnyWorkers); + + for j := 0; j < 1; j++ { + + _, errPool := pool.Open() + + for i, _ := range extWorkers { + if (*extWorkers[i]).asleep { + t.Errorf("Worker is still asleep!") + } + } + + if errPool != nil { + t.Errorf("Error starting pool: ", errPool) + return + } + + for i := 0; i < 40; i++ { + /* Calling SendWork is thread safe, go ahead and call it from any goroutine. + * The call will block until a worker is ready and has completed the job. + */ + go func() { + if value, err := pool.SendWork("hello world"); err == nil { + if str, ok := value.(string); ok { + if str != "custom job done: hello world" { + t.Errorf("Unexpected output from custom worker") + } + } else { + t.Errorf("Not a string!") + } + } else { + t.Errorf("Error returned: ", err) + } + outChan <- 1 + }() + } + + for i := 0; i < 40; i++ { + <-outChan + } + + /* After this call we should be able to guarantee that no other go routine is + * accessing the workers. + */ + pool.Close() + + expectedJobs := ((j + 1) * 10) + for i, _ := range extWorkers { + if (*extWorkers[i]).jobsCompleted != expectedJobs { + t.Errorf( "Expected %v jobs completed, actually: %v", + expectedJobs, + (*extWorkers[i]).jobsCompleted, + ) + } + if !(*extWorkers[i]).asleep { + t.Errorf("Worker is still awake!") + } + } + } +}