Skip to content

Commit

Permalink
Implement state change event
Browse files Browse the repository at this point in the history
Signed-off-by: Yevhen Vydolob <[email protected]>
  • Loading branch information
evidolob committed Dec 7, 2023
1 parent f152448 commit a6a8fb4
Show file tree
Hide file tree
Showing 11 changed files with 128 additions and 17 deletions.
2 changes: 1 addition & 1 deletion cmd/crc/cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func getStatus(client *daemonclient.Client, cacheDir string) *status {
}
return &status{Success: false, Error: crcErrors.ToSerializableError(err)}
}
if clusterStatus.CrcStatus == string(state.Novm) {
if clusterStatus.CrcStatus == string(state.NoVM) {
return &status{Success: false, Error: crcErrors.ToSerializableError(crcErrors.VMNotExist)}
}
var size int64
Expand Down
2 changes: 1 addition & 1 deletion pkg/crc/api/events/cluster_load_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type TickListener struct {
}

func newClusterLoadStream(server *EventServer) EventStream {
return newStream(newStatusListener(server.machine), newEventPublisher(CLUSTER_LOAD, server.sseServer))
return newStream(newStatusListener(server.machine), newEventPublisher(ClusterLoad, server.sseServer))
}

func newStatusListener(machine crcMachine.Client) EventProducer {
Expand Down
11 changes: 7 additions & 4 deletions pkg/crc/api/events/event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,9 @@ func NewEventServer(machine machine.Client) *EventServer {
stream.RemoveSubscriber(sub)
}

sseServer.CreateStream(LOGS)
sseServer.CreateStream(CLUSTER_LOAD)
sseServer.CreateStream(Logs)
sseServer.CreateStream(ClusterLoad)
sseServer.CreateStream(StatusChange)
return eventServer
}

Expand All @@ -65,10 +66,12 @@ func (es *EventServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func createEventStream(server *EventServer, streamID string) EventStream {
switch streamID {
case LOGS:
case Logs:
return newLogsStream(server)
case CLUSTER_LOAD:
case ClusterLoad:
return newClusterLoadStream(server)
case StatusChange:
return newStatusChangeStream(server)
}
return nil
}
5 changes: 3 additions & 2 deletions pkg/crc/api/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package events
import "github.com/r3labs/sse/v2"

const (
LOGS = "logs" // Logs event channel, contains daemon logs
CLUSTER_LOAD = "cluster_load" // status event channel, contains VM load info
Logs = "logs" // Logs event channel, contains daemon logs
ClusterLoad = "cluster_load" // status event channel, contains VM load info
StatusChange = "status_change" // status change channel, fires on 'starting', 'stopping', etc
)

type EventPublisher interface {
Expand Down
9 changes: 7 additions & 2 deletions pkg/crc/api/events/log_stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"bytes"

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/r3labs/sse/v2"
"github.com/sirupsen/logrus"
Expand All @@ -23,7 +25,7 @@ func newSSEStreamHook(server *sse.Server) *streamHook {
&logrus.JSONFormatter{
TimestampFormat: "",
DisableTimestamp: false,
DisableHTMLEscape: false,
DisableHTMLEscape: true,
DataKey: "",
FieldMap: nil,
CallerPrettyfier: nil,
Expand Down Expand Up @@ -56,7 +58,10 @@ func (s *streamHook) Fire(entry *logrus.Entry) error {
return err
}

s.server.Publish(LOGS, &sse.Event{Event: []byte(LOGS), Data: line})
// remove "Line Feed"("\n") character which add was added by json.Encoder
line = bytes.TrimRight(line, "\n")

s.server.Publish(Logs, &sse.Event{Event: []byte(Logs), Data: line})
return nil
}

Expand Down
62 changes: 62 additions & 0 deletions pkg/crc/api/events/status_change_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package events

import (
"encoding/json"

"github.com/crc-org/crc/v2/pkg/crc/logging"
"github.com/crc-org/crc/v2/pkg/crc/machine"
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
"github.com/crc-org/crc/v2/pkg/events"
"github.com/r3labs/sse/v2"
)

type statusChangeEvent struct {
Status *types.ClusterStatusResult `json:"status"`
Error string `json:"error,omitempty"`
}

type statusChange struct {
listenerDisposable events.Disposable
machineClient machine.Client
}

func newStatusChangeStream(server *EventServer) EventStream {
return newStream(newStatusChangeListener(server.machine), newEventPublisher(StatusChange, server.sseServer))
}

func newStatusChangeListener(client machine.Client) EventProducer {
return &statusChange{
machineClient: client,
}
}

func (st *statusChange) Start(publisher EventPublisher) {
st.listenerDisposable = events.StatusChanged.AddListener(func(changedEvent events.StatusChangedEvent) {
logging.Debugf("State Changed Event %s", changedEvent)
var event statusChangeEvent
status, err := st.machineClient.Status()
// if we cannot receive actual state, send error state with error description
if err != nil {
event = statusChangeEvent{Status: &types.ClusterStatusResult{
CrcStatus: state.Error,
}, Error: err.Error()}
} else {
status.CrcStatus = changedEvent.State // override with actual reported state
event = statusChangeEvent{Status: status}
if changedEvent.Error != nil {
event.Error = changedEvent.Error.Error()
}

}
data, _ := json.Marshal(event)
publisher.Publish(&sse.Event{Event: []byte(StatusChange), Data: data})
})
}

func (st *statusChange) Stop() {
if st.listenerDisposable != nil {
st.listenerDisposable()
st.listenerDisposable = nil
}
}
2 changes: 1 addition & 1 deletion pkg/crc/machine/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const (
Stopped State = "Stopped"
Stopping State = "Stopping"
Starting State = "Starting"
Novm State = "NoVM"
NoVM State = "NoVM"
Error State = "Error"
)

Expand Down
2 changes: 1 addition & 1 deletion pkg/crc/machine/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (client *client) Status() (*types.ClusterStatusResult, error) {
}
if !exists {
return &types.ClusterStatusResult{
CrcStatus: state.Novm,
CrcStatus: state.NoVM,
}, nil
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/crc/machine/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
"github.com/crc-org/crc/v2/pkg/crc/machine/types"
crcPreset "github.com/crc-org/crc/v2/pkg/crc/preset"
"github.com/crc-org/crc/v2/pkg/events"
)

const startCancelTimeout = 15 * time.Second
Expand Down Expand Up @@ -69,6 +70,10 @@ func (s *Synchronized) Delete() error {

err := s.underlying.Delete()
s.syncOperationDone <- Deleting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.NoVM})
}
return err
}

Expand All @@ -80,6 +85,7 @@ func (s *Synchronized) prepareStart(startCancel context.CancelFunc) error {
}
s.startCancel = startCancel
s.currentState = Starting
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Starting})

return nil
}
Expand All @@ -92,6 +98,13 @@ func (s *Synchronized) Start(ctx context.Context, startConfig types.StartConfig)

startResult, err := s.underlying.Start(ctx, startConfig)
s.syncOperationDone <- Starting

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: startResult.Status})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return startResult, err
}

Expand Down Expand Up @@ -136,10 +149,16 @@ func (s *Synchronized) Stop() (state.State, error) {
if err := s.prepareStopDelete(Stopping); err != nil {
return state.Error, err
}
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopping})

st, err := s.underlying.Stop()
s.syncOperationDone <- Stopping

if err == nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: st})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}
return st, err
}

Expand All @@ -160,7 +179,14 @@ func (s *Synchronized) ConnectionDetails() (*types.ConnectionDetails, error) {
}

func (s *Synchronized) PowerOff() error {
return s.underlying.PowerOff()
err := s.underlying.PowerOff()
if err != nil {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Stopped})
} else {
events.StatusChanged.Fire(events.StatusChangedEvent{State: state.Error, Error: err})
}

return err
}

func (s *Synchronized) Status() (*types.ClusterStatusResult, error) {
Expand Down
8 changes: 4 additions & 4 deletions pkg/events/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@ func NewEvent[T any]() Event[T] {

func (e *event[T]) AddListener(listener Listener[T]) Disposable {
e.eventMutex.Lock()
defer e.eventMutex.Unlock()

e.keyCounter++
key := e.keyCounter
e.listeners[key] = &listenerWrapper[T]{
Listener: listener,
}
e.eventMutex.Unlock()

return func() {
e.eventMutex.Lock()
defer e.eventMutex.Unlock()
Expand All @@ -46,9 +48,7 @@ func (e *event[T]) AddListener(listener Listener[T]) Disposable {
}

func (e *event[T]) Fire(event T) {
e.eventMutex.Lock()
defer e.eventMutex.Unlock()
for _, l := range e.listeners {
l.Listener(event)
go l.Listener(event)
}
}
14 changes: 14 additions & 0 deletions pkg/events/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package events

import (
"github.com/crc-org/crc/v2/pkg/crc/machine/state"
)

type StatusChangedEvent struct {
State state.State
Error error
}

var (
StatusChanged = NewEvent[StatusChangedEvent]()
)

0 comments on commit a6a8fb4

Please sign in to comment.