The Watch provide methods to watch on a key interval and cancel a watcher. If the watcher is disconnected on error, it will be resumed automatically.
-
The watch client should process watch request, the watch client create watch request for client with key, option(revision, delete only, put only and end key) and register callback for watch request.
-
Notification, when watch client receive event from etcd server, it will should call registered callback.
-
Resume, when the watch client disconnect with etcd server, the etcd client should automatically resume all the watch requests with option( revision = last received revision + 1).
-
Cancel watch request, the etcd client should process watch cancellation and filter all the notification after cancellation request.
-
The watch client should be able to make a progress notify request that propagates the latest revision number to all watches.
The etcd client process watch request with watch function, process notification with processEvents function, process resume with resume function, process cancel with cancelWatch function and request progress with requestProgress function.
Watch watches on a key interval.
- Send create request to requestStream.
- If the watch is create successfully, the
onCreate
will be called and the ListenableFuture task will be completed. - If the watch is slow or the required rev is compacted, the watch request might be canceled from the server-side and the
onCreateFailed
will be called.
Process subscribe watch events.
- If the watch id is not in the watchers map, scan it in the cancelWatchers map.
- if it exist in cancelWatchers, ignore, otherwise cancel it.
- If the watcher exist in watchers map, call the
onWatch
and set the last revision for resume.
- Set requestStream as null, so getRequestStream will make new requestStream.
- call resumeWatchers to resume all working watchers.
Cancel the watch task with the watcher, the onCanceled
will be called after successfully canceled.
- The watcher will be removed from watchers map.
- If the watchers map contain the watcher, it will be moved to cancelWatchers and send cancel request to requestStream.
Send the latest revision processed to all active watchers
- Send a progress request to requestStream.
- Working watchers will receive a WatchResponse containing the latest revision number. All future revision numbers are guaranteed to be greater than or equal to the received revision number.
StreamObserver instance
- It is created by gRPC call
watch
. - It will be a single instance and automatically created by getRequestStream if null.
requestStream
is used to send request to etcd server for watch creation/cancel.- If error, this stream will be canceled by server and we need to resume this stream by set it to
null
and the getRequestStream will create a new one.
- Hold callback for Watcher creation/cancel/resume/event.
- Hold WatcherOption and key for resume.
- Hold last revision for resume.
ConcurrentHashMap collection for working watcher.
- It is used for WatchResponse distribution.
- It is used for resumes.
It hold the on creating watchers.
ConcurrentHashMap collection for canceling watcher.
- It is used to filter canceled watch events response.
- It is used to hold the canceling watcher.
- The watcher will be deleted from
cancelWatchers
after canceled successfully.
- Single instance method to get requestStream.
- Create requestStream with gRPC call
watch
with responseStream. - The responseStream will distribute the create, cancel, normal response to processCreate, processCanceled and processEvents.
- If error happened, the requestStream will be closed by server side, so we call resume to resume all ongoing watchers.
Process create response from etcd server.
- If there is no pendingWatcher, ignore.
- If cancel flag is true or
CompactRevision!=0
means the start revision has been compacted out of the store, call onCreateFailed. - If watchId = -1, create failed, call
onCreateFailed
. - If everything is Ok, create watcher, complete ListenableFuture task and put the new watcher to the watchers map.
Process cancel response from etcd server.
- Remove the respond watcher from cancelWatchers.
- call
onCancel
callback.
Resume all the the watchers on new requestStream.
- Build new watch creation request for old watcher with last revision + 1.
- Call
watch
function with the new watch creation request.