diff --git a/src/main/java/org/renci/pubsub_daemon/Globals.java b/src/main/java/org/renci/pubsub_daemon/Globals.java index 95281c2..64bed78 100644 --- a/src/main/java/org/renci/pubsub_daemon/Globals.java +++ b/src/main/java/org/renci/pubsub_daemon/Globals.java @@ -8,6 +8,7 @@ import java.io.FileWriter; import java.io.IOException; import java.io.Reader; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.HashSet; @@ -308,4 +309,22 @@ public void createInternalConverter() { public IMultiFormatConverter getInternalConverter() { return internalConverter; } + + + // list nodes of interest + public static List getSMNodeList(Set smsOfInterest) { + List ret = new ArrayList(); + + List pubNodes = Globals.getInstance().getXMPP().listAllNodes(); + + for (String n: pubNodes) { + //logger.info("Found node " + n); + for (String sm: smsOfInterest) { + if (n.matches(ManifestSubscriber.ORCA_SM_PREFIX + sm + ManifestSubscriber.ORCA_SM_SLICE_LIST_SUFFIX)) + ret.add(n); + } + } + + return ret; + } } diff --git a/src/main/java/org/renci/pubsub_daemon/ManifestSubscriber.java b/src/main/java/org/renci/pubsub_daemon/ManifestSubscriber.java index 37c76ab..6295e88 100644 --- a/src/main/java/org/renci/pubsub_daemon/ManifestSubscriber.java +++ b/src/main/java/org/renci/pubsub_daemon/ManifestSubscriber.java @@ -38,8 +38,8 @@ */ public class ManifestSubscriber implements IPubSubReconnectCallback { public static final String buildVersion = "Blowhole " + ManifestSubscriber.class.getPackage().getImplementationVersion(); - private static final String ORCA_SM_SLICE_LIST_SUFFIX = ".+sliceList"; - private static final String ORCA_SM_PREFIX = "/orca/sm/"; + public static final String ORCA_SM_SLICE_LIST_SUFFIX = ".+sliceList"; + public static final String ORCA_SM_PREFIX = "/orca/sm/"; private static final String PUBSUB_SUBSCRIBER_RESOURCE = "GMOC-Subscriber"; public static final String PREF_FILE = ".xmpp.properties"; @@ -62,6 +62,8 @@ public class ManifestSubscriber implements IPubSubReconnectCallback { private static final String PUBSUB_KEYSTORE_PASS = PUBSUB_PROP_PREFIX + ".keystorepass"; // Note truststore password would be read from the GMOC.pubsub.password property when using certificates + Set smsOfInterest = null; + static XPath xp; protected Properties prefProperties = null; @@ -143,10 +145,28 @@ private ManifestSubscriber() { Globals.getInstance().setXMPP(xmpp); - // get the list of nodes that list manifests - List smNodes = getSMNodeList(); + // determine which SMs we will be subscribing to + String smPropVal = prefProperties.getProperty(PUBSUB_SMS_PROP); + + if ((smPropVal == null) || (smPropVal.length() == 0)){ + // include everything + logger.error(PUBSUB_SMS_PROP + " not specifiedor empty, unable to subscribe"); + System.exit(1); + } + + smsOfInterest = new HashSet(); + + for(String s: smPropVal.split(",")) { + smsOfInterest.add(s.trim()); + } + + // get the list of nodes that list manifests now (possible that SM + // has listed nothing so far) + List smNodes = Globals.getSMNodeList(smsOfInterest); + // collect successfully subscribed ones List missingNodes = new ArrayList(); + Globals.info("Subscribing to known SM manifest lists"); for (String smListNode: smNodes) { logger.info(" " + smListNode); @@ -161,7 +181,7 @@ private ManifestSubscriber() { } rst = new ResubscribeThread(Globals.getInstance().getSliceListener(), Globals.getInstance().getManifestListener()); - rst.updateSliceList(missingNodes); + rst.updateSliceList(smsOfInterest, missingNodes); addShutDownHandler(); sem.release(); @@ -327,38 +347,6 @@ protected XMPPPubSub prepareXMPPForAcctCreation() { return xps; } - - // list nodes of interest - private List getSMNodeList() { - List ret = new ArrayList(); - - String smPropVal = prefProperties.getProperty(PUBSUB_SMS_PROP); - - if (smPropVal == null) { - // include everything - Globals.warn(PUBSUB_SMS_PROP + " not specified, will subscribe to ALL SMs found on this server."); - smPropVal = ""; - } else if (smPropVal.length() == 0) - Globals.warn(PUBSUB_SMS_PROP + " is empty, will subscribe to ALL SMs found on this server."); - - Set smsOfInterest = new HashSet(); - - for(String s: smPropVal.split(",")) { - smsOfInterest.add(s.trim()); - } - - List pubNodes = Globals.getInstance().getXMPP().listAllNodes(); - - for (String n: pubNodes) { - //logger.info("Found node " + n); - for (String sm: smsOfInterest) { - if (n.matches(ORCA_SM_PREFIX + sm + ORCA_SM_SLICE_LIST_SUFFIX)) - ret.add(n); - } - } - - return ret; - } private void unsubscribeAll(List save) { Globals.info("Unsubscribing from all slice lists"); @@ -413,7 +401,7 @@ public void onReconnect() { Globals.getInstance().getSliceListener().unsubscribeAll(manifestNodes); // tell resubscription thread - rst.updateSliceList(listNodes); + rst.updateSliceList(smsOfInterest, listNodes); } public static void main(String[] args) { diff --git a/src/main/java/org/renci/pubsub_daemon/ResubscribeThread.java b/src/main/java/org/renci/pubsub_daemon/ResubscribeThread.java index 16e9881..9b285c2 100644 --- a/src/main/java/org/renci/pubsub_daemon/ResubscribeThread.java +++ b/src/main/java/org/renci/pubsub_daemon/ResubscribeThread.java @@ -16,6 +16,7 @@ public class ResubscribeThread extends TimerTask { private SliceListEventListener sll; private ManifestEventListener ml; + private Set smsOfInterest = new HashSet(); private Set remainingSliceLists = new HashSet(); private Set remainingManifests = new HashSet(); @@ -28,9 +29,20 @@ public ResubscribeThread(SliceListEventListener _sll, ManifestEventListener _ml) public void run() { // go through the sets and try to resubscribe - // for now only slice lists + Globals.info("Getting updated list of available SM slice lists"); + List newSmNodes = Globals.getSMNodeList(smsOfInterest); + remainingSliceLists.addAll(newSmNodes); + + // filter out known subscriptions + Set subs = Globals.getInstance().getSubscriptions(); + for(SubscriptionPair sub: subs) { + remainingSliceLists.remove(sub.node); + } + + // try to subscribe List success = new ArrayList(); Globals.info("Trying to (re)subscribe to slice lists: "); + for(String smListNode:remainingSliceLists) { Globals.info(" " + smListNode); SubscriptionPair sp = new SubscriptionPair(smListNode, @@ -42,20 +54,18 @@ public void run() { } else Globals.info(" UNABLE, will try again later!"); } - for (String s: success) { - remainingSliceLists.remove(s); - } + remainingSliceLists.removeAll(success); } - public synchronized void updateSliceList(final List lst) { - if (lst == null) { + public synchronized void updateSliceList(final Set sms, final List missing) { + Globals.info("Adding SMs of interest: " + sms); + smsOfInterest.addAll(sms); + if (missing == null) { remainingSliceLists.clear(); return; } - for(String n: lst) { - Globals.info("Adding slice list " + n + " for later attempts"); - remainingSliceLists.add(n); - } + Globals.info("Adding slice list for later attempts: " + missing); + remainingSliceLists.addAll(missing); } public synchronized void updateManifestList(final List lst) {