Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add status change event #3903

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cmd/crc/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/crc-org/crc/v2/pkg/crc/constants"
"github.com/crc-org/crc/v2/pkg/crc/daemonclient"
crcErrors "github.com/crc-org/crc/v2/pkg/crc/errors"
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
"github.com/crc-org/crc/v2/pkg/crc/preset"
"github.com/docker/go-units"
Expand Down Expand Up @@ -88,7 +89,7 @@ func runWatchStatus(writer io.Writer, client *daemonclient.Client, cacheDir stri
}
}()

err = client.SSEClient.Status(func(loadResult *types.ClusterLoadResult) {
err = client.SSEClient.ClusterLoad(func(loadResult *types.ClusterLoadResult) {
if !isPoolInit {
ramBar, cpuBars = createBars(loadResult.CPUUse, writer)
barPool = pb.NewPool(append([]*pb.ProgressBar{ramBar}, cpuBars...)...)
Expand Down Expand Up @@ -152,6 +153,9 @@ func getStatus(client *daemonclient.Client, cacheDir string) *status {
}
return &status{Success: false, Error: crcErrors.ToSerializableError(err)}
}
if clusterStatus.CrcStatus == string(state.NoVM) {
return &status{Success: false, Error: crcErrors.ToSerializableError(crcErrors.VMNotExist)}
}
var size int64
err = filepath.Walk(cacheDir, func(_ string, info os.FileInfo, err error) error {
if !info.IsDir() {
Expand Down
4 changes: 2 additions & 2 deletions pkg/crc/api/client/sse_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func NewSSEClient(transport *http.Transport) *SSEClient {
}
}

func (c *SSEClient) Status(statusCallback func(*types.ClusterLoadResult)) error {
err := c.client.Subscribe("status", func(msg *sse.Event) {
func (c *SSEClient) ClusterLoad(statusCallback func(*types.ClusterLoadResult)) error {
err := c.client.Subscribe("cluster_load", func(msg *sse.Event) {
wmState := &types.ClusterLoadResult{}
err := json.Unmarshal(msg.Data, wmState)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ type TickListener struct {
tickPeriod time.Duration
}

func newStatusStream(server *EventServer) EventStream {
return newStream(NewStatusListener(server.machine), newEventPublisher(STATUS, server.sseServer))
func newClusterLoadStream(server *EventServer) EventStream {
return newStream(newClusterLoadListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer))
}

func NewStatusListener(machine crcMachine.Client) EventProducer {
func newClusterLoadListener(machine crcMachine.Client) EventProducer {
getStatus := func() (interface{}, error) {
return machine.GetClusterLoad()
}
return NewTickListener(getStatus)
return newTickListener(getStatus)
}

func NewTickListener(generator genData) EventProducer {
func newTickListener(generator genData) EventProducer {
return &TickListener{
done: make(chan bool),
generator: generator,
Expand Down
13 changes: 8 additions & 5 deletions pkg/crc/api/events/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer {
stream.RemoveSubscriber(sub)
}

sseServer.CreateStream(LOGS)
sseServer.CreateStream(STATUS)
sseServer.CreateStream(Logs)
sseServer.CreateStream(ClusterLoad)
cfergeau marked this conversation as resolved.
Show resolved Hide resolved
sseServer.CreateStream(StatusChange)
return eventServer
}

Expand All @@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func createEventStream(server *EventServer, streamID string) EventStream {
switch streamID {
case LOGS:
case Logs:
return newLogsStream(server)
case STATUS:
return newStatusStream(server)
case ClusterLoad:
return newClusterLoadStream(server)
case StatusChange:
return newStatusChangeStream(server)
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/crc/api/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package events
import "github.com/r3labs/sse/v2"

const (
LOGS = "logs" // Logs event channel, contains daemon logs
STATUS = "status" // status event channel, contains VM load info
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
)

type EventPublisher interface {
Expand Down
7 changes: 6 additions & 1 deletion pkg/crc/api/events/log_stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"bytes"

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -56,7 +58,10 @@ func (s *streamHook) Fire(entry *logrus.Entry) error {
return err
}

s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line})
// remove "Line Feed"("\n") character which add was added by json.Encoder
line = bytes.TrimRight(line, "\n")

s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line})
return nil
}

Expand Down
69 changes: 69 additions & 0 deletions pkg/crc/api/events/status_change_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events

import (
"encoding/json"
cfergeau marked this conversation as resolved.
Show resolved Hide resolved

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/crc-org/crc/v2/pkg/crc/machine"
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
"github.com/crc-org/crc/v2/pkg/events"
"github.com/r3labs/sse/v2"
)

type serializableEvent struct {
Status *types.ClusterStatusResult `json:"status"`
Error string `json:"error,omitempty"`
}

type statusChangeListener struct {
machineClient machine.Client
publisher EventPublisher
}

func newStatusChangeStream(server *EventServer) EventStream {
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
}

func newStatusChangeListener(client machine.Client) EventProducer {
return &statusChangeListener{
machineClient: client,
}
}

func (st *statusChangeListener) Notify(changedEvent events.StatusChangedEvent) {
logging.Debugf("State Changed Event %s", changedEvent)
var event serializableEvent
status, err := st.machineClient.Status()
// if we cannot receive actual state, send error state with error description
if err != nil {
event = serializableEvent{Status: &types.ClusterStatusResult{
CrcStatus: state.Error,
}, Error: err.Error()}
} else {
// event could be fired, before actual code, which change state is called
// so status could contain 'old' state, replace it with state received in event
status.CrcStatus = changedEvent.State // override with actual reported state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the part I find questionable in this PR. Instead of having a single source of truth for the status, we now have 2, some of the status comes from st.machineClient.Status(), but some of the status is hardcoded in machine/sync.go. If the 2 get out of sync, or if we want to improve status reporting in general, we'll have multiple places to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spending more time on it, I understand more the problems this is trying to solve.
One example is when the VM is stopped. crc status will succeed and report that crc is stopped, however crc stop will return an error and say the machine is already stopped.

Without status.CrcStatus = changedEvent.State, when invoking api/stop, we would not be reporting an error in the stream, but instead we'd only say the machine is stopped. This would not match what was reported to the api/stop call. There are most likely other state transitions where this is needed.

The comment however seems to indicate sometimes Status could report a stale status, I haven't seen that in the little testing I did, but I'd be interested in understanding more when this happens? If we don't record this knowledge now when it's more or less fresh in our minds, it will be very difficult to find this back in 6 months from now if we need it :-/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, there's more details in an older discussion:

User call start, we fire state change event, before actuality call Start, and if listener is fast enough, it could get old state (basically we could lost starting state in transition from stoped to running states)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion, this is an indication that these events are not fired in the right place, and this is papered over with even more complexity :-/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t instead we'd only say the machine is stopped. This would not match what was reported to the api/stop

The API matches the usecase. Stop should in my opinion not return an error when already stopped; as you succeed in doing what is requested. What more information do we actually give with saying this failed as it was already stopped?!

If you believe this should be fixed or recorded, make these states clear for the convergence.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The part you quoted is a description of the behaviour in this PR. These are some notes I took during review/testing when I was trying to understand the design choices which led to the code being reviewed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to replace this code, with firing event at more "proper" place, but face problem that starting state is "artificial", ie exist only in sync.go file, and VM doesn't have/reports starting state. Also stopping state is the same.
So I have no other ideas, than accept current PR and think on better state handling, like @cfergeau propose there #3903 (comment)

Or change state handling first, and then rebase this PR on top of it.

@gbraad @cfergeau WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can go forward with the current code, but we really need more details in the commit log, so that it records the design choices you made when writing the code, so that we know the current limitations, why this overwrites the state, ...

event = serializableEvent{Status: status}
if changedEvent.Error != nil {
event.Error = changedEvent.Error.Error()
}

}
data, err := json.Marshal(event)
if err != nil {
logging.Errorf("Could not serealize status changed event in to JSON: %s", err)
return
}
st.publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
}

func (st *statusChangeListener) Start(publisher EventPublisher) {
st.publisher = publisher
events.StatusChanged.AddListener(st)

}

func (st *statusChangeListener) Stop() {
events.StatusChanged.RemoveListener(st)
}
8 changes: 0 additions & 8 deletions pkg/crc/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,6 @@ func NewHandler(config *crcConfig.Config, machine machine.Client, logger Logger,
}

func (h *Handler) Status(c *context) error {
exists, err := h.Client.Exists()
if err != nil {
return err
}
if !exists {
return c.String(http.StatusInternalServerError, string(errors.VMNotExist))
}

res, err := h.Client.Status()
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/crc/machine/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
Stopped State = "Stopped"
Stopping State = "Stopping"
Starting State = "Starting"
NoVM State = "NoVM"
Error State = "Error"
)

Expand Down
3 changes: 1 addition & 2 deletions pkg/crc/machine/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (client *client) Status() (*types.ClusterStatusResult, error) {
if err != nil {
if errors.Is(err, errMissingHost(client.name)) {
return &types.ClusterStatusResult{
CrcStatus: state.Stopped,
OpenshiftStatus: types.OpenshiftStopped,
CrcStatus: state.NoVM,
}, nil
}
return nil, errors.Wrap(err, fmt.Sprintf("Cannot load '%s' virtual machine", client.name))
Expand Down
28 changes: 27 additions & 1 deletion pkg/crc/machine/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
"github.com/crc-org/crc/v2/pkg/events"
)

const startCancelTimeout = 15 * time.Second
Expand Down Expand Up @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {

err := s.underlying.Delete()
s.syncOperationDone <- Deleting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM})
}
return err
}

Expand All @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
}
s.startCancel = startCancel
s.currentState = Starting
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})

return nil
}
Expand All @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)

startResult, err := s.underlying.Start(ctx, startConfig)
s.syncOperationDone <- Starting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(commenting here, but that applies to most of the file).
Duplicating the state logic in this file ("after calling Start(), the VM is in the Start state, unless there was an error, in which case it's in the Error state") will likely cause additional maintainance burden, as we have to make sure to keep in sync the state logic in this file, and the state logic in other parts of crc - the status events may be reporting a different state from Status().

Can we emit a StatusChangedEvent to tell other code the status changed, but not specify the current status in the event payload? This way the receiver of the event can check Status() if they need the status, and we don't have 2 source for the status to keep in sync?

Regarding errors, it seems if Start() (or other commands) return an error, you consider that the cluster is in an error state? This makes sense for some commands, but I don't think this matches what is reported by Status() at the moment? Would be nice to fix the reported Status() when there are start failures for examples, but doing it here would be papering over the real issue in my opinion.
And in some other cases, errors are less bad, for example in the power off case, maybe the command failed early, and the cluster is still nicely running?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicating the state logic in this file ("after calling Start(), the VM is in the Start state, unless there was an error, in which case it's in the Error state") will likely cause additional maintainance burden, as we have to make sure to keep in sync the state logic in this file, and the state logic in other parts of crc - the status events may be reporting a different state from Status().

Not sure to understand how to resolve you concern, the problem is that we don't have a way to track state change(at least I wasn’t able to find it, it would be nice if you point me that place if if it exist)

Can we emit a StatusChangedEvent to tell other code the status changed, but not specify the current status in the event payload? This way the receiver of the event can check Status() if they need the status, and we don't have 2 source for the status to keep in sync?

I first do that, but I face a race condition with updating state, for example:
User call start, we fire state change event, before actuality call Start, and if listener is fast enough, it could get old state (basically we could lost starting state in transition from stoped to running states)

Possible solution to that, may be moving state change event firing from Synchronized client in to client implementation of machine.Client interface. And fire events only after actually state is changed.

Regarding errors, it seems if Start() (or other commands) return an error, you consider that the cluster is in an error state? This makes sense for some commands, but I don't think this matches what is reported by Status() at the moment? Would be nice to fix the reported Status() when there are start failures for examples, but doing it here would be papering over the real issue in my opinion.
And in some other cases, errors are less bad, for example in the power off case, maybe the command failed early, and the cluster is still nicely running?

It is matched, as we send with SSE status reported by Status() in addition we also send error message, which come from status change event.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is #3903 (comment) , and I'm under the impression that if you have a terminal showing the stream of events, and another running crc status every second, then the state between the 2 will not always match, in particular sometimes the event stream will contain State: state.Error while crc status will not report an error?
Guess I need to run both side by side and compare myself to understand better how this works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(I understand the current code base is missing a centralized "setState" facility, and that this PR has to find a way of doing this, just trying to understand the limitations in the PR)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My main concern is #3903 (comment) , and I'm under the impression that if you have a terminal showing the stream of events, and another running crc status every second, then the state between the 2 will not always match, in particular sometimes the event stream will contain State: state.Error while crc status will not report an error?
Guess I need to run both side by side and compare myself to understand better how this works.

No, output should be exactly the same, to get that I add
https://github.com/crc-org/crc/pull/3903/files#diff-e5710ade03da511d4845f380cd63319bf7329e960c1afbbba7fbed4060fcf8edR37
and
https://github.com/crc-org/crc/pull/3903/files#diff-e5710ade03da511d4845f380cd63319bf7329e960c1afbbba7fbed4060fcf8edR44-R50

Otherwise we need to move event triggering deeper in crc code, and fire event after execution of pice of code which change crc status output.

And it is a question for all, should we integrate events in more depths of CRC core(which leads to spreading events over our codebase) or keep as is( in relative hi level Client interface implementation )?

@gbraad @cfergeau @praveenkumar WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it is a question for all, should we integrate events in more depths of CRC core(which leads to spreading events over our codebase) or keep as is( in relative hi level Client interface implementation )?

I'd say we can go with this for now, more details about the design choices, the implementation limitations, ...

One thing we can explore over time some centralized tracking of the cluster state, a StateTracker type, we'd add some calls to StateTracker.SetState(...) in the appropriate places, the StateTracker would fire events on status changes, it could have a CanChangeState(state) helper if needed to know if calling Stop will result in an error or not, ...
As long as all crc commands do not go through the daemon, it could do some polling to reconcile the internal state with changes through external crc calls, ...


return startResult, err
}

Expand Down Expand Up @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
if err := s.prepareStopDelete(Stopping); err != nil {
return state.Error, err
}
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})

st, err := s.underlying.Stop()
s.syncOperationDone <- Stopping

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}
return st, err
}

Expand All @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
}

func (s *Synchronized) PowerOff() error {
return s.underlying.PowerOff()
err := s.underlying.PowerOff()
if err != nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return err
}

func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/crc/machine/virtualmachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ func (err *MissingHostError) Error() string {
return fmt.Sprintf("no such libmachine vm: %s", err.name)
}

func (err *MissingHostError) Is(target error) bool {
var x *MissingHostError
if errors.As(target, &x) && x.name == err.name {
return true
}
return false
}

var errInvalidBundleMetadata = errors.New("Error loading bundle metadata")

func loadVirtualMachine(name string, useVSock bool) (*virtualMachine, error) {
Expand Down
48 changes: 48 additions & 0 deletions pkg/events/emitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package events

import (
"sync"
)

type Event[T any] interface {
cfergeau marked this conversation as resolved.
Show resolved Hide resolved
AddListener(listener Notifiable[T])
RemoveListener(listener Notifiable[T])
Fire(data T)
}

type Notifiable[T any] interface {
Notify(event T)
}

type event[T any] struct {
listeners map[Notifiable[T]]Notifiable[T]
eventMutex sync.Mutex
}

func NewEvent[T any]() Event[T] {
return &event[T]{
listeners: make(map[Notifiable[T]]Notifiable[T]),
}
}

func (e *event[T]) AddListener(listener Notifiable[T]) {
e.eventMutex.Lock()
evidolob marked this conversation as resolved.
Show resolved Hide resolved
defer e.eventMutex.Unlock()
e.listeners[listener] = listener
}

func (e *event[T]) RemoveListener(listener Notifiable[T]) {
e.eventMutex.Lock()
defer e.eventMutex.Unlock()
delete(e.listeners, listener)
}

func (e *event[T]) Fire(event T) {
e.eventMutex.Lock()
defer e.eventMutex.Unlock()
evidolob marked this conversation as resolved.
Show resolved Hide resolved
for _, listener := range e.listeners {
cfergeau marked this conversation as resolved.
Show resolved Hide resolved
// shadowing for loop variable, need to remove after golang 1.22 migration
listener := listener
go listener.Notify(event)
evidolob marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading