Skip to content

Commit 7bc30df

Browse files
committed
create subclass Readable to FragmentedSubIO to allow future implementations of Writable and ReadWrite
1 parent 1bc6718 commit 7bc30df

File tree

5 files changed

+129
-73
lines changed

5 files changed

+129
-73
lines changed

README.md

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11

22
# lecousin.net - Java core framework
33

4-
The core library provides:
4+
The core library provides mainly:
55
* A Multi-Threading framework, allowing asynchronous programming
6-
* A new IO (Input/Output) model, flexible, and providing asynchronous operations
6+
* A new IO (Input/Output) model, much more flexible, and supporting asynchronous operations
77

88
It does not have any dependency, however the library [net.lecousin.framework.system](https://github.com/lecousin/java-framework-system "java-framework-system")
99
is recommended for better performances on disk operations (detection of physical drives).
@@ -20,33 +20,47 @@ Branch 0.9: ![build status](https://travis-ci.org/lecousin/java-framework-core.s
2020

2121
## Multi-threading
2222

23-
The multi-threading system of this library is based on _physical_ resources for better performance:
23+
The multi-threading system is based on _physical_ resources for better performance:
2424
* One thread by available processor (CPU)
2525
* One thread by physical drive
2626

2727
Each unit of work is a _Task_, that may succeed with a result, fail with an exception, or be cancelled.
2828
A task must use only one physical resource, so a process implying both CPU work and some operations on a drive
2929
must be split into several tasks.
3030

31+
Because the multi-threading system allocates a single thread by CPU, if long running tasks are running, other
32+
tasks may wait for those tasks to finish if all CPUs are used. While this may be acceptable on a single application
33+
environment, it is recommended to split such long running tasks into smaller tasks.
34+
3135
A task should not, but is allowed to block. In this case the blocked thread is interrupted and a new thread
3236
is automatically launched to process other tasks for the same physical resource. Once the task is unblocked,
33-
the thread is resumed as soon as another thread is available and can be stopped.
34-
35-
By default, the order tasks are executed is based on tasks' priority,
36-
then for the same priority in a first-in-first-out order.
37-
This may be changed by providing a new implementation of TaskPriorityManager.
37+
the thread is resumed as soon as another thread is available and can be stopped. For this, synchronized
38+
sections should be avoided as much as possible (or be very short), instead a _synchronization point_ should
39+
be used.
3840

3941
Different kinds of _synchronization point_ are available in the package net.lecousin.framework.concurrent.synch,
4042
such as JoinPoint, SynchronizationPoint, AsyncWork... They allow to wait for one or more asynchronous operations
4143
to finish (successfully or not), by listening to them.
4244

45+
By default, the order tasks are executed is based on tasks' priority,
46+
then for the same priority in a first-in-first-out order.
47+
This may be changed by providing a new implementation of TaskPriorityManager.
48+
4349
## IO Model
4450

4551
The model provided by Java is very basic and mainly based on streams (reading or writing forward).
4652

47-
Our model add two main additions:
48-
* Flexibility by using interfaces that define the capabilities of an Input/Output implementation such as Readable, Writable, Seekable, Resizable, Buffered...
49-
* Asynchronous operations allowing multi-threading
53+
Our model adds much more flexibility, by using interfaces that define the capabilities of an Input/Output
54+
implementation such as Readable, Writable, Seekable, Resizable, Buffered...
55+
By using those interfaces we can know which operations can be performed on an IO, but allow also a method
56+
to specify what are the minimum expected capabilities.
57+
58+
For example a method that needs an IO on which it can write data, it can seek (move forward and backward),
59+
and it can resize the IO can be defined as follow:
60+
61+
public <T extends IO.Writable.Seekable & IO.Resizable> myMethod(T io) { ... }
62+
63+
In addition, the model add asynchronous operations (non-blocking).
5064

5165
## Startup
5266

@@ -60,13 +74,19 @@ as multiple applications may share the same environment, including the same mult
6074

6175
## Logging
6276

63-
A logging system is also provided, in a similar way as other logging frameworks.
64-
TODO
77+
A logging system is also provided, in a similar way as other logging frameworks (using loggers and appenders).
6578

66-
## Memory management
79+
The reason to provide again another logging system is to have a logging system capable to use our
80+
multi-threading system and asynchronous IO operations.
81+
82+
Each time something is logged, this is done by using asynchronous operations and tasks such as the code
83+
logging information is not blocked to avoid reducing performance because of logging.
6784

68-
TODO
85+
## Memory management
6986

70-
## Locale
87+
It often happens that data is kept in memory to improve performance, typically a cache. Such implementations
88+
can declare themselves to the MemoryManager. The MemoryManager is monitoring memory usage, so when available
89+
memory becomes low, it will ask the implementations to free some memory.
7190

72-
TODO
91+
In addition, when an application is idle (doing almost nothing) since several minutes, the MemoryManager may
92+
decide to ask to free some memory to reduce the memory footprint of the application when it is idle.

net.lecousin.core/pom.xml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,9 @@
135135
<goals>
136136
<goal>jar</goal>
137137
</goals>
138+
<configuration>
139+
<additionalparam>-Xdoclint:none</additionalparam>
140+
</configuration>
138141
</execution>
139142
</executions>
140143
</plugin>

net.lecousin.core/src/main/java/net/lecousin/framework/io/FragmentedSubIO.java

Lines changed: 87 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@
1515

1616
/**
1717
* A fragmented sub-IO allows to specify a list of fragments inside a seekable IO, and does like those fragments are a contiguous IO.
18-
* TODO split it with Readable, Writable and ReadWrite
1918
* TODO improve perf by storing the fragment containing the current position ?
2019
*/
21-
public class FragmentedSubIO extends IO.AbstractIO implements IO.Readable.Seekable, IO.KnownSize {
20+
public abstract class FragmentedSubIO extends IO.AbstractIO implements IO.KnownSize, IO.Seekable {
2221

2322
/** Constructor. */
24-
public FragmentedSubIO(IO.Readable.Seekable io, List<RangeLong> fragments, boolean closeParentIOOnClose, String description) {
23+
public FragmentedSubIO(IO.Seekable io, List<RangeLong> fragments, boolean closeParentIOOnClose, String description) {
2524
this.io = io;
2625
this.fragments = fragments;
2726
this.closeParentIOOnClose = closeParentIOOnClose;
@@ -30,22 +29,85 @@ public FragmentedSubIO(IO.Readable.Seekable io, List<RangeLong> fragments, boole
3029
for (RangeLong r : fragments) size += r.max - r.min + 1;
3130
}
3231

33-
private IO.Readable.Seekable io;
34-
private List<RangeLong> fragments;
35-
private long pos = 0;
36-
private long size;
37-
private boolean closeParentIOOnClose;
38-
private String description;
32+
protected IO.Seekable io;
33+
protected List<RangeLong> fragments;
34+
protected long pos = 0;
35+
protected long size;
36+
protected boolean closeParentIOOnClose;
37+
protected String description;
3938

4039
@Override
4140
protected ISynchronizationPoint<IOException> closeIO() {
4241
if (closeParentIOOnClose) return io.closeAsync();
4342
return new SynchronizationPoint<>(true);
4443
}
45-
46-
@Override
47-
public ISynchronizationPoint<IOException> canStartReading() {
48-
return io.canStartReading();
44+
45+
/** Readable fragmented IO. */
46+
public static class Readable extends FragmentedSubIO implements IO.Readable.Seekable {
47+
48+
/** Constructor. */
49+
public Readable(IO.Readable.Seekable io, List<RangeLong> fragments, boolean closeParentIOOnClose, String description) {
50+
super(io, fragments, closeParentIOOnClose, description);
51+
}
52+
53+
@Override
54+
public ISynchronizationPoint<IOException> canStartReading() {
55+
return ((IO.Readable.Seekable)io).canStartReading();
56+
}
57+
58+
@Override
59+
public AsyncWork<Integer,IOException> readAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
60+
return super.readAsync(pos, buffer, ondone);
61+
}
62+
63+
@Override
64+
public AsyncWork<Integer, IOException> readAsync(
65+
long pos, ByteBuffer buffer, RunnableWithParameter<Pair<Integer, IOException>> ondone
66+
) {
67+
return super.readAsync(pos, buffer, ondone);
68+
}
69+
70+
@Override
71+
public int readSync(ByteBuffer buffer) throws IOException {
72+
return super.readSync(pos, buffer);
73+
}
74+
75+
@Override
76+
public int readSync(long pos, ByteBuffer buffer) throws IOException {
77+
return super.readSync(pos, buffer);
78+
}
79+
80+
@Override
81+
public int readFullySync(long pos, ByteBuffer buffer) throws IOException {
82+
return super.readFullySync(pos, buffer);
83+
}
84+
85+
@Override
86+
public int readFullySync(ByteBuffer buffer) throws IOException {
87+
return super.readFullySync(pos, buffer);
88+
}
89+
90+
@Override
91+
public AsyncWork<Integer,IOException> readFullyAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
92+
return readFullyAsync(pos, buffer, ondone);
93+
}
94+
95+
@Override
96+
public AsyncWork<Integer,IOException> readFullyAsync(
97+
long pos, ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone
98+
) {
99+
return IOUtil.readFullyAsynch(this, pos, buffer, ondone);
100+
}
101+
102+
@Override
103+
public long skipSync(long n) {
104+
return super.skipSync(n);
105+
}
106+
107+
@Override
108+
public AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long, IOException>> ondone) {
109+
return super.skipAsync(n, ondone);
110+
}
49111
}
50112

51113
@Override
@@ -90,13 +152,7 @@ public AsyncWork<Long, IOException> getSizeAsync() {
90152
return sp;
91153
}
92154

93-
@Override
94-
public AsyncWork<Integer,IOException> readAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
95-
return readAsync(pos, buffer, ondone);
96-
}
97-
98-
@Override
99-
public AsyncWork<Integer,IOException> readAsync(long pos, ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
155+
protected AsyncWork<Integer,IOException> readAsync(long pos, ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
100156
Iterator<RangeLong> it = fragments.iterator();
101157
long p = 0;
102158
while (it.hasNext()) {
@@ -111,7 +167,8 @@ public AsyncWork<Integer,IOException> readAsync(long pos, ByteBuffer buffer, Run
111167
if (start + len > s) {
112168
int prevLimit = buffer.limit();
113169
buffer.limit((int)(prevLimit - ((start + len) - s)));
114-
return io.readAsync(r.min + start, buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
170+
return ((IO.Readable.Seekable)io).readAsync(r.min + start, buffer,
171+
new RunnableWithParameter<Pair<Integer,IOException>>() {
115172
@Override
116173
public void run(Pair<Integer, IOException> param) {
117174
buffer.limit(prevLimit);
@@ -121,7 +178,7 @@ public void run(Pair<Integer, IOException> param) {
121178
}
122179
});
123180
}
124-
return io.readAsync(r.min + start, buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
181+
return ((IO.Readable.Seekable)io).readAsync(r.min + start, buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
125182
@Override
126183
public void run(Pair<Integer, IOException> param) {
127184
if (param.getValue1() != null)
@@ -136,13 +193,7 @@ public void run(Pair<Integer, IOException> param) {
136193
return sp;
137194
}
138195

139-
@Override
140-
public int readSync(ByteBuffer buffer) throws IOException {
141-
return readSync(pos, buffer);
142-
}
143-
144-
@Override
145-
public int readSync(long pos, ByteBuffer buffer) throws IOException {
196+
protected int readSync(long pos, ByteBuffer buffer) throws IOException {
146197
Iterator<RangeLong> it = fragments.iterator();
147198
long p = 0;
148199
while (it.hasNext()) {
@@ -157,24 +208,18 @@ public int readSync(long pos, ByteBuffer buffer) throws IOException {
157208
if (start + len > s) {
158209
int prevLimit = buffer.limit();
159210
buffer.limit((int)(prevLimit - ((start + len) - s)));
160-
len = io.readSync(r.min + start, buffer);
211+
len = ((IO.Readable.Seekable)io).readSync(r.min + start, buffer);
161212
buffer.limit(prevLimit);
162213
} else {
163-
len = io.readSync(r.min + start, buffer);
214+
len = ((IO.Readable.Seekable)io).readSync(r.min + start, buffer);
164215
}
165216
this.pos = pos + len;
166217
return len;
167218
}
168219
return 0;
169220
}
170221

171-
@Override
172-
public int readFullySync(ByteBuffer buffer) throws IOException {
173-
return readFullySync(pos, buffer);
174-
}
175-
176-
@Override
177-
public int readFullySync(long pos, ByteBuffer buffer) throws IOException {
222+
protected int readFullySync(long pos, ByteBuffer buffer) throws IOException {
178223
Iterator<RangeLong> it = fragments.iterator();
179224
long p = 0;
180225
int done = 0;
@@ -190,10 +235,10 @@ public int readFullySync(long pos, ByteBuffer buffer) throws IOException {
190235
if (start + len > s) {
191236
int prevLimit = buffer.limit();
192237
buffer.limit((int)(prevLimit - ((start + len) - s)));
193-
len = io.readFullySync(r.min + start, buffer);
238+
len = ((IO.Readable.Seekable)io).readFullySync(r.min + start, buffer);
194239
buffer.limit(prevLimit);
195240
} else {
196-
len = io.readFullySync(r.min + start, buffer);
241+
len = ((IO.Readable.Seekable)io).readFullySync(r.min + start, buffer);
197242
}
198243
this.pos = pos + len;
199244
done += len;
@@ -205,16 +250,6 @@ public int readFullySync(long pos, ByteBuffer buffer) throws IOException {
205250
return done;
206251
}
207252

208-
@Override
209-
public AsyncWork<Integer,IOException> readFullyAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
210-
return readFullyAsync(pos, buffer, ondone);
211-
}
212-
213-
@Override
214-
public AsyncWork<Integer,IOException> readFullyAsync(long pos, ByteBuffer buffer, RunnableWithParameter<Pair<Integer,IOException>> ondone) {
215-
return IOUtil.readFullyAsynch(this, pos, buffer, ondone);
216-
}
217-
218253
@Override
219254
public long seekSync(SeekType type, long move) {
220255
switch (type) {
@@ -234,8 +269,7 @@ public long seekSync(SeekType type, long move) {
234269
return pos;
235270
}
236271

237-
@Override
238-
public long skipSync(long n) {
272+
protected long skipSync(long n) {
239273
long size = getSizeSync();
240274
// skip checkstyle: VariableDeclarationUsageDistance
241275
long prevPos = pos;
@@ -254,8 +288,7 @@ public AsyncWork<Long,IOException> seekAsync(SeekType type, long move, RunnableW
254288
return sp;
255289
}
256290

257-
@Override
258-
public AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long, IOException>> ondone) {
291+
protected AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long, IOException>> ondone) {
259292
AsyncWork<Long,IOException> sp = new AsyncWork<>();
260293
long skipped = skipSync(n);
261294
if (ondone != null) ondone.run(new Pair<>(Long.valueOf(skipped), null));

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/io/impl/TestFragmentedSubIOReadable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public TestFragmentedSubIOReadable(FragmentedFile f) {
3232

3333
@Override
3434
protected IO.Readable createReadableFromFile(FileIO.ReadOnly file, long fileSize) {
35-
return new FragmentedSubIO(file, f.fragments, true, "fragmented IO");
35+
return new FragmentedSubIO.Readable(file, f.fragments, true, "fragmented IO");
3636
}
3737

3838
}

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/io/impl/TestFragmentedSubIOReadableSeekable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public TestFragmentedSubIOReadableSeekable(FragmentedFile f) {
3131

3232
@Override
3333
protected IO.Readable.Seekable createReadableSeekableFromFile(ReadOnly file, long fileSize) throws Exception {
34-
return new FragmentedSubIO(file, f.fragments, true, "fragmented IO");
34+
return new FragmentedSubIO.Readable(file, f.fragments, true, "fragmented IO");
3535
}
3636

3737
}

0 commit comments

Comments
 (0)