-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race between flush and Emit. #8
base: master
Are you sure you want to change the base?
Conversation
@@ -46,42 +51,32 @@ func (output *ForwardOutput) encodeRecordSet(recordSet ik.FluentRecordSet) error | |||
} | |||
|
|||
func (output *ForwardOutput) flush() error { | |||
if output.conn == nil { | |||
buffer := output.buffer | |||
output.buffer = bytes.Buffer{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flush() 中に emit が止まらないように、 flush をバックグラウンドの goroutine で行います。
送信中の buffer に書き込みしないように buffer を入れ替えます。
At the moment Output.Emit() may only be called from the single router that is not reentrant. Therefore, it doesn't need to be thread-safe. In addition, as PluginInstance.Run() is supposed to be periodically called by the supervisor so it can detect the healthiness of the spawnee (in other words, the implementor of the method should make it return ik.Continue at a certain interval if she wants to keep it running), the creation of the timer at the top of the function seems inappropriate. |
Having said that, it's always a good thing for an output plugin to not consume much time in its Emit(), and so is offloading the communication to another goroutine. |
After a short discussion with @methane, I had an impression that the router should be reentrant for the sake of performance, because message passing between the plugins and the router otherwise needs to be guarded by either unbuffered or buffered (when the multiple routers get spawned) channels. |
Emit() may be called from multiple goroutine.
flush() is also called from anther goroutine.
To protect buffer from race, single goroutine should handle all emit() and flush().
WIP: I have not run this code yet. Hopefully, I'll test in this week.