Skip to content

Commit

Permalink
feat/docs: cleanup examples and improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
tinyzimmer committed Jul 31, 2023
1 parent 8add99a commit ad557e3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 53 deletions.
12 changes: 2 additions & 10 deletions examples/simple/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3'
version: "3"

networks:
simple:
Expand All @@ -7,11 +7,7 @@ networks:
config:
- subnet: 10.1.0.0/24

volumes:
bootstrap-node:

services:

bootstrap-node:
image: ${IMAGE:-ghcr.io/webmeshproj/node:latest}
build:
Expand All @@ -26,16 +22,12 @@ services:
- --global.no-ipv6
- --global.detect-endpoints
- --global.detect-private-endpoints
- --global.log-level=debug
- --bootstrap.enabled
- --bootstrap.default-network-policy=accept
- --raft.data-dir=/data
- --wireguard.key-file=/data/wireguard.key
- --raft.in-memory
ports:
- 8443:8443
- 51820:51820/udp
volumes:
- bootstrap-node:/data
cap_add: ["NET_ADMIN", "NET_RAW", "SYS_MODULE"]

join-node:
Expand Down
91 changes: 48 additions & 43 deletions pkg/mesh/mesh_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,64 +28,69 @@ import (
)

func (s *meshStore) onObservation(ev raft.Observation) {
s.log.Debug("received observation event", slog.String("type", reflect.TypeOf(ev.Data).String()))
log := s.log.With("event", "observation")
log.Debug("received observation event", slog.String("type", reflect.TypeOf(ev.Data).String()))
ctx := context.Background()
switch data := ev.Data.(type) {
case raft.PeerObservation:
s.log.Debug("PeerObservation", slog.Any("data", data))
log.Debug("PeerObservation", slog.Any("data", data))
if s.testStore {
return
}
if string(data.Peer.ID) == s.nodeID {
return
}
if err := s.nw.RefreshPeers(ctx); err != nil {
s.log.Error("wireguard refresh peers", slog.String("error", err.Error()))
log.Warn("wireguard refresh peers", slog.String("error", err.Error()))
}
p := peers.New(s.Storage())
node, err := p.Get(ctx, string(data.Peer.ID))
if err != nil {
s.log.Error("failed to get peer", slog.String("error", err.Error()))
return
}
err = s.plugins.Emit(ctx, &v1.Event{
Type: func() v1.WatchEvent {
if data.Removed {
return v1.WatchEvent_WATCH_EVENT_NODE_LEAVE
}
return v1.WatchEvent_WATCH_EVENT_NODE_JOIN
}(),
Event: &v1.Event_Node{
Node: node.Proto(func() v1.ClusterStatus {
if s.plugins.HasWatchers() {
p := peers.New(s.Storage())
node, err := p.Get(ctx, string(data.Peer.ID))
if err != nil {
log.Warn("failed to lookup peer, can't emit event", slog.String("error", err.Error()))
return
}
err = s.plugins.Emit(ctx, &v1.Event{
Type: func() v1.WatchEvent {
if data.Removed {
return v1.ClusterStatus_CLUSTER_STATUS_UNKNOWN
}
if data.Peer.Suffrage == raft.Nonvoter {
return v1.ClusterStatus_CLUSTER_NON_VOTER
return v1.WatchEvent_WATCH_EVENT_NODE_LEAVE
}
return v1.ClusterStatus_CLUSTER_VOTER
}()),
},
})
if err != nil {
s.log.Error("emit node join/leave event", slog.String("error", err.Error()))
return v1.WatchEvent_WATCH_EVENT_NODE_JOIN
}(),
Event: &v1.Event_Node{
Node: node.Proto(func() v1.ClusterStatus {
if data.Removed {
return v1.ClusterStatus_CLUSTER_STATUS_UNKNOWN
}
if data.Peer.Suffrage == raft.Nonvoter {
return v1.ClusterStatus_CLUSTER_NON_VOTER
}
return v1.ClusterStatus_CLUSTER_VOTER
}()),
},
})
if err != nil {
log.Warn("error sending node join/leave event", slog.String("error", err.Error()))
}
}
case raft.LeaderObservation:
s.log.Debug("LeaderObservation", slog.Any("data", data))
p := peers.New(s.Storage())
node, err := p.Get(ctx, string(data.LeaderID))
if err != nil {
s.log.Error("failed to get leader", slog.String("error", err.Error()))
return
}
err = s.plugins.Emit(ctx, &v1.Event{
Type: v1.WatchEvent_WATCH_EVENT_LEADER_CHANGE,
Event: &v1.Event_Node{
Node: node.Proto(v1.ClusterStatus_CLUSTER_LEADER),
},
})
if err != nil {
s.log.Error("emit leader change event", slog.String("error", err.Error()))
log.Debug("LeaderObservation", slog.Any("data", data))
if s.plugins.HasWatchers() {
p := peers.New(s.Storage())
node, err := p.Get(ctx, string(data.LeaderID))
if err != nil {
log.Warn("failed to get leader, may be fresh cluster, can't emit event", slog.String("error", err.Error()))
return
}
err = s.plugins.Emit(ctx, &v1.Event{
Type: v1.WatchEvent_WATCH_EVENT_LEADER_CHANGE,
Event: &v1.Event_Node{
Node: node.Proto(v1.ClusterStatus_CLUSTER_LEADER),
},
})
if err != nil {
log.Warn("error sending leader change event", slog.String("error", err.Error()))
}
}
}
}
7 changes: 7 additions & 0 deletions pkg/plugins/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type Manager interface {
ServeStorage(db storage.Storage)
// HasAuth returns true if the manager has an auth plugin.
HasAuth() bool
// HasWatchers returns true if the manager has any watch plugins.
HasWatchers() bool
// AuthUnaryInterceptor returns a unary interceptor for the configured auth plugin.
// If no plugin is configured, the returned function is a pass-through.
AuthUnaryInterceptor() grpc.UnaryServerInterceptor
Expand Down Expand Up @@ -75,6 +77,11 @@ func (m *manager) HasAuth() bool {
return m.auth != nil
}

// HasWatchers returns true if the manager has any watch plugins.
func (m *manager) HasWatchers() bool {
return len(m.emitters) > 0
}

// AuthUnaryInterceptor returns a unary interceptor for the configured auth plugin.
// If no plugin is configured, the returned function is a no-op.
func (m *manager) AuthUnaryInterceptor() grpc.UnaryServerInterceptor {
Expand Down

0 comments on commit ad557e3

Please sign in to comment.