Skip to content

Commit 9439080

Browse files
committed
1.add inbound and outbound message count capability 2.enable bash script to pass variable to select algorithm
1 parent 6c56a3b commit 9439080

7 files changed

+107
-14
lines changed

here_we_go.sh

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@ tell application "iTerm2"
2626
# create clients
2727
delay 0.5
2828
write text "cd ./src/main/java"
29-
write text "java -Dlog4j.configurationFile=../resources/log4j2.xml -cp ../../../lib/*:. MainApp client 0"
29+
write text "java -Dlog4j.configurationFile=../resources/log4j2.xml -cp ../../../lib/*:. MainApp client 0 " & "$1"
3030
repeat with i from 0 to 3
3131
tell application "System Events" to keystroke "d" using {command down}
3232
delay 0.5
3333
write text "cd ./src/main/java"
34-
write text "java -Dlog4j.configurationFile=../resources/log4j2.xml -cp ../../../lib/*:. MainApp client " & (i+1)
34+
write text "java -Dlog4j.configurationFile=../resources/log4j2.xml -cp ../../../lib/*:. MainApp client " & (i+1) & " " & "$1"
3535
end repeat
3636
# init all connections by executing "connect" and moving left
3737
repeat with i from 0 to 3

src/main/java/MainApp.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import constant.Constant;
22
import network.server.MutualExclusionClient;
33
import network.server.MutualExclusionServer;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.apache.logging.log4j.Logger;
46
import util.FileUtil;
57

68
import java.io.File;
@@ -12,6 +14,7 @@
1214
* Usage: Main entrance to run different distributed algorithms
1315
*/
1416
public class MainApp {
17+
private static Logger logger;
1518
public static void main(String[] args) {
1619
String serverType = args[0];
1720
switch (serverType) {
@@ -47,6 +50,8 @@ public static void main(String[] args) {
4750
}
4851
case Constant.CLIENT: {
4952
int clientId = Integer.parseInt(args[1]);
53+
logger = LogManager.getLogger("client" + clientId + "_logger");
54+
String algoType = args[2];
5055
int clientNum = 5;
5156
int serverNum = 3;
5257
int fileNum = 2;
@@ -67,20 +72,27 @@ public static void main(String[] args) {
6772
}
6873

6974
clientSideServer.initConnection();
70-
clientSideServer.initMEList();
75+
clientSideServer.setMEList(algoType);
76+
// clientSideServer.initMEList();
7177

78+
long startTimestamp = Long.MIN_VALUE;
7279
while (true) {
7380
command = scanner.nextLine();
7481
if (command.equals(Constant.COMMAND_RECONNECT)) {
7582
clientSideServer.closeConnection();
76-
clientSideServer.clearMEList();
83+
// clientSideServer.clearMEList();
7784

7885
clientSideServer.initConnection();
79-
clientSideServer.initMEList();
86+
// clientSideServer.initMEList();
8087
} else if (command.equals(Constant.COMMAND_CLOSE)) {
88+
logger.trace("total inbound msg count: " + clientSideServer.allMsgCount(Constant.COUNT_INBOUND_MSG));
89+
logger.trace("total outbound msg count: " + clientSideServer.allMsgCount(Constant.COUNT_OUTBOUND_MSG));
8190
clientSideServer.closeConnection();
8291
break;
8392
} else {
93+
startTimestamp = System.currentTimeMillis();
94+
System.out.println("start running algorithm: " + algoType);
95+
clientSideServer.setStartTimestamp(startTimestamp);
8496
clientSideServer.executeOperation();
8597
}
8698
}

src/main/java/algorithm/LamportMutualExclusion.java

+1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ private void checkIncomingMsgSeqNum(int otherClientId, int otherSeqNum) {
187187
private void putIntoOutboundBlockingQueue(int target, String msg) {
188188
try {
189189
outboundBlockingQueueMap.get(target).put(msg);
190+
outboundMsgCount++;
190191
logger.trace("insert outbound msg and prepare to send to client " + target + " msg: " + msg);
191192
} catch (InterruptedException ex) {
192193
ex.printStackTrace();

src/main/java/algorithm/MutexBase.java

+19
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ public abstract class MutexBase {
2323
private LinkedBlockingQueue<String> inboundMsgBlockingQueue;
2424
protected Map<Integer, LinkedBlockingQueue<String>> outboundBlockingQueueMap;
2525
private boolean closeHandler;
26+
protected int inboundMsgCount;
27+
protected int outboundMsgCount;
2628

2729
public MutexBase(int me, int fileId,
2830
Map<Integer, Connection> clientConnMap,
@@ -45,6 +47,7 @@ public void run() {
4547
while (!closeHandler) {
4648
try {
4749
String message = inboundMsgBlockingQueue.take();
50+
inboundMsgCount++;
4851
logger.trace("handle inbound msg: " + message);
4952
handleMsg(message);
5053
} catch (InterruptedException ex) {
@@ -63,6 +66,22 @@ public void tearDown() {
6366
closeHandler = true;
6467
}
6568

69+
/**
70+
* get inbound message count
71+
* @return message count
72+
*/
73+
public int getInboundMsgCount() {
74+
return inboundMsgCount;
75+
}
76+
77+
/**
78+
* get outbound message count
79+
* @return message count
80+
*/
81+
public int getOutboundMsgCount() {
82+
return outboundMsgCount;
83+
}
84+
6685
/**
6786
* handle inbound message in various mutex algorithm
6887
* @param msg message

src/main/java/algorithm/RaAlgoWithCrOptimization.java

+1
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ private void releaseResource() {
209209
private void putIntoOutboundBlockingQueue(int target, String msg) {
210210
try {
211211
outboundBlockingQueueMap.get(target).put(msg);
212+
outboundMsgCount++;
212213
logger.trace("insert outbound msg and prepare to send to client " + target + " msg: " + msg);
213214
} catch (InterruptedException ex) {
214215
ex.printStackTrace();

src/main/java/constant/Constant.java

+6
Original file line numberDiff line numberDiff line change
@@ -38,4 +38,10 @@ public class Constant {
3838
public static final String COMMAND_RECONNECT = "reconnect";
3939
public static final String COMMAND_INVALID = "invalid command";
4040
public static final String COMMAND_CLOSE = "close";
41+
42+
public static final String ALGO_RA_WITH_OPTIMIZATION = "ra_with_optimization";
43+
public static final String ALGO_LAMPORT = "lamport";
44+
45+
public static final String COUNT_INBOUND_MSG = "count_inbound_msg";
46+
public static final String COUNT_OUTBOUND_MSG = "count_outbound_msg";
4147
}

src/main/java/network/server/MutualExclusionClient.java

+63-9
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class MutualExclusionClient extends BaseServer {
4646
private Map<Integer, LinkedBlockingQueue<String>> outboundBlockingQueueMap = new HashMap<>();
4747
private Semaphore writeCountMutex = new Semaphore(1);
4848
private volatile int writeResponseCount;
49+
private long startTimestamp;
4950

5051
public MutualExclusionClient(int clientId, int clientNum, int serverNum, int fileNum, int opCount) {
5152
super(Constant.BASE_CLIENT_PORT + clientId, Constant.CLIENT);
@@ -152,17 +153,40 @@ public void closeConnection() {
152153

153154
}
154155

156+
public void setStartTimestamp(long timestamp) {
157+
startTimestamp = timestamp;
158+
}
159+
155160
/**
156-
* init mutual exclusion algorithm list
157-
* each file has its own mutual exclusion algorithm object
161+
* set mutual exclusion algorithm object list
162+
* @param type mutual exclusion algorithm type
158163
*/
159-
public void initMEList() {
160-
for (int i = 0; i < fileNum; i++) {
161-
LinkedBlockingQueue<String> inboundMsgBlockingQueue = new LinkedBlockingQueue<>();
162-
blockingQueueList.add(inboundMsgBlockingQueue);
163-
MutexBase mutexBase =
164-
new LamportMutualExclusion(clientId, i, clientConnMap, this, inboundMsgBlockingQueue, outboundBlockingQueueMap);
165-
meAlgoList.add(mutexBase);
164+
public void setMEList(String type) {
165+
switch (type) {
166+
case Constant.ALGO_LAMPORT: {
167+
clearMEList();
168+
for (int i = 0; i < fileNum; i++) {
169+
LinkedBlockingQueue<String> inboundMsgBlockingQueue = new LinkedBlockingQueue<>();
170+
blockingQueueList.add(inboundMsgBlockingQueue);
171+
MutexBase mutexBase =
172+
new LamportMutualExclusion(clientId, i, clientConnMap, this, inboundMsgBlockingQueue, outboundBlockingQueueMap);
173+
meAlgoList.add(mutexBase);
174+
}
175+
break;
176+
}
177+
case Constant.ALGO_RA_WITH_OPTIMIZATION: {
178+
clearMEList();
179+
for (int i = 0; i < fileNum; i++) {
180+
LinkedBlockingQueue<String> inboundMsgBlockingQueue = new LinkedBlockingQueue<>();
181+
blockingQueueList.add(inboundMsgBlockingQueue);
182+
MutexBase mutexBase =
183+
new RaAlgoWithCrOptimization(clientId, clientNum, i, clientConnMap, this, inboundMsgBlockingQueue, outboundBlockingQueueMap);
184+
meAlgoList.add(mutexBase);
185+
}
186+
break;
187+
}
188+
default:
189+
break;
166190
}
167191
}
168192

@@ -180,6 +204,8 @@ public void executeOperation() {
180204
if (curOpCount >= opCount) {
181205
curOpCount = 0;
182206
System.out.println("finish all " + opCount + " operations");
207+
long endTimestamp = System.currentTimeMillis();
208+
logger.trace("finish running algorithm, total time: " + (endTimestamp - startTimestamp));
183209
return;
184210
}
185211
int opId = Util.genRandom(3);
@@ -239,6 +265,34 @@ public void checkWriteFinish() {
239265
}
240266
}
241267

268+
269+
/**
270+
* calculate inbound or outbound message total count
271+
* @param type type
272+
* @return count
273+
*/
274+
public int allMsgCount(String type) {
275+
int totalCount = 0;
276+
switch (type) {
277+
case Constant.COUNT_INBOUND_MSG: {
278+
for (MutexBase mutexBase : meAlgoList) {
279+
totalCount += mutexBase.getInboundMsgCount();
280+
}
281+
break;
282+
}
283+
case Constant.COUNT_OUTBOUND_MSG: {
284+
for (MutexBase mutexBase : meAlgoList) {
285+
totalCount += mutexBase.getOutboundMsgCount();
286+
}
287+
break;
288+
}
289+
default:
290+
break;
291+
}
292+
293+
return totalCount;
294+
}
295+
242296
/**
243297
* send different operation request
244298
* @param type operation type

0 commit comments

Comments
 (0)