________________ ______
__ __ \_ ____/____________ ___________ /_
_ / / / / __ __ ___/ __ `/__ __ \_ __ \
/ /_/ // /_/ / _ / / /_/ /__ /_/ / / / /
\____/ \____/ /_/ \__,_/ _ .___//_/ /_/
/_/
OGraph is a graph execution framework implemented in Go.
You can control the scheduling of sequential execution of dependent elements and concurrent execution of non-dependent elements by building a Pipeline
.
In addition, OGraph also provides a range of features out-of-the-box, including retry limits, timeout settings, and execution tracking.
OGraph was inspired by another C++ project, CGraph. However, OGraph is not the Go version of CGraph.
Like CGraph, OGraph also provides basic graph construction and scheduling execution capabilities. However, there are several key differences:
-
Implemented in Go, using coroutines instead of threads for scheduling, making it lighter and more flexible.
-
Supports customizing loop, condition, error handling, and other logic through
Wrapper
, which can be combined freely. -
Supports exporting graph structure and importing it for execution (within the constraints).
-
Flexible virtual node settings to simplify dependencies.
After benchmarking, the performance of OGraph and CGraph are on the same level. However, OGraph has an advantage in performance in io-intensive scenarios.
CGraph Performance test reference
OGraph Performance test reference
Limit: 8 cores, three scenarios (concurrent 32 nodes, sequential 32 nodes, complex scenario simulating 6 nodes) each executed 1 million times.
cd test
go test -bench='(Concurrent_32|Serial_32|Complex_6)$' -benchtime=1000000x -benchmem -cpu=8
outputs
goos: linux
goarch: amd64
pkg: github.com/symphony09/ograph/test
cpu: AMD Ryzen 5 5600G with Radeon Graphics
BenchmarkConcurrent_32-8 1000000 9669 ns/op 2212 B/op 64 allocs/op
BenchmarkSerial_32-8 1000000 1761 ns/op 712 B/op 15 allocs/op
BenchmarkComplex_6-8 1000000 3118 ns/op 1152 B/op 26 allocs/op
PASS
ok github.com/symphony09/ograph/test 14.553s
type Person struct {
ograph.BaseNode
}
func (person *Person) Run(ctx context.Context, state ogcore.State) error {
fmt.Printf("Hello, i am %s.\n", person.Name())
return nil
}
In the code above, the Person struct combines the BaseNode and overrides the Node interface method Run.
func TestHello(t *testing.T) {
pipeline := ograph.NewPipeline()
zhangSan := ograph.NewElement("ZhangSan").UseNode(&Person{})
liSi := ograph.NewElement("LiSi").UseNode(&Person{})
pipeline.Register(zhangSan).
Register(liSi, ograph.Rely(zhangSan))
if err := pipeline.Run(context.TODO(), nil); err != nil {
t.Error(err)
}
}
In the code above, the two Person nodes (zhangSan and liSi) in the pipeline are registered, and liSi is specified to depend on zhangSan.
outputs
Hello, i am ZhangSan.
Hello, i am LiSi.
More examples can be found in the code under the "example" directory.
file | introduce |
---|---|
e01_hello_test.go | Demonstrate the basic flow. |
e02_state_test.go | Demonstrate how to share state data between nodes. |
e03_factory_test.go | Demonstrate how to create nodes using the factory pattern. |
e04_param_test.go | Demonstrate how to set node parameters. |
e05_wrapper_test.go | Demonstrate how to use the wrapper to enhance node functionality. |
e06_cluster_test.go | Demonstrate how to use the cluster to flexibly schedule multiple nodes. |
e07_global_test.go | Demonstrate how to globalize the factory function. |
e08_virtual_test.go | Demonstrate how to use virtual nodes to simplify dependency relationships. |
e09_interrupter_test.go | Demonstrate how to add interruptions during the execution of pipeline . |
e10_compose_test.go | Demonstrate how to combine nested pipelines . |
e11_advance_test.go | Demonstrate some advanced usage, including graph verification, exporting, and preheating of pipelines. |
The ograph provides some common node implementations:
Name | Type | Function | Documentation |
---|---|---|---|
CMD | General Node | Command execution | Documentation Link |
HttpReq | General Node | HTTP request | Documentation Link |
Choose | Cluster | Select one node to execute from multiple nodes | Work in progress |
Parallel | Cluster | Concurrent execution of multiple nodes | Documentation Link |
Queue | Cluster | Sequential execution of multiple nodes in a queue | Documentation Link |
Race | Cluster | Concurrent nodes competing to execute | Work in progress |
Async | Wrapper | Asynchronous execution of the wrapped node | Documentation Link |
Condition | Wrapper | Conditionally determine whether to execute the wrapped node | Work in progress |
Debug | Wrapper | Debugging the wrapped node | Documentation Link |
Delay | Wrapper | Delay the execution of the wrapped node | Documentation Link |
Loop | Wrapper | Loop the execution of the wrapped node | Documentation Link |
Retry | Wrapper | Retry failed nodes | Documentation Link |
Silent | Wrapper | Suppress errors and failures of the node | Documentation Link |
Timeout | Wrapper | Node timeout control | Documentation Link |
Trace | Wrapper | Trace the execution process of the wrapped node | Documentation Link |
What are the limitations of exporting and importing graphs?
All nodes need to be created using a factory method, and the import graph pipeline must be registered with the factory associated with the imported node.
Why do we provide multiple node create methods (UseNode, UseFactory, UseFn)?
For simple scenarios, it's convenient to register a singleton and run functions directly. However, when considering pipeline concurrency issues and graph import/export, we need to use the factory method.
Is the access to State safe for concurrent use?
By default, the State access is safe for concurrent use, but if a custom implementation is used, concurrency safety cannot be guaranteed.
How to achieve optimal performance? Are there any best practices?
Since coroutines are lightweight and flexible, they usually don't require adjustments or optimizations. If node initialization is slow, you can consider preheating the worker pool.