diff --git a/prj/test/functional/topics/src/main/java/topics/AbstractTopicsStorageRecoveryTests.java b/prj/test/functional/topics/src/main/java/topics/AbstractTopicsStorageRecoveryTests.java index 0b5a16f6cb7e..1b3dbffeb522 100644 --- a/prj/test/functional/topics/src/main/java/topics/AbstractTopicsStorageRecoveryTests.java +++ b/prj/test/functional/topics/src/main/java/topics/AbstractTopicsStorageRecoveryTests.java @@ -38,11 +38,8 @@ import com.tangosol.io.pof.PofWriter; import com.tangosol.io.pof.PortableObject; import com.tangosol.net.CacheFactory; -import com.tangosol.net.Cluster; import com.tangosol.net.Coherence; -import com.tangosol.net.NamedCache; import com.tangosol.net.Session; -import com.tangosol.net.TopicService; import com.tangosol.net.topic.NamedTopic; import com.tangosol.net.topic.Publisher; import com.tangosol.net.topic.Subscriber; @@ -50,7 +47,6 @@ import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -357,9 +353,11 @@ optComplete, withIdentifyingName(sName))) // shutdown the storage members restartCluster(); - IsServiceRunning isRunning = new IsServiceRunning(sServiceName); + IsServiceRunning isRunning = new IsServiceRunning(sServiceName); + IsCoherenceRunning isCohRunning = new IsCoherenceRunning(); for (CoherenceClusterMember m : s_storageCluster) { + Eventually.assertDeferred(() -> m.invoke(isCohRunning), is(true)); Eventually.assertDeferred(() -> m.invoke(isRunning), is(true)); } Logger.info(">>>> Restarted service " + sServiceName + " on all members"); @@ -388,7 +386,7 @@ optComplete, withIdentifyingName(sName))) System.err.println(mapPublished.get(message) + " " + mapReceived.get(message)); } } - assertThat(count, is(cPublished.get())); + assertThat(count, greaterThanOrEqualTo(cPublished.get())); } } @@ -398,8 +396,6 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce { NamedTopic topic = ensureTopic("test-three"); String sGroup = "group-one"; - TopicService service = topic.getTopicService(); - Cluster cluster = service.getCluster(); String sServiceName = s_storageCluster.getAny().invoke(new GetTopicServiceName(topic.getName())); // create a subscriber group so that published messages are not lost before the subscriber subscribes @@ -433,13 +429,11 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce // shutdown the storage members restartCluster(); - // we should eventually have three cluster members - Eventually.assertDeferred(() -> cluster.getMemberSet().size(), is(3)); - Logger.info(">>>> Restarted storage."); - - IsServiceRunning isRunning = new IsServiceRunning(sServiceName); + IsServiceRunning isRunning = new IsServiceRunning(sServiceName); + IsCoherenceRunning isCohRunning = new IsCoherenceRunning(); for (CoherenceClusterMember m : s_storageCluster) { + Eventually.assertDeferred(() -> m.invoke(isCohRunning), is(true)); Eventually.assertDeferred(() -> m.invoke(isRunning), is(true)); } Logger.info(">>>> Restarted service " + sServiceName + " on all members");