Skip to content

Commit

Permalink
[Server] add process info to ingester module
Browse files Browse the repository at this point in the history
  • Loading branch information
roryye committed Jul 12, 2023
1 parent 4de2643 commit 8c14619
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 23 deletions.
4 changes: 4 additions & 0 deletions cli/ctl/trisolaris_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,10 @@ func platformData(response *trident.SyncResponse) {
for index, entry := range platform.Cidrs {
JsonFormat(index+1, entry)
}
fmt.Println("gprocess infos:")
for index, entry := range platform.GprocessInfos {
JsonFormat(index+1, entry)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.18

require (
github.com/bitly/go-simplejson v0.5.0
github.com/deepflowio/deepflow/message v0.0.0-20230530102604-2aaf27c6681a
github.com/deepflowio/deepflow/message v0.0.0-20230711072045-ba7f8a7d5b88
github.com/deepflowio/deepflow/server v0.0.0-20230627003827-a0736467fe05
github.com/golang/protobuf v1.5.2
github.com/mattn/go-runewidth v0.0.14
Expand Down
4 changes: 2 additions & 2 deletions cli/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ3
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deepflowio/deepflow/message v0.0.0-20230530102604-2aaf27c6681a h1:HPPeWjfRdoUOllDZq2z8QzQsgkb13/TsNJdt1s2oT14=
github.com/deepflowio/deepflow/message v0.0.0-20230530102604-2aaf27c6681a/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/message v0.0.0-20230711072045-ba7f8a7d5b88 h1:uK/Ie+NO6cdqnDAb/veG/ZLnjinq0o6S5/ArXouKFH4=
github.com/deepflowio/deepflow/message v0.0.0-20230711072045-ba7f8a7d5b88/go.mod h1:e+1lUMMlycCvFRKvlwt/y/0vxJnF8wVss3GyR1ARXY0=
github.com/deepflowio/deepflow/server v0.0.0-20230627003827-a0736467fe05 h1:aNxt1SOsk909k8/wZ3doIZ5VOsTpINFRftC49DT5ZFs=
github.com/deepflowio/deepflow/server v0.0.0-20230627003827-a0736467fe05/go.mod h1:cA/aI0aTksmlb2xrvSSCCvKvwdhgFrr8lpADNAevAFU=
github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw=
Expand Down
12 changes: 12 additions & 0 deletions server/controller/trisolaris/metadata/db_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type DBDataCache struct {
npbPolicies []*models.NpbPolicy
pcapPolicies []*models.PcapPolicy
cens []*models.CEN
processes []*models.Process
}

func newDBDataCache() *DBDataCache {
Expand Down Expand Up @@ -247,6 +248,10 @@ func (d *DBDataCache) GetCENs() []*models.CEN {
return d.cens
}

func (d *DBDataCache) GetProcesses() []*models.Process {
return d.processes
}

func GetTapTypesFromDB(db *gorm.DB) []*models.TapType {
tapTypes, err := dbmgr.DBMgr[models.TapType](db).Gets()
if err != nil {
Expand Down Expand Up @@ -553,4 +558,11 @@ func (d *DBDataCache) GetDataCacheFromDB(db *gorm.DB) {
} else {
log.Error(err)
}

processes, err := dbmgr.DBMgr[models.Process](db).Gets()
if err == nil {
d.processes = processes
} else {
log.Error(err)
}
}
30 changes: 30 additions & 0 deletions server/controller/trisolaris/metadata/domain_gprocess_info.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 Yunshan Networks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package metadata

import "github.com/deepflowio/deepflow/message/trident"

// GProcessInfoProto is only used to send to the ingester module.
type GProcessInfoProto struct {
gprocessInfo []*trident.GProcessInfo
}

func newGProcessInfoProto(length int) *GProcessInfoProto {
return &GProcessInfoProto{
gprocessInfo: make([]*trident.GProcessInfo, 0, length),
}
}
17 changes: 12 additions & 5 deletions server/controller/trisolaris/metadata/platform_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type PlatformData struct {
interfaceProtos []*trident.Interface
peerConnProtos []*trident.PeerConnection
cidrProtos []*trident.Cidr
gprocessInfoProtos []*trident.GProcessInfo
version uint64
mergeDomains []string
dataType uint32
Expand All @@ -52,14 +53,16 @@ func NewPlatformData(domain string, lcuuid string, version uint64, dataType uint
interfaceProtos: []*trident.Interface{},
peerConnProtos: []*trident.PeerConnection{},
cidrProtos: []*trident.Cidr{},
gprocessInfoProtos: []*trident.GProcessInfo{},
version: version,
mergeDomains: []string{},
dataType: dataType,
}
}

func (f *PlatformData) setPlatformData(ifs []*trident.Interface, pcs []*trident.PeerConnection, cidrs []*trident.Cidr) {
f.initPlatformData(ifs, pcs, cidrs)
func (f *PlatformData) setPlatformData(ifs []*trident.Interface, pcs []*trident.PeerConnection, cidrs []*trident.Cidr,
gpis []*trident.GProcessInfo) {
f.initPlatformData(ifs, pcs, cidrs, gpis)
f.GeneratePlatformDataResult()
}

Expand Down Expand Up @@ -89,17 +92,20 @@ func (f *PlatformData) initVersion() {
offsetVersion += offsetInterval
}

func (f *PlatformData) initPlatformData(ifs []*trident.Interface, pcs []*trident.PeerConnection, cidrs []*trident.Cidr) {
func (f *PlatformData) initPlatformData(ifs []*trident.Interface, pcs []*trident.PeerConnection, cidrs []*trident.Cidr,
gpi []*trident.GProcessInfo) {
f.interfaceProtos = ifs
f.peerConnProtos = pcs
f.cidrProtos = cidrs
f.gprocessInfoProtos = gpi
}

func (f *PlatformData) GeneratePlatformDataResult() {
f.platformDataProtos = &trident.PlatformData{
Interfaces: f.interfaceProtos,
PeerConnections: f.peerConnProtos,
Cidrs: f.cidrProtos,
GprocessInfos: f.gprocessInfoProtos,
}
var err error
f.platformDataStr, err = f.platformDataProtos.Marshal()
Expand All @@ -118,6 +124,7 @@ func (f *PlatformData) Merge(other *PlatformData) {
f.interfaceProtos = append(f.interfaceProtos, other.interfaceProtos...)
f.peerConnProtos = append(f.peerConnProtos, other.peerConnProtos...)
f.cidrProtos = append(f.cidrProtos, other.cidrProtos...)
f.gprocessInfoProtos = append(f.gprocessInfoProtos, other.gprocessInfoProtos...)
f.version += other.version
if len(other.domain) != 0 {
f.mergeDomains = append(f.mergeDomains, other.domain)
Expand Down Expand Up @@ -159,6 +166,6 @@ func (f *PlatformData) equal(other *PlatformData) bool {
}

func (f *PlatformData) String() string {
return fmt.Sprintf("name: %s, lcuuid: %s, data_type: %d, version: %d, platform_data_hash: %d, interfaces: %d, peer_connections: %d, cidrs: %d, merge_domains: %s",
f.domain, f.lcuuid, f.dataType, f.version, f.platformDataHash, len(f.interfaceProtos), len(f.peerConnProtos), len(f.cidrProtos), f.mergeDomains)
return fmt.Sprintf("name: %s, lcuuid: %s, data_type: %d, version: %d, platform_data_hash: %d, interfaces: %d, peer_connections: %d, cidrs: %d, gprocess_info: %d, merge_domains: %s",
f.domain, f.lcuuid, f.dataType, f.version, f.platformDataHash, len(f.interfaceProtos), len(f.peerConnProtos), len(f.cidrProtos), len(f.gprocessInfoProtos), f.mergeDomains)
}
57 changes: 46 additions & 11 deletions server/controller/trisolaris/metadata/platform_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type PlatformDataOP struct {
domainInterfaceProto *atomic.Value // *DomainInterfaceProto
domainPeerConnProto *atomic.Value // *DomainPeerConnProto
domainCIDRProto *atomic.Value // *DomainCIDRProto

GProcessInfoProto *atomic.Value // *GProcessInfoProto

// ingester used platform data
allPlatformDataForIngester *atomic.Value //*PlatformData

Expand Down Expand Up @@ -69,6 +72,9 @@ func newPlatformDataOP(db *gorm.DB, metaData *MetaData) *PlatformDataOP {
domainCIDRProto := &atomic.Value{}
domainCIDRProto.Store(newDomainCIDRProto(0))

gprocessInfoProto := &atomic.Value{}
gprocessInfoProto.Store(newGProcessInfoProto(0))

allPlatformDataForIngester := &atomic.Value{}
allPlatformDataForIngester.Store(NewPlatformData("", "", 0, INGESTER_ALL_PLATFORM_DATA))

Expand All @@ -77,6 +83,7 @@ func newPlatformDataOP(db *gorm.DB, metaData *MetaData) *PlatformDataOP {
domainInterfaceProto: domainInterfaceProto,
domainPeerConnProto: domainPeerConnProto,
domainCIDRProto: domainCIDRProto,
GProcessInfoProto: gprocessInfoProto,
allPlatformDataForIngester: allPlatformDataForIngester,
DomainToPlatformData: newDomainToPlatformData(),
db: db,
Expand Down Expand Up @@ -328,15 +335,31 @@ func (p *PlatformDataOP) generateCIDRs() {
p.updateDomainCIDRProto(dcProto)
}

func (p *PlatformDataOP) generateGProcessInfo() {
dbDataCache := p.metaData.GetDBDataCache()
processes := dbDataCache.GetProcesses()
gprocessData := newGProcessInfoProto(len(processes))
for _, process := range processes {
p := &trident.GProcessInfo{
GprocessId: proto.Uint32(uint32(process.ID)),
NetnsId: proto.Uint32(process.NetnsID),
VtapId: proto.Uint32(uint32(process.VTapID)),
}
gprocessData.gprocessInfo = append(gprocessData.gprocessInfo, p)
}
p.updateGProcessInfoProto(gprocessData)
}

func (p *PlatformDataOP) generateIngesterPlatformData() {
domainInterfaceProto := p.getDomainInterfaceProto()
domainPeerConnProto := p.getDomainPeerConnProto()
domainCIDRProto := p.getDomainCIDRProto()
gprocessInfo := p.getGProcessInfoProto().gprocessInfo

// AllPlatformDataForIngester
newIngesterPlatformData := NewPlatformData("", "", 0, INGESTER_ALL_PLATFORM_DATA)
newIngesterPlatformData.setPlatformData(domainInterfaceProto.allCompleteInterfaces,
domainPeerConnProto.peerConns, domainCIDRProto.cidrs)
domainPeerConnProto.peerConns, domainCIDRProto.cidrs, gprocessInfo)
oldIngesterPlatformData := p.GetAllPlatformDataForIngester()
if oldIngesterPlatformData.GetVersion() == 0 {
newIngesterPlatformData.setVersion(uint64(time.Now().Unix()))
Expand All @@ -351,7 +374,8 @@ func (p *PlatformDataOP) generateIngesterPlatformData() {
newACPData.setPlatformData(
domainInterfaceProto.allCompleteInterfacesExceptPod,
domainPeerConnProto.peerConns,
domainCIDRProto.simplecidrs)
domainCIDRProto.simplecidrs,
gprocessInfo)
oldACPData := p.GetAllCompletePlatformDataExceptPod()
if oldACPData == nil {
newACPData.initVersion()
Expand All @@ -366,7 +390,7 @@ func (p *PlatformDataOP) generateIngesterPlatformData() {
for _, region := range regions {
interfaces := domainInterfaceProto.regionToInterfacesOnlyPod[region.Lcuuid]
regionData := NewPlatformData(region.Name, region.Lcuuid, 0, REGION_TO_PLATFORM_DATA_ONLY_POD)
regionData.setPlatformData(interfaces, nil, nil)
regionData.setPlatformData(interfaces, nil, nil, nil)
regionToData[region.Lcuuid] = regionData
}
if !p.GetRegionToPlatformDataOnlyPod().checkVersion(regionToData) {
Expand All @@ -378,7 +402,7 @@ func (p *PlatformDataOP) generateIngesterPlatformData() {
for _, az := range azs {
interfaces := domainInterfaceProto.azToInterfacesOnlyPod[az.Lcuuid]
azData := NewPlatformData(az.Name, az.Lcuuid, 0, AZ_TO_PLATFORM_DATA_ONLY_POD)
azData.setPlatformData(interfaces, nil, nil)
azData.setPlatformData(interfaces, nil, nil, nil)
azToData[az.Lcuuid] = azData
}
if !p.GetAZToPlatformDataOnlyPod().checkVersion(azToData) {
Expand All @@ -400,7 +424,8 @@ func (p *PlatformDataOP) generateAllSimplePlatformData() {
aSPData.setPlatformData(
domainInterfaceProto.allSimpleInterfaces,
domainPeerConnProto.peerConns,
domainCIDRProto.simplecidrs)
domainCIDRProto.simplecidrs,
nil)
pASPData := p.GetAllSimplePlatformData()
if pASPData == nil {
aSPData.initVersion()
Expand All @@ -416,7 +441,8 @@ func (p *PlatformDataOP) generateAllSimplePlatformData() {
aSPDExceptPod.setPlatformData(
domainInterfaceProto.allSimpleInterfacesExceptPod,
domainPeerConnProto.peerConns,
domainCIDRProto.simplecidrs)
domainCIDRProto.simplecidrs,
nil)
pASPDExceptPod := p.GetAllSimplePlatformDataExceptPod()
if pASPDExceptPod == nil {
aSPDExceptPod.initVersion()
Expand Down Expand Up @@ -447,20 +473,20 @@ func (p *PlatformDataOP) generateDomainPlatformData() {
peerConnections := domainPeerConnProto.domainToPeerConns[domain.Lcuuid]
cidrs := domainCIDRProto.domainToCIDRs[domain.Lcuuid]
domainDate := NewPlatformData(domain.Name, domain.Lcuuid, 0, DOMAIN_TO_ALL_SIMPLE_PLATFORM_DATA)
domainDate.setPlatformData(interfaces, peerConnections, cidrs)
domainDate.setPlatformData(interfaces, peerConnections, cidrs, nil)
dToAPData[domain.Lcuuid] = domainDate

// vinterface包含集群内非pod信息
interfacesExceptPod := domainInterfaceProto.domainToInterfacesExceptPod[domain.Lcuuid]
domainCIDRs := domainCIDRProto.domainOrSubdomainToCIDRs[domain.Lcuuid]
domainDataExceptPod := NewPlatformData(domain.Name, domain.Lcuuid, 0, DOMAIN_TO_PLATFORM_DATA_EXCEPT_POD)
domainDataExceptPod.setPlatformData(interfacesExceptPod, peerConnections, domainCIDRs)
domainDataExceptPod.setPlatformData(interfacesExceptPod, peerConnections, domainCIDRs, nil)
dToPDExceptPod[domain.Lcuuid] = domainDataExceptPod

// domain仅包含pod信息
interfacesOnlyPod := domainInterfaceProto.domainOrSubdomainToInterfacesOnlyPod[domain.Lcuuid]
domainDataOnlyPod := NewPlatformData(domain.Name, domain.Lcuuid, 0, DOMAIN_TO_PLATFORM_DATA_ONLY_POD)
domainDataOnlyPod.setPlatformData(interfacesOnlyPod, peerConnections, domainCIDRs)
domainDataOnlyPod.setPlatformData(interfacesOnlyPod, peerConnections, domainCIDRs, nil)
dToPDOnlyPod[domain.Lcuuid] = domainDataOnlyPod
}

Expand All @@ -471,12 +497,12 @@ func (p *PlatformDataOP) generateDomainPlatformData() {
peerConnections := domainPeerConnProto.domainToPeerConns[subDomain.Lcuuid]
cidrs := domainCIDRProto.domainOrSubdomainToCIDRs[subDomain.Lcuuid]
domainDataOnlyPod := NewPlatformData(subDomain.Name, subDomain.Lcuuid, 0, DOMAIN_TO_PLATFORM_DATA_ONLY_POD)
domainDataOnlyPod.setPlatformData(interfaces, peerConnections, cidrs)
domainDataOnlyPod.setPlatformData(interfaces, peerConnections, cidrs, nil)
dToPDOnlyPod[subDomain.Lcuuid] = domainDataOnlyPod
}

noDomainData := NewPlatformData("no domain", "", 0, NO_DOMAIN_TO_PLATFORM)
noDomainData.setPlatformData(nil, domainPeerConnProto.getNoDomainPeerConns(), nil)
noDomainData.setPlatformData(nil, domainPeerConnProto.getNoDomainPeerConns(), nil, nil)
oldNoDOmainDat := p.GetNoDomainPlatformData()
if oldNoDOmainDat == nil {
noDomainData.initVersion()
Expand Down Expand Up @@ -570,6 +596,14 @@ func (p *PlatformDataOP) updateDomainCIDRProto(c *DomainCIDRProto) {
p.domainCIDRProto.Store(c)
}

func (p *PlatformDataOP) getGProcessInfoProto() *GProcessInfoProto {
return p.GProcessInfoProto.Load().(*GProcessInfoProto)
}

func (p *PlatformDataOP) updateGProcessInfoProto(c *GProcessInfoProto) {
p.GProcessInfoProto.Store(c)
}

func (p *PlatformDataOP) GetAllPlatformDataForIngester() *PlatformData {
return p.allPlatformDataForIngester.Load().(*PlatformData)
}
Expand All @@ -588,6 +622,7 @@ func (p *PlatformDataOP) generateBasePlatformData() {
p.generateVInterfaces()
p.generatePeerConnections()
p.generateCIDRs()
p.generateGProcessInfo()
p.generateIngesterPlatformData()
p.generateAllSimplePlatformData()
p.generateDomainPlatformData()
Expand Down
21 changes: 20 additions & 1 deletion server/controller/trisolaris/metadata/raw_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type PlatformRawData struct {
lbIDs mapset.Set
natIDs mapset.Set
podServicePortIDs mapset.Set
processIDs mapset.Set
subnetPrefix []string
subnetMask []string
serverToVmIDs map[string]mapset.Set
Expand Down Expand Up @@ -168,6 +169,7 @@ func NewPlatformRawData() *PlatformRawData {
lbIDs: mapset.NewSet(),
natIDs: mapset.NewSet(),
podServicePortIDs: mapset.NewSet(),
processIDs: mapset.NewSet(),
serverToVmIDs: make(map[string]mapset.Set),
floatingIPs: make(map[int]*IPData),
podServiceIDToPodGroupPortIDs: make(map[int]mapset.Set),
Expand Down Expand Up @@ -1021,6 +1023,16 @@ func (r *PlatformRawData) ConvertSkipVTapVIfIDs(dbDataCache *DBDataCache) {
log.Debug(r.launchServerToSkipInterface)
}

func (r *PlatformRawData) ConvertDBProcesses(dbDataCache *DBDataCache) {
processes := dbDataCache.GetProcesses()
if processes == nil {
return
}
for _, process := range processes {
r.processIDs.Add(process.ID)
}
}

// 有依赖 需要按顺序convert
func (r *PlatformRawData) ConvertDBCache(dbDataCache *DBDataCache) {
r.ConvertHost(dbDataCache)
Expand All @@ -1047,6 +1059,7 @@ func (r *PlatformRawData) ConvertDBCache(dbDataCache *DBDataCache) {
r.ConvertDBVmPodNodeConn(dbDataCache)
r.ConvertDBVipDomain(dbDataCache)
r.ConvertSkipVTapVIfIDs(dbDataCache)
r.ConvertDBProcesses(dbDataCache)
}

func (r *PlatformRawData) checkIsVip(ip string, vif *models.VInterface, platformVips []string) bool {
Expand Down Expand Up @@ -1114,7 +1127,8 @@ func (r *PlatformRawData) vInterfaceToProto(
PodNodeId: proto.Uint32(uint32(device.PodNodeID)),
PodId: proto.Uint32(uint32(device.PodID)),
IsVipInterface: proto.Bool(ipResourceData.isVipInterface),
NetnsId: proto.Uint32(uint32(vif.NetnsID)),
NetnsId: proto.Uint32(vif.NetnsID),
VtapId: proto.Uint32(vif.VtapID),
}
sInterface := &trident.Interface{
Id: proto.Uint32(uint32(vif.ID)),
Expand Down Expand Up @@ -1434,6 +1448,11 @@ func (r *PlatformRawData) equal(o *PlatformRawData) bool {
return false
}

if !r.processIDs.Equal(o.processIDs) {
log.Info("platform processes changed")
return false
}

if len(r.podServiceIDToPodGroupPortIDs) != len(o.podServiceIDToPodGroupPortIDs) {
log.Info("platform pod service pod group ports changed")
return false
Expand Down
Loading

0 comments on commit 8c14619

Please sign in to comment.