Skip to content

Commit

Permalink
Don't call broadcast at initial data received, increase storage delay
Browse files Browse the repository at this point in the history
  • Loading branch information
ManfredKarrer committed Jul 27, 2016
1 parent 241835a commit 6dd9eab
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ public void onMessage(Message message, Connection connection) {

final NodeAddress sender = connection.getPeersNodeAddressOptional().get();
((GetDataResponse) message).dataSet.stream().forEach(protectedStorageEntry -> {
dataStorage.add(protectedStorageEntry, sender, null, false);
// We dont broadcast here as we are only connected to the seed node and would be pointless
dataStorage.add(protectedStorageEntry, sender, null, false, false);
});

cleanup();
listener.onComplete();
} else {
Expand Down
82 changes: 46 additions & 36 deletions network/src/main/java/io/bitsquare/p2p/storage/P2PDataStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,13 @@ public void onError(Throwable throwable) {

public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean isDataOwner) {
Log.traceCall();
Log.traceCall("with allowBroadcast=true");
return add(protectedStorageEntry, sender, listener, isDataOwner, true);
}

public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean isDataOwner, boolean allowBroadcast) {
Log.traceCall("with allowBroadcast=" + allowBroadcast);

ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload());
boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload);
Expand All @@ -227,9 +233,11 @@ && checkSignature(protectedStorageEntry)

if (hasSequenceNrIncreased) {
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
// We set the delay higher as we might receive a batch of items
storage.queueUpForSave(sequenceNumberMap, 2000);

broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner);
if (allowBroadcast)
broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner);
} else {
log.trace("We got that version of the data already, so we don't broadcast it.");
}
Expand Down Expand Up @@ -270,7 +278,7 @@ public boolean refreshTTL(RefreshTTLMessage refreshTTLMessage, @Nullable NodeAdd
storedData.updateSignature(signature);
printData("after refreshTTL");
sequenceNumberMap.put(hashOfPayload, new MapValue(sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
storage.queueUpForSave(sequenceNumberMap, 1000);

broadcast(refreshTTLMessage, sender, null, isDataOwner);
}
Expand Down Expand Up @@ -299,7 +307,7 @@ && checkSignature(protectedStorageEntry)
doRemoveProtectedExpirableData(protectedStorageEntry, hashOfPayload);
printData("after remove");
sequenceNumberMap.put(hashOfPayload, new MapValue(protectedStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
storage.queueUpForSave(sequenceNumberMap, 300);

broadcast(new RemoveDataMessage(protectedStorageEntry), sender, null, isDataOwner);
} else {
Expand All @@ -326,7 +334,7 @@ && checkSignature(protectedMailboxStorageEntry)
doRemoveProtectedExpirableData(protectedMailboxStorageEntry, hashOfData);
printData("after removeMailboxData");
sequenceNumberMap.put(hashOfData, new MapValue(protectedMailboxStorageEntry.sequenceNumber, System.currentTimeMillis()));
storage.queueUpForSave(sequenceNumberMap, 100);
storage.queueUpForSave(sequenceNumberMap, 300);

broadcast(new RemoveMailboxDataMessage(protectedMailboxStorageEntry), sender, null, isDataOwner);
} else {
Expand Down Expand Up @@ -551,36 +559,38 @@ private HashMap<ByteArray, MapValue> getPurgedSequenceNumberMap(HashMap<ByteArra
}

private void printData(String info) {
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set " + info + " operation");
// We print the items sorted by hash with the payload class name and id
List<Tuple2<String, ProtectedStorageEntry>> tempList = map.values().stream()
.map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getStoragePayload()).bytes), e))
.collect(Collectors.toList());
tempList.sort((o1, o2) -> o1.first.compareTo(o2.first));
tempList.stream().forEach(e -> {
final ProtectedStorageEntry storageEntry = e.second;
final StoragePayload storagePayload = storageEntry.getStoragePayload();
final MapValue mapValue = sequenceNumberMap.get(getHashAsByteArray(storagePayload));
sb.append("\n")
.append("Hash=")
.append(e.first)
.append("; Class=")
.append(storagePayload.getClass().getSimpleName())
.append("; SequenceNumbers (Object/Stored)=")
.append(storageEntry.sequenceNumber)
.append(" / ")
.append(mapValue != null ? mapValue.sequenceNr : "null")
.append("; TimeStamp (Object/Stored)=")
.append(storageEntry.creationTimeStamp)
.append(" / ")
.append(mapValue != null ? mapValue.timeStamp : "null")
.append("; Payload=")
.append(Utilities.toTruncatedString(storagePayload));
});
sb.append("\n------------------------------------------------------------\n");
log.debug(sb.toString());
log.info("Data set " + info + " operation: size=" + map.values().size());
if (LoggerFactory.getLogger(Log.class).isInfoEnabled() || LoggerFactory.getLogger(Log.class).isDebugEnabled()) {
StringBuilder sb = new StringBuilder("\n\n------------------------------------------------------------\n");
sb.append("Data set ").append(info).append(" operation");
// We print the items sorted by hash with the payload class name and id
List<Tuple2<String, ProtectedStorageEntry>> tempList = map.values().stream()
.map(e -> new Tuple2<>(org.bitcoinj.core.Utils.HEX.encode(getHashAsByteArray(e.getStoragePayload()).bytes), e))
.collect(Collectors.toList());
tempList.sort((o1, o2) -> o1.first.compareTo(o2.first));
tempList.stream().forEach(e -> {
final ProtectedStorageEntry storageEntry = e.second;
final StoragePayload storagePayload = storageEntry.getStoragePayload();
final MapValue mapValue = sequenceNumberMap.get(getHashAsByteArray(storagePayload));
sb.append("\n")
.append("Hash=")
.append(e.first)
.append("; Class=")
.append(storagePayload.getClass().getSimpleName())
.append("; SequenceNumbers (Object/Stored)=")
.append(storageEntry.sequenceNumber)
.append(" / ")
.append(mapValue != null ? mapValue.sequenceNr : "null")
.append("; TimeStamp (Object/Stored)=")
.append(storageEntry.creationTimeStamp)
.append(" / ")
.append(mapValue != null ? mapValue.timeStamp : "null")
.append("; Payload=")
.append(Utilities.toTruncatedString(storagePayload));
});
sb.append("\n------------------------------------------------------------\n");
log.debug(sb.toString());
log.info("Data set " + info + " operation: size=" + map.values().size());
}
}


Expand Down

0 comments on commit 6dd9eab

Please sign in to comment.