Skip to content

Commit

Permalink
Merge pull request #1 from soundvibe/v0.0.2
Browse files Browse the repository at this point in the history
Simplified management of mmaped buffers.
  • Loading branch information
soundvibe authored Aug 27, 2020
2 parents b220b6e + 3d3ac99 commit 88d87b7
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 226 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ Lasher is available on Maven Central, hence just add the following dependency:
<dependency>
<groupId>net.soundvibe</groupId>
<artifactId>lasher</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
</dependency>
```

Scala SBT
```scala
libraryDependencies += "net.soundvibe" % "lasher" % "0.0.1"
libraryDependencies += "net.soundvibe" % "lasher" % "0.0.2"
```

Contributions
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>net.soundvibe</groupId>
<artifactId>lasher</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
<packaging>jar</packaging>
<name>lasher</name>
<description>Embeddable persistent key-value store</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ protected void rehash() {
int stripeToRehash;
while (true) {
stripeToRehash = rehashIndex.getAndIncrement();
if (stripeToRehash == 0) {
index.doubleGrow();
}
//If it's in the valid table range, we conceptually acquired a valid ticket
if (stripeToRehash < STRIPES) break;
//Otherwise we're in the middle of a reset - spin until it has completed.
Expand All @@ -135,6 +132,9 @@ protected void rehash() {
}
//We now have a valid ticket - we rehash all the indexes in the given stripe
synchronized (locks[stripeToRehash]) {
if (stripeToRehash == 0) {
index.doubleGrow();
}
for (long idx = stripeToRehash; idx < tableLength; idx += STRIPES) {
rehashIdx(idx);
}
Expand Down Expand Up @@ -219,7 +219,6 @@ public void close() {
writeHeader();
index.close();
data.close();
System.gc();
}

public void delete() {
Expand Down
58 changes: 6 additions & 52 deletions src/main/java/net/soundvibe/lasher/mmap/MemoryMapped.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@

public abstract class MemoryMapped implements Closeable {

private static final int CHUNK_SIZE = 256 * 1024 * 1024; //256 MB
private static final int CHUNK_SIZE = 128 * 1024 * 1024; //128 MB
private final FileType fileType;
protected MappedByteBuffer[] buffers;
protected long size;
private final Path baseDir;
private final long defaultLength;

private static final Class<?> UNSAFE_CLASS = resolveUnsafeClass();
private static final Unsafe UNSAFE = resolveUnsafe();
Expand All @@ -29,10 +28,10 @@ protected MemoryMapped(final Path baseDir, FileType fileType, long defaultLength
Objects.requireNonNull(baseDir, "baseDir is null");
this.fileType = fileType;
this.baseDir = baseDir;
this.defaultLength = roundTo4096(defaultLength);
final long length = Math.max(CHUNK_SIZE, roundTo4096(defaultLength));

var fileStats = readFileStats(baseDir, fileType, this.defaultLength);
this.size = Math.max(this.defaultLength, fileStats.totalSize);
var fileStats = readFileStats(baseDir, fileType, length);
this.size = Math.max(length, fileStats.totalSize);
this.buffers = fileStats.buffers;
if (fileStats.buffers.length == 0) {
mapAndResize(this.size);
Expand Down Expand Up @@ -69,9 +68,7 @@ private FileStats readFileStats(final Path baseDir, FileType fileType, long defa
var totalBuffers = new MappedByteBuffer[totalBuffersSize];
int index = 0;
for (long i = 0; i < fileSize; i+=CHUNK_SIZE) {
var remaining = fileSize - i;
var bufferSize = remaining < CHUNK_SIZE ? remaining : CHUNK_SIZE;
totalBuffers[index] = fc.map(FileChannel.MapMode.READ_WRITE, i, bufferSize);
totalBuffers[index] = fc.map(FileChannel.MapMode.READ_WRITE, i, CHUNK_SIZE);
totalBuffers[index].order(BYTE_ORDER);
index++;
}
Expand Down Expand Up @@ -167,10 +164,6 @@ private static long roundTo4096(long i) {
private void mapAndResize(long newSize) {
var bufferIndex = findBufferIndex(newSize);
expandBuffers(bufferIndex, newSize);
resizeOlderBuffersIfNeeded(bufferIndex);

long sizeToSet = bufferIndex == 0 ? newSize : resolveSize(newSize, bufferIndex);
buffers[bufferIndex] = mapBuffer(bufferIndex, (int)sizeToSet, newSize);
}

private MappedByteBuffer mapBuffer(int bufferIndex, int bufferSize, long fileSize) {
Expand Down Expand Up @@ -201,45 +194,6 @@ private long resolveBufferPos(int bufferIndex) {
return result;
}

private void resizeOlderBuffersIfNeeded(int bufferIndex) {
if (bufferIndex > 0) {
for (int i = 0; i < bufferIndex; i++) {
if (buffers[i] == null || buffers[i].capacity() < CHUNK_SIZE) {
remapTo(i, CHUNK_SIZE);
}
}
}
}

private void remapTo(int bufferIndex, long newSize) {
unmap(buffers[bufferIndex]);
try {
try (var f = new RandomAccessFile(baseDir.resolve(fileType.filename).toFile(), "rw");
var fc = f.getChannel()) {
var pos = resolveBufferPos(bufferIndex);
buffers[bufferIndex] = fc.map(FileChannel.MapMode.READ_WRITE, pos, newSize);
buffers[bufferIndex].order(BYTE_ORDER);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private long resolveSize(long size, int bufferIndex) {
long newSize = size;
for (int i = 0; i < bufferIndex - 1; i++) {
if (buffers[i] != null) {
newSize -= buffers[i].capacity();
}
}

if (newSize < defaultLength) {
return defaultLength;
}

return Math.min(CHUNK_SIZE, newSize);
}

protected int convertPos(long absolutePos, int bufferIndex) {
long startPos = (long) CHUNK_SIZE * (long) bufferIndex;
int bufferPos = (int) (absolutePos - startPos);
Expand All @@ -265,7 +219,7 @@ protected void expandBuffers(int newPartition, long newSize) {
if (newPartition + 1 > buffers.length) {
int oldLength = buffers.length;
buffers = Arrays.copyOf(buffers, newPartition + 1);
for (int i = oldLength; i < buffers.length - 1; i++) {
for (int i = oldLength; i < buffers.length; i++) {
unmap(buffers[i]);
buffers[i] = mapBuffer(i, CHUNK_SIZE, newSize);
}
Expand Down
140 changes: 0 additions & 140 deletions src/main/java/net/soundvibe/lasher/util/Murmur3.java

This file was deleted.

31 changes: 4 additions & 27 deletions src/test/java/net/soundvibe/lasher/map/core/LasherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,34 +64,11 @@ void should_do_basic_operations(@TempDir Path tmpPath) {

@Test
void should_rehash(@TempDir Path tmpPath) {
long fileSize = (long) Math.pow(2, 10L);
long lastIdx = 0L;
try (var sut = new Lasher(tmpPath, fileSize, fileSize)) {
assertEquals(0L, sut.rehashIndex.get());

for (long i = 0; i < 800; i++) {
lastIdx = i;
sut.put(BytesSupport.longToBytes(i), BytesSupport.longToBytes(i + 1));
}

assertNotEquals(0L, sut.rehashIndex.get());

for (long i = 0; i < 800; i++) {
assertArrayEquals(BytesSupport.longToBytes(i + 1),sut.get(BytesSupport.longToBytes(i)));
}
} catch (Exception e) {
System.out.println("Last index: " + lastIdx);
throw e;
}
}

@Test
void should_rehash_overlapping(@TempDir Path tmpPath) {
long fileSize = (long) Math.pow(2, 8L);
try (var sut = new Lasher(tmpPath, fileSize, fileSize)) {
assertEquals(0L, sut.rehashIndex.get());
long count = 100_000;
var bytes = new byte[19011];
long count = 15_000_000;
var bytes = new byte[32];
Arrays.fill(bytes, (byte)1);

for (long i = 0; i < count; i++) {
Expand All @@ -110,8 +87,8 @@ void should_rehash_overlapping(@TempDir Path tmpPath) {
@Test
void should_read_from_store(@TempDir Path tmpPath) {
long fileSize = (long) Math.pow(2, 8L);
long count = 100_000;
var bytes = new byte[19011];
long count = 15_000_000;
var bytes = new byte[24];
Arrays.fill(bytes, (byte)1);
try (var sut = new Lasher(tmpPath, fileSize, fileSize)) {
assertEquals(0L, sut.rehashIndex.get());
Expand Down

0 comments on commit 88d87b7

Please sign in to comment.