Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
hmottestad committed Feb 12, 2025
1 parent 1f61f15 commit f8e6813
Show file tree
Hide file tree
Showing 3 changed files with 336 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -112,30 +110,14 @@ public T get() {
}
}

private final ArrayBlockingQueue<QueueObject<T>>[] queue = new ArrayBlockingQueue[] {
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024),
new ArrayBlockingQueue<>(16 * 1024), new ArrayBlockingQueue<>(16 * 1024), };
private final ArrayBlockingQueue<QueueObject<T>> queue = new ArrayBlockingQueue<>(16);

private T next;
private boolean end;
private PipedIteratorException exception;

private Thread thread;

AtomicInteger indexHasNext = new AtomicInteger(0);

volatile ArrayBlockingQueue<QueueObject<T>> focusQueue;

@Override
public boolean hasNext() {
if (end) {
Expand All @@ -147,13 +129,7 @@ public boolean hasNext() {

QueueObject<T> obj;
try {
obj = useFocusQueue();

if (obj == null) {

obj = useThreadBasedQueue();
}

obj = queue.take();
} catch (InterruptedException e) {
throw new PipedIteratorException("Can't read pipe", e);
}
Expand All @@ -169,51 +145,6 @@ public boolean hasNext() {
return true;
}

private QueueObject<T> useThreadBasedQueue() throws InterruptedException {
QueueObject<T> obj;
int i = Thread.currentThread().hashCode();
obj = queue[i % queue.length].poll();
if (obj == null) {
obj = iterateThroughAllQueues(obj);
} else if (focusQueue == null) {
focusQueue = queue[i % queue.length];
}
return obj;
}

private QueueObject<T> iterateThroughAllQueues(QueueObject<T> obj) throws InterruptedException {
while (obj == null) {
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
obj = queueObjects.poll();
if (obj != null) {
if (focusQueue == null) {
focusQueue = queueObjects;
}
return obj;
}
}
Thread.sleep(10);
}
return obj;
}

private QueueObject<T> useFocusQueue() throws InterruptedException {
QueueObject<T> obj;
var focusQueue = this.focusQueue;
if (focusQueue != null) {
QueueObject<T> poll = focusQueue.poll();
if (poll != null) {
obj = poll;
} else {
obj = null;
this.focusQueue = null;
}
} else {
obj = null;
}
return obj;
}

@Override
public T next() {
if (!hasNext()) {
Expand All @@ -231,19 +162,15 @@ public void closePipe() {
public void closePipe(Throwable e) {
if (e != null) {
// clear the queue to force the exception
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
queueObjects.clear();
}
queue.clear();
if (e instanceof PipedIteratorException) {
this.exception = (PipedIteratorException) e;
} else {
this.exception = new PipedIteratorException("closing exception", e);
}
}
try {
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
queueObjects.put(new EndQueueObject());
}
queue.put(new EndQueueObject());
} catch (InterruptedException ee) {
throw new PipedIteratorException("Can't close pipe", ee);
}
Expand Down Expand Up @@ -271,25 +198,9 @@ public <E> Iterator<E> mapWithId(MapIterator.MapWithIdFunction<T, E> mappingFunc
return new MapIterator<>(this, mappingFunction);
}

AtomicInteger index = new AtomicInteger(0);

public void addElement(T node) {
int i = Thread.currentThread().hashCode();
int l = i % queue.length;
try {
boolean success = queue[l].offer(new ElementQueueObject(node));
if (!success) {
focusQueue = queue[l];
while (!success) {
for (ArrayBlockingQueue<QueueObject<T>> queueObjects : queue) {
success = queueObjects.offer(new ElementQueueObject(node), 1, TimeUnit.MILLISECONDS);
if (success) {
break;
}
}
}
}

queue.put(new ElementQueueObject(node));
} catch (InterruptedException ee) {
throw new PipedIteratorException("Can't add element to pipe", ee);
}
Expand Down
Loading

0 comments on commit f8e6813

Please sign in to comment.