-
Notifications
You must be signed in to change notification settings - Fork 169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added SeqBuffer<T> class as a replacement for some of Seq.toList() calls #305
Conversation
I added a new implementation of |
That looks very interesting, thank you very much for your suggestion. I'll review this later (hopefully in the next 2 days). We'll definitely need a reusable |
} | ||
|
||
private final Spliterator<T> source; | ||
private final List<T> buffer = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this buffer is not thread safe. It may well be that one consumer of a buffered result consumes the buffer while another consumer produces new elements at the same time. E.g., this could happen in your splitAt()
implementation suggestion here: b345bbb
v2.map(t -> t.v1) | ||
)); | ||
SeqBuffer<T> buffer = SeqBuffer.of(stream); | ||
return tuple(buffer.seq().limit(position), buffer.seq().skip(position)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Much nicer, indeed. Only caveat: Thread safety in the current SeqBuffer
implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a side note: do you realize that the current implementation of Seq.splitAt
(as well as Seq.partition
) is not thread safe? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, splitAt()
doesn't look wrong to me, but partition may well be.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was under the impression that Seq
wasn't thread safe to begin with or was it just that it will not be a parallel stream?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's currently no guarantee in the API, as there are too many flaws, still. But in principle, the results from methods like splitAt()
(which produce several Seq) should be thread safe. Or at least, we should have an option for them to be thread safe.
Perhaps this is a concept that should be reviewed more globally.
Note that thread safety and parallelism aren't the same thing in this context. Stream's parallelism allows for parallel processing of operations like map()
or filter()
. By keeping operations independent of one another, parallelism can drastically speed up the processing of a stream. Thread safety in our context just means that consuming two things (e.g. the splitAt()
results) on different threads, we don't want to get wrong results.
But again, perhaps we should implement this more thoroughly, with a specific thread safety flag...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, Seq.splitAt
was directly based on Seq.partition
:) And Seq.partition
definitely isn't thread-safe because both Seq
s write to buffer1
and buffer2
without any synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@billoneil If I understand it right I think @lukaseder distinguishes between two things:
- single
Seq
is sequential so as a consequence it's not thread safe and two threads cannot operate on the sameSeq
- there seems to be no restriction to consuming one
Seq
on one thread and anotherSeq
on the other thread even if they both "derive" from the sameSeq
(hence need for thread safety in such cases)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I wasn't thinking, makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lukaseder, here's my shot at thread-safety of SeqBuffer. Let me know what you think of it.
if (estimateSize > MAX_ARRAY_SIZE) | ||
throw new IllegalArgumentException("Stream is too long to be buffered: " + estimateSize); | ||
|
||
return new ArrayList<>((int) estimateSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, no I don't agree with this. If we create a large capacitied array list, we're back to the original solution that you wanted to avoid: Up-front large memory consumption for intermediate buffers. Imagine:
Seq.seq(Collections.nCopies(1000000, "value")).splitAt(999999).v1.limit(1).forEach(System.out::println);
The splitAt()
operation would create a large array list only to consume its first element...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ha, you're right! I'll revert it.
PS. We wouldn't be entirely back to square one because I mainly wanted to avoid up-front processor consumption (i.e. consuming entire Seq
) but you're absolutely right that it's best to also avoid up-front memory consumption.
…stimateSize()" because it introduced unnecessary up-front memory consumption Reverted from commit f54482e
Thanks for the fix. Now, I wonder again if it would be possible to squash all commits into one for this PR (GitHub has such a feature). That would make the final review much simpler for me. |
Done. PS. I couldn't find such feature (available to the PR author) anywhere on GitHub so I squashed them manually. |
Oh, interesting. That was a wrong assumption then, sorry. I can indeed squash and merge, but not squash, review again, then merge... Will send a feature request to GitHub. Thanks very much! |
|
||
return tuple(seq(new Duplicate()), seq(new Duplicate())); | ||
SeqBuffer<T> buffer = SeqBuffer.of(stream); | ||
return tuple(buffer.seq(), buffer.seq()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I see. Much simpler, sure, but the flip side of this implementation is that if both duplicates are consumed at the same speed, we're wasting a lot of memory for a buffer that might no longer be needed. What are your thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're absolutely right. On the other hand, current implementation is thread safe thanks to SeqBuffer
, and the previous one wasn't.
All in all, I'd be more inclined to this simple implementation but I guess I'd leave an appropriate comment in this method in case someone reports any memory-related issues in the future.
v2.map(t -> t.v1) | ||
)); | ||
SeqBuffer<T> buffer = SeqBuffer.of(stream); | ||
return tuple(buffer.seq().limit(position), buffer.seq().skip(position)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, splitAt()
doesn't look wrong to me, but partition may well be.
Thanks again for this change. I'll review this later in detail, and will let you know. |
I propose a solution to the problems described in #195 for the following methods:
crossSelfJoin()
(crossJoin()
needs to be adapted by @lukaseder injOOQ-tools
)inner(Self)Join()
leftOuter(Self)Join()
The solution for nearly all the remaining methods will consist in applying
Seq.lazy()
to them for which I'll create a separate PR in order to demonstrate its usefulness, as discussed in #302.PS.
SeqBuffer
could be used to create a simpler implementation ofSeq.duplicate
.PPS.
SeqBuffer
could also be used to implement methodSeq.duplicate(int count)
. Such method would return aList<Seq<T>>
containingcount
Seq
s. However, the only application for such method that I can think of is implementing #49 which (to be honest) I don't find very useful either.