Skip to content
This repository has been archived by the owner on Oct 26, 2022. It is now read-only.

Commit

Permalink
Trying to fix tricky empty SM subscription problem
Browse files Browse the repository at this point in the history
  • Loading branch information
Ilya Baldin committed Jan 15, 2016
1 parent f8d5b75 commit d975749
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 48 deletions.
19 changes: 19 additions & 0 deletions src/main/java/org/renci/pubsub_daemon/Globals.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
Expand Down Expand Up @@ -308,4 +309,22 @@ public void createInternalConverter() {
public IMultiFormatConverter getInternalConverter() {
return internalConverter;
}


// list nodes of interest
public static List<String> getSMNodeList(Set<String> smsOfInterest) {
List<String> ret = new ArrayList<String>();

List<String> pubNodes = Globals.getInstance().getXMPP().listAllNodes();

for (String n: pubNodes) {
//logger.info("Found node " + n);
for (String sm: smsOfInterest) {
if (n.matches(ManifestSubscriber.ORCA_SM_PREFIX + sm + ManifestSubscriber.ORCA_SM_SLICE_LIST_SUFFIX))
ret.add(n);
}
}

return ret;
}
}
64 changes: 26 additions & 38 deletions src/main/java/org/renci/pubsub_daemon/ManifestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
*/
public class ManifestSubscriber implements IPubSubReconnectCallback {
public static final String buildVersion = "Blowhole " + ManifestSubscriber.class.getPackage().getImplementationVersion();
private static final String ORCA_SM_SLICE_LIST_SUFFIX = ".+sliceList";
private static final String ORCA_SM_PREFIX = "/orca/sm/";
public static final String ORCA_SM_SLICE_LIST_SUFFIX = ".+sliceList";
public static final String ORCA_SM_PREFIX = "/orca/sm/";

private static final String PUBSUB_SUBSCRIBER_RESOURCE = "GMOC-Subscriber";
public static final String PREF_FILE = ".xmpp.properties";
Expand All @@ -62,6 +62,8 @@ public class ManifestSubscriber implements IPubSubReconnectCallback {
private static final String PUBSUB_KEYSTORE_PASS = PUBSUB_PROP_PREFIX + ".keystorepass";
// Note truststore password would be read from the GMOC.pubsub.password property when using certificates

Set<String> smsOfInterest = null;

static XPath xp;

protected Properties prefProperties = null;
Expand Down Expand Up @@ -143,10 +145,28 @@ private ManifestSubscriber() {

Globals.getInstance().setXMPP(xmpp);

// get the list of nodes that list manifests
List<String> smNodes = getSMNodeList();
// determine which SMs we will be subscribing to
String smPropVal = prefProperties.getProperty(PUBSUB_SMS_PROP);

if ((smPropVal == null) || (smPropVal.length() == 0)){
// include everything
logger.error(PUBSUB_SMS_PROP + " not specifiedor empty, unable to subscribe");
System.exit(1);
}

smsOfInterest = new HashSet<String>();

for(String s: smPropVal.split(",")) {
smsOfInterest.add(s.trim());
}

// get the list of nodes that list manifests now (possible that SM
// has listed nothing so far)
List<String> smNodes = Globals.getSMNodeList(smsOfInterest);

// collect successfully subscribed ones
List<String> missingNodes = new ArrayList<String>();

Globals.info("Subscribing to known SM manifest lists");
for (String smListNode: smNodes) {
logger.info(" " + smListNode);
Expand All @@ -161,7 +181,7 @@ private ManifestSubscriber() {
}
rst = new ResubscribeThread(Globals.getInstance().getSliceListener(), Globals.getInstance().getManifestListener());

rst.updateSliceList(missingNodes);
rst.updateSliceList(smsOfInterest, missingNodes);
addShutDownHandler();
sem.release();

Expand Down Expand Up @@ -327,38 +347,6 @@ protected XMPPPubSub prepareXMPPForAcctCreation() {

return xps;
}

// list nodes of interest
private List<String> getSMNodeList() {
List<String> ret = new ArrayList<String>();

String smPropVal = prefProperties.getProperty(PUBSUB_SMS_PROP);

if (smPropVal == null) {
// include everything
Globals.warn(PUBSUB_SMS_PROP + " not specified, will subscribe to ALL SMs found on this server.");
smPropVal = "";
} else if (smPropVal.length() == 0)
Globals.warn(PUBSUB_SMS_PROP + " is empty, will subscribe to ALL SMs found on this server.");

Set<String> smsOfInterest = new HashSet<String>();

for(String s: smPropVal.split(",")) {
smsOfInterest.add(s.trim());
}

List<String> pubNodes = Globals.getInstance().getXMPP().listAllNodes();

for (String n: pubNodes) {
//logger.info("Found node " + n);
for (String sm: smsOfInterest) {
if (n.matches(ORCA_SM_PREFIX + sm + ORCA_SM_SLICE_LIST_SUFFIX))
ret.add(n);
}
}

return ret;
}

private void unsubscribeAll(List<String> save) {
Globals.info("Unsubscribing from all slice lists");
Expand Down Expand Up @@ -413,7 +401,7 @@ public void onReconnect() {
Globals.getInstance().getSliceListener().unsubscribeAll(manifestNodes);

// tell resubscription thread
rst.updateSliceList(listNodes);
rst.updateSliceList(smsOfInterest, listNodes);
}

public static void main(String[] args) {
Expand Down
30 changes: 20 additions & 10 deletions src/main/java/org/renci/pubsub_daemon/ResubscribeThread.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
public class ResubscribeThread extends TimerTask {
private SliceListEventListener sll;
private ManifestEventListener ml;
private Set<String> smsOfInterest = new HashSet<String>();
private Set<String> remainingSliceLists = new HashSet<String>();
private Set<String> remainingManifests = new HashSet<String>();

Expand All @@ -28,9 +29,20 @@ public ResubscribeThread(SliceListEventListener _sll, ManifestEventListener _ml)
public void run() {
// go through the sets and try to resubscribe

// for now only slice lists
Globals.info("Getting updated list of available SM slice lists");
List<String> newSmNodes = Globals.getSMNodeList(smsOfInterest);
remainingSliceLists.addAll(newSmNodes);

// filter out known subscriptions
Set<SubscriptionPair> subs = Globals.getInstance().getSubscriptions();
for(SubscriptionPair sub: subs) {
remainingSliceLists.remove(sub.node);
}

// try to subscribe
List<String> success = new ArrayList<String>();
Globals.info("Trying to (re)subscribe to slice lists: ");

for(String smListNode:remainingSliceLists) {
Globals.info(" " + smListNode);
SubscriptionPair sp = new SubscriptionPair(smListNode,
Expand All @@ -42,20 +54,18 @@ public void run() {
} else
Globals.info(" UNABLE, will try again later!");
}
for (String s: success) {
remainingSliceLists.remove(s);
}
remainingSliceLists.removeAll(success);
}

public synchronized void updateSliceList(final List<String> lst) {
if (lst == null) {
public synchronized void updateSliceList(final Set<String> sms, final List<String> missing) {
Globals.info("Adding SMs of interest: " + sms);
smsOfInterest.addAll(sms);
if (missing == null) {
remainingSliceLists.clear();
return;
}
for(String n: lst) {
Globals.info("Adding slice list " + n + " for later attempts");
remainingSliceLists.add(n);
}
Globals.info("Adding slice list for later attempts: " + missing);
remainingSliceLists.addAll(missing);
}

public synchronized void updateManifestList(final List<String> lst) {
Expand Down

0 comments on commit d975749

Please sign in to comment.