The pipelines
module is a Go library designed to facilitate the creation and management of:
- Data processing pipelines
- Reactive streaming applications leveraging Go's concurrency primitives
It provides a set of tools for flow control, error handling, and pipeline processes. Under the hood, it uses Go's channels and goroutines to enable concurrency at each stage of the pipeline.
To get started with the pipelines
module, follow these steps:
-
Install the
pipelines
module:go get github.com/elastiflow/pipelines
-
(Optional) To view local documentation via
godoc
:go install -v golang.org/x/tools/cmd/godoc@latest make docs
-
Once running, visit GoDocs to view the latest documentation locally.
- ETL (Extract, Transform, Load) style scenarios where data arrives in a stream, and you want to apply transformations or filtering in a concurrent manner.
- Complex concurrency flows: easily fan out, fan in, or broadcast data streams.
- Reactive Streaming Applications: serves as a light framework for Go native reactive streaming applications.
- Sources (e.g.
FromArray
,FromChannel
) produce data into a channel. - DataStream transformations (e.g.
Run
,Filter
,Map
) read from inbound channels and write results to outbound channels. - Sinks (e.g.
ToChannel
) consume data from the final output channels. - Each method typically spins up one or more goroutines which connect these channels together, allowing parallel processing.
A pipeline is a series of data processing stages connected by channels. Each stage (datastreams.DataStream
) is a function that performs a specific task and passes its output to the next stage. The pipelines
module provides a flexible way to define and manage these stages.
The datastreams.DataStream
struct is the core of the pipelines
module. It manages the flow of data through the pipeline stages and handles errors according to the provided parameters.
-
ProcessFunc A user-defined function type used in a given
DataStream
stage via theDataStream.Run()
method. For instance:ds = ds.Run(func(v int) (int, error) { if v < 0 { return 0, fmt.Errorf("negative number: %d", v) } return v + 1, nil })
-
TransformFunc A user-defined function type
func(T) (U, error)
used with theMap()
method to convert from typeT
to a different typeU
.
For instance:ds = ds.Map(func(i int) (string, error) { return fmt.Sprintf("Number: %d", i), nil })
-
FilterFunc A user-defined function type func(T) (bool, error) used with the Filter() method to decide if an item should pass through (true) or be dropped (false). For instance:
ds = ds.Filter(func(i int) (bool, error) { return i % 2 == 0, nil })
KeyByFunc: A user-defined function type used to partition the data stream into different segments based on a key. This is useful for grouping data before applying transformations or aggregations. For instance:
kds := ds.KeyBy[testStruct, int](
New[testStruct](ctx, input, errCh).WithWaitGroup(&sync.WaitGroup{}),
func(i int) (int, error) {
return i % 2, nil
},
Params{
BufferSize: 50,
Num: 1, // only 1 output channel per key
},
)
WindowFunc: A user-defined function to process batched data in a window. This is useful for aggregating data over time or count-based windows. For instance:
kds = ds.Window[testStruct, string, *testInference](
datastreams.KeyBy[*SensorReading, string](p, keyFunc),
TumblingWindowFunc,
partitionFactory,
datastreams.Params{
BufferSize: 50,
},
)
- FromArray([]T): Convert a Go slice/array into a Sourcer
- FromChannel(<-chan T): Convert an existing channel into a Sourcer
- FromDataStream(DataStream[T]): Convert an existing DataStream into a Sourcer
- ToChannel(chan<- T): Write DataStream output into a channel
Window performs time- or count-based aggregation on a partitioned stream.
- NewTumblingFactory[T]: Creates fixed-size windows that do not overlap.
- NewSlidingFactory[T]: Creates overlapping windows.
- NewIntervalFactory[T]: Creates windows based on a time interval.
- Run(ProcessFunc[T]) DataStream[T]: Process each item with a user function
- Filter(FilterFunc[T]) DataStream[T]: Filter items by user-defined condition
- Map(TransformFunc[T,U]) DataStream[U]: Transform each item from T to U
- KeyBy(KeyByFunc[T]) DataStream[T]: Partition the stream by a key
- Window(WindowFunc[T]) DataStream[T]: Apply a window function to the stream
- Expand(ExpandFunc[T]) DataStream[T]: Explode each item into multiple items
- FanOut() DataStream[T]: Create multiple parallel output channels
- FanIn() DataStream[T]: Merge multiple channels into one
- Broadcast() DataStream[T]: Duplicate each item to multiple outputs
- Tee() (DataStream[T], DataStream[T]): Split into two DataStreams
- Take(Params{Num: N}) DataStream[T]: Take only N items
- OrDone() DataStream[T]: Terminates if upstream is closed
- Out() <-chan T: Underlying output channel
- Sink(Sinker[T]) DataStream[T]: Push items to a sink
- Params:
Used to pass arguments into
DataStream
methods.- Options
- SkipError (bool): If true, any error in ProcessFunc / TransformFunc / FilterFunc causes that item to be skipped rather than stopping the pipeline.
- Num (int): Used by methods like FanOut, Broadcast, and Take to specify how many parallel channels or how many items to consume.
- BufferSize (int): Controls the size of the buffered channels created for that stage. Larger buffers can reduce blocking but use more memory.
- SegmentName (string): Tag a pipeline stage name, useful for logging or debugging errors (e.g. “segment: broadcast-2”).
- Options
Below is an example of how to use the pipelines
module to create simple pipelines.
Additional examples can be found in the godocs.
This example demonstrates how to set up a pipeline that takes a stream of integers, squares each odd integer, and outputs the results.
package main
import (
"context"
"fmt"
"log/slog"
"github.com/elastiflow/pipelines"
"github.com/elastiflow/pipelines/datastreams"
"github.com/elastiflow/pipelines/datastreams/sources"
)
func createIntArr(num int) []int {
var arr []int
for i := 0; i < num; i++ {
arr = append(arr, i)
}
return arr
}
func squareOdds(v int) (int, error) {
if v%2 == 0 {
return v, fmt.Errorf("even number error: %v", v)
}
return v * v, nil
}
func exProcess(p datastreams.DataStream[int]) datastreams.DataStream[int] {
return p.OrDone().FanOut(
datastreams.Params{Num: 2},
).Run(
squareOdds,
)
}
func main() {
errChan := make(chan error, 10)
defer close(errChan)
pl := pipelines.New[int, int]( // Create a new Pipeline
context.Background(),
sources.FromArray(createIntArr(10)), // Create a source to start the pipeline
errChan,
).Start(exProcess)
go func(errReceiver <-chan error) { // Handle Pipeline errors
defer pl.Close()
for err := range errReceiver {
if err != nil {
slog.Error("demo error: " + err.Error())
// return if you wanted to close the pipeline during error handling.
}
}
}(pl.Errors())
for out := range pl.Out() { // Read Pipeline output
slog.Info("received simple pipeline output", slog.Int("out", out))
}
}
We welcome your contributions! Please see our Contributing Guide for details on how to open issues, submit pull requests, and propose new features.
This project is licensed under the Apache 2.0 License - see the LICENSE file for details.