Skip to content

Commit

Permalink
Merge pull request #13 from kalradev/kalradev/broker/channel
Browse files Browse the repository at this point in the history
NATS: channel support
  • Loading branch information
kumarabd authored Dec 22, 2020
2 parents 20b4921 + 988bf34 commit f133108
Show file tree
Hide file tree
Showing 35 changed files with 112 additions and 60 deletions.
17 changes: 13 additions & 4 deletions internal/cluster/informers/depoyment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/apps/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -19,35 +19,44 @@ func (c *Cluster) DeploymentInformer() cache.SharedIndexInformer {
AddFunc: func(obj interface{}) {
deployment := obj.(*v1.Deployment)
log.Printf("Deployment Named: %s - added", deployment.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
deployment.TypeMeta,
deployment.ObjectMeta,
deployment.Spec,
deployment.Status,
)})
if err != nil {
log.Println("Error publishing Deployment")
}
},
UpdateFunc: func(new interface{}, old interface{}) {
deployment := new.(*v1.Deployment)
log.Printf("Deployment Named: %s - updated", deployment.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
deployment.TypeMeta,
deployment.ObjectMeta,
deployment.Spec,
deployment.Status,
)})
if err != nil {
log.Println("Error publishing Deployment")
}
},
DeleteFunc: func(obj interface{}) {
deployment := obj.(*v1.Deployment)
log.Printf("Deployment Named: %s - deleted", deployment.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
deployment.TypeMeta,
deployment.ObjectMeta,
deployment.Spec,
deployment.Status,
)})
if err != nil {
log.Println("Error publishing Deployment")
}
},
},
)
Expand Down
17 changes: 13 additions & 4 deletions internal/cluster/informers/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -19,35 +19,44 @@ func (c *Cluster) NamespaceInformer() cache.SharedIndexInformer {
AddFunc: func(obj interface{}) {
Namespace := obj.(*v1.Namespace)
log.Printf("Namespace Named: %s - added", Namespace.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Namespace.TypeMeta,
Namespace.ObjectMeta,
Namespace.Spec,
Namespace.Status,
)})
if err != nil {
log.Println("Error publishing Namespace")
}
},
UpdateFunc: func(new interface{}, old interface{}) {
Namespace := new.(*v1.Namespace)
log.Printf("Namespace Named: %s - updated", Namespace.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Namespace.TypeMeta,
Namespace.ObjectMeta,
Namespace.Spec,
Namespace.Status,
)})
if err != nil {
log.Println("Error publishing Namespace")
}
},
DeleteFunc: func(obj interface{}) {
Namespace := obj.(*v1.Namespace)
log.Printf("Namespace Named: %s - deleted", Namespace.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Namespace.TypeMeta,
Namespace.ObjectMeta,
Namespace.Spec,
Namespace.Status,
)})
if err != nil {
log.Println("Error publishing Namespace")
}
},
},
)
Expand Down
17 changes: 13 additions & 4 deletions internal/cluster/informers/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -19,35 +19,44 @@ func (c *Cluster) NodeInformer() cache.SharedIndexInformer {
AddFunc: func(obj interface{}) {
Node := obj.(*v1.Node)
log.Printf("Node Named: %s - added", Node.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Node.TypeMeta,
Node.ObjectMeta,
Node.Spec,
Node.Status,
)})
if err != nil {
log.Println("Error publishing Node")
}
},
UpdateFunc: func(new interface{}, old interface{}) {
Node := new.(*v1.Node)
log.Printf("Node Named: %s - updated", Node.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Node.TypeMeta,
Node.ObjectMeta,
Node.Spec,
Node.Status,
)})
if err != nil {
log.Println("Error publishing Node")
}
},
DeleteFunc: func(obj interface{}) {
Node := obj.(*v1.Node)
log.Printf("Node Named: %s - deleted", Node.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Node.TypeMeta,
Node.ObjectMeta,
Node.Spec,
Node.Status,
)})
if err != nil {
log.Println("Error publishing Node")
}
},
},
)
Expand Down
17 changes: 13 additions & 4 deletions internal/cluster/informers/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -19,35 +19,44 @@ func (c *Cluster) PodInformer() cache.SharedIndexInformer {
AddFunc: func(obj interface{}) {
Pod := obj.(*v1.Pod)
log.Printf("Pod Named: %s - added", Pod.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Pod.TypeMeta,
Pod.ObjectMeta,
Pod.Spec,
Pod.Status,
)})
if err != nil {
log.Println("Error publishing Pod")
}
},
UpdateFunc: func(new interface{}, old interface{}) {
Pod := new.(*v1.Pod)
log.Printf("Pod Named: %s - updated", Pod.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Pod.TypeMeta,
Pod.ObjectMeta,
Pod.Spec,
Pod.Status,
)})
if err != nil {
log.Println("Error publishing Pod")
}
},
DeleteFunc: func(obj interface{}) {
Pod := obj.(*v1.Pod)
log.Printf("Pod Named: %s - deleted", Pod.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
Pod.TypeMeta,
Pod.ObjectMeta,
Pod.Spec,
Pod.Status,
)})
if err != nil {
log.Println("Error publishing Pod")
}
},
},
)
Expand Down
17 changes: 13 additions & 4 deletions internal/cluster/informers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -19,35 +19,44 @@ func (c *Cluster) ServiceInformer() cache.SharedIndexInformer {
AddFunc: func(obj interface{}) {
service := obj.(*v1.Service)
log.Printf("Service Named: %s - added", service.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
service.TypeMeta,
service.ObjectMeta,
service.Spec,
service.Status,
)})
if err != nil {
log.Println("Error publishing Service")
}
},
UpdateFunc: func(new interface{}, old interface{}) {
service := new.(*v1.Service)
log.Printf("Service Named: %s - updated", service.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
service.TypeMeta,
service.ObjectMeta,
service.Spec,
service.Status,
)})
if err != nil {
log.Println("Error publishing Service")
}
},
DeleteFunc: func(obj interface{}) {
service := obj.(*v1.Service)
log.Printf("Service Named: %s - deleted", service.Name)
c.broker.Publish(Subject, &broker.Message{
err := c.broker.Publish(Subject, &broker.Message{
Object: model.ConvObject(
service.TypeMeta,
service.ObjectMeta,
service.Spec,
service.Status,
)})
if err != nil {
log.Println("Error publishing Service")
}
},
},
)
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/pipeline/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"

"github.com/layer5io/meshsync/internal/cache"
"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
discovery "github.com/layer5io/meshsync/pkg/discovery"
"github.com/myntra/pipeline"
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/pipeline/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pipeline
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
discovery "github.com/layer5io/meshsync/pkg/discovery"
"github.com/myntra/pipeline"
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package pipeline
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
discovery "github.com/layer5io/meshsync/pkg/discovery"
"github.com/myntra/pipeline"
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/pipeline/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"

"github.com/layer5io/meshsync/internal/cache"
"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
discovery "github.com/layer5io/meshsync/pkg/discovery"
"github.com/myntra/pipeline"
Expand Down
2 changes: 1 addition & 1 deletion internal/cluster/pipeline/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"

"github.com/layer5io/meshsync/internal/cache"
"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
discovery "github.com/layer5io/meshsync/pkg/discovery"
"github.com/myntra/pipeline"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/authorization_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
broker "github.com/layer5io/meshsync/pkg/broker"
"github.com/layer5io/meshsync/pkg/model"
v1beta1 "istio.io/client-go/pkg/apis/security/v1beta1"
"k8s.io/client-go/tools/cache"
)
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/destination_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
"k8s.io/client-go/tools/cache"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/envoy_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
"k8s.io/client-go/tools/cache"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
"k8s.io/client-go/tools/cache"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/peer_authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1beta1 "istio.io/client-go/pkg/apis/security/v1beta1"
"k8s.io/client-go/tools/cache"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/request_authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1beta1 "istio.io/client-go/pkg/apis/security/v1beta1"
"k8s.io/client-go/tools/cache"
Expand Down
2 changes: 1 addition & 1 deletion internal/meshes/istio/informers/service_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package informers
import (
"log"

"github.com/layer5io/meshsync/internal/model"
"github.com/layer5io/meshsync/pkg/model"
broker "github.com/layer5io/meshsync/pkg/broker"
v1beta1 "istio.io/client-go/pkg/apis/networking/v1beta1"
"k8s.io/client-go/tools/cache"
Expand Down
Loading

0 comments on commit f133108

Please sign in to comment.