diff --git a/.gitignore.save b/.gitignore.save deleted file mode 100644 index 81cf7cb..0000000 --- a/.gitignore.save +++ /dev/null @@ -1,55 +0,0 @@ -target/ - -###################### -## Intellj ignore -###################### - - - -# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm -# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 - -# User-specific stuff: -.idea/workspace.xml -.idea/tasks.xml -.idea/dictionaries -.idea/vcs.xml -.idea/jsLibraryMappings.xml - -# Sensitive or high-churn files: -.idea/dataSources.ids -.idea/dataSources.xml -.idea/dataSources.local.xml -.idea/sqlDataSources.xml -.idea/dynamic.xml -.idea/uiDesigner.xml - -# Gradle: -.idea/gradle.xml -.idea/libraries - -# Mongo Explorer plugin: -.idea/mongoSettings.xml - -## File-based project format: -*.iws - -## Plugin-specific files: - -# IntelliJ -/out/ - -# mpeltonen/sbt-idea plugin -.idea_modules/ - -# JIRA plugin -atlassian-ide-plugin.xml - -# Crashlytics plugin (for Android Studio and IntelliJ) -com_crashlytics_export_strings.xml -crashlytics.properties -crashlytics-build.properties -fabric.properties - - - diff --git a/.idea/.name b/.idea/.name deleted file mode 100644 index 292531e..0000000 --- a/.idea/.name +++ /dev/null @@ -1 +0,0 @@ -PAD-FileSystem \ No newline at end of file diff --git a/PAD-FileSystem.iml b/.idea/PAD-FileSystem.iml similarity index 83% rename from PAD-FileSystem.iml rename to .idea/PAD-FileSystem.iml index a73a35e..dce1d50 100644 --- a/PAD-FileSystem.iml +++ b/.idea/PAD-FileSystem.iml @@ -1,22 +1,20 @@ - + - - - - + - + + diff --git a/.idea/compiler.xml b/.idea/compiler.xml index ea0ffad..b0d808c 100644 --- a/.idea/compiler.xml +++ b/.idea/compiler.xml @@ -22,15 +22,14 @@ + + - + - - \ No newline at end of file diff --git a/.idea/encodings.xml b/.idea/encodings.xml deleted file mode 100644 index 97626ba..0000000 --- a/.idea/encodings.xml +++ /dev/null @@ -1,6 +0,0 @@ - - - - - - \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml index a0b0171..d45d835 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -20,7 +20,5 @@ - - - + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml index 7f11f02..84f0345 100644 --- a/.idea/modules.xml +++ b/.idea/modules.xml @@ -2,7 +2,8 @@ - + + diff --git a/.idea/runConfigurations/GossipRunner0.xml b/.idea/runConfigurations/GossipRunner0.xml deleted file mode 100644 index 0c00d51..0000000 --- a/.idea/runConfigurations/GossipRunner0.xml +++ /dev/null @@ -1,17 +0,0 @@ - - - - - \ No newline at end of file diff --git a/app/app.iml b/app/app.iml new file mode 100644 index 0000000..1ec4769 --- /dev/null +++ b/app/app.iml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/app/pom.xml b/app/pom.xml new file mode 100644 index 0000000..72f2b59 --- /dev/null +++ b/app/pom.xml @@ -0,0 +1,15 @@ + + + + pad-fs + com.dido.code.pad + 1.0-SNAPSHOT + + 4.0.0 + + app + + + \ No newline at end of file diff --git a/app/src/main/java/AppRunner.java b/app/src/main/java/AppRunner.java new file mode 100644 index 0000000..bcaf48b --- /dev/null +++ b/app/src/main/java/AppRunner.java @@ -0,0 +1,9 @@ +/** + * Created by dido-ubuntu on 22/03/16. + */ +public class AppRunner { + + public static void main(String[] args){ + System.out.println("_--------------App runner-----------------"); + } +} diff --git a/core/core.iml b/core/core.iml index ecb0a18..1ec4769 100644 --- a/core/core.iml +++ b/core/core.iml @@ -11,12 +11,13 @@ - + - + + diff --git a/core/src/main/java/com/dido/pad/Node.java b/core/src/main/java/com/dido/pad/Node.java index 72ee019..364e3b8 100644 --- a/core/src/main/java/com/dido/pad/Node.java +++ b/core/src/main/java/com/dido/pad/Node.java @@ -1,25 +1,20 @@ package com.dido.pad; import com.dido.pad.datamessages.AppMsg; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; import com.google.code.gossip.*; -import com.google.code.gossip.event.GossipListener; import com.google.code.gossip.event.GossipState; import com.google.code.gossip.manager.GossipManager; import org.apache.log4j.Logger; -import java.io.IOException; import java.net.*; -import java.nio.ByteBuffer; import java.util.List; /** * Created by dido-ubuntu on 05/03/16. */ -public class Node { +public class Node { public static final Logger LOGGER = Logger.getLogger(Node.class); @@ -34,7 +29,7 @@ public class Node { private int numReplicas; // Empty constructor for jackson parser to JSON - public Node(){ + public Node() { } public int getNumReplicas() { @@ -45,17 +40,17 @@ public void setNumReplicas(int numReplicas) { this.numReplicas = numReplicas; } - public Node(String ipAddress, String id, int portStorage, int portGossip, List gossipMembers){ - this(ipAddress, id, portStorage,portGossip, LogLevel.CONFIG_INFO, gossipMembers, new GossipSettings()); + public Node(String ipAddress, String id, int portStorage, int portGossip, List gossipMembers) { + this(ipAddress, id, portStorage, portGossip, gossipMembers, new GossipSettings()); } - public Node(String ipAddress, String id, int portStorage, int portGossip, String level, List gossipMembers, GossipSettings settings) { + public Node(String ipAddress, String id, int portStorage, int portGossip, List gossipMembers, GossipSettings settings) { this.ipAddress = ipAddress; this.id = id; this.portStorage = portStorage; this.portGossip = portGossip; try { - _gossipService = new GossipService(ipAddress,portGossip,id, LogLevel.fromString(level),gossipMembers, settings,this::gossipEvent); + _gossipService = new GossipService(ipAddress, this.portGossip, id, gossipMembers, settings, this::gossipEvent); _storageService = new StorageService(this, gossipMembers); _storageService.addServer(this); @@ -64,42 +59,41 @@ public Node(String ipAddress, String id, int portStorage, int portGossip, String } } - public void start(){ + public void start() { _gossipService.start(); _storageService.start(); } // Node from a GossipMember. // Used when a GossipMemeber goes UP - public Node(GossipMember member){ + public Node(GossipMember member) { this.ipAddress = member.getHost(); - this.id = member.getId(); + this.id = member.getId(); this.portStorage = Helper.STORAGE_PORT; this.portGossip = member.getPort(); - } +// +// private void startGossipService(int logLevel, List gossipMembers, GossipSettings settings, GossipListener listener) +// throws UnknownHostException, InterruptedException { +// _gossipService.start(); +// +// } - private void startGossipService(int logLevel, List gossipMembers, GossipSettings settings, GossipListener listener) - throws UnknownHostException, InterruptedException { - _gossipService.start(); - - } - - public GossipManager getGossipmanager(){ + public GossipManager getGossipmanager() { return _gossipService.get_gossipManager(); } - - // only for test the storage service - private void startStorageService(){ - this._storageService.start(); - } - - //only for test the storage system - public StorageService getStorageService(){ - return this._storageService; - } +// +// // only for test the storage service +// private void startStorageService() { +// this._storageService.start(); +// } +// +// //only for test the storage system +// public StorageService getStorageService() { +// return this._storageService; +// } public String getIpAddress() { return ipAddress; @@ -117,25 +111,27 @@ public void setId(String id) { this.id = id; } - public void shutdown(){ - if(_gossipService != null) + public void shutdown() { + if (_gossipService != null) _gossipService.shutdown(); - if(_storageService != null) + if (_storageService != null) _storageService.shutdown(); } - + public void sendToStorage(AppMsg msg){ + _storageService.sendToMe(msg); + } /* callback of gossiping procedure if a node goes UP or DOWN */ - public void gossipEvent(GossipMember member, GossipState state) { + private void gossipEvent(GossipMember member, GossipState state) { switch (state) { case UP: _storageService.addServer(new Node(member)); - Node.LOGGER.info(this.getIpAddress() + "- UP event, node "+member.getHost()+" added to consistent hasher"); + Node.LOGGER.info(this.getIpAddress() + "- UP event, node " + member.getHost() + " added to consistent hasher"); break; case DOWN: System.out.println("REMOVED DOWN "); _storageService.removeServer(new Node(member)); - Node.LOGGER.info(this.getIpAddress()+"- DOWN event, node "+member.getHost()+" removed from consistent hasher"); + Node.LOGGER.info(this.getIpAddress() + "- DOWN event, node " + member.getHost() + " removed from consistent hasher"); break; } } @@ -148,8 +144,7 @@ public boolean equals(Object o) { Node node = (Node) o; - if (!ipAddress.equals(node.ipAddress)) return false; - return id.equals(node.id); + return ipAddress.equals(node.ipAddress) && id.equals(node.id); } @@ -168,64 +163,16 @@ public String toString() { '}'; } - public int getPortStorage(){ + public int getPortStorage() { return this.portStorage; } - public void setPortStorage( int portStorage){ + public void setPortStorage(int portStorage) { this.portStorage = portStorage; } - public void sendToStorageNode(AppMsg msg){ - /* send message to the same storage node */ - //System.out.println("sendToStorage from same node "+this.ipAddress); - this.send(this.ipAddress, getPortStorage(),msg); - } - - public void send(String destIp, int destPort, AppMsg msg){ - try { - - InetAddress address = InetAddress.getByName(destIp); - - if(msg.getIpSender() == null) - msg.setIpSender(this.ipAddress); - - ObjectMapper mapper = new ObjectMapper().registerModule(new Jdk8Module()); - byte[] jsonByte = mapper.writeValueAsBytes(msg); - - int packet_length = jsonByte.length; - - // Convert the packet length to the byte representation of the int. - byte[] length_bytes = new byte[4]; - length_bytes[0] = (byte) (packet_length >> 24); - length_bytes[1] = (byte) ((packet_length << 8) >> 24); - length_bytes[2] = (byte) ((packet_length << 16) >> 24); - length_bytes[3] = (byte) ((packet_length << 24) >> 24); - - ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonByte.length); - byteBuffer.put(length_bytes); - byteBuffer.put(jsonByte); - byte[] buf = byteBuffer.array(); - - // / Initialize a datagram packet with data and address - DatagramSocket dsocket = new DatagramSocket(); - DatagramPacket packet = new DatagramPacket(buf, buf.length, address, destPort); - dsocket.send(packet); - dsocket.close(); - - } catch (UnknownHostException e) { - e.printStackTrace(); - } catch (SocketException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - - - } - public StorageService get_storageService() { return _storageService; } diff --git a/core/src/main/java/com/dido/pad/PadFsNode.java b/core/src/main/java/com/dido/pad/PadFsNode.java index 3518714..b3fd045 100644 --- a/core/src/main/java/com/dido/pad/PadFsNode.java +++ b/core/src/main/java/com/dido/pad/PadFsNode.java @@ -2,7 +2,6 @@ import com.beust.jcommander.JCommander; import com.google.code.gossip.GossipSettings; -import com.google.code.gossip.LogLevel; /** @@ -16,7 +15,7 @@ public static void main(String[] args){ ParseArgs jct = new ParseArgs(); new JCommander(jct, args); - Node node = new Node(jct.getIp(), jct.getId(), jct.getStoragePort(), jct.getGossipPort(),LogLevel.CONFIG_INFO, jct.getGossipMember(), new GossipSettings()); + Node node = new Node(jct.getIp(), jct.getId(), jct.getStoragePort(), jct.getGossipPort(), jct.getGossipMember(), new GossipSettings()); node.start(); } diff --git a/core/src/main/java/com/dido/pad/PersistentStorage.java b/core/src/main/java/com/dido/pad/PersistentStorage.java new file mode 100644 index 0000000..5379f23 --- /dev/null +++ b/core/src/main/java/com/dido/pad/PersistentStorage.java @@ -0,0 +1,33 @@ +package com.dido.pad; + +import com.dido.pad.data.Versioned; + +import java.util.HashMap; + +/** + * Created by dido-ubuntu on 10/03/16. + */ +public class PersistentStorage { + + private HashMap database; + + public PersistentStorage() { + this.database = new HashMap<>(); + } + + public HashMap getStorage(){ + return database; + } + + public void put(Versioned v){ + database.put(v.getData().getKey(), v); + } + + public boolean containsKey(String key){ return database.containsKey(key);} + + public Versioned get(String key){ return database.get(key); } + + public void update(Versioned v){ + database.put(v.getData().getKey(),v); + } +} diff --git a/core/src/main/java/com/dido/pad/StorageService.java b/core/src/main/java/com/dido/pad/StorageService.java index 8069409..6d43a40 100644 --- a/core/src/main/java/com/dido/pad/StorageService.java +++ b/core/src/main/java/com/dido/pad/StorageService.java @@ -1,8 +1,9 @@ package com.dido.pad; -import com.dido.pad.VectorClocks.Versioned; +import com.dido.pad.consistenthashing.DefaultFunctions; +import com.dido.pad.data.StorageData; +import com.dido.pad.data.Versioned; import com.dido.pad.consistenthashing.Hasher; -import com.dido.pad.consistenthashing.iHasher; import com.dido.pad.datamessages.*; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.ObjectMapper; @@ -22,19 +23,17 @@ /** * Created by dido-ubuntu on 07/03/16. */ - - -public class StorageService extends Thread{ +public class StorageService extends Thread { public static final Logger LOGGER = Logger.getLogger(StorageService.class); public int N_REPLICAS = 2; - public int WRITE_NODES = 1 ; + public int WRITE_NODES = 1; public int READ_NODES = 2; private Hasher cHasher; - private PersisentStorage storage; + private PersistentStorage storage; private DatagramSocket udpServer; private AtomicBoolean keepRunning; @@ -42,38 +41,38 @@ public class StorageService extends Thread{ private Node myNode; - public StorageService(Node node, List seedNodes){ + public StorageService(Node node, List seedNodes) { - this.cHasher = new Hasher<>(Helper.NUM_NODES_VIRTUALS,iHasher.SHA1,iHasher.getNodeToBytesConverter()); + this.cHasher = new Hasher<>(Helper.NUM_NODES_VIRTUALS, DefaultFunctions::SHA1, DefaultFunctions::BytesConverter); this.myNode = node; - storage = new PersisentStorage(); + storage = new PersistentStorage(); // ADD seed nodes to the node storage service for (GossipMember member : seedNodes) { Node n = new Node(member); - if(!cHasher.containsNode(n)) + if (!cHasher.containsNode(n)) cHasher.addServer(n); } keepRunning = new AtomicBoolean(true); try { - SocketAddress sAddress= new InetSocketAddress(node.getIpAddress(), Helper.STORAGE_PORT); - StorageService.LOGGER.info(this.myNode.getIpAddress()+ "- initialized on portStorage "+ Helper.STORAGE_PORT); + SocketAddress sAddress = new InetSocketAddress(node.getIpAddress(), Helper.STORAGE_PORT); + StorageService.LOGGER.info(this.myNode.getIpAddress() + "- initialized on portStorage " + Helper.STORAGE_PORT); StorageService.LOGGER.debug("I'm " + node.toString()); udpServer = new DatagramSocket(sAddress); } catch (SocketException e) { - StorageService.LOGGER.error(this.myNode.getIpAddress()+ " - "+ e); + StorageService.LOGGER.error(this.myNode.getIpAddress() + " - " + e); keepRunning.set(false); udpServer = null; } } - public List getReplicasNodes(Node server, int replicas){ - return cHasher.getPreferenceList(server, replicas); + public List getReplicasNodes(Node server, int replicas) { + return cHasher.getPreferenceList(server, replicas); } - public PersisentStorage getStorage() { + public PersistentStorage getStorage() { return storage; } @@ -81,7 +80,7 @@ public void setPortStorage(int portStorage) { this.myNode.setPortStorage(portStorage); } - public int getPortStorage(){ + public int getPortStorage() { return this.myNode.getPortStorage(); } @@ -90,26 +89,66 @@ public Hasher getcHasher() { return cHasher; } - public void startStorageService(){ - this.start(); + public void addServer(Node n) { + cHasher.addServer(n); } + public void removeServer(Node n) { + cHasher.removeServer(n); + } - public void addServer(Node n){ - this.cHasher.addServer(n); + public void sendToMe(AppMsg msg) { + /* send message to the same storage node */ + //System.out.println("sendToStorage from same node "+this.ipAddress); + this.send(myNode.getIpAddress(), getPortStorage(), msg); } - public void removeServer(Node n){ - this.cHasher.removeServer(n); + private void send(String destIp, int destPort, AppMsg msg) { + try { + + InetAddress address = InetAddress.getByName(destIp); + + if (msg.getIpSender() == null) + msg.setIpSender(myNode.getIpAddress()); + + ObjectMapper mapper = new ObjectMapper().registerModule(new Jdk8Module()); + byte[] jsonByte = mapper.writeValueAsBytes(msg); + + int packet_length = jsonByte.length; + + // Convert the packet length to the byte representation of the int. + byte[] length_bytes = new byte[4]; + length_bytes[0] = (byte) (packet_length >> 24); + length_bytes[1] = (byte) ((packet_length << 8) >> 24); + length_bytes[2] = (byte) ((packet_length << 16) >> 24); + length_bytes[3] = (byte) ((packet_length << 24) >> 24); + + ByteBuffer byteBuffer = ByteBuffer.allocate(4 + jsonByte.length); + byteBuffer.put(length_bytes); + byteBuffer.put(jsonByte); + byte[] buf = byteBuffer.array(); + + // / Initialize a datagram packet with data and address + DatagramSocket dsocket = new DatagramSocket(); + DatagramPacket packet = new DatagramPacket(buf, buf.length, address, destPort); + dsocket.send(packet); + dsocket.close(); + + } catch (IOException e) { + e.printStackTrace(); + } + + } + @Override - public void run(){ + public void run() { - while(keepRunning.get()){ + while (keepRunning.get()) { - while(N_REPLICAS > cHasher.getAllNodes().size() ){ - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - Required "+N_REPLICAS+" backup node, found " +cHasher.getAllNodes().size()); + while (N_REPLICAS > cHasher.getAllNodes().size()) { + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - Required " + N_REPLICAS + " backup node, found " + cHasher.getAllNodes().size()); try { Thread.sleep(1000); } catch (InterruptedException e) { @@ -120,9 +159,9 @@ public void run(){ preferenceNodes = cHasher.getPreferenceList(this.myNode, N_REPLICAS); try { - byte [] buff = new byte[udpServer.getReceiveBufferSize()]; + byte[] buff = new byte[udpServer.getReceiveBufferSize()]; DatagramPacket p = new DatagramPacket(buff, buff.length); - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - waiting messages..."); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - waiting messages..."); udpServer.receive(p); int packet_length = 0; @@ -146,22 +185,22 @@ public void run(){ if (destNode.equals(this.myNode)) { /*store in my database, I'm the master*/ manageAppRequest(requestMsg); } else { /*forward message to another node*/ - destNode.sendToStorageNode(requestMsg); - StorageService.LOGGER.info( this.myNode.getIpAddress()+" -forwards msg to "+destNode.getIpAddress()); + destNode.get_storageService().sendToMe(requestMsg); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " -forwards msg to " + destNode.getIpAddress()); } } /* Request System message received*/ - else if(msg instanceof RequestSystemMsg) { - manageSystemRequest((RequestSystemMsg)msg); + else if (msg instanceof RequestSystemMsg) { + manageSystemRequest((RequestSystemMsg) msg); } /* Reply Application message received*/ - else if (msg instanceof ReplyAppMsg){ - ReplyAppMsg replyMsg = (ReplyAppMsg) msg; - manageAppReply(replyMsg); + else if (msg instanceof ReplyAppMsg) { + ReplyAppMsg replyMsg = (ReplyAppMsg) msg; + manageAppReply(replyMsg); } /* Reply System message received*/ - else if (msg instanceof ReplySystemMsg){ + else if (msg instanceof ReplySystemMsg) { ReplySystemMsg replyMsg = (ReplySystemMsg) msg; manageSystemReply(replyMsg); } @@ -177,7 +216,7 @@ else if (msg instanceof ReplySystemMsg){ } private void manageSystemReply(ReplySystemMsg replyMsg) { - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - REPLY SYSTEM received from "+ replyMsg.getIpSender()); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - REPLY SYSTEM received from " + replyMsg.getIpSender()); } @@ -187,30 +226,33 @@ private void manageSystemRequest(RequestSystemMsg msg) { case PUT: Versioned vData = msg.getVersionedData(); String key = vData.getData().getKey(); // key of the data - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - PUT SystemMsg received key:" +key); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - PUT SystemMsg received key:" + key); //TODO set the Primary master node into the vData ??? storage.put(vData); - myNode.send(msg.getIpSender(),Helper.QUORUM_PORT, new ReplySystemMsg(AppMsg.OPERATION.OK,myNode.getIpAddress(),Helper.STORAGE_PORT+1,"PUT Succesfully QUORUM")); + send(msg.getIpSender(), Helper.QUORUM_PORT, new ReplySystemMsg(AppMsg.OPERATION.OK, myNode.getIpAddress(), Helper.STORAGE_PORT + 1, "PUT Succesfully QUORUM")); /* else{ Versioned myVData = storage.get(key); //my version of the data +<<<<<<< HEAD:src/main/java/com/dido/pad/StorageService.java myVData.getVersion() +======= + myVData.getVersion() +>>>>>>> e3a879c91af528482e9b717aa9af5d0780951f7f:core/src/main/java/com/dido/pad/StorageService.java }*/ break; case GET: - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - GET SystemMsg received "); - if(storage.containsKey(msg.getKey())){ + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - GET SystemMsg received "); + if (storage.containsKey(msg.getKey())) { Versioned myData = storage.get(msg.getKey()); - ReplySystemMsg reply = new ReplySystemMsg(AppMsg.OPERATION.OK, myNode.getIpAddress(), Helper.STORAGE_PORT,myData); + ReplySystemMsg reply = new ReplySystemMsg(AppMsg.OPERATION.OK, myNode.getIpAddress(), Helper.STORAGE_PORT, myData); //TODO change from Storage-Port to Quorum Port (Read quorum) - myNode.send(msg.getIpSender(), Helper.QUORUM_PORT,reply); - } - else{ - String info = myNode.getIpAddress()+" - data is not present into my storage"; - RequestSystemMsg replyErr = new RequestSystemMsg(AppMsg.OPERATION.ERR, msg.getIpSender(),Helper.STORAGE_PORT,info); - myNode.send(msg.getIpSender(),Helper.QUORUM_PORT, replyErr); + send(msg.getIpSender(), Helper.QUORUM_PORT, reply); + } else { + String info = myNode.getIpAddress() + " - data is not present into my storage"; + RequestSystemMsg replyErr = new RequestSystemMsg(AppMsg.OPERATION.ERR, msg.getIpSender(), Helper.STORAGE_PORT, info); + send(msg.getIpSender(), Helper.QUORUM_PORT, replyErr); } break; case LIST: @@ -218,13 +260,13 @@ private void manageSystemRequest(RequestSystemMsg msg) { } } - private void manageAppReply(ReplyAppMsg msg){ + private void manageAppReply(ReplyAppMsg msg) { switch (msg.getOperation()) { case OK: - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - REPLY OK "+msg.getMsg()+" from "+msg.getIpSender()); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - REPLY OK " + msg.getMsg() + " from " + msg.getIpSender()); break; case ERR: - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - REPLY ERR "+msg.getMsg()); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - REPLY ERR " + msg.getMsg()); break; } } @@ -232,36 +274,57 @@ private void manageAppReply(ReplyAppMsg msg){ /** * The coordinator of the request manages the application message * sent by a client. + * * @param msg */ - private void manageAppRequest(RequestAppMsg msg){ + private void manageAppRequest(RequestAppMsg msg) { switch (msg.getOperation()) { - case PUT: - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - RECEIVED MSG "+msg.getOperation() +" <" + msg.getKey()+":"+msg.getValue()+"> from "+msg.getIpSender()); - - if(storage.containsKey(msg.getKey())){// UPDATE data Version +//<<<<<<< HEAD:src/main/java/com/dido/pad/StorageService.java + case PUT: + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - RECEIVED MSG " + msg.getOperation() + " <" + msg.getKey() + ":" + msg.getValue() + "> from " + msg.getIpSender()); + if (storage.containsKey(msg.getKey())) {// UPDATE data Version Versioned d = storage.get(msg.getKey()); - d.getData().setValue(msg.getValue()); - d.getVersion().incremenetVersion(myNode.getIpAddress()); - myNode.sendToStorageNode(new ReplyAppMsg(AppMsg.OPERATION.OK, " Updated succesfully ")); - } - else{ // PUT new object - Versioned vData = new Versioned<>(new StorageData(msg.getKey(), msg.getValue())); - vData.getVersion().incremenetVersion(myNode.getIpAddress()); - this.storage.put(vData); - StorageService.LOGGER.info(this.myNode.getIpAddress() + " - Inserted <" + msg.getKey() + ":" + msg.getValue() + "> into local database"); - - askQuorum(vData, Helper.QUORUM_PORT, AppMsg.OPERATION.PUT); - - myNode.send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " PUT <" + msg.getKey() + ":" + msg.getValue() + ">")); + d.setData(new StorageData<>(msg.getKey(), msg.getValue())); + d.getVersion().increment(myNode.getIpAddress()); + myNode.get_storageService().sendToMe(new ReplyAppMsg(AppMsg.OPERATION.OK, " Updated succesfully key:" + msg.getKey())); + } else { // PUT new object + Versioned vData = new Versioned(new StorageData<>(msg.getKey(), msg.getValue())); + vData.getVersion().increment(myNode.getIpAddress()); + this.storage.put(vData); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - Inserted <" + msg.getKey() + ":" + msg.getValue() + "> into local database"); + + //TODO send ok before Quorum Request : writable first policy + askQuorum(vData, Helper.QUORUM_PORT, AppMsg.OPERATION.PUT); + + send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " PUT <" + msg.getKey() + ":" + msg.getValue() + ">")); +//======= +// case PUT: +// StorageService.LOGGER.info( this.myNode.getIpAddress()+" - RECEIVED MSG "+msg.getOperation() +" <" + msg.getKey()+":"+msg.getValue()+"> from "+msg.getIpSender()); +// +// if(storage.containsKey(msg.getKey())){// UPDATE data Version +// Versioned d = storage.get(msg.getKey()); +// d.getData().setValue(msg.getValue()); +// d.getVersion().incremenetVersion(myNode.getIpAddress()); +// myNode.sendToStorageNode(new ReplyAppMsg(AppMsg.OPERATION.OK, " Updated succesfully ")); +// } +// else{ // PUT new object +// Versioned vData = new Versioned<>(new StorageData(msg.getKey(), msg.getValue())); +// vData.getVersion().incremenetVersion(myNode.getIpAddress()); +// this.storage.put(vData); +// StorageService.LOGGER.info(this.myNode.getIpAddress() + " - Inserted <" + msg.getKey() + ":" + msg.getValue() + "> into local database"); +// +// askQuorum(vData, Helper.QUORUM_PORT, AppMsg.OPERATION.PUT); +// +// myNode.send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " PUT <" + msg.getKey() + ":" + msg.getValue() + ">")); +//>>>>>>> e3a879c91af528482e9b717aa9af5d0780951f7f:core/src/main/java/com/dido/pad/StorageService.java } break; case GET: String key = msg.getKey(); - StorageService.LOGGER.debug( this.myNode.getIpAddress()+" - RECEIVED MSG "+msg.getOperation()+"<"+ key+">"); - if(storage.containsKey(key)) { - Versioned vdata = storage.get(key); + StorageService.LOGGER.debug(this.myNode.getIpAddress() + " - RECEIVED MSG " + msg.getOperation() + "<" + key + ">"); + if (storage.containsKey(key)) { + Versioned vdata = storage.get(key); List replies = askQuorum(vdata,Helper.QUORUM_PORT, AppMsg.OPERATION.GET); for (ReplySystemMsg r : @@ -269,22 +332,19 @@ private void manageAppRequest(RequestAppMsg msg){ System.out.print(r.getData().getVersion()); } - //4) merge version //5) sent reconcilied version - // myNode.send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " GET "+ vdata.getData().toString())); - } - else { - myNode.send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.ERR, " GET key not found")); + // myNode.send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " GET "+ vdata.getData().toString())); + } else { + send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.ERR, " GET key not found")); } break; case LIST: - HashMap> db = this.storage.getStorage(); - if(!db.isEmpty()) { - myNode.send(msg.getIpSender(),Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " LIST "+ db.toString())); - } - else{ - myNode.send(msg.getIpSender(),Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.ERR, " LIST empty data database")); + HashMap db = this.storage.getStorage(); + if (!db.isEmpty()) { + send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.OK, " LIST " + db.toString())); + } else { + send(msg.getIpSender(), Helper.STORAGE_PORT, new ReplyAppMsg(AppMsg.OPERATION.ERR, " LIST empty data database")); } break; @@ -292,13 +352,13 @@ private void manageAppRequest(RequestAppMsg msg){ } - public List askQuorum(Versioned vData, int listenPort, AppMsg.OPERATION op) { + public List askQuorum(Versioned vData, int listenPort, AppMsg.OPERATION op) { RequestSystemMsg reqQuorum; - if(op.equals(AppMsg.OPERATION.PUT)) - reqQuorum = new RequestSystemMsg(op, myNode.getIpAddress(),0,vData ); - else{ - reqQuorum = new RequestSystemMsg(op, myNode.getIpAddress(),0, vData.getData().getKey() ); + if (op.equals(AppMsg.OPERATION.PUT)) + reqQuorum = new RequestSystemMsg(op, myNode.getIpAddress(), 0, vData); + else { + reqQuorum = new RequestSystemMsg(op, myNode.getIpAddress(), 0, vData.getData().getKey()); } try { @@ -337,55 +397,54 @@ public List askQuorum(Versioned vData, int listenPort, AppMsg.O return _waitQuorum(reqQuorum.getOperation(), listenPort); } - private List _waitQuorum(AppMsg.OPERATION op, int listenPort){ + private List _waitQuorum(AppMsg.OPERATION op, int listenPort) { DatagramSocket udpQuorum; //server listen Quorum response - int numResponses = (op == AppMsg.OPERATION.PUT) ? WRITE_NODES: READ_NODES; + int numResponses = (op == AppMsg.OPERATION.PUT) ? WRITE_NODES : READ_NODES; ArrayList replies = new ArrayList<>(); try { - udpQuorum= new DatagramSocket(listenPort); + udpQuorum = new DatagramSocket(listenPort); //TODO insert timeout into configuration file udpQuorum.setSoTimeout(5000); - //wait the response - for(int j =0; j < numResponses; j++){ - byte [] buff = new byte[udpQuorum.getReceiveBufferSize()]; - DatagramPacket p = new DatagramPacket(buff, buff.length); - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - waiting "+numResponses + " QUORUM msg response..."); + //wait the response + for (int j = 0; j < numResponses; j++) { + byte[] buff = new byte[udpQuorum.getReceiveBufferSize()]; + DatagramPacket p = new DatagramPacket(buff, buff.length); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - waiting " + numResponses + " QUORUM msg response..."); - udpQuorum.receive(p); + udpQuorum.receive(p); - int packet_length = 0; - for (int i = 0; i < 4; i++) { - int shift = (4 - 1 - i) * 8; - packet_length += (buff[i] & 0x000000FF) << shift; - } + int packet_length = 0; + for (int i = 0; i < 4; i++) { + int shift = (4 - 1 - i) * 8; + packet_length += (buff[i] & 0x000000FF) << shift; + } - byte[] json_bytes = new byte[packet_length]; - System.arraycopy(buff, 4, json_bytes, 0, packet_length); - String receivedMessage = new String(json_bytes); + byte[] json_bytes = new byte[packet_length]; + System.arraycopy(buff, 4, json_bytes, 0, packet_length); + String receivedMessage = new String(json_bytes); - ObjectMapper mapper = new ObjectMapper().registerModule(new Jdk8Module()); - ReplySystemMsg msgQuorum = mapper.readValue(receivedMessage, ReplySystemMsg.class); - StorageService.LOGGER.info( this.myNode.getIpAddress()+" - RECEIVED QUORUM MSG from "+msgQuorum.getIpSender()); - replies.add(msgQuorum); + ObjectMapper mapper = new ObjectMapper().registerModule(new Jdk8Module()); + ReplySystemMsg msgQuorum = mapper.readValue(receivedMessage, ReplySystemMsg.class); + StorageService.LOGGER.info(this.myNode.getIpAddress() + " - RECEIVED QUORUM MSG from " + msgQuorum.getIpSender()); + replies.add(msgQuorum); - } + } udpQuorum.close(); } catch (SocketException e) { - StorageService.LOGGER.error(this.myNode.getIpAddress()+ " - "+ e); - } - catch (IOException e) { + StorageService.LOGGER.error(this.myNode.getIpAddress() + " - " + e); + } catch (IOException e) { e.printStackTrace(); } return replies; } - public void shutdown(){ - LOGGER.info(this.myNode.getIpAddress()+"Storage service has been shutdown..."); + public void shutdown() { + LOGGER.info(this.myNode.getIpAddress() + "Storage service has been shutdown..."); this.udpServer.close(); } diff --git a/core/src/main/java/com/dido/pad/VectorClocks/VectorClock.java b/core/src/main/java/com/dido/pad/VectorClocks/VectorClock.java index 94611db..e24c650 100644 --- a/core/src/main/java/com/dido/pad/VectorClocks/VectorClock.java +++ b/core/src/main/java/com/dido/pad/VectorClocks/VectorClock.java @@ -1,40 +1,33 @@ package com.dido.pad.VectorClocks; -import com.dido.pad.Node; -import com.google.common.base.Preconditions; -import com.google.common.collect.Maps; - import java.util.HashMap; -import java.util.List; import java.util.Map; /** * Created by dido-ubuntu on 15/03/16. */ public class VectorClock implements Version { - - - public enum APPEN {EQUAL,BEFORE,AFTER,CONCURRENT}; + public enum OCCUR {EQUAL,BEFORE,AFTER,CONCURRENT}; private Map vectorClock; private int delta; public VectorClock() { - this.vectorClock = new HashMap(); + this.vectorClock = new HashMap<>(); this.delta = 1; } - public VectorClock(int delta) { - this(); - this.delta = delta; + public VectorClock(VectorClock v){ + this.vectorClock = v.vectorClock; + this.delta = v.delta; } - public VectorClock( Map m){ + public VectorClock(int delta) { this(); - this.vectorClock = m; + this.delta = delta; } - public long getClockFor(String ip){ + public Long getClockFor(String ip){ return vectorClock.get(ip); } @@ -42,72 +35,36 @@ public Map getVector(){ return vectorClock; } - public VectorClock incremenetVersion( String ip){ + public VectorClock increment( String ip){ vectorClock.put(ip, vectorClock.getOrDefault(ip, (long) 0) + delta); return this; } @Override - public VectorClock clone() { - return new VectorClock(Maps.newHashMap(vectorClock)); - } - - public void update (VectorClock v2){ - - } - - - - - - /** - * Compare two VectorClocks, the outcomes will be one of the following:
- * -- Clock 1 is BEFORE clock 2, if there exists an nodeId such that - * c1(nodeId) <= c2(nodeId) and there does not exist another nodeId such - * that c1(nodeId) > c2(nodeId).
- * -- Clock 1 is CONCURRENT to clock 2 if there exists an nodeId, nodeId2 - * such that c1(nodeId) < c2(nodeId) and c1(nodeId2) > c2(nodeId2)
- * -- Clock 1 is AFTER clock 2 otherwise - * - * @param version The second VectorClock - */ - @Override - public VectorClock.APPEN compare(Version version) { - Preconditions.checkNotNull(version,"Second Vector clock name can not be null"); - - if (!(version instanceof VectorClock)) - return APPEN.CONCURRENT; + public OCCUR compare(Version v) { + if (!(v instanceof VectorClock)) + return OCCUR.CONCURRENT; VectorClock first = this; - VectorClock second = (VectorClock) version; + VectorClock second = (VectorClock) v; +// for(String ip: first.getVector().keySet()){ +// if(!second.getVector().containsKey(ip)) +// throw new Exception("The vector clocks must contains the same nodes"); +// } + int n = first.getVector().size(); int numEq=0, numLt=0, numGt = 0; for (String ip: first.getVector().keySet()) { - if(!second.getVector().keySet().contains(ip)){ //if second does not contains the ip the version is 0 so the first is greater - numGt++; - } - else if(first.getClockFor(ip) == second.getClockFor(ip)){ - numEq++; } - else if(first.getClockFor(ip) < second.getClockFor(ip)){ - numLt++; } + if(first.getClockFor(ip) == second.getClockFor(ip)){ numEq++; } + else if(first.getClockFor(ip) < second.getClockFor(ip)){ numLt++; } else{numGt ++;} } - - //for each string that are present in the second if is not present in the firsts - for(String ip: second.getVector().keySet()){ - if(!first.getVector().keySet().contains(ip)){ - numLt ++; - } - } - //int n = first.getVector().keySet().size(); - if(numGt ==0 && n ==(numEq + numLt)) return VectorClock.APPEN.BEFORE; - else if(numLt > 0 && numGt > 0) return VectorClock.APPEN.CONCURRENT; - else {return VectorClock.APPEN.AFTER; } - + if(numGt ==0 && n ==(numEq + numLt)) return OCCUR.BEFORE; + else if(numLt > 0 && numGt > 0) return OCCUR.CONCURRENT; + else {return OCCUR.AFTER; } } - @Override public String toString() { return "VectorClock{" + diff --git a/core/src/main/java/com/dido/pad/VectorClocks/Version.java b/core/src/main/java/com/dido/pad/VectorClocks/Version.java index 6502c12..beb1c15 100644 --- a/core/src/main/java/com/dido/pad/VectorClocks/Version.java +++ b/core/src/main/java/com/dido/pad/VectorClocks/Version.java @@ -11,9 +11,7 @@ public interface Version { * * @param v The other version */ - VectorClock.APPEN compare(Version v) throws Exception; - - VectorClock incremenetVersion( String ip); - + VectorClock.OCCUR compare(Version v); + VectorClock increment( String ip); } diff --git a/core/src/main/java/com/dido/pad/VectorClocks/Versioned.java b/core/src/main/java/com/dido/pad/VectorClocks/Versioned.java index 7f1d532..afd3800 100644 --- a/core/src/main/java/com/dido/pad/VectorClocks/Versioned.java +++ b/core/src/main/java/com/dido/pad/VectorClocks/Versioned.java @@ -43,14 +43,5 @@ public T getData(){ public void setData(T data) { this.data = data; } - - - /** - * Create a clone of this StorageData object such that the object pointed to - * is the same, but the VectorClock and StorageData wrapper is a shallow copy. - */ - public Versioned cloneVersioned() { - return new Versioned<>(this.getData(), this.vectorclock.clone()); - } } diff --git a/core/src/main/java/com/dido/pad/consistenthashing/DefaultFunctions.java b/core/src/main/java/com/dido/pad/consistenthashing/DefaultFunctions.java new file mode 100644 index 0000000..9f0a1d2 --- /dev/null +++ b/core/src/main/java/com/dido/pad/consistenthashing/DefaultFunctions.java @@ -0,0 +1,20 @@ +package com.dido.pad.consistenthashing; + +import com.dido.pad.Node; +import com.google.common.base.Preconditions; +import com.google.common.hash.Hashing; + +/** + * Created by luca on 22/03/16. + */ +public class DefaultFunctions { + + public static byte[] SHA1(byte[] input) { + Preconditions.checkNotNull(input); + return Hashing.sha1().hashBytes(input).asBytes(); + } + + public static byte[] BytesConverter(Node node) { + return (node.getIpAddress() + node.getId()).getBytes(); + } +} diff --git a/core/src/main/java/com/dido/pad/consistenthashing/Hasher.java b/core/src/main/java/com/dido/pad/consistenthashing/Hasher.java index 17b2783..6b53647 100644 --- a/core/src/main/java/com/dido/pad/consistenthashing/Hasher.java +++ b/core/src/main/java/com/dido/pad/consistenthashing/Hasher.java @@ -1,38 +1,30 @@ package com.dido.pad.consistenthashing; -import com.dido.pad.Node; import com.google.common.base.Preconditions; -import com.google.common.util.concurrent.ListenableFuture; -import org.apache.log4j.Logger; import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * Created by dido-ubuntu on 08/03/16. */ -public class Hasher implements iHasher{ +public class Hasher implements IHasher { - public static final Logger LOGGER = Logger.getLogger(Hasher.class); +// public static final Logger LOGGER = Logger.getLogger(Hasher.class); private final int startVirtualNodeId, stopVirtualNodeId; - private final HashFunction hashFunction; - private final BytesConverter nodeToByteConverter; + private final IHashFunction hashFunction; + private final IByteConverter nodeToByteConverter; private final NavigableMap serversMap; - private HashMap> virtualForServer; // > :for each Node list its virtual nodes + private HashMap> virtualForServer; // > :for each Node list its virtual nodes - public Hasher(final int virtulaNodes,final HashFunction hash, final BytesConverter nodetoByteConverter) { + public Hasher(final int virtulaNodes, final IHashFunction hash, final IByteConverter nodetoByteConverter) { - Preconditions.checkNotNull(hash,"HashFunction can not be null."); + Preconditions.checkNotNull(hash, "HashFunction can not be null."); this.hashFunction = hash; this.nodeToByteConverter = nodetoByteConverter; @@ -54,14 +46,14 @@ synchronized public void addServer(T server) { serversMap.put(virtBucket, server); virtBuckets.add(virtBucket); } - virtualForServer.put(server,virtBuckets); + virtualForServer.put(server, virtBuckets); } @Override synchronized public void removeServer(T server) { Preconditions.checkNotNull(server, "Server name can not be null"); - for(int virtID= startVirtualNodeId; virtID <= stopVirtualNodeId; virtID++){ - ByteBuffer bbServerVirtuals = convertAndApplyHash(virtID,server); + for (int virtID = startVirtualNodeId; virtID <= stopVirtualNodeId; virtID++) { + ByteBuffer bbServerVirtuals = convertAndApplyHash(virtID, server); serversMap.remove(bbServerVirtuals); } virtualForServer.remove(server); @@ -69,9 +61,9 @@ synchronized public void removeServer(T server) { } - private ByteBuffer convertAndApplyHash(int nodeID, T server){ + private ByteBuffer convertAndApplyHash(int nodeID, T server) { - byte[] bucketNameInBytes = hashFunction.hash(nodeToByteConverter.convert(server)); + byte[] bucketNameInBytes = hashFunction.hash(nodeToByteConverter.convert(server)); byte[] bucketNameAndCode = new byte[(Integer.BYTES / Byte.BYTES) + bucketNameInBytes.length]; ByteBuffer bb = ByteBuffer.wrap(bucketNameAndCode); bb.put(bucketNameInBytes); @@ -80,45 +72,43 @@ private ByteBuffer convertAndApplyHash(int nodeID, T server){ } - - synchronized public T getServerForData(String key){ + synchronized public T getServerForData(String key) { byte[] bHashData = hashFunction.hash(key.getBytes()); ByteBuffer bbData = ByteBuffer.wrap(bHashData); ByteBuffer nearServer = serversMap.ceilingKey(bbData); - if(nearServer==null) { - T server = serversMap.firstEntry().getValue(); - return server; - } - else { + if (nearServer == null) { + T server = serversMap.firstEntry().getValue(); + return server; + } else { T server = serversMap.get(nearServer); return server; } } - public List getAllNodes(){ + public List getAllNodes() { List nodes = new ArrayList<>(); nodes.addAll(virtualForServer.keySet()); return nodes; } - public boolean containsNode(T node){ - return this.serversMap.containsKey( nodeToByteConverter.convert(node)); + public boolean containsNode(T node) { + return this.serversMap.containsKey(nodeToByteConverter.convert(node)); } - public void printkeyValueHash(){ + public void printkeyValueHash() { SortedMap sorted = serversMap.tailMap(serversMap.firstKey()); - Iterator iter = sorted.keySet().iterator(); - while(iter.hasNext()){ + Iterator iter = sorted.keySet().iterator(); + while (iter.hasNext()) { ByteBuffer bb = iter.next(); T server = serversMap.get(bb); - System.out.println("Hash: "+ byteBufferToLong(bb)+" Value: "+ server); // byteBufferToLong(bb) + System.out.println("Hash: " + byteBufferToLong(bb) + " Value: " + server); // byteBufferToLong(bb) } System.out.println(" "); } - private long byteBufferToLong(ByteBuffer bb){ - BigInteger bigint = new BigInteger(1,bb.array()); + private long byteBufferToLong(ByteBuffer bb) { + BigInteger bigint = new BigInteger(1, bb.array()); return bigint.longValue(); } @@ -127,7 +117,7 @@ synchronized public NavigableMap getServersMap() { return serversMap; } - public ArrayList getPreferenceList(T server, int number){ + public ArrayList getPreferenceList(T server, int number) { Preconditions.checkArgument(number <= virtualForServer.keySet().size(), "The number of node present is less than the preference list size required"); ArrayList virtuals = virtualForServer.get(server); @@ -135,12 +125,12 @@ public ArrayList getPreferenceList(T server, int number){ ByteBuffer bbNext = virtuals.get(0); // first entry is the Bytebuffer of the first physical server. ArrayList nexts = new ArrayList<>(); - while(number > 0) { + while (number > 0) { bbNext = serversMap.higherKey(bbNext); if (bbNext == null) { // there is no they greater than the server key. bbNext = serversMap.firstKey(); } - if(!virtuals.contains(bbNext) && !nexts.contains(serversMap.get(bbNext))){ + if (!virtuals.contains(bbNext) && !nexts.contains(serversMap.get(bbNext))) { nexts.add(serversMap.get(bbNext)); number--; } diff --git a/core/src/main/java/com/dido/pad/consistenthashing/IByteConverter.java b/core/src/main/java/com/dido/pad/consistenthashing/IByteConverter.java new file mode 100644 index 0000000..e40651c --- /dev/null +++ b/core/src/main/java/com/dido/pad/consistenthashing/IByteConverter.java @@ -0,0 +1,8 @@ +package com.dido.pad.consistenthashing; + +/** + * Created by luca on 22/03/16. + */ +public interface IByteConverter{ + byte[] convert( T data); +} diff --git a/core/src/main/java/com/dido/pad/consistenthashing/IHashFunction.java b/core/src/main/java/com/dido/pad/consistenthashing/IHashFunction.java new file mode 100644 index 0000000..f3d5fe2 --- /dev/null +++ b/core/src/main/java/com/dido/pad/consistenthashing/IHashFunction.java @@ -0,0 +1,8 @@ +package com.dido.pad.consistenthashing; + +/** + * Created by luca on 22/03/16. + */ +public interface IHashFunction{ + byte[] hash( byte[] input); +} \ No newline at end of file diff --git a/core/src/main/java/com/dido/pad/consistenthashing/IHasher.java b/core/src/main/java/com/dido/pad/consistenthashing/IHasher.java new file mode 100644 index 0000000..d9df9a4 --- /dev/null +++ b/core/src/main/java/com/dido/pad/consistenthashing/IHasher.java @@ -0,0 +1,14 @@ +package com.dido.pad.consistenthashing; + +/** + * Created by dido-ubuntu on 08/03/16. + */ +public interface IHasher { + + void addServer( S s ); + + void removeServer(S s); + +} + + diff --git a/core/src/main/java/com/dido/pad/data/StorageData.java b/core/src/main/java/com/dido/pad/data/StorageData.java new file mode 100644 index 0000000..d55d627 --- /dev/null +++ b/core/src/main/java/com/dido/pad/data/StorageData.java @@ -0,0 +1,61 @@ +package com.dido.pad.data; + +/** + * Created by dido-ubuntu on 11/03/16. + */ +public class StorageData { + + private String key; + private T value; + + public StorageData() { + } + + public StorageData(String key, T value) { + this.key = key; + this.value = value; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } + + @Override + public String toString() { + return "StorageData {" + + "key='" + key + '\'' + + ", value=" + value + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + StorageData that = (StorageData) o; + + if (key != null ? !key.equals(that.key) : that.key != null) return false; + return value != null ? value.equals(that.value) : that.value == null; + + } + + @Override + public int hashCode() { + int result = key != null ? key.hashCode() : 0; + result = 31 * result + (value != null ? value.hashCode() : 0); + return result; + } +} diff --git a/core/src/main/java/com/dido/pad/data/Versioned.java b/core/src/main/java/com/dido/pad/data/Versioned.java new file mode 100644 index 0000000..66f9b15 --- /dev/null +++ b/core/src/main/java/com/dido/pad/data/Versioned.java @@ -0,0 +1,56 @@ +package com.dido.pad.data; + + +import com.dido.pad.VectorClocks.VectorClock; +import com.dido.pad.VectorClocks.Version; + +/** + * A wrapper for an StorageData object that adds a Version. + */ + +//extends StorageData +public class Versioned { + + private StorageData data; + private Version vectorClock; + // private String masterNode; //master node of the data. + + public Versioned() { + //for jackson JSOn parser + } + + public Versioned(StorageData data) { + this.data = data; + this.vectorClock = new VectorClock(); + } + + public Versioned(StorageData data, Version version) { + this.vectorClock = version == null ? new VectorClock() : (VectorClock) version; + this.data = data; + } + + public void setVectorClock(VectorClock version) { + this.vectorClock = version; + } + + public Version getVersion() { + return vectorClock; + } + + public StorageData getData(){ + return data; + } + + public void setData(StorageData data) { + this.data = data; + } + + /** + * Create a clone of this StorageData object such that the object pointed to + * is the same, but the VectorClock and StorageData wrapper is a shallow copy. + */ +// public Versioned cloneVersioned() { +// return new Versioned(this.getData(), this.vectorClock.clone()); +// } +} + diff --git a/core/src/main/java/com/dido/pad/datamessages/ReplySystemMsg.java b/core/src/main/java/com/dido/pad/datamessages/ReplySystemMsg.java index e39eec9..42a50a3 100644 --- a/core/src/main/java/com/dido/pad/datamessages/ReplySystemMsg.java +++ b/core/src/main/java/com/dido/pad/datamessages/ReplySystemMsg.java @@ -1,6 +1,6 @@ package com.dido.pad.datamessages; -import com.dido.pad.VectorClocks.Versioned; +import com.dido.pad.data.Versioned; /** * Created by dido-ubuntu on 16/03/16. diff --git a/core/src/main/java/com/dido/pad/datamessages/RequestSystemMsg.java b/core/src/main/java/com/dido/pad/datamessages/RequestSystemMsg.java index 52b8239..06aba69 100644 --- a/core/src/main/java/com/dido/pad/datamessages/RequestSystemMsg.java +++ b/core/src/main/java/com/dido/pad/datamessages/RequestSystemMsg.java @@ -1,6 +1,6 @@ package com.dido.pad.datamessages; -import com.dido.pad.VectorClocks.Versioned; +import com.dido.pad.data.Versioned; /** * Created by dido-ubuntu on 15/03/16. diff --git a/core/src/test/java/TestGossipStorageService.java b/core/src/test/java/TestGossipStorageService.java new file mode 100644 index 0000000..8c15157 --- /dev/null +++ b/core/src/test/java/TestGossipStorageService.java @@ -0,0 +1,136 @@ +import com.dido.pad.Helper; +import com.dido.pad.Node; +import com.dido.pad.datamessages.AppMsg; +import com.dido.pad.datamessages.RequestAppMsg; +import com.google.code.gossip.*; +import org.junit.Assert; +import org.junit.Test; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; + +/** + * Created by dido-ubuntu on 09/03/16. + */ +public class TestGossipStorageService { + + private static final int NUM_NODES = 3; + + /* + @Test + public void testDataInsert(){ + + //startup gossip member + GossipSettings settings = new GossipSettings(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes+1; ++i) { + startupMembers.add(new RemoteGossipMember("127.0.0." + i, Helper.GOSSIP_PORT, "node"+i)); + } + + //create three local client + List clients = new ArrayList<>(); + int clusterMembers = 3; + for (int i = 1; i < clusterMembers+1; ++i) { + Node node = new Node("127.0.0." + i, "node" + i, Helper.STORAGE_PORT, Helper.GOSSIP_PORT); + clients.add(node); + } + + for(int i=0; i < clients.size(); i++){ + Node node = clients.get(i); + try { + node.startGossipService(LogLevel.DEBUG, startupMembers,settings,node::gossipEvent ); + node.startStorageService(); + } catch (UnknownHostException e) { + e.printStackTrace(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + try { + Thread.sleep(10000); + + //Check if the nodes discovered each other + for (int i = 0; i < clusterMembers; ++i) { + Assert.assertEquals(NUM_NODES-1, clients.get(i).getGossipmanager().getMemberList().size()); + Assert.assertEquals(NUM_NODES, clients.get(i).get_storageService().getcHasher().getServersMap().values().size()); + + } + + String key="Davide"; + AppMsg req = new RequestAppMsg(AppMsg.OPERATION.PUT, key, "Neri"); + clients.get(0).send("127.0.0.3", Helper.STORAGE_PORT, req); + + Thread.sleep(3000); + //check if 127.0.0.3 has received the key + Assert.assertTrue(clients.get(2).get_storageService().getStorage().containsKey(key)); + + + AppMsg req1 = new RequestAppMsg(AppMsg.OPERATION.GET, key, ""); + clients.get(0).send("127.0.0.3",Helper.STORAGE_PORT, req1); + + Thread.sleep(15000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + */ +/// test with restructurin the node creation + @Test + public void testNodeQuorum() { + + //startup gossip member + GossipSettings settings = new GossipSettings(); + int seedNodes = 1; + List startupMembers = new ArrayList<>(); + for (int i = 1; i < seedNodes + 1; ++i) { + startupMembers.add(new RemoteGossipMember("127.0.0." + i, Helper.GOSSIP_PORT, "node" + i)); + } + + //create three local client + List clients = new ArrayList<>(); + int clusterMembers = 3; + for (int i = 1; i < clusterMembers + 1; ++i) { + Node node = new Node("127.0.0." + i, "node" + i, Helper.STORAGE_PORT, Helper.GOSSIP_PORT, startupMembers, settings); + clients.add(node); + } + + for (Node n : clients) { + n.start(); + } + + try { + Thread.sleep(15000); + + //Check if the nodes discovered each other + for (int i = 0; i < clusterMembers; ++i) { + Assert.assertEquals((NUM_NODES * (Helper.NUM_NODES_VIRTUALS)) - 1, clients.get(i).getGossipmanager().getMemberList().size()); + } + + String key = "Davide"; + AppMsg req = new RequestAppMsg<>(AppMsg.OPERATION.PUT, key, "Neri"); + clients.get(2).sendToStorage(req); + + Thread.sleep(10000); + + //check if 127.0.0.3 has received the key + Assert.assertTrue(clients.get(2).get_storageService().getStorage().containsKey(key)); + + AppMsg reqUpdate = new RequestAppMsg<>(AppMsg.OPERATION.PUT, key, "giangrande"); + clients.get(2).sendToStorage(reqUpdate); + + Thread.sleep(10000); + + AppMsg req1 = new RequestAppMsg<>(AppMsg.OPERATION.GET, key, ""); + clients.get(2).sendToStorage(req1); + + + Thread.sleep(10000); + + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/core/src/test/java/testHasher.java b/core/src/test/java/TestHasher.java similarity index 82% rename from core/src/test/java/testHasher.java rename to core/src/test/java/TestHasher.java index bffebec..17d56b7 100644 --- a/core/src/test/java/testHasher.java +++ b/core/src/test/java/TestHasher.java @@ -1,8 +1,8 @@ import com.dido.pad.StorageData; import com.dido.pad.Helper; import com.dido.pad.Node; +import com.dido.pad.consistenthashing.DefaultFunctions; import com.dido.pad.consistenthashing.Hasher; -import com.dido.pad.consistenthashing.iHasher; import com.google.code.gossip.GossipMember; import org.junit.Assert; import org.junit.Test; @@ -13,21 +13,22 @@ /** * Created by dido-ubuntu on 08/03/16. */ -public class testHasher { +public class TestHasher { @Test public void testOneServer(){ - Hasher hasher = new Hasher<>(1,iHasher.SHA1,iHasher.getNodeToBytesConverter()); - List l = new ArrayList<>(); - - Node n1 = new Node("127.0.0.1","id1", Helper.STORAGE_PORT, Helper.GOSSIP_PORT,l); - hasher.addServer(n1); - - StorageData d = new StorageData("AAAA","first data"); - StorageData d2 = new StorageData("ZZZZ","second data"); - - Assert.assertEquals(hasher.getServerForData(d.getKey()),n1); - n1.shutdown(); +// Hasher hasher = new Hasher<>(1, DefaultFunctions::SHA1, DefaultFunctions::BytesConverter); +// List l = new ArrayList<>(); +// +// Node n1 = new Node("127.0.0.1","id1", Helper.STORAGE_PORT, Helper.GOSSIP_PORT,l); +// hasher.addServer(n1); +// n1.start(); +// +// StorageData d = new StorageData<>("AAAA","first data"); +// StorageData d2 = new StorageData<>("ZZZZ","second data"); +// +// Assert.assertEquals(hasher.getServerForData(d.getKey()),n1); +// n1.shutdown(); } /* diff --git a/core/src/test/java/testVectorClocks.java b/core/src/test/java/TestVectorClocks.java similarity index 96% rename from core/src/test/java/testVectorClocks.java rename to core/src/test/java/TestVectorClocks.java index 8c44864..71f1e78 100644 --- a/core/src/test/java/testVectorClocks.java +++ b/core/src/test/java/TestVectorClocks.java @@ -6,7 +6,7 @@ /** * Created by dido-ubuntu on 15/03/16. */ -public class testVectorClocks { +public class TestVectorClocks { @Test public void testVectors() throws InterruptedException { diff --git a/core/src/test/java/testGossipStorageService.java b/core/src/test/java/testGossipStorageService.java deleted file mode 100644 index 6095ef5..0000000 --- a/core/src/test/java/testGossipStorageService.java +++ /dev/null @@ -1,135 +0,0 @@ -import com.dido.pad.Helper; -import com.dido.pad.Node; -import com.dido.pad.datamessages.AppMsg; -import com.dido.pad.datamessages.RequestAppMsg; -import com.google.code.gossip.*; -import org.junit.Assert; -import org.junit.Test; - -import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.List; - -/** - * Created by dido-ubuntu on 09/03/16. - */ -public class testGossipStorageService { - - private static final int NUM_NODES = 3; -/* - @Test - public void testDataInsert(){ - - //startup gossip member - GossipSettings settings = new GossipSettings(); - int seedNodes = 1; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, Helper.GOSSIP_PORT, "node"+i)); - } - - //create three local client - List clients = new ArrayList<>(); - int clusterMembers = 3; - for (int i = 1; i < clusterMembers+1; ++i) { - Node node = new Node("127.0.0." + i, "node" + i, Helper.STORAGE_PORT, Helper.GOSSIP_PORT); - clients.add(node); - } - - for(int i=0; i < clients.size(); i++){ - Node node = clients.get(i); - try { - node.startGossipService(LogLevel.DEBUG, startupMembers,settings,node::gossipEvent ); - node.startStorageService(); - } catch (UnknownHostException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - - try { - Thread.sleep(10000); - - //Check if the nodes discovered each other - for (int i = 0; i < clusterMembers; ++i) { - Assert.assertEquals(NUM_NODES-1, clients.get(i).getGossipmanager().getMemberList().size()); - Assert.assertEquals(NUM_NODES, clients.get(i).get_storageService().getcHasher().getServersMap().values().size()); - - } - - String key="Davide"; - AppMsg req = new RequestAppMsg(AppMsg.OPERATION.PUT, key, "Neri"); - clients.get(0).send("127.0.0.3", Helper.STORAGE_PORT, req); - - Thread.sleep(3000); - //check if 127.0.0.3 has received the key - Assert.assertTrue(clients.get(2).get_storageService().getStorage().containsKey(key)); - - - AppMsg req1 = new RequestAppMsg(AppMsg.OPERATION.GET, key, ""); - clients.get(0).send("127.0.0.3",Helper.STORAGE_PORT, req1); - - Thread.sleep(15000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - */ -/// test with restructurin the node creation -@Test -public void testNodeQuorum(){ - - //startup gossip member - GossipSettings settings = new GossipSettings(); - int seedNodes = 1; - List startupMembers = new ArrayList<>(); - for (int i = 1; i < seedNodes+1; ++i) { - startupMembers.add(new RemoteGossipMember("127.0.0." + i, Helper.GOSSIP_PORT, "node"+i)); - } - - //create three local client - List clients = new ArrayList<>(); - int clusterMembers = 3; - for (int i = 1; i < clusterMembers+1; ++i) { - Node node = new Node("127.0.0." + i, "node" + i, Helper.STORAGE_PORT, Helper.GOSSIP_PORT, LogLevel.CONFIG_INFO,startupMembers, settings); - clients.add(node); - } - - for (Node n : clients) { - n.start(); - } - - try { - Thread.sleep(15000); - - //Check if the nodes discovered each other - for (int i = 0; i < clusterMembers; ++i) { - Assert.assertEquals((NUM_NODES * (Helper.NUM_NODES_VIRTUALS))-1, clients.get(i).getGossipmanager().getMemberList().size()); - } - - String key="Davide"; - AppMsg req = new RequestAppMsg<>(AppMsg.OPERATION.PUT, key, "Neri"); - clients.get(0).send("127.0.0.3", Helper.STORAGE_PORT, req); - - Thread.sleep(10000); - - //check if 127.0.0.3 has received the key - Assert.assertTrue(clients.get(2).get_storageService().getStorage().containsKey(key)); - - AppMsg reqUpdate = new RequestAppMsg<>(AppMsg.OPERATION.PUT,key,"giangrande"); - clients.get(0).send("127.0.0.3", Helper.STORAGE_PORT, reqUpdate); - - Thread.sleep(10000); - - AppMsg req1 = new RequestAppMsg<>(AppMsg.OPERATION.GET, key, ""); - clients.get(0).send("127.0.0.3",Helper.STORAGE_PORT, req1); - - - Thread.sleep(10000); - - } catch (InterruptedException e) { - e.printStackTrace(); - } - } -} diff --git a/pad-fs.iml b/pad-fs.iml new file mode 100644 index 0000000..678b84b --- /dev/null +++ b/pad-fs.iml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index 19d03ec..f87db27 100644 --- a/pom.xml +++ b/pom.xml @@ -9,11 +9,12 @@ pom 1.0-SNAPSHOT PAD File system project a.a 2015-16 + core + app - central @@ -27,42 +28,59 @@ - - - org.apache.maven.plugins - maven-compiler-plugin - 3.1 - - 1.8 - 1.8 - - - + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.8 + 1.8 + + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + com.dido.pad.PadFsNode + + + + - - it.cnr.isti.hpclab - gossip - 0.0.3 - + + + + + - + junit junit - 4.8.2 + 4.11 test - log4j log4j @@ -96,13 +114,13 @@ jdom 2.0.2 + com.google.collections google-collections 1.0 - com.fasterxml.jackson.core @@ -128,7 +146,6 @@ 2.7.2 - com.fasterxml.jackson.datatype jackson-datatype-jdk8 @@ -145,8 +162,4 @@ - - - -