Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4964 add information about lacp connection status for lag ports #5040

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
34 changes: 34 additions & 0 deletions docker/db-migration/migrations/025-add-lacp-partner-class.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
databaseChangeLog:
- changeSet:
id: tag
author: dkakollu
changes:
- tagDatabase:
tag: 025-add-lacp-partner-class

- changeSet:
id: add-lacp-partner-class
author: dkakollu
changes:
- sql: "CREATE CLASS lacp_partner IF NOT EXISTS EXTENDS V"
- sql: "CREATE PROPERTY lacp_partner.switch_id IF NOT EXISTS STRING"
- sql: "CREATE PROPERTY lacp_partner.logical_port_number IF NOT EXISTS INTEGER"
- sql: "CREATE PROPERTY lacp_partner.system_priority IF NOT EXISTS INTEGER"
- sql: "CREATE PROPERTY lacp_partner.system_id IF NOT EXISTS STRING"
- sql: "CREATE PROPERTY lacp_partner.key IF NOT EXISTS INTEGER"
- sql: "CREATE PROPERTY lacp_partner.port_priority IF NOT EXISTS INTEGER"
- sql: "CREATE PROPERTY lacp_partner.port_number IF NOT EXISTS INTEGER"
- sql: "CREATE PROPERTY lacp_partner.state_active IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_short_timeout IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_aggregatable IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_synchronised IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_collecting IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_distributing IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_defaulted IF NOT EXISTS BOOLEAN"
- sql: "CREATE PROPERTY lacp_partner.state_expired IF NOT EXISTS BOOLEAN"
- sql: "CREATE INDEX lacp_partner.switch_id IF NOT EXISTS NOTUNIQUE_HASH_INDEX"
- sql: "CREATE INDEX lacp_partner.logical_port_number IF NOT EXISTS NOTUNIQUE_HASH_INDEX"

rollback:
- sql: "DELETE VERTEX lacp_partner"
- sql: "DROP CLASS lacp_partner IF EXISTS"
3 changes: 3 additions & 0 deletions docker/db-migration/migrations/root.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,6 @@ databaseChangeLog:
- include:
relativeToChangelogFile: true
file: 023-add-lacp-reply-into-lag-class.yaml
- include:
relativeToChangelogFile: true
file: 025-add-lacp-partner-class.yaml
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure when migration 024 will be merged so please change your migration name to 024

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated to 024

Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright 2017 Telstra Open Source
dmitrii-beliakov marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2023

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.messaging.info.event;

import org.openkilda.messaging.info.InfoData;
import org.openkilda.model.SwitchId;

import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;



@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class LacpInfoData extends InfoData {
SwitchId switchId;
int logicalPortNumber;
LacpPartner actor;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* Copyright 2021 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.messaging.info.event;

import org.openkilda.model.MacAddress;

import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class LacpPartner implements Serializable {
int systemPriority;
MacAddress systemId;
int key;
int portPriority;
int portNumber;
LacpState state;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/* Copyright 2021 Telstra Open Source
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openkilda.messaging.info.event;

import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
public class LacpState implements Serializable {
boolean active;
boolean shortTimeout;
boolean aggregatable;
boolean synchronised;
boolean collecting;
boolean distributing;
boolean defaulted;
boolean expired;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,38 @@

public class ConnectedDevicesTopology extends AbstractTopology<ConnectedDevicesTopologyConfig> {
public static final String CONNECTED_DEVICES_SPOUT_ID = "connected-devices-spout";
public static final String LACP_SPOUT_ID = "lacp-spout";
public static final String ROUTER_BOLT_ID = "router-bolt";
public static final String PACKET_BOLT_ID = "packet-bolt";

public ConnectedDevicesTopology(LaunchEnvironment env) {
super(env, "connecteddevices-topology", ConnectedDevicesTopologyConfig.class);
}

/**
* Main method to run topology.
*/
public static void main(String[] args) {
try {
LaunchEnvironment env = new LaunchEnvironment(args);
(new ConnectedDevicesTopology(env)).setup();
} catch (Exception e) {
System.exit(handleLaunchException(e));
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you move main method from the end of the file?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its formatted by IntelliJ, Updated.

/**
* Creating topology.
*/
public StormTopology createTopology() {
TopologyBuilder builder = new TopologyBuilder();
PersistenceManager persistenceManager = new PersistenceManager(configurationProvider);

createZkSpout(builder);

createSpout(builder);
createLacpSpout(builder);

PersistenceManager persistenceManager = new PersistenceManager(configurationProvider);
createRouterBolt(builder, persistenceManager);
createPacketBolt(builder, persistenceManager);

Expand Down Expand Up @@ -80,6 +95,10 @@ private void createSpout(TopologyBuilder builder) {
declareKafkaSpout(builder, topologyConfig.getKafkaTopoConnectedDevicesTopic(), CONNECTED_DEVICES_SPOUT_ID);
}

private void createLacpSpout(TopologyBuilder builder) {
declareKafkaSpout(builder, topologyConfig.getKafkaLacpTopic(), LACP_SPOUT_ID);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You forgot to add LACP_SPOUT_ID into input of RouterBolt. It should be

    private void createRouterBolt(TopologyBuilder builder, PersistenceManager persistenceManager) {
        RouterBolt routerBolt = new RouterBolt(persistenceManager, ZooKeeperSpout.SPOUT_ID);
        declareBolt(builder, routerBolt, ROUTER_BOLT_ID)
                .shuffleGrouping(CONNECTED_DEVICES_SPOUT_ID)
                .shuffleGrouping(LACP_SPOUT_ID)
                .allGrouping(ZooKeeperSpout.SPOUT_ID);
    }

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

}

private void createZkBolt(TopologyBuilder builder) {
ZooKeeperBolt zooKeeperBolt = new ZooKeeperBolt(getConfig().getBlueGreenMode(), getZkTopoName(),
getZookeeperConfig(), getBoltInstancesCount(ROUTER_BOLT_ID));
Expand All @@ -91,16 +110,4 @@ private void createZkBolt(TopologyBuilder builder) {
protected String getZkTopoName() {
return "connecteddevices";
}

/**
* Main method to run topology.
*/
public static void main(String[] args) {
try {
LaunchEnvironment env = new LaunchEnvironment(args);
(new ConnectedDevicesTopology(env)).setup();
} catch (Exception e) {
System.exit(handleLaunchException(e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,9 @@ public interface ConnectedDevicesTopologyConfig extends AbstractTopologyConfig {
default String getKafkaTopoConnectedDevicesTopic() {
return getKafkaTopics().getTopoConnectedDevicesTopic();
}

default String getKafkaLacpTopic() {
return getKafkaTopics().getLacpTopic();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import static org.openkilda.wfm.topology.utils.KafkaRecordTranslator.FIELD_ID_PAYLOAD;

import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.info.event.ArpInfoData;
import org.openkilda.messaging.info.event.ConnectedDevicePacketBase;
import org.openkilda.messaging.info.event.LacpInfoData;
import org.openkilda.messaging.info.event.LldpInfoData;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.AbstractBolt;
Expand All @@ -43,12 +44,14 @@ protected void init() {

@Override
protected void handleInput(Tuple input) throws PipelineException {
ConnectedDevicePacketBase data = pullValue(input, FIELD_ID_PAYLOAD, ConnectedDevicePacketBase.class);
InfoData data = pullValue(input, FIELD_ID_PAYLOAD, InfoData.class);

if (data instanceof LldpInfoData) {
packetService.handleLldpData((LldpInfoData) data);
} else if (data instanceof ArpInfoData) {
packetService.handleArpData((ArpInfoData) data);
} else if (data instanceof LacpInfoData) {
packetService.handleLacpData((LacpInfoData) data);
} else {
unhandledInput(input);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.openkilda.messaging.info.InfoData;
import org.openkilda.messaging.info.InfoMessage;
import org.openkilda.messaging.info.event.ArpInfoData;
import org.openkilda.messaging.info.event.LacpInfoData;
import org.openkilda.messaging.info.event.LldpInfoData;
import org.openkilda.persistence.PersistenceManager;
import org.openkilda.wfm.AbstractBolt;
Expand Down Expand Up @@ -57,8 +58,8 @@ protected void handleInput(Tuple input) throws PipelineException {
emitWithContext(PACKET_STREAM_ID, input, new Values(createMessageKey((LldpInfoData) data), data));
} else if (data instanceof ArpInfoData) {
emitWithContext(PACKET_STREAM_ID, input, new Values(createMessageKey((ArpInfoData) data), data));
} else {
unhandledInput(input);
} else if (data instanceof LacpInfoData) {
emitWithContext(PACKET_STREAM_ID, input, new Values(createMessageKey((LacpInfoData) data), data));
}
} else {
unhandledInput(input);
Expand Down
Loading