38
38
import com .tangosol .io .pof .PofWriter ;
39
39
import com .tangosol .io .pof .PortableObject ;
40
40
import com .tangosol .net .CacheFactory ;
41
- import com .tangosol .net .Cluster ;
42
41
import com .tangosol .net .Coherence ;
43
- import com .tangosol .net .NamedCache ;
44
42
import com .tangosol .net .Session ;
45
- import com .tangosol .net .TopicService ;
46
43
import com .tangosol .net .topic .NamedTopic ;
47
44
import com .tangosol .net .topic .Publisher ;
48
45
import com .tangosol .net .topic .Subscriber ;
49
46
import com .tangosol .util .ExternalizableHelper ;
50
47
import org .junit .After ;
51
48
import org .junit .Before ;
52
49
import org .junit .ClassRule ;
53
- import org .junit .Ignore ;
54
50
import org .junit .Rule ;
55
51
import org .junit .Test ;
56
52
import org .junit .rules .TestName ;
@@ -357,9 +353,11 @@ optComplete, withIdentifyingName(sName)))
357
353
// shutdown the storage members
358
354
restartCluster ();
359
355
360
- IsServiceRunning isRunning = new IsServiceRunning (sServiceName );
356
+ IsServiceRunning isRunning = new IsServiceRunning (sServiceName );
357
+ IsCoherenceRunning isCohRunning = new IsCoherenceRunning ();
361
358
for (CoherenceClusterMember m : s_storageCluster )
362
359
{
360
+ Eventually .assertDeferred (() -> m .invoke (isCohRunning ), is (true ));
363
361
Eventually .assertDeferred (() -> m .invoke (isRunning ), is (true ));
364
362
}
365
363
Logger .info (">>>> Restarted service " + sServiceName + " on all members" );
@@ -388,7 +386,7 @@ optComplete, withIdentifyingName(sName)))
388
386
System .err .println (mapPublished .get (message ) + " " + mapReceived .get (message ));
389
387
}
390
388
}
391
- assertThat (count , is (cPublished .get ()));
389
+ assertThat (count , greaterThanOrEqualTo (cPublished .get ()));
392
390
}
393
391
}
394
392
@@ -398,8 +396,6 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
398
396
{
399
397
NamedTopic <Message > topic = ensureTopic ("test-three" );
400
398
String sGroup = "group-one" ;
401
- TopicService service = topic .getTopicService ();
402
- Cluster cluster = service .getCluster ();
403
399
String sServiceName = s_storageCluster .getAny ().invoke (new GetTopicServiceName (topic .getName ()));
404
400
405
401
// create a subscriber group so that published messages are not lost before the subscriber subscribes
@@ -433,13 +429,11 @@ public void shouldRecoverWaitingSubscriberAfterCleanStorageRestart() throws Exce
433
429
// shutdown the storage members
434
430
restartCluster ();
435
431
436
- // we should eventually have three cluster members
437
- Eventually .assertDeferred (() -> cluster .getMemberSet ().size (), is (3 ));
438
- Logger .info (">>>> Restarted storage." );
439
-
440
- IsServiceRunning isRunning = new IsServiceRunning (sServiceName );
432
+ IsServiceRunning isRunning = new IsServiceRunning (sServiceName );
433
+ IsCoherenceRunning isCohRunning = new IsCoherenceRunning ();
441
434
for (CoherenceClusterMember m : s_storageCluster )
442
435
{
436
+ Eventually .assertDeferred (() -> m .invoke (isCohRunning ), is (true ));
443
437
Eventually .assertDeferred (() -> m .invoke (isRunning ), is (true ));
444
438
}
445
439
Logger .info (">>>> Restarted service " + sServiceName + " on all members" );
0 commit comments