Skip to content

Commit

Permalink
move que creation to common library
Browse files Browse the repository at this point in the history
  • Loading branch information
havardelnan committed Feb 10, 2025
1 parent 575c5b7 commit 6b2145d
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ func GetControlPlanesMetadata() gin.HandlerFunc {
func GetKubeconfig() gin.HandlerFunc {
return func(c *gin.Context) {
ctx, cancel := gincontext.GetRorContextFromGinContext(c)
defer cancel()
clusterid := c.Param("clusterid")
if clusterid == "" {
rerr := rorerror.NewRorError(http.StatusBadRequest, "clusterid must be provided")
Expand Down Expand Up @@ -546,7 +547,6 @@ func GetKubeconfig() gin.HandlerFunc {
}

var clusterKubeConfigPayload apicontracts.KubeconfigCredentials
defer cancel()

//validate the request body
if err := c.BindJSON(&clusterKubeConfigPayload); err != nil {
Expand Down
34 changes: 34 additions & 0 deletions pkg/handlers/ssehandler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func HandleSSE() gin.HandlerFunc {
}()
// Send closed connection to event server
sseserver.Server.ClosedClients <- client.Id
cancel()
return
default:
time.Sleep(time.Second * 1)
Expand Down Expand Up @@ -120,3 +121,36 @@ func Send() gin.HandlerFunc {
c.JSON(http.StatusOK, nil)
}
}

func Subscribe() gin.HandlerFunc {
return func(c *gin.Context) {
ctx, cancel := gincontext.GetRorContextFromGinContext(c)
defer cancel()
// // // Access check
// // // Scope: ror
// // // Subject: global
// // // Access: create
// // // TODO: check if this is the right way to do it
// accessQuery := aclmodels.NewAclV2QueryAccessScopeSubject(aclmodels.Acl2ScopeRor, aclmodels.Acl2RorSubjectGlobal)
// accessObject := aclservice.CheckAccessByContextAclQuery(ctx, accessQuery)
// if !accessObject.Create {
// c.JSON(http.StatusForbidden, "403: No access")
// return
// }

var input sseserver.SSESubscribe
err := c.BindJSON(&input)
if err != nil {
rerr := rorerror.NewRorError(http.StatusBadRequest, "Object is not valid", err)
rerr.GinLogErrorAbort(c)
return
}

err = apiconnections.RabbitMQConnection.SendMessage(ctx, input, sseserver.SSERouteBroadcast, nil)
if err != nil {
rerr := rorerror.NewRorError(http.StatusInternalServerError, "could not send sse broadcast event", err)
rerr.GinLogErrorAbort(c)
}
c.JSON(http.StatusOK, nil)
}
}
34 changes: 31 additions & 3 deletions pkg/servers/sseserver/eventclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@ type SseEvent struct {
Data string `json:"data" validate:"required"`
}

type SSESubscribe struct {
ClientId EventClientId `json:"clientId" validate:"required"`
Topic Subscription `json:"topic" validate:"required"`
}

type Subscription string

type EventClientId string

type EventClientChan chan SseEvent

type EventClient struct {
Id EventClientId
Connection EventClientChan
Identity identitymodels.Identity
Id EventClientId
Connection EventClientChan
Identity identitymodels.Identity
Subscriptions []Subscription
}

type EventClients struct {
Expand Down Expand Up @@ -65,6 +73,13 @@ func (e *EventClients) Add(client *EventClient) {
e.lock.Lock()
defer e.lock.Unlock()
e.clients = append(e.clients, client)
Server.Message <- EventMessage{
Clients: []EventClientId{client.Id},
SseEvent: SseEvent{
Event: "connection.id",
Data: string(client.Id),
},
}
}

func (e *EventClients) GetBroadcast() []EventClientId {
Expand All @@ -76,3 +91,16 @@ func (e *EventClients) GetBroadcast() []EventClientId {
}
return clients
}

func (e *EventClient) Subscribe(topic Subscription) {
e.Subscriptions = append(e.Subscriptions, topic)
}

func (e *EventClient) Unsubscribe(topic Subscription) {
for i, t := range e.Subscriptions {
if t == topic {
e.Subscriptions = append(e.Subscriptions[:i], e.Subscriptions[i+1:]...)
break
}
}
}
62 changes: 32 additions & 30 deletions pkg/servers/sseserver/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,41 +46,43 @@ func StartListeningRabbitMQ() {
panic(err)
}

//Create the queue
// //Create the queue
SSEventsQueueName := fmt.Sprintf("%s-%s", SSEventsQueueNamePrefix, uuid.New().String())
apiEventsqueue, err := apiconnections.RabbitMQConnection.GetChannel().QueueDeclare(
SSEventsQueueName, // name
false, // durable
true, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments, non quorum queue
)
if err != nil {
rlog.Fatal("Could not declare queue", err)
}
// apiEventsqueue, err := apiconnections.RabbitMQConnection.GetChannel().QueueDeclare(
// SSEventsQueueName, // name
// false, // durable
// true, // delete when unused
// false, // exclusive
// false, // no-wait
// nil, // arguments, non quorum queue
// )
// if err != nil {
// rlog.Fatal("Could not declare queue", err)
// }

err = apiconnections.RabbitMQConnection.GetChannel().QueueBind(
apiEventsqueue.Name, // queue name
"", // routing key
SSEventsExchange, // exchange
false,
nil,
)
if err != nil {
rlog.Fatal("Could not bind queue to excahnge", err)
}
// err = apiconnections.RabbitMQConnection.GetChannel().QueueBind(
// apiEventsqueue.Name, // queue name
// "", // routing key
// SSEventsExchange, // exchange
// false,
// nil,
// )
// if err != nil {
// rlog.Fatal("Could not bind queue to excahnge", err)
// }

go func() {
config := rabbitmqhandler.RabbitMQListnerConfig{
Client: apiconnections.RabbitMQConnection,
QueueName: apiEventsqueue.Name,
Consumer: "",
AutoAck: false,
Exclusive: false,
NoLocal: false,
NoWait: false,
Args: nil,
Client: apiconnections.RabbitMQConnection,
QueueName: SSEventsQueueName,
Consumer: "",
AutoAck: false,
QueueAutoDelete: true,

Check failure on line 80 in pkg/servers/sseserver/rabbitmq.go

View workflow job for this annotation

GitHub Actions / build-app

unknown field QueueAutoDelete in struct literal of type rabbitmqhandler.RabbitMQListnerConfig
Exclusive: false,
NoLocal: false,
NoWait: false,
Args: nil,
Exchange: SSEventsExchange,

Check failure on line 85 in pkg/servers/sseserver/rabbitmq.go

View workflow job for this annotation

GitHub Actions / build-app

unknown field Exchange in struct literal of type rabbitmqhandler.RabbitMQListnerConfig
}
rabbithandler := rabbitmqhandler.New(config, ssemessagehandler{})
_ = apiconnections.RabbitMQConnection.RegisterHandler(rabbithandler)
Expand Down
3 changes: 1 addition & 2 deletions pkg/servers/sseserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type EventMessage struct {
func StartEventServer() {
StartListeningRabbitMQ()
Server = &EventServer{
Message: make(chan EventMessage),
Message: make(chan EventMessage, 10),
NewClients: make(chan *EventClient),
ClosedClients: make(chan EventClientId),
Clients: NewEventClients(),
Expand All @@ -48,7 +48,6 @@ func (es *EventServer) listen() {
case client := <-es.NewClients:
es.Clients.Add(client)
rlog.Infof("Added sse client. %d registered clients", es.Clients.Len())

// Remove closed client
case client := <-es.ClosedClients:

Expand Down

0 comments on commit 6b2145d

Please sign in to comment.