diff --git a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java index acb8030d3ad..353a18db095 100644 --- a/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java +++ b/network/src/main/java/io/bitsquare/p2p/peers/getdata/RequestDataHandler.java @@ -157,9 +157,9 @@ public void onMessage(Message message, Connection connection) { final NodeAddress sender = connection.getPeersNodeAddressOptional().get(); ((GetDataResponse) message).dataSet.stream().forEach(protectedStorageEntry -> { - dataStorage.add(protectedStorageEntry, sender, null, false); + // We dont broadcast here as we are only connected to the seed node and would be pointless + dataStorage.add(protectedStorageEntry, sender, null, false, false); }); - cleanup(); listener.onComplete(); } else { diff --git a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java index 0d999731e50..03cadccbc7f 100644 --- a/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java +++ b/network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java @@ -201,7 +201,13 @@ public void onError(Throwable throwable) { public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, @Nullable BroadcastHandler.Listener listener, boolean isDataOwner) { - Log.traceCall(); + Log.traceCall("with allowBroadcast=true"); + return add(protectedStorageEntry, sender, listener, isDataOwner, true); + } + + public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, + @Nullable BroadcastHandler.Listener listener, boolean isDataOwner, boolean allowBroadcast) { + Log.traceCall("with allowBroadcast=" + allowBroadcast); ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload()); boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload); @@ -227,9 +233,11 @@ && checkSignature(protectedStorageEntry) if (hasSequenceNrIncreased) { sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 100); + // We set the delay higher as we might receive a batch of items + storage.queueUpForSave(sequenceNumberMap, 2000); - broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner); + if (allowBroadcast) + broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner); } else { log.trace("We got that version of the data already, so we don't broadcast it."); } @@ -270,7 +278,7 @@ public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAdd storedData.updateSignature(signature); printData("after refreshTTL"); sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 100); + storage.queueUpForSave(sequenceNumberMap, 1000); broadcast(refreshTTLMessage, sender, null, isDataOwner); } @@ -299,7 +307,7 @@ && checkSignature(protectedStorageEntry) doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload); printData("after remove"); sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 100); + storage.queueUpForSave(sequenceNumberMap, 300); broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner); } else { @@ -326,7 +334,7 @@ && checkSignature(protectedMailboxStorageEntry) doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData); printData("after removeMailboxData"); sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.sequenceNumber, System.currentTimeMillis())); - storage.queueUpForSave(sequenceNumberMap, 100); + storage.queueUpForSave(sequenceNumberMap, 300); broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner); } else { @@ -551,36 +559,38 @@ private HashMap getPurgedSequenceNumberMap(HashMap> tempList = map.values().stream() - .map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getStoragePayload()).bytes), e)) - .collect(Collectors.toList()); - tempList.sort((o1, o2) -> o1.first.compareTo(o2.first)); - tempList.stream().forEach(e -> { - final ProtectedStorageEntry storageEntry = e.second; - final StoragePayload storagePayload = storageEntry.getStoragePayload(); - final MapValue mapValue = sequenceNumberMap.get(getHashAsByteArray(storagePayload)); - sb.append("\n") - .append("Hash=") - .append(e.first) - .append("; Class=") - .append(storagePayload.getClass().getSimpleName()) - .append("; SequenceNumbers (Object/Stored)=") - .append(storageEntry.sequenceNumber) - .append(" / ") - .append(mapValue != null ? mapValue.sequenceNr : "null") - .append("; TimeStamp (Object/Stored)=") - .append(storageEntry.creationTimeStamp) - .append(" / ") - .append(mapValue != null ? mapValue.timeStamp : "null") - .append("; Payload=") - .append(Utilities.toTruncatedString(storagePayload)); - }); - sb.append("\n------------------------------------------------------------\n"); - log.debug(sb.toString()); - log.info("Data set " + info + " operation: size=" + map.values().size()); + if (LoggerFactory.getLogger(Log.class).isInfoEnabled() || LoggerFactory.getLogger(Log.class).isDebugEnabled()) { + StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n"); + sb.append("Data set ").append(info).append(" operation"); + // We print the items sorted by hash with the payload class name and id + List> tempList = map.values().stream() + .map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getStoragePayload()).bytes), e)) + .collect(Collectors.toList()); + tempList.sort((o1, o2) -> o1.first.compareTo(o2.first)); + tempList.stream().forEach(e -> { + final ProtectedStorageEntry storageEntry = e.second; + final StoragePayload storagePayload = storageEntry.getStoragePayload(); + final MapValue mapValue = sequenceNumberMap.get(getHashAsByteArray(storagePayload)); + sb.append("\n") + .append("Hash=") + .append(e.first) + .append("; Class=") + .append(storagePayload.getClass().getSimpleName()) + .append("; SequenceNumbers (Object/Stored)=") + .append(storageEntry.sequenceNumber) + .append(" / ") + .append(mapValue != null ? mapValue.sequenceNr : "null") + .append("; TimeStamp (Object/Stored)=") + .append(storageEntry.creationTimeStamp) + .append(" / ") + .append(mapValue != null ? mapValue.timeStamp : "null") + .append("; Payload=") + .append(Utilities.toTruncatedString(storagePayload)); + }); + sb.append("\n------------------------------------------------------------\n"); + log.debug(sb.toString()); + log.info("Data set " + info + " operation: size=" + map.values().size()); + } }