A data pipelining framework that auto-magically(reflection) parallelizes your workflo
w based upon configuration. All
code is runtime validated before processing begins.(Optionally compile time via exposed testing method)
NOTE, this code trades hiding complexity with a whole lot of reflection, thus it is not that fastest library in the
world. I made it more to prove to myself I could(and to learn the reflect
package better). Would this code been a
whole lot better if I had generics/contracts to work with? Yep. Once they are released officially I plan to come back to
this project and try to keep a similar api and compare performance. I imagine they will both still heavily use
reflection up front for validation, but with generics I would expect I should not have to pay the price at data
processing time.(maybe) Either way, I think the project is still cool and hey maybe you don't need blazing fast code
and would prefer working with a slick api...
Package flo is designed to be an abstraction to the common worker pool pattern used in Go. How does it work? By being decently opinionated and making heavy use of reflection.
The first thing you need to know about designing a flo is what kind of functions/methods it can work with. Flo's can consist of one of three function signatures:
1. func (context.Context) (R, error)
2. func (context.Context, T) (R, error)
3. func (context.Context, T) error
One can only be used as the first step of a flo. It is meant to act as a step the produces data without an input from anywhere. Two can be used at any position in the flo, although if it is used as the first or last step in the flo some extra configuration is expected. Three can only be used as the last step of a flo. It is mean to act as a step that consumes data and does not send it along to anywhere else.
Now lets break down the common parts of the step signatures. They all take in a context as their first parameter. This the same context that is passed into the flo when BuildAndExecute is called. It is propagated throughout to support proper context cancellation. The second things all of the signatures have in common is they all return at least an error.
Important: If an error is returned from a step in the flo no result will not be propagated to the next step, ending any data processing for that data stream.
Lastly, you see that steps can optionally take in a T and optionally return an R. Theses are meant to be generic placeholders for concrete types.
Imagine we have have the following steps:
func step1(ctx context.Context) (int, error) {...}
func step2(ctx context.Context, i int) (string, error) {...}
func step3(ctx context.Context, s string) error {...}
We could construct and run a flo with these steps by doing:
err := flo.NewBuilder().
Add(step1).
Add(step2).
Add(step3).
BuildAndExecute(context.Background())
This would be an example of a valid flo. Notice how the output type of the previous step must match the input of the proceeding step(the T and R values mentioned above). If the types did not align properly the err that is returned would not be nil. The flo validates all types before it begins to process any data. A Validate method is also exposed from the flo.Builder should you want to validate your flo at test time.
For more detailed examples including how to configure a flo's parallelism and how to bridge a flo to other parts of your codebase I recommend checkout out the examples folder in this repo.
func main() {
// Create a cancelable context
ctx, cancel := context.WithCancel(context.Background())
// Giving the app a little time to process some data, then gracefully shutdown.
go func() {
time.Sleep(500 * time.Microsecond)
cancel()
}()
err := flo.NewBuilder(). // Create a flo builder
Add(helloWorld). // Add a Step
Add(exclaim). // Add another
BuildAndExecute(ctx) // BuildAndExecute processing(this blocks if there is no error)
// Checking for validation errors
if err != nil {
log.Fatal(err)
}
}
For more examples see the examples folder
- Running a basic flo
- Registering an input channel to a flo
- Configuring flo's parallelism
- Validating a flo
- Handling errors in a flo
- Registering an output channel for the flo
Hey, guess what? Reflection is not super fast. Especially when you are using it as much as this library does. I added
a benchmark in flo_test.go
which shows how fast flo is compared to hand-writing your own worker pools. This
example also shows how flo can reduce your LOC quite a bit. The flo example is 21 LOC vs 68 LOC for hand writing worker
pools. That is a more than 3x reduction in LOC! So what is the performance cost for this you may ask? 5x.
:sob: :panda_face: Benchmarks were ran on my Razer Blade(2017) Laptop.
$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/codyoss/flo
BenchmarkFlo-8 200000 6577 ns/op 520 B/op 12 allocs/op
BenchmarkNonFlo-8 1000000 1057 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/codyoss/flo 2.462s
I wrote a blog post about this repo. Check it out here!