Skip to content

Commit

Permalink
[CONTROLLER/RECORDER] refactors listener module
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengya authored and SongZhen0704 committed Jul 6, 2023
1 parent c659f9c commit 71c7018
Show file tree
Hide file tree
Showing 94 changed files with 1,713 additions and 1,231 deletions.
6 changes: 3 additions & 3 deletions server/controller/recorder/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1514,13 +1514,13 @@ func (c *Cache) refreshProcesses() {
c.AddProcesses(processes)
}

func (c *Cache) AddPrometheusTarget(items []*mysql.PrometheusTarget) {
func (c *Cache) AddPrometheusTargets(items []*mysql.PrometheusTarget) {
for _, item := range items {
c.DiffBaseDataSet.addPrometheusTarget(item, c.Sequence)
}
}

func (c *Cache) DeletePrometheusTarget(lcuuids []string) {
func (c *Cache) DeletePrometheusTargets(lcuuids []string) {
for _, lcuuid := range lcuuids {
c.DiffBaseDataSet.deletePrometheusTarget(lcuuid)
}
Expand All @@ -1534,5 +1534,5 @@ func (c *Cache) refreshPrometheusTarget() {
return
}

c.AddPrometheusTarget(prometheusTargets)
c.AddPrometheusTargets(prometheusTargets)
}
46 changes: 46 additions & 0 deletions server/controller/recorder/listener/az.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package listener

import (
cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/recorder/cache"
)

type AZ struct {
cache *cache.Cache
}

func NewAZ(c *cache.Cache) *AZ {
listener := &AZ{
cache: c,
}
return listener
}

func (a *AZ) OnUpdaterAdded(addedDBItems []*mysql.AZ) {
a.cache.AddAZs(addedDBItems)
}

func (a *AZ) OnUpdaterUpdated(cloudItem *cloudmodel.AZ, diffBase *cache.AZ) {
diffBase.Update(cloudItem)
}

func (a *AZ) OnUpdaterDeleted(lcuuids []string) {
a.cache.DeleteAZs(lcuuids)
}
46 changes: 46 additions & 0 deletions server/controller/recorder/listener/cen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package listener

import (
cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/recorder/cache"
)

type CEN struct {
cache *cache.Cache
}

func NewCEN(c *cache.Cache) *CEN {
listener := &CEN{
cache: c,
}
return listener
}

func (c *CEN) OnUpdaterAdded(addedDBItems []*mysql.CEN) {
c.cache.AddCENs(addedDBItems)
}

func (c *CEN) OnUpdaterUpdated(cloudItem *cloudmodel.CEN, diffBase *cache.CEN) {
diffBase.Update(cloudItem)
}

func (c *CEN) OnUpdaterDeleted(lcuuids []string) {
c.cache.DeleteCENs(lcuuids)
}
12 changes: 6 additions & 6 deletions server/controller/recorder/listener/dhcp_port.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ type DHCPPort struct {
}

func NewDHCPPort(c *cache.Cache, eq *queue.OverwriteQueue) *DHCPPort {
lisener := &DHCPPort{
listener := &DHCPPort{
cache: c,
eventProducer: event.NewDHCPPort(&c.ToolDataSet, eq),
}
return lisener
return listener
}

func (p *DHCPPort) OnUpdaterAdded(addedDBItems []*mysql.DHCPPort) {
// p.cache.AddDHCPPorts(addedDBItems)
p.eventProducer.ProduceByAdd(addedDBItems)
p.cache.AddDHCPPorts(addedDBItems)
}

func (p *DHCPPort) OnUpdaterUpdated(cloudItem *cloudmodel.DHCPPort, diffBase *cache.DHCPPort) {
p.eventProducer.ProduceByUpdate(cloudItem, diffBase)
// diffBase.Update(cloudItem)
// p.cache.UpdateDHCPPort(cloudItem)
diffBase.Update(cloudItem)
p.cache.UpdateDHCPPort(cloudItem)
}

func (p *DHCPPort) OnUpdaterDeleted(lcuuids []string) {
p.eventProducer.ProduceByDelete(lcuuids)
// p.cache.DeleteDHCPPorts(lcuuids)
p.cache.DeleteDHCPPorts(lcuuids)
}
10 changes: 5 additions & 5 deletions server/controller/recorder/listener/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,19 @@ import (
"github.com/deepflowio/deepflow/server/libs/queue"
)

type Domain struct {
type WholeDomain struct {
cache *cache.Cache
eventProducer *event.Domain
}

func NewDomain(domainLcuuid string, c *cache.Cache, eq *queue.OverwriteQueue) *Domain {
lisener := &Domain{
func NewWholeDomain(domainLcuuid string, c *cache.Cache, eq *queue.OverwriteQueue) *WholeDomain {
listener := &WholeDomain{
cache: c,
eventProducer: event.NewDomain(domainLcuuid, &c.ToolDataSet, eq),
}
return lisener
return listener
}

func (p *Domain) OnUpdatersCompeleted() {
func (p *WholeDomain) OnUpdatersCompleted() {
p.eventProducer.ProduceFromMySQL()
}
46 changes: 46 additions & 0 deletions server/controller/recorder/listener/floating_ip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Yunshan Networks
*
* LiFloatingIPsed under the Apache LiFloatingIPse, Version 2.0 (the "LiFloatingIPse");
* you may not use this file except in compliance with the LiFloatingIPse.
* You may obtain a copy of the LiFloatingIPse at
*
* http://www.apache.org/liFloatingIPses/LIFloatingIPSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the LiFloatingIPse is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the LiFloatingIPse for the specific language governing permissions and
* limitations under the LiFloatingIPse.
*/

package listener

import (
cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/recorder/cache"
)

type FloatingIP struct {
cache *cache.Cache
}

func NewFloatingIP(c *cache.Cache) *FloatingIP {
listener := &FloatingIP{
cache: c,
}
return listener
}

func (f *FloatingIP) OnUpdaterAdded(addedDBItems []*mysql.FloatingIP) {
f.cache.AddFloatingIPs(addedDBItems)
}

func (f *FloatingIP) OnUpdaterUpdated(cloudItem *cloudmodel.FloatingIP, diffBase *cache.FloatingIP) {
diffBase.Update(cloudItem)
}

func (f *FloatingIP) OnUpdaterDeleted(lcuuids []string) {
f.cache.DeleteFloatingIPs(lcuuids)
}
24 changes: 12 additions & 12 deletions server/controller/recorder/listener/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ type Host struct {
}

func NewHost(c *cache.Cache, eq *queue.OverwriteQueue) *Host {
lisener := &Host{
listener := &Host{
cache: c,
eventProducer: event.NewHost(&c.ToolDataSet, eq),
}
return lisener
return listener
}

func (p *Host) OnUpdaterAdded(addedDBItems []*mysql.Host) {
// p.cache.AddHosts(addedDBItems)
p.eventProducer.ProduceByAdd(addedDBItems)
func (h *Host) OnUpdaterAdded(addedDBItems []*mysql.Host) {
h.eventProducer.ProduceByAdd(addedDBItems)
h.cache.AddHosts(addedDBItems)
}

func (p *Host) OnUpdaterUpdated(cloudItem *cloudmodel.Host, diffBase *cache.Host) {
p.eventProducer.ProduceByUpdate(cloudItem, diffBase)
// diffBase.Update(cloudItem)
// p.cache.UpdateHost(cloudItem)
func (h *Host) OnUpdaterUpdated(cloudItem *cloudmodel.Host, diffBase *cache.Host) {
h.eventProducer.ProduceByUpdate(cloudItem, diffBase)
diffBase.Update(cloudItem)
h.cache.UpdateHost(cloudItem)
}

func (p *Host) OnUpdaterDeleted(lcuuids []string) {
p.eventProducer.ProduceByDelete(lcuuids)
// p.cache.DeleteHosts(lcuuids)
func (h *Host) OnUpdaterDeleted(lcuuids []string) {
h.eventProducer.ProduceByDelete(lcuuids)
h.cache.DeleteHosts(lcuuids)
}
23 changes: 11 additions & 12 deletions server/controller/recorder/listener/lan_ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,24 @@ type LANIP struct {
}

func NewLANIP(c *cache.Cache, eq *queue.OverwriteQueue) *LANIP {
lisener := &LANIP{
listener := &LANIP{
cache: c,
eventProducer: event.NewLANIP(&c.ToolDataSet, eq),
}
return lisener
return listener
}

func (p *LANIP) OnUpdaterAdded(addedDBItems []*mysql.LANIP) {
// p.cache.AddLANIPs(addedDBItems)
p.eventProducer.ProduceByAdd(addedDBItems)
func (i *LANIP) OnUpdaterAdded(addedDBItems []*mysql.LANIP) {
i.eventProducer.ProduceByAdd(addedDBItems)
i.cache.AddLANIPs(addedDBItems)
}

func (p *LANIP) OnUpdaterUpdated(cloudItem *cloudmodel.IP, diffBase *cache.LANIP) {
p.eventProducer.ProduceByUpdate(cloudItem, diffBase)
// diffBase.Update(cloudItem)
// p.cache.UpdateLANIP(cloudItem)
func (i *LANIP) OnUpdaterUpdated(cloudItem *cloudmodel.IP, diffBase *cache.LANIP) {
i.eventProducer.ProduceByUpdate(cloudItem, diffBase)
diffBase.Update(cloudItem)
}

func (p *LANIP) OnUpdaterDeleted(lcuuids []string) {
p.eventProducer.ProduceByDelete(lcuuids)
// p.cache.DeleteLANIPs(lcuuids)
func (i *LANIP) OnUpdaterDeleted(lcuuids []string) {
i.eventProducer.ProduceByDelete(lcuuids)
i.cache.DeleteLANIPs(lcuuids)
}
24 changes: 12 additions & 12 deletions server/controller/recorder/listener/lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,25 @@ type LB struct {
}

func NewLB(c *cache.Cache, eq *queue.OverwriteQueue) *LB {
lisener := &LB{
listener := &LB{
cache: c,
eventProducer: event.NewLB(&c.ToolDataSet, eq),
}
return lisener
return listener
}

func (p *LB) OnUpdaterAdded(addedDBItems []*mysql.LB) {
// p.cache.AddLBs(addedDBItems)
p.eventProducer.ProduceByAdd(addedDBItems)
func (lb *LB) OnUpdaterAdded(addedDBItems []*mysql.LB) {
lb.eventProducer.ProduceByAdd(addedDBItems)
lb.cache.AddLBs(addedDBItems)
}

func (p *LB) OnUpdaterUpdated(cloudItem *cloudmodel.LB, diffBase *cache.LB) {
p.eventProducer.ProduceByUpdate(cloudItem, diffBase)
// diffBase.Update(cloudItem)
// p.cache.UpdateLB(cloudItem)
func (lb *LB) OnUpdaterUpdated(cloudItem *cloudmodel.LB, diffBase *cache.LB) {
lb.eventProducer.ProduceByUpdate(cloudItem, diffBase)
diffBase.Update(cloudItem)
lb.cache.UpdateLB(cloudItem)
}

func (p *LB) OnUpdaterDeleted(lcuuids []string) {
p.eventProducer.ProduceByDelete(lcuuids)
// p.cache.DeleteLBs(lcuuids)
func (lb *LB) OnUpdaterDeleted(lcuuids []string) {
lb.eventProducer.ProduceByDelete(lcuuids)
lb.cache.DeleteLBs(lcuuids)
}
46 changes: 46 additions & 0 deletions server/controller/recorder/listener/lb_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2023 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package listener

import (
cloudmodel "github.com/deepflowio/deepflow/server/controller/cloud/model"
"github.com/deepflowio/deepflow/server/controller/db/mysql"
"github.com/deepflowio/deepflow/server/controller/recorder/cache"
)

type LBListener struct {
cache *cache.Cache
}

func NewLBListener(c *cache.Cache) *LBListener {
listener := &LBListener{
cache: c,
}
return listener
}

func (l *LBListener) OnUpdaterAdded(addedDBItems []*mysql.LBListener) {
l.cache.AddLBListeners(addedDBItems)
}

func (l *LBListener) OnUpdaterUpdated(cloudItem *cloudmodel.LBListener, diffBase *cache.LBListener) {
diffBase.Update(cloudItem)
}

func (l *LBListener) OnUpdaterDeleted(lcuuids []string) {
l.cache.DeleteLBListeners(lcuuids)
}
Loading

0 comments on commit 71c7018

Please sign in to comment.