Skip to content

Commit

Permalink
[SpecsUtils] commiting MultiConsumerTest class
Browse files Browse the repository at this point in the history
(this file was uncommited on my machine for a long time; i think it was
just a test for something related to the BTF)
  • Loading branch information
nmcp88 committed Sep 30, 2024
1 parent e4b830a commit c81222f
Showing 1 changed file with 102 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package pt.up.fe.specs.util.threadstream;

import java.util.ArrayList;
import java.util.Random;

import org.junit.Test;

import pt.up.fe.specs.util.collections.concurrentchannel.ChannelConsumer;

public class MultiConsumerTest {

/*
* Test producer
*/
private class StringProducer implements ObjectProducer<String> {

private int producecount;

public StringProducer(int producecount) {
this.producecount = producecount;
}

@Override
public String getPoison() {
return "kill";
}

public String getString() {

if (this.producecount-- == 0)
return null;

int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();

return random.ints(leftLimit, rightLimit + 1)
.limit(targetStringLength)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}

@Override
public void close() throws Exception {
// TODO Auto-generated method stub
}
}

/*
* Test custom stream name
*/
private class StringStream extends GenericObjectStream<String> {

public StringStream(StringProducer producer, ChannelConsumer<String> consumer) {
super(consumer, producer.getPoison());
}
}

/*
* Test consumer
*/
private class StringConsumer {

public StringConsumer() {
// TODO Auto-generated constructor stub
}

public Integer consumeString(StringStream istream) {

Integer hashcode = 0;
String str = null;
while ((str = istream.next()) != null) {
hashcode += str.hashCode();
}
return hashcode;
}

}

@Test
public void test() {

// host for threads
var sb = new StringProducer(100000);
var streamengine = new ProducerEngine<String, StringProducer>(sb, op -> op.getString(),
cc -> new StringStream(sb, cc));

// new consumer thread via lambda
var threadlist = new ArrayList<ConsumerThread<String, ?>>();
for (int i = 0; i < 20; i++)
threadlist.add(streamengine.subscribe(os -> (new StringConsumer()).consumeString((StringStream) os)));

// launch all threads (blocking)
streamengine.launch();

// consume
for (int i = 0; i < 20; i++) {
System.out.println(threadlist.get(i).getConsumeResult());
}
}
}

0 comments on commit c81222f

Please sign in to comment.