From b37ad8931f8c785151794081a50851c27a807048 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 25 Aug 2020 16:32:49 +0300 Subject: [PATCH] Initial commit --- .gitignore | 19 + .travis.yml | 11 + LICENSE | 202 +++++++++ README.md | 66 +++ pom.xml | 202 +++++++++ .../net/soundvibe/lasher/map/LasherMap.java | 159 +++++++ .../net/soundvibe/lasher/map/TimeLash.java | 158 +++++++ .../lasher/map/core/BaseLinearHashMap.java | 246 +++++++++++ .../net/soundvibe/lasher/map/core/Lasher.java | 416 ++++++++++++++++++ .../soundvibe/lasher/map/model/FileType.java | 12 + .../lasher/map/model/RecordNode.java | 69 +++ .../lasher/map/model/UnsafeAccess.java | 8 + .../net/soundvibe/lasher/mmap/DataNode.java | 197 +++++++++ .../net/soundvibe/lasher/mmap/IndexNode.java | 50 +++ .../soundvibe/lasher/mmap/MemoryMapped.java | 274 ++++++++++++ .../soundvibe/lasher/serde/BytesSerde.java | 13 + .../soundvibe/lasher/serde/IntegerSerde.java | 17 + .../net/soundvibe/lasher/serde/LongSerde.java | 18 + .../net/soundvibe/lasher/serde/Serde.java | 9 + .../net/soundvibe/lasher/serde/Serdes.java | 11 + .../soundvibe/lasher/serde/StringSerde.java | 17 + .../net/soundvibe/lasher/serde/UUIDSerde.java | 24 + .../soundvibe/lasher/util/BytesSupport.java | 64 +++ .../soundvibe/lasher/util/FileSupport.java | 33 ++ .../java/net/soundvibe/lasher/util/Hash.java | 124 ++++++ .../net/soundvibe/lasher/util/Murmur3.java | 140 ++++++ .../soundvibe/lasher/map/LasherMapTest.java | 94 ++++ .../soundvibe/lasher/map/TimeLashTest.java | 83 ++++ .../soundvibe/lasher/map/core/LasherTest.java | 245 +++++++++++ .../performance/LasherPerformanceTest.java | 304 +++++++++++++ .../lasher/serde/BytesSerdeTest.java | 32 ++ .../lasher/serde/IntegerSerdeTest.java | 31 ++ .../soundvibe/lasher/serde/LongSerdeTest.java | 31 ++ .../lasher/serde/StringSerdeTest.java | 35 ++ .../soundvibe/lasher/serde/UUIDSerdeTest.java | 30 ++ 35 files changed, 3444 insertions(+) create mode 100644 .gitignore create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 pom.xml create mode 100644 src/main/java/net/soundvibe/lasher/map/LasherMap.java create mode 100644 src/main/java/net/soundvibe/lasher/map/TimeLash.java create mode 100644 src/main/java/net/soundvibe/lasher/map/core/BaseLinearHashMap.java create mode 100644 src/main/java/net/soundvibe/lasher/map/core/Lasher.java create mode 100644 src/main/java/net/soundvibe/lasher/map/model/FileType.java create mode 100644 src/main/java/net/soundvibe/lasher/map/model/RecordNode.java create mode 100644 src/main/java/net/soundvibe/lasher/map/model/UnsafeAccess.java create mode 100644 src/main/java/net/soundvibe/lasher/mmap/DataNode.java create mode 100644 src/main/java/net/soundvibe/lasher/mmap/IndexNode.java create mode 100644 src/main/java/net/soundvibe/lasher/mmap/MemoryMapped.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/BytesSerde.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/IntegerSerde.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/LongSerde.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/Serde.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/Serdes.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/StringSerde.java create mode 100644 src/main/java/net/soundvibe/lasher/serde/UUIDSerde.java create mode 100644 src/main/java/net/soundvibe/lasher/util/BytesSupport.java create mode 100644 src/main/java/net/soundvibe/lasher/util/FileSupport.java create mode 100644 src/main/java/net/soundvibe/lasher/util/Hash.java create mode 100644 src/main/java/net/soundvibe/lasher/util/Murmur3.java create mode 100644 src/test/java/net/soundvibe/lasher/map/LasherMapTest.java create mode 100644 src/test/java/net/soundvibe/lasher/map/TimeLashTest.java create mode 100644 src/test/java/net/soundvibe/lasher/map/core/LasherTest.java create mode 100644 src/test/java/net/soundvibe/lasher/map/performance/LasherPerformanceTest.java create mode 100644 src/test/java/net/soundvibe/lasher/serde/BytesSerdeTest.java create mode 100644 src/test/java/net/soundvibe/lasher/serde/IntegerSerdeTest.java create mode 100644 src/test/java/net/soundvibe/lasher/serde/LongSerdeTest.java create mode 100644 src/test/java/net/soundvibe/lasher/serde/StringSerdeTest.java create mode 100644 src/test/java/net/soundvibe/lasher/serde/UUIDSerdeTest.java diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9aad68c --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +.project +.settings +.metadata +.classpath +.gradle +.idea +*.iml +*.ipr +*.iws +/build +*/build +out/ +*/bin/ +*/test-output/ +/ligradle +.DS_Store +/doc +/target +target/ diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..060f073 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,11 @@ +language: java + +sudo: false +dist: trusty + +matrix: + include: + - jdk: openjdk14 + +after_success: + - bash <(curl -s https://codecov.io/bash) diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..cb0004f --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + +Copyright [2020] [Linas Naginionis] + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..79f6512 --- /dev/null +++ b/README.md @@ -0,0 +1,66 @@ +Lasher +========== +[![Maven Central](https://maven-badges.herokuapp.com/maven-central/net.soundvibe/lasher/badge.svg)](https://maven-badges.herokuapp.com/maven-central/net.soundvibe/lasher) +[![Build Status](https://travis-ci.org/soundvibe/lasher.svg)](https://travis-ci.org/soundvibe/lasher) +[![codecov](https://codecov.io/gh/soundvibe/lasher/branch/master/graph/badge.svg)](https://codecov.io/gh/soundvibe/lasher) + +Lasher is an embeddable key-value store written in Java. + +What is Lasher? +------------------- +Lasher is very lightweight (has no dependencies) embeddable persistent key-value store with very fast performance. +Lasher incrementally rehashes stripes when needed, avoiding full table rehash which is typical for standard hash tables. +This helps to sustain high performance even when the number of elements in the table is very large. + +It is possible to store millions of elements in Lasher and use very little memory because all the data is persisted into memory mapped files. + +Lasher could be used instead of any regular in-memory hashmap without any performance decrease. +It is even faster than `ConcurrentHasMap` in our benchmarks and uses much less memory. + +Lasher stores consist of 2 binary files - index and data. + +LasherMap +------------------- +LasherMap implements `ConcurrentMap` for easier interoperability with java maps. + +TimeLash +------------------- +TimeLash is a time-series map backed by Lasher where data is partitioned by time intervals. +It supports very efficient data retention strategies. +To put or get values from the map, timestamp should be additionally provided, e.g.: + +```java +try (var timeLash = new TimeLash<>( + dir, //db directory + Duration.ofHours(6), //data retention duration + Serdes.STRING, //key serde + Serdes.STRING)) // value serde { + timeLash.put("foo", "bar", Instant.now()); + var bar = timeLash.get("foo", Instant.now()); +} +``` + +Artifacts + +Lasher is available on Maven Central, hence just add the following dependency: +``` + + net.soundvibe + lasher + 0.0.1 + +``` +Scala SBT +``` +libraryDependencies += "net.soundvibe" % "lasher" % "0.0.1" +``` + +Contributions +----------- + +Any helpful feedback is more than welcome. This includes feature requests, bug reports, pull requests, constructive feedback, etc. + +Copyright & License +------------------- + +Lasher © 2020 Linas Naginionis. Licensed under the terms of the Apache License, Version 2.0. \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..f22c28a --- /dev/null +++ b/pom.xml @@ -0,0 +1,202 @@ + + + 4.0.0 + + net.soundvibe + lasher + 0.0.1 + jar + lasher + Embeddable persistent key-value store + https://github.com/soundvibe/lasher + + + The Apache Software License, Version 2.0 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + + + + + scm:git:https://github.com/soundvibe/lasher.git + scm:git:https://github.com/soundvibe/lasher.git + https://github.com/soundvibe/lasher.git + HEAD + + + + linasnaginionis + Linas Naginionis + lnaginionis@gmail.com + + + + + 14 + UTF-8 + + + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots + + + ossrh + https://oss.sonatype.org/service/local/staging/deploy/maven2/ + + + + + + + + + org.junit.jupiter + junit-jupiter + 5.6.2 + test + + + + + + + + maven-surefire-plugin + 2.22.2 + + + **/Test*.java + **/*Test.java + **/*Tests.java + **/*TestCase.java + + + **/performance/** + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + ${java.version} + ${java.version} + + + + + maven-source-plugin + 3.2.1 + + + attach-sources + verify + + jar + + + + + + + maven-release-plugin + 2.5.3 + + deploy + false + true + + + + + org.jacoco + jacoco-maven-plugin + 0.8.5 + + + + prepare-agent + + + + report + test + + report + + + + + + + org.sonatype.plugins + nexus-staging-maven-plugin + 1.6.8 + true + + ossrh + https://oss.sonatype.org/ + true + + + + + + + + + release + + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.0.1 + + + attach-javadocs + + jar + + + false + -Xdoclint:none + + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.6 + + + sign-artifacts + verify + + sign + + + + + + + + + + + diff --git a/src/main/java/net/soundvibe/lasher/map/LasherMap.java b/src/main/java/net/soundvibe/lasher/map/LasherMap.java new file mode 100644 index 0000000..0c405b2 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/LasherMap.java @@ -0,0 +1,159 @@ +package net.soundvibe.lasher.map; + +import net.soundvibe.lasher.map.core.Lasher; +import net.soundvibe.lasher.serde.Serde; + +import java.util.*; +import java.util.concurrent.ConcurrentMap; + +public class LasherMap extends AbstractMap implements ConcurrentMap, AutoCloseable { + + private final Lasher map; + private final Serde keySerde; + private final Serde valSerde; + + public LasherMap(Lasher map, Serde keySerde, Serde valSerde) { + this.map = map; + this.keySerde = keySerde; + this.valSerde = valSerde; + } + + @Override + public void close() { + map.close(); + } + + public void delete() { + map.delete(); + } + + @Override + public int size() { + return (int) map.size(); + } + + public long sizeLong() { + return map.size(); + } + + @Override + public boolean containsKey(Object key) { + return get(key) != null; + } + + @Override + public V get(Object key) { + var kBytes = keySerde.toBytes( (K)key); + var out = map.get(kBytes); + if (out == null) return null; + return valSerde.fromBytes(out); + } + + @Override + public V put(K key, V value) { + var old = map.put(keySerde.toBytes(key), valSerde.toBytes(value)); + if (old == null) return null; + return valSerde.fromBytes(old); + } + + @Override + public V remove(Object key) { + var kBytes = keySerde.toBytes((K)key); + var old = map.remove(kBytes); + if (old == null) return null; + return valSerde.fromBytes(old); + } + + @Override + public void clear() { + map.clear(); + } + + @Override + public Set> entrySet() { + return new EntrySet(); + } + + @Override + public V putIfAbsent(K key, V value) { + var old = map.putIfAbsent(keySerde.toBytes(key), valSerde.toBytes(value)); + if (old == null) return null; + return valSerde.fromBytes(old); + } + + @Override + public boolean remove(Object key, Object value) { + var kBytes = keySerde.toBytes((K)key); + var vBytes = valSerde.toBytes((V)value); + return map.remove(kBytes, vBytes); + } + + @Override + public boolean replace(K key, V oldValue, V newValue) { + var kBytes = keySerde.toBytes(key); + var oldValBytes = valSerde.toBytes(oldValue); + var newValBytes = valSerde.toBytes(newValue); + return map.replace(kBytes, oldValBytes, newValBytes); + } + + @Override + public V replace(K key, V value) { + var kBytes = keySerde.toBytes(key); + var vBytes = valSerde.toBytes(value); + var out = map.replace(kBytes, vBytes); + if (out == null) return null; + return valSerde.fromBytes(out); + } + + protected class EntrySet extends AbstractSet> { + @Override + public int size() { + return LasherMap.this.size(); + } + + @Override + public Iterator> iterator() { + return new Iterator<>() { + final Iterator> backingIt = map.iterator(); + + @Override + public boolean hasNext() { + return backingIt.hasNext(); + } + + @Override + public java.util.Map.Entry next() { + final Map.Entry e = backingIt.next(); + var k = keySerde.fromBytes(e.getKey()); + var v = valSerde.fromBytes(e.getValue()); + return new AbstractMap.SimpleImmutableEntry<>(k, v); + } + + @Override + public void remove() { + backingIt.remove(); + } + }; + } + + @Override + public boolean contains(Object o) { + if (!(o instanceof Map.Entry)) return false; + var e = (Map.Entry) o; + final V v = get(e.getKey()); + return v == null ? e.getValue() == null : v.equals(e.getValue()); + } + + @Override + public void clear() { + LasherMap.this.clear(); + } + + @Override + public boolean remove(Object o) { + if (!(o instanceof Map.Entry)) return false; + var e = (Map.Entry) o; + return LasherMap.this.remove(e.getKey(), e.getValue()); + } + } +} diff --git a/src/main/java/net/soundvibe/lasher/map/TimeLash.java b/src/main/java/net/soundvibe/lasher/map/TimeLash.java new file mode 100644 index 0000000..47f2d92 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/TimeLash.java @@ -0,0 +1,158 @@ +package net.soundvibe.lasher.map; + +import net.soundvibe.lasher.map.core.Lasher; +import net.soundvibe.lasher.serde.Serde; + +import java.nio.file.Path; +import java.time.*; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Stream; + +import static java.util.function.Predicate.not; +import static java.util.stream.Collectors.toList; + +public class TimeLash implements AutoCloseable, Iterable> { + + private final Path baseDir; + private final Serde keySerde; + private final Serde valSerde; + private final long retentionSecs; + private final AtomicLong watermark = new AtomicLong(); + private final Map> buckets; + private final long bucketSizeSeconds; + + private static final Duration DEFAULT_BUCKET_WINDOW = Duration.ofHours(1); + + public TimeLash(Path baseDir, Duration retention, Duration bucketWindow, Serde keySerde, Serde valSerde) { + this.baseDir = baseDir; + this.keySerde = keySerde; + this.valSerde = valSerde; + this.retentionSecs = retention.toSeconds(); + this.buckets = new ConcurrentHashMap<>(Math.max(10, (int)retention.toHours())); + this.bucketSizeSeconds = bucketWindow.toSeconds(); + } + + public TimeLash(Path baseDir, Duration retention, Serde keySerde, Serde valSerde) { + this(baseDir, retention, DEFAULT_BUCKET_WINDOW, keySerde, valSerde); + } + + public V get(K key, Instant timestamp) { + return get(key, timestamp.getEpochSecond()); + } + + public V get(K key, long timestamp) { + long idx = idxFromTimestamp(timestamp); + var map = buckets.get(idx); + if (map != null) { + return map.get(key); + } + return null; + } + + public V put(K key, V value, Instant timestamp) { + return put(key, value, timestamp.getEpochSecond()); + } + + /** + * Puts key value to the map based on given timestamp + * @param key key with which the specified value is to be associated + * @param value value to be associated with the specified key + * @param timestamp Number of seconds from the Java epoch of 1970-01-01T00:00:00Z. + * @return the previous value associated with key, or null if there was no mapping for key. + */ + public V put(K key, V value, long timestamp) { + var maxTime = watermark.updateAndGet(prev -> Math.max(prev, timestamp)); + long idx = idxFromTimestamp(timestamp); + var map = buckets.compute(idx, (k, oldMap) -> { + if (oldMap != null) return oldMap; + if (bucketInRange(idx, maxTime)) { + return createNewMap(idx); + } + return null; + }); + if (map == null) { + return null; + } + + if (map.isEmpty()) { + //check for expired entries + var expiredEntries = buckets.entrySet().stream() + .filter(not(e -> bucketInRange(e.getKey(), maxTime))) + .collect(toList()); + + expiredEntries.forEach(e -> { + deleteExpired(e.getValue()); + buckets.remove(e.getKey()); + }); + } + + return map.put(key, value); + } + + public V remove(K key, Instant timestamp) { + return remove(key, timestamp.getEpochSecond()); + } + + public V remove(K key, long timestamp) { + long idx = idxFromTimestamp(timestamp); + var map = buckets.get(idx); + if (map != null) { + return map.remove(key); + } + return null; + } + + public long size() { + return buckets.values().stream() + .mapToLong(LasherMap::sizeLong) + .sum(); + } + + public boolean containsKey(K key, Instant timestamp) { + return containsKey(key, timestamp.getEpochSecond()); + } + + public boolean containsKey(K key, long timestamp) { + return get(key, timestamp) != null; + } + + public Stream streamKeys() { + return buckets.values().stream() + .flatMap(map -> map.keySet().stream()); + } + + public Stream> stream() { + return buckets.values().stream() + .flatMap(map -> map.entrySet().stream()); + } + + @Override + public void close() { + buckets.forEach((k, m) -> m.close()); + buckets.clear(); + } + + private boolean bucketInRange(long bucket, long maxWatermark) { + return (maxWatermark - bucket) <= retentionSecs; + } + + private LasherMap createNewMap(long bucket) { + var lasher = new Lasher(baseDir.resolve(Long.toString(bucket))); + return new LasherMap<>(lasher, keySerde, valSerde); + } + + private synchronized void deleteExpired(LasherMap map) { + map.delete(); + } + + protected long idxFromTimestamp(long timestamp) { + return timestamp - (timestamp % bucketSizeSeconds); + } + + @Override + public Iterator> iterator() { + return stream().iterator(); + } +} diff --git a/src/main/java/net/soundvibe/lasher/map/core/BaseLinearHashMap.java b/src/main/java/net/soundvibe/lasher/map/core/BaseLinearHashMap.java new file mode 100644 index 0000000..09deb57 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/core/BaseLinearHashMap.java @@ -0,0 +1,246 @@ +package net.soundvibe.lasher.map.core; + +import net.soundvibe.lasher.mmap.*; + +import java.nio.file.Path; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static net.soundvibe.lasher.util.FileSupport.deleteDirectory; + +public abstract class BaseLinearHashMap implements AutoCloseable { + + static final int STRIPES = (int) Math.pow(2,8); + static final double LOAD_FACTOR = 0.75; + + private final long defaultFileLength /*1L << 28*/; + + final IndexNode index; + final DataNode data; + final Path baseDir; + + final AtomicLong dataWritePos = new AtomicLong(0L); + + final ReentrantReadWriteLock dataLock = new ReentrantReadWriteLock(); + + final Lock[] locks = new Lock[STRIPES]; + + final AtomicLong size = new AtomicLong(0L); + + long tableLength; + + private static final long HEADER_SIZE = 28L; + + final AtomicInteger rehashIndex = new AtomicInteger(0); + + private static final class Lock {} + + public BaseLinearHashMap(Path baseDir, long indexFileLength, long dataFileLength) { + this.baseDir = baseDir; + this.defaultFileLength = nextPowerOf2(dataFileLength); + for (int i = 0; i < STRIPES; i++) locks[i] = new Lock(); + baseDir.toFile().mkdirs(); + index = new IndexNode(baseDir, nextPowerOf2(indexFileLength)); + data = new DataNode(baseDir, this.defaultFileLength); + readHeader(); + } + + protected abstract void readHeader(); + + public abstract byte[] get(byte[] key); + + protected long getHeaderSize() { + return HEADER_SIZE; + } + + protected void writeHeader() { + dataLock.writeLock().lock(); + try { + data.putLong(0L, size()); + data.putLong(8L, tableLength); + data.putLong(16L, dataWritePos.get()); + data.putInt(24L, rehashIndex.get()); + } finally { + dataLock.writeLock().unlock(); + } + } + + public long nextPowerOf2(long i) { + if (i < defaultFileLength) return (defaultFileLength); + if ((i & (i - 1)) == 0) return i; + return (1 << (64 - (Long.numberOfLeadingZeros(i)))); + } + + /** + * Returns the lock for the stripe for the given hash. Synchronize of this + * object before mutating the map. + */ + protected Object lockForHash(long hash) { + return locks[(int) (hash & (STRIPES - 1L))]; + } + + /** + * Returns the bucket index for the given hash. + * This doesn't lock - because it depends on tableLength, callers should + * establish some lock that precludes a full rehash (read or write lock on + * any of the locks). + */ + protected long idxForHash(long hash) { + return (hash & (STRIPES - 1L)) < rehashIndex.get() + ? hash & (tableLength + tableLength - 1L) + : hash & (tableLength - 1L); + } + + /** + * Recursively locks all stripes, and doubles the size of the primary mapper. + * On Linux your filesystem probably makes this expansion a sparse operation. + */ + protected void completeExpansion(int idx) { + if (idx == STRIPES) { + rehashIndex.set(0); + tableLength *= 2; + } else { + synchronized (locks[idx]) { + completeExpansion(idx + 1); + } + } + } + + /** + * Perform incremental rehashing to keep the load under the threshold. + */ + protected void rehash() { + while (load() > LOAD_FACTOR) { + //If we've completed all rehashing, we need to expand the table & reset + //the counters. + if (rehashIndex.compareAndSet(STRIPES, STRIPES + 1)) { + completeExpansion(0); + return; + } + + //Otherwise, we attempt to grab the next index to process + int stripeToRehash; + while (true) { + stripeToRehash = rehashIndex.getAndIncrement(); + if (stripeToRehash == 0) { + index.doubleGrow(); + } + //If it's in the valid table range, we conceptually acquired a valid ticket + if (stripeToRehash < STRIPES) break; + //Otherwise we're in the middle of a reset - spin until it has completed. + while (rehashIndex.get() >= STRIPES) { + Thread.yield(); + if (load() < LOAD_FACTOR) return; + } + } + //We now have a valid ticket - we rehash all the indexes in the given stripe + synchronized (locks[stripeToRehash]) { + for (long idx = stripeToRehash; idx < tableLength; idx += STRIPES) { + rehashIdx(idx); + } + } + } + } + + /** + * Allocates the given amount of space in secondary storage, and returns a + * pointer to it. Expands secondary storage if necessary. + */ + protected long allocateData(long size) { + dataLock.readLock().lock(); + try { + while (true) { + final long out = dataWritePos.get(); + final long newDataPos = out + size; + if (newDataPos >= data.size()) { + //Goes to reallocation section + break; + } else { + if (dataWritePos.compareAndSet(out, newDataPos)) { + return out; + } + } + } + } finally { + dataLock.readLock().unlock(); + } + + dataLock.writeLock().lock(); + try { + if (dataWritePos.get() + size >= data.size()) { + data.doubleGrow(); + } + } finally { + dataLock.writeLock().unlock(); + } + return allocateData(size); + } + + /** + * Because all records in a bucket hash to their position or position + tableLength, + * we can incrementally rehash one bucket at a time. + * This does not need to acquire a lock; the calling rehash() method handles it. + */ + protected abstract void rehashIdx(long idx); + + private void clear(int i) { + if (i == STRIPES) { + this.dataLock.writeLock().lock(); + try { + this.index.clear(); + this.dataWritePos.set(getHeaderSize()); + this.size.set(0); + this.rehashIndex.set(0); + } finally { + this.dataLock.writeLock().unlock(); + } + } else { + synchronized (locks[i]) { + clear(i + 1); + } + } + } + + /** + * Removes all entries from the map, zeroing the primary file and marking + * the current position in the secondary as immediately after the header. + * Data is not actually removed from the secondary, but it will be + * overwritten on subsequent writes. + */ + public void clear() { + clear(0); + } + + /** + * Writes all header metadata and unmaps the backing mmap'd files. + */ + @Override + public void close() { + writeHeader(); + index.close(); + data.close(); + System.gc(); + } + + public void delete() { + close(); + deleteDirectory(baseDir); + } + + public long size() { + return size.get(); + } + + /** + * "Fullness" of the table. Some implementations may wish to override this + * to account for multiple records per bucket. + */ + public double load() { + return size.doubleValue() / (tableLength + ((double) tableLength / (STRIPES)) * rehashIndex.get()); + } + + public boolean containsKey(byte[] k) { + return get(k) != null; + } + +} diff --git a/src/main/java/net/soundvibe/lasher/map/core/Lasher.java b/src/main/java/net/soundvibe/lasher/map/core/Lasher.java new file mode 100644 index 0000000..702bb87 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/core/Lasher.java @@ -0,0 +1,416 @@ +package net.soundvibe.lasher.map.core; + +import net.soundvibe.lasher.map.model.RecordNode; +import net.soundvibe.lasher.util.Hash; + +import java.nio.file.Path; +import java.util.*; + +import static java.util.Objects.requireNonNull; + +public class Lasher extends BaseLinearHashMap { + + private static final long DEFAULT_FILE_LENGTH = 1L << 28; + + private static final long MB_32 = (long) Math.pow(2, 25L); + private static final long MB_128 = (long) Math.pow(2, 27L); + + private static final int INDEX_REC_SIZE = Long.BYTES; + + public Lasher(Path baseDir) { + this(baseDir, MB_128, MB_32); + } + + public Lasher(Path baseDir, long indexFileLength) { + super(baseDir, indexFileLength, DEFAULT_FILE_LENGTH); + } + + public Lasher(Path baseDir, long indexFileLength, long dataFileLength) { + super(baseDir, indexFileLength, dataFileLength); + } + + @Override + public byte[] get(byte[] key) { + requireNonNull(key, "key cannot be null"); + final long hash = Hash.murmurHash(key); + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + final long adr = index.getDataAddress(pos); + if (adr == 0L) return null; + + var record = readDataRecord(adr); + while (true) { + if (record.keyEquals(key)) { + return record.val; + } else if (record.getNextRecordPos() != 0L) { + record = readDataRecord(record.getNextRecordPos()); + } else + return null; + } + } + } + + //This is the primary use case for a r/w lock + public byte[] putIfAbsent(byte[] key, byte[] value) { + requireNonNull(key, "key cannot be null"); + requireNonNull(value, "value cannot be null"); + var load = load(); + if (load > LOAD_FACTOR) rehash(); + final long hash = Hash.murmurHash(key); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + final long adr = index.getDataAddress(pos); + long nextPos = 0L; + if (adr == 0L) { + final long insertPos = allocateNewRecord(key, value); + data.writeRecord(key, value, insertPos, nextPos); + index.putDataAddress(pos, insertPos); + size.incrementAndGet(); + return null; + } + var bucket = readDataRecord(adr); + while (true) { + if (bucket.keyEquals(key)) { + return bucket.val; + } else if (bucket.getNextRecordPos() != 0L) { + bucket = readDataRecord(bucket.getNextRecordPos()); + } else { + final long insertPos = allocateNewRecord(key, value); + data.writeRecord(key, value, insertPos, nextPos); + bucket.setNextRecordPos(insertPos, data); + size.incrementAndGet(); + return null; + } + } + } + } + + public byte[] put(byte[] key, byte[] value) { + requireNonNull(key, "key cannot be null"); + requireNonNull(value, "value cannot be null"); + if (load() > LOAD_FACTOR) { + rehash(); + } + + final long hash = Hash.murmurHash(key); + final long insertPos = allocateNewRecord(key, value); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + if (pos >= index.size()) { + throw new IndexOutOfBoundsException("Pos: " + pos + " index size: " + index.size()); + } + final long adr = index.getDataAddress(pos); + long nextPos = 0L; + if (adr == 0L) { + data.writeRecord(key, value, insertPos, nextPos); + index.putDataAddress(pos, insertPos); + size.incrementAndGet(); + return null; + } + var bucket = readDataRecord(adr); + RecordNode prev = null; + while (true) { + nextPos = bucket.getNextRecordPos(); + if (bucket.keyEquals(key)) { + data.writeRecord(key, value, insertPos, nextPos); + if (prev == null) { + index.putDataAddress(pos, insertPos); + } else { + prev.setNextRecordPos(insertPos, data); + } + return bucket.val; + } else if (nextPos != 0L) { + prev = bucket; + bucket = readDataRecord(nextPos); + } else { + data.writeRecord(key, value, insertPos, nextPos); + bucket.setNextRecordPos(insertPos, data); + size.incrementAndGet(); + return null; + } + } + } + } + + public byte[] remove(byte[] key) { + requireNonNull(key, "key cannot be null"); + final long hash = Hash.murmurHash(key); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + final long adr = index.getDataAddress(pos); + if (adr == 0L) return null; + + var bucket = readDataRecord(adr); + RecordNode prev = null; + while (true) { + if (bucket.keyEquals(key)) { + if (prev == null) { + index.putDataAddress(pos, bucket.getNextRecordPos()); + } else { + prev.setNextRecordPos(bucket.getNextRecordPos(), data); + } + size.decrementAndGet(); + return bucket.val; + } else if (bucket.getNextRecordPos() != 0L) { + prev = bucket; + bucket = readDataRecord(bucket.getNextRecordPos()); + } else { + return null; + } + } + } + } + + public boolean remove(byte[] key, byte[] value) { + requireNonNull(key, "key cannot be null"); + requireNonNull(value, "value cannot be null"); + final long hash = Hash.murmurHash(key); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + final long adr = index.getDataAddress(pos); + if (adr == 0L) return false; + + var bucket = readDataRecord(adr); + RecordNode prev = null; + while (true) { + if (bucket.keyValueEquals(key, value)) { + if (prev == null) { + index.putDataAddress(pos, bucket.getNextRecordPos()); + } + else { + prev.setNextRecordPos(bucket.getNextRecordPos()); + } + size.decrementAndGet(); + return true; + } else if (bucket.getNextRecordPos() != 0L) { + prev = bucket; + bucket = readDataRecord(bucket.getNextRecordPos()); + } else return false; + } + } + } + + public boolean replace(byte[] key, byte[] prevVal, byte[] newVal) { + requireNonNull(key, "key cannot be null"); + requireNonNull(prevVal, "prevVal cannot be null"); + requireNonNull(newVal, "newVal cannot be null"); + + final long hash = Hash.murmurHash(key); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + + final long adr = index.getDataAddress(pos); + if (adr == 0L) return false; + + var bucket = readDataRecord(adr); + RecordNode prev = null; + while (true) { + if (bucket.keyValueEquals(key, prevVal)) { + final long insertPos = allocateNewRecord(key, newVal); + data.writeRecord(key, newVal, insertPos, bucket.getNextRecordPos()); + if (prev == null) { + index.putDataAddress(pos, insertPos); + } else { + prev.setNextRecordPos(insertPos, data); + } + return true; + } else if (bucket.getNextRecordPos() != 0L) { + prev = bucket; + bucket = readDataRecord(bucket.getNextRecordPos()); + } else { + return false; + } + } + } + + } + + public byte[] replace(byte[] key, byte[] value) { + requireNonNull(key, "key cannot be null"); + requireNonNull(value, "value cannot be null"); + final long hash = Hash.murmurHash(key); + + synchronized (lockForHash(hash)) { + final long idx = idxForHash(hash); + final long pos = idxToPos(idx); + + final long adr = index.getDataAddress(pos); + if (adr == 0L) return null; + + var bucket = readDataRecord(adr); + RecordNode prev = null; + while (true) { + if (bucket.keyEquals(key)) { + final long insertPos = allocateNewRecord(key, value); + data.writeRecord(key, value, insertPos, bucket.getNextRecordPos()); + if (prev == null) { + index.putDataAddress(pos, insertPos); + } else { + prev.setNextRecordPos(insertPos, data); + } + return bucket.val; + } else if (bucket.getNextRecordPos() != 0L) { + prev = bucket; + bucket = readDataRecord(bucket.getNextRecordPos()); + } else { + return null; + } + } + } + } + + @Override + protected void readHeader() { + dataLock.readLock().lock(); + try { + final long size = data.getLong(0L); + final long bucketsInMap = data.getLong(8L); + final long lastSecondaryPos = data.getLong(16L); + final int rehashComplete = data.getInt(24L); + this.size.set(size); + this.tableLength = bucketsInMap == 0L ? (index.size() / INDEX_REC_SIZE) : bucketsInMap; + this.dataWritePos.set(lastSecondaryPos == 0L ? getHeaderSize() : lastSecondaryPos); + this.rehashIndex.set(rehashComplete); + } finally { + dataLock.readLock().unlock(); + } + } + + protected long idxToPos(long idx) { + return idx * INDEX_REC_SIZE; + } + + protected RecordNode readDataRecord(long pos) { + dataLock.readLock().lock(); + try { + return data.readRecord(pos); + } finally { + dataLock.readLock().unlock(); + } + } + + protected long allocateNewRecord(byte[] key, byte[] value) { + final long recordSize = data.headerSize() + key.length + (value == null ? 0L : value.length); + return allocateData(recordSize); + } + + @Override + protected void rehashIdx(long idx) { + final long indexPos = idxToPos(idx); + final long addr = index.getDataAddress(indexPos); + if (addr == 0L) return; + final long moveIdx = idx + tableLength; + final long moveIndexPos = idxToPos(moveIdx); + + var keepBuckets = new ArrayList(); + var moveBuckets = new ArrayList(); + var bucket = readDataRecord(addr); + while (true) { + long hash = Hash.murmurHash(bucket.key); + final long newIdx = hash & (tableLength + tableLength - 1L); + if (newIdx == idx) { + keepBuckets.add(bucket); + } else if (newIdx == moveIdx) { + moveBuckets.add(bucket); + } else { + throw new IllegalStateException("hash:" + hash + + ", idx:" + idx + + ", newIdx:" + newIdx + + ", tableLength:" + tableLength + + ", moveIdx=" + moveIdx + + ", primaryPos=" + indexPos + + ", bucket=" + bucket); + } + + if (bucket.getNextRecordPos() != 0L) { + bucket = readDataRecord(bucket.getNextRecordPos()); + } else { + break; + } + } + //Adjust chains + var keepInitialPos = updateChain(keepBuckets); + var moveInitialPos = updateChain(moveBuckets); + index.putDataAddress(indexPos, keepInitialPos); + index.putDataAddress(moveIndexPos, moveInitialPos); + } + + /** + * Cause each bucket to point to the subsequent one. Returns address of original, + * or 0 if the list was empty. + */ + protected long updateChain(List buckets) { + if (buckets.isEmpty()) return 0L; + for (int i = 0; i < buckets.size() - 1; i++) { + buckets.get(i) + .setNextRecordPos(buckets.get(i + 1).pos, data); + } + buckets.get(buckets.size() - 1) + .setNextRecordPos(0L, data); + return buckets.get(0).pos; + } + + public Iterator> iterator() { + return new LashIterator(); + } + + public final class LashIterator implements Iterator> { + private long nextIdx = 0L; + private long nextAddr = 0L; + private boolean finished = true; + private final long length; + + public LashIterator() { + this.length = rehashIndex.get() == 0L ? tableLength : tableLength * 2L; + for (nextIdx = 0L; nextIdx < length; nextIdx++) { + nextAddr = index.getDataAddress(idxToPos(nextIdx)); + if (nextAddr != 0L) { + finished = false; + break; + } + } + } + + @Override + public boolean hasNext() { + return !finished; + } + + @Override + public Map.Entry next() { + if (finished) throw new NoSuchElementException(); + final var node = readDataRecord(nextAddr); + advance(node); + return new AbstractMap.SimpleEntry<>(node.key, node.val); + } + + private void advance(RecordNode bucket) { + if (bucket.getNextRecordPos() != 0L) { + nextAddr = bucket.getNextRecordPos(); + finished = false; + return; + } + for (nextIdx = nextIdx + 1L; nextIdx < length; nextIdx++) { + final long pos = idxToPos(nextIdx); + nextAddr = index.getDataAddress(pos); + if (nextAddr != 0L) { + finished = false; + return; + } + } + finished = true; + } + } + +} diff --git a/src/main/java/net/soundvibe/lasher/map/model/FileType.java b/src/main/java/net/soundvibe/lasher/map/model/FileType.java new file mode 100644 index 0000000..e303822 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/model/FileType.java @@ -0,0 +1,12 @@ +package net.soundvibe.lasher.map.model; + +public enum FileType { + + INDEX("index.lasher"), DATA("data.lasher"); + + public final String filename; + + FileType(String filename) { + this.filename = filename; + } +} diff --git a/src/main/java/net/soundvibe/lasher/map/model/RecordNode.java b/src/main/java/net/soundvibe/lasher/map/model/RecordNode.java new file mode 100644 index 0000000..22d313d --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/model/RecordNode.java @@ -0,0 +1,69 @@ +package net.soundvibe.lasher.map.model; + +import net.soundvibe.lasher.mmap.DataNode; + +import java.util.*; + +public final class RecordNode { + public final long pos; + private long nextRecordPos; + public final byte[] key; + public final byte[] val; + + public RecordNode(long pos, long nextRecordPos, byte[] key, byte[] val) { + this.pos = pos; + this.nextRecordPos = nextRecordPos; + this.key = key; + this.val = val; + } + + public long getNextRecordPos() { + return this.nextRecordPos; + } + + public void setNextRecordPos(long nRecPos) { + this.nextRecordPos = nRecPos; + } + + public void setNextRecordPos(long nRecPos, DataNode dataNode) { + dataNode.writeNextRecordPos(pos, nRecPos); + this.nextRecordPos = nRecPos; + } + + public boolean keyEquals(byte[] k) { + return Arrays.equals(k, this.key); + } + + public boolean keyValueEquals(byte[] k, byte[] v) { + return keyEquals(k) && Arrays.equals(v, this.val); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final RecordNode that = (RecordNode) o; + return pos == that.pos && + nextRecordPos == that.nextRecordPos && + Arrays.equals(key, that.key) && + Arrays.equals(val, that.val); + } + + @Override + public int hashCode() { + int result = Objects.hash(pos, nextRecordPos); + result = 31 * result + Arrays.hashCode(key); + result = 31 * result + Arrays.hashCode(val); + return result; + } + + @Override + public String toString() { + return "RecordNode{" + + "pos=" + pos + + ", nextRecordPos=" + nextRecordPos + + ", keyLength=" + key.length + + ", valLength=" + (val == null ? -1 : val.length) + + '}'; + } +} diff --git a/src/main/java/net/soundvibe/lasher/map/model/UnsafeAccess.java b/src/main/java/net/soundvibe/lasher/map/model/UnsafeAccess.java new file mode 100644 index 0000000..7a2596c --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/map/model/UnsafeAccess.java @@ -0,0 +1,8 @@ +package net.soundvibe.lasher.map.model; + +public class UnsafeAccess extends RuntimeException { + + public UnsafeAccess(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/net/soundvibe/lasher/mmap/DataNode.java b/src/main/java/net/soundvibe/lasher/mmap/DataNode.java new file mode 100644 index 0000000..c493161 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/mmap/DataNode.java @@ -0,0 +1,197 @@ +package net.soundvibe.lasher.mmap; + +import net.soundvibe.lasher.map.model.*; +import net.soundvibe.lasher.util.BytesSupport; + +import java.nio.file.Path; +import java.util.Arrays; + +import static net.soundvibe.lasher.util.BytesSupport.*; + +public final class DataNode extends MemoryMapped { + + public DataNode(Path baseDir, long len) { + super(baseDir, FileType.DATA, len); + } + + private static final int DATA_HEADER_SIZE = 16; + + public int headerSize() { + return DATA_HEADER_SIZE; + } + + public RecordNode readRecord(long pos) { + if (pos >= size) { + throw new IndexOutOfBoundsException("Record pos: " + pos + " is out of total size range: " + size); + } + var header = new byte[DATA_HEADER_SIZE]; + getBytes(pos, header); + + var nextRecordPos = longFromBytes(header); + final int keyLen = intFromBytes(header,8); + final int valLen = intFromBytes(header, 12); + + if (keyLen < 0) { + throw new IndexOutOfBoundsException("KeyLen: " + keyLen + " for pos: " + pos + " and size:" + size); + } + + var dataLen = keyLen + Math.max(0, valLen); + var data = new byte[dataLen]; + getBytes(pos + DATA_HEADER_SIZE, data); + + var key = Arrays.copyOfRange(data, 0, keyLen); + byte[] val = null; + if (valLen != -1) { + val = Arrays.copyOfRange(data, keyLen, data.length); + } + return new RecordNode(pos, nextRecordPos, key, val); + } + +/* public void writeRecord(byte[] key, byte[] value, long pos, long nextRecPos) { + int valueLength = value == null ? 0 : value.length; + var buffer = ByteBuffer.allocate(DATA_HEADER_SIZE + key.length + valueLength); + buffer.order(BYTE_ORDER); + + buffer.putLong(nextRecPos); + buffer.putInt(key.length); + buffer.putInt(value == null ? -1 : value.length); + buffer.put(key); + if (value != null) { + buffer.put(value); + } + + putBytes(pos, buffer.array()); + }*/ + + public void writeRecord(byte[] key, byte[] value, long pos, long nextRecPos) { + writeNextRecordPos(pos, nextRecPos); + putInt(pos + 8, key.length); + putInt(pos + 12, value == null ? -1 : value.length); + putBytes(pos + 16, key); + if (value != null) { + putBytes(pos + 16 + key.length, value); + } + } + + public void writeNextRecordPos(long pos, long nextRecordPos) { + putLong(pos, nextRecordPos); + } + + public int getInt(long pos) { + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + + if (posBuffer + Integer.BYTES > buffer.capacity()) { + var valBytes = new byte[Integer.BYTES]; + getBytes(pos, valBytes); + return BytesSupport.bytesToInt(valBytes); + } else { + return buffer.getInt(posBuffer); + } + } + + public void putInt(long pos, int val) { + if ((pos + Integer.BYTES) > size) { + throw new IllegalStateException(String.format("pos [%d] larger than total size [%d]", + pos + Integer.BYTES, size)); + } + + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + + if (posBuffer + Integer.BYTES > buffer.capacity()) { + var valBytes = BytesSupport.intToBytes(val); + putBytes(pos, valBytes); + } else { + buffer.putInt(posBuffer, val); + } + } + + public void getBytes(long pos, byte[] data) { + if (pos + data.length > size) return; + + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + var offset = 0; + var length = data.length; + + while (posBuffer + length > buffer.capacity()) { + var remaining = buffer.capacity() - posBuffer; + buffer.order(BYTE_ORDER); + buffer.get(posBuffer, data, offset, remaining); + bufferIndex++; + buffer = buffers[bufferIndex]; + posBuffer = 0; + offset += remaining; + length -= remaining; + } + buffer.order(BYTE_ORDER); + buffer.get(posBuffer, data, offset, length); + } + + public void putBytes(long pos, byte[] data) { + putBytes(pos, data, data.length); + } + + public void putBytes(long pos, byte[] data, int len) { + if ((pos + len) > size) { + throw new IllegalStateException(String.format("pos [%d] larger than total size [%d]", pos + len, size)); + } + + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + var offset = 0; + var length = len; + + while (posBuffer + length > buffer.capacity()) { + var remaining = buffer.capacity() - posBuffer; + buffer.order(BYTE_ORDER); + buffer.put(posBuffer, data, offset, remaining); + bufferIndex++; + buffer = buffers[bufferIndex]; + posBuffer = 0; + offset += remaining; + length -= remaining; + } + + buffer.order(BYTE_ORDER); + buffer.put(posBuffer, data, offset, length); + } + + @Override + public long getLong(long pos) { + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + + if (posBuffer + Long.BYTES > buffer.capacity()) { + var valBytes = new byte[Long.BYTES]; + getBytes(pos, valBytes); + return BytesSupport.bytesToLong(valBytes); + } else { + return buffer.getLong(posBuffer); + } + } + + @Override + public void putLong(long pos, long val) { + if ((pos + Long.BYTES) > size) { + throw new IllegalStateException(String.format("pos [%d] larger than total size [%d]", pos + Long.BYTES, size)); + } + + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + + if (posBuffer + Long.BYTES > buffer.capacity()) { + var valBytes = BytesSupport.longToBytes(val); + putBytes(pos, valBytes); + } else { + buffer.putLong(posBuffer, val); + } + } +} diff --git a/src/main/java/net/soundvibe/lasher/mmap/IndexNode.java b/src/main/java/net/soundvibe/lasher/mmap/IndexNode.java new file mode 100644 index 0000000..f885b72 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/mmap/IndexNode.java @@ -0,0 +1,50 @@ +package net.soundvibe.lasher.mmap; + +import net.soundvibe.lasher.map.model.FileType; + +import java.nio.file.Path; + +public final class IndexNode extends MemoryMapped { + + public IndexNode(Path baseDir, long len) { + super(baseDir, FileType.INDEX, len); + } + + public long getDataAddress(long pos) { + return getLong(pos); + } + + public void putDataAddress(long pos, long dataAddress) { + putLong(pos, dataAddress); + } + + @Override + public long getLong(long pos) { + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + if (posBuffer + Long.BYTES > buffer.capacity()) { + throw new IllegalStateException(String.format("Pos in buffer exceeds it's capacity: %d > %d", + posBuffer + Long.BYTES, buffer.capacity())); + } + return buffer.getLong(posBuffer); + } + + @Override + public void putLong(long pos, long val) { + if ((pos + Long.BYTES) > size) { + throw new IllegalStateException(String.format("pos [%d] larger than total size [%d]", pos + Long.BYTES, size)); + } + + var bufferIndex = resolveBufferIndex(pos); + var buffer = buffers[bufferIndex]; + var posBuffer = convertPos(pos, bufferIndex); + + if (posBuffer + Long.BYTES > buffer.capacity()) { + throw new IllegalStateException(String.format("Pos in buffer exceeds it's capacity: %d > %d", + posBuffer + Long.BYTES, buffer.capacity())); + } + + buffer.putLong(posBuffer, val); + } +} diff --git a/src/main/java/net/soundvibe/lasher/mmap/MemoryMapped.java b/src/main/java/net/soundvibe/lasher/mmap/MemoryMapped.java new file mode 100644 index 0000000..a78c185 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/mmap/MemoryMapped.java @@ -0,0 +1,274 @@ +package net.soundvibe.lasher.mmap; + +import net.soundvibe.lasher.map.model.*; +import sun.misc.Unsafe; + +import java.io.*; +import java.lang.reflect.*; +import java.nio.*; +import java.nio.channels.FileChannel; +import java.nio.file.*; +import java.util.*; + +import static net.soundvibe.lasher.util.BytesSupport.BYTE_ORDER; + +public abstract class MemoryMapped implements Closeable { + + private static final int CHUNK_SIZE = 256 * 1024 * 1024; //256 MB + private final FileType fileType; + protected MappedByteBuffer[] buffers; + protected long size; + private final Path baseDir; + private final long defaultLength; + + private static final Class UNSAFE_CLASS = resolveUnsafeClass(); + private static final Unsafe UNSAFE = resolveUnsafe(); + private static final Field ADDRESS_FIELD = resolveAddressField(); + + protected MemoryMapped(final Path baseDir, FileType fileType, long defaultLength) { + Objects.requireNonNull(baseDir, "baseDir is null"); + this.fileType = fileType; + this.baseDir = baseDir; + this.defaultLength = roundTo4096(defaultLength); + + var fileStats = readFileStats(baseDir, fileType, this.defaultLength); + this.size = Math.max(this.defaultLength, fileStats.totalSize); + this.buffers = fileStats.buffers; + if (fileStats.buffers.length == 0) { + mapAndResize(this.size); + } + } + + public abstract long getLong(long pos); + public abstract void putLong(long pos, long val); + + private static final class FileStats { + final long totalSize; + final MappedByteBuffer[] buffers; + + private FileStats(long totalSize, MappedByteBuffer[] buffers) { + this.totalSize = totalSize; + this.buffers = buffers; + } + } + + private FileStats readFileStats(final Path baseDir, FileType fileType, long defaultLength) { + var path = baseDir.resolve(fileType.filename); + if (Files.notExists(path)) { + return new FileStats(defaultLength, new MappedByteBuffer[0]); + } + + var file = path.toFile(); + var fileSize = file.length(); + + try { + try (var rf = new RandomAccessFile(file, "rw"); + var fc = rf.getChannel()) { + + var totalBuffersSize = (int) Math.ceil(Math.max(1d, (double) fileSize / CHUNK_SIZE)); + var totalBuffers = new MappedByteBuffer[totalBuffersSize]; + int index = 0; + for (long i = 0; i < fileSize; i+=CHUNK_SIZE) { + var remaining = fileSize - i; + var bufferSize = remaining < CHUNK_SIZE ? remaining : CHUNK_SIZE; + totalBuffers[index] = fc.map(FileChannel.MapMode.READ_WRITE, i, bufferSize); + totalBuffers[index].order(BYTE_ORDER); + index++; + } + return new FileStats(fileSize, totalBuffers); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public long size() { + return this.size; + } + + public void remap(long newLength) { + var newSize = roundTo4096(newLength); + mapAndResize(newSize); + this.size = newSize; + } + + public void doubleGrow() { + remap(this.size * 2); + } + + @Override + public void close() { + for (var buffer : buffers) { + if (buffer != null) { + buffer.force(); + unmap(buffer); + } + } + } + + public void clear() { + for (var buffer : buffers) { + if (buffer != null) { + try { + var addr = ADDRESS_FIELD.getLong(buffer); + UNSAFE.setMemory(addr, buffer.capacity(), (byte) 0); + } catch (IllegalAccessException e) { + throw new UnsafeAccess(e); + } + } + } + } + + private static void unmap(ByteBuffer byteBuffer) { + if (byteBuffer == null || !byteBuffer.isDirect()) return; + if (UNSAFE == null) { + throw new UnsupportedOperationException("Unsafe not supported on this platform"); + } + UNSAFE.invokeCleaner(byteBuffer); + } + + private static Class resolveUnsafeClass() { + try { + return Class.forName("sun.misc.Unsafe"); + } catch (Exception ex) { + try { + return Class.forName("jdk.internal.misc.Unsafe"); + } catch (ClassNotFoundException e) { + return null; + } + } + } + + private static Unsafe resolveUnsafe() { + if (UNSAFE_CLASS == null) throw new UnsupportedOperationException("Unsafe not supported on this platform"); + try { + var theUnsafeField = UNSAFE_CLASS.getDeclaredField("theUnsafe"); + theUnsafeField.setAccessible(true); + return (Unsafe) theUnsafeField.get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new UnsafeAccess(e); + } + } + + private static Field resolveAddressField() { + try { + var field = Buffer.class.getDeclaredField("address"); + field.setAccessible(true); + return field; + } catch (Exception e) { + throw new UnsafeAccess(e); + } + } + + private static long roundTo4096(long i) { + return (i + 0xfffL) & ~0xfffL; + } + + private void mapAndResize(long newSize) { + var bufferIndex = findBufferIndex(newSize); + expandBuffers(bufferIndex, newSize); + resizeOlderBuffersIfNeeded(bufferIndex); + + long sizeToSet = bufferIndex == 0 ? newSize : resolveSize(newSize, bufferIndex); + buffers[bufferIndex] = mapBuffer(bufferIndex, (int)sizeToSet, newSize); + } + + private MappedByteBuffer mapBuffer(int bufferIndex, int bufferSize, long fileSize) { + try (var f = new RandomAccessFile(baseDir.resolve(fileType.filename).toFile(), "rw"); + var fc = f.getChannel()) { + if (f.length() < fileSize) { + f.setLength(fileSize); + this.size = fileSize; + } + + var pos = resolveBufferPos(bufferIndex); + var buffer = fc.map(FileChannel.MapMode.READ_WRITE, pos, bufferSize); + buffer.order(BYTE_ORDER); + return buffer; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private long resolveBufferPos(int bufferIndex) { + long result = 0L; + for (int i = 0; i < bufferIndex; i++) { + var buffer = buffers[i]; + if (buffer != null) { + result += buffer.capacity(); + } + } + return result; + } + + private void resizeOlderBuffersIfNeeded(int bufferIndex) { + if (bufferIndex > 0) { + for (int i = 0; i < bufferIndex; i++) { + if (buffers[i] == null || buffers[i].capacity() < CHUNK_SIZE) { + remapTo(i, CHUNK_SIZE); + } + } + } + } + + private void remapTo(int bufferIndex, long newSize) { + unmap(buffers[bufferIndex]); + try { + try (var f = new RandomAccessFile(baseDir.resolve(fileType.filename).toFile(), "rw"); + var fc = f.getChannel()) { + var pos = resolveBufferPos(bufferIndex); + buffers[bufferIndex] = fc.map(FileChannel.MapMode.READ_WRITE, pos, newSize); + buffers[bufferIndex].order(BYTE_ORDER); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private long resolveSize(long size, int bufferIndex) { + long newSize = size; + for (int i = 0; i < bufferIndex - 1; i++) { + if (buffers[i] != null) { + newSize -= buffers[i].capacity(); + } + } + + if (newSize < defaultLength) { + return defaultLength; + } + + return Math.min(CHUNK_SIZE, newSize); + } + + protected int convertPos(long absolutePos, int bufferIndex) { + long startPos = (long) CHUNK_SIZE * (long) bufferIndex; + int bufferPos = (int) (absolutePos - startPos); + if (bufferPos < 0 || bufferPos > CHUNK_SIZE) { + throw new IndexOutOfBoundsException("Buffer pos " + bufferPos + " is out of bounds: " + CHUNK_SIZE + " for pos: " + absolutePos + " and buffer index: " + bufferIndex); + } + return bufferPos; + } + + protected int findBufferIndex(long pos) { + return (int) Math.floorDiv(pos, CHUNK_SIZE); + } + + protected int resolveBufferIndex(long pos) { + int ix = (int) Math.floorDiv(pos, CHUNK_SIZE); + if (ix < 0 || ix >= buffers.length) { + throw new IndexOutOfBoundsException("Buffer index " + ix + " is out of total length: " + buffers.length + " for pos " + pos); + } + return ix; + } + + protected void expandBuffers(int newPartition, long newSize) { + if (newPartition + 1 > buffers.length) { + int oldLength = buffers.length; + buffers = Arrays.copyOf(buffers, newPartition + 1); + for (int i = oldLength; i < buffers.length - 1; i++) { + unmap(buffers[i]); + buffers[i] = mapBuffer(i, CHUNK_SIZE, newSize); + } + } + } +} diff --git a/src/main/java/net/soundvibe/lasher/serde/BytesSerde.java b/src/main/java/net/soundvibe/lasher/serde/BytesSerde.java new file mode 100644 index 0000000..3f3f3be --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/BytesSerde.java @@ -0,0 +1,13 @@ +package net.soundvibe.lasher.serde; + +public final class BytesSerde implements Serde { + @Override + public byte[] toBytes(byte[] value) { + return value; + } + + @Override + public byte[] fromBytes(byte[] bytes) { + return bytes; + } +} diff --git a/src/main/java/net/soundvibe/lasher/serde/IntegerSerde.java b/src/main/java/net/soundvibe/lasher/serde/IntegerSerde.java new file mode 100644 index 0000000..977b793 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/IntegerSerde.java @@ -0,0 +1,17 @@ +package net.soundvibe.lasher.serde; + +import static net.soundvibe.lasher.util.BytesSupport.*; + +public final class IntegerSerde implements Serde { + @Override + public byte[] toBytes(Integer value) { + if (value == null) return null; + return intToBytes(value); + } + + @Override + public Integer fromBytes(byte[] bytes) { + if (bytes == null || bytes.length == 0) return null; + return bytesToInt(bytes); + } +} diff --git a/src/main/java/net/soundvibe/lasher/serde/LongSerde.java b/src/main/java/net/soundvibe/lasher/serde/LongSerde.java new file mode 100644 index 0000000..80cc7d7 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/LongSerde.java @@ -0,0 +1,18 @@ +package net.soundvibe.lasher.serde; + +import net.soundvibe.lasher.util.BytesSupport; + +public final class LongSerde implements Serde { + + @Override + public byte[] toBytes(Long value) { + if (value == null) return null; + return BytesSupport.longToBytes(value); + } + + @Override + public Long fromBytes(byte[] bytes) { + if (bytes == null || bytes.length == 0) return null; + return BytesSupport.bytesToLong(bytes); + } +} diff --git a/src/main/java/net/soundvibe/lasher/serde/Serde.java b/src/main/java/net/soundvibe/lasher/serde/Serde.java new file mode 100644 index 0000000..54786f2 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/Serde.java @@ -0,0 +1,9 @@ +package net.soundvibe.lasher.serde; + +public interface Serde { + + byte[] toBytes(T value); + + T fromBytes(byte[] bytes); + +} diff --git a/src/main/java/net/soundvibe/lasher/serde/Serdes.java b/src/main/java/net/soundvibe/lasher/serde/Serdes.java new file mode 100644 index 0000000..01a9f1b --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/Serdes.java @@ -0,0 +1,11 @@ +package net.soundvibe.lasher.serde; + +public interface Serdes { + + LongSerde LONG = new LongSerde(); + IntegerSerde INTEGER = new IntegerSerde(); + BytesSerde BYTES = new BytesSerde(); + StringSerde STRING = new StringSerde(); + UUIDSerde UUID = new UUIDSerde(); + +} diff --git a/src/main/java/net/soundvibe/lasher/serde/StringSerde.java b/src/main/java/net/soundvibe/lasher/serde/StringSerde.java new file mode 100644 index 0000000..08ea8d7 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/StringSerde.java @@ -0,0 +1,17 @@ +package net.soundvibe.lasher.serde; + +import java.nio.charset.StandardCharsets; + +public final class StringSerde implements Serde { + @Override + public byte[] toBytes(String value) { + if (value == null) return null; + return value.getBytes(StandardCharsets.UTF_8); + } + + @Override + public String fromBytes(byte[] bytes) { + if (bytes == null) return null; + return new String(bytes, StandardCharsets.UTF_8); + } +} diff --git a/src/main/java/net/soundvibe/lasher/serde/UUIDSerde.java b/src/main/java/net/soundvibe/lasher/serde/UUIDSerde.java new file mode 100644 index 0000000..c859482 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/serde/UUIDSerde.java @@ -0,0 +1,24 @@ +package net.soundvibe.lasher.serde; + +import java.nio.ByteBuffer; +import java.util.UUID; + +public final class UUIDSerde implements Serde { + @Override + public byte[] toBytes(UUID value) { + if (value == null) return null; + var byteBuffer = ByteBuffer.wrap(new byte[16]); + byteBuffer.putLong(value.getLeastSignificantBits()); + byteBuffer.putLong(value.getMostSignificantBits()); + return byteBuffer.array(); + } + + @Override + public UUID fromBytes(byte[] bytes) { + if (bytes == null || bytes.length == 0) return null; + var byteBuffer = ByteBuffer.wrap(bytes); + var leastBits = byteBuffer.getLong(); + var mostBits = byteBuffer.getLong(); + return new UUID(mostBits, leastBits); + } +} diff --git a/src/main/java/net/soundvibe/lasher/util/BytesSupport.java b/src/main/java/net/soundvibe/lasher/util/BytesSupport.java new file mode 100644 index 0000000..b8dd6c5 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/util/BytesSupport.java @@ -0,0 +1,64 @@ +package net.soundvibe.lasher.util; + +import java.lang.invoke.*; +import java.nio.*; + +public final class BytesSupport { + + private BytesSupport() {} + + public static final ByteOrder BYTE_ORDER = ByteOrder.nativeOrder(); + + private static final VarHandle LONG_HANDLE = MethodHandles.byteArrayViewVarHandle(long[].class, BYTE_ORDER); + private static final VarHandle INT_HANDLE = MethodHandles.byteArrayViewVarHandle(int[].class, BYTE_ORDER); + + public static byte[] toBytes(long i) { + var result = new byte[8]; + LONG_HANDLE.set(result, 0, i); + return result; + } + + public static long longFromBytes(byte[] b) { + if (b == null) + return -1L; + + return (long)LONG_HANDLE.get(b, 0); + } + + public static int intFromBytes(byte[] b, int fromBytes) { + if (b == null) + return -1; + + return (int)INT_HANDLE.get(b, fromBytes); + } + + public static byte[] longToBytes(long i) { + var buf = ByteBuffer.allocate(Long.BYTES); + buf.order(BYTE_ORDER); + buf.putLong(i); + return buf.array(); + } + + public static long bytesToLong(byte[] b) { + if (b == null) + return -1; + var buf = ByteBuffer.wrap(b); + buf.order(BYTE_ORDER); + return buf.getLong(); + } + + public static byte[] intToBytes(int i) { + var buf = ByteBuffer.allocate(Integer.BYTES); + buf.order(BYTE_ORDER); + buf.putInt(i); + return buf.array(); + } + + public static int bytesToInt(byte[] b) { + if (b == null) + return -1; + var buf = ByteBuffer.wrap(b); + buf.order(BYTE_ORDER); + return buf.getInt(); + } +} \ No newline at end of file diff --git a/src/main/java/net/soundvibe/lasher/util/FileSupport.java b/src/main/java/net/soundvibe/lasher/util/FileSupport.java new file mode 100644 index 0000000..9fa0703 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/util/FileSupport.java @@ -0,0 +1,33 @@ +package net.soundvibe.lasher.util; + +import java.io.*; +import java.nio.file.*; +import java.nio.file.attribute.BasicFileAttributes; + +public final class FileSupport { + + private FileSupport() {} + + public static void deleteDirectory(Path dir) { + if (Files.exists(dir)) { + try { + Files.walkFileTree(dir, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path path, BasicFileAttributes basicFileAttributes) throws IOException { + Files.delete(path); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path directory, IOException ioException) throws IOException { + if (ioException != null) throw ioException; + Files.delete(directory); + return FileVisitResult.CONTINUE; + } + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/src/main/java/net/soundvibe/lasher/util/Hash.java b/src/main/java/net/soundvibe/lasher/util/Hash.java new file mode 100644 index 0000000..b671432 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/util/Hash.java @@ -0,0 +1,124 @@ +package net.soundvibe.lasher.util; + +public final class Hash { + + private Hash() { + } + + private static final int HASH_SEED = 0xe17a1465; + + /** + * Return a number greater than i whose bottom n bits collide when hashed. + */ + public static long findCollision(long i, long nBits) { + final long mask = (1L << nBits) - 1; + final long targetHash = murmurHash(i) & mask; + + for (i = i + 1; i < Long.MAX_VALUE; i++) { + if ((murmurHash(i) & mask) == targetHash) { + return i; + } + } + return -1; + } + + /** + * Utility to hash a single value + */ + public static long murmurHash(long k) { + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (HASH_SEED & 0xffffffffL) ^ (8 * m); + + k *= m; + k ^= k >>> r; + k *= m; + h ^= k; + h *= m; + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h & 0x7fffffffffffffffL; + } + + public static long murmurHash(long[] data) { + final int length = data.length; + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (HASH_SEED & 0xffffffffL) ^ (length * 8 * m); + + for (final long datum : data) { + long k = datum; + k *= m; + k ^= k >>> r; + k *= m; + h ^= k; + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + return h & 0x7fffffffffffffffL; + } + + public static long murmurHash(byte[] data) { + return murmurHash(HASH_SEED, data); + } + + /** + * Modified to always return positive number. Murmurhash2 with 64-bit output. + */ + public static long murmurHash(long hashSeed, byte[] data) { + final int length = data.length; + final long m = 0xc6a4a7935bd1e995L; + final int r = 47; + + long h = (hashSeed & 0xffffffffL) ^ (length * m); + + int length8 = length / 8; + + for (int i = 0; i < length8; i++) { + final int i8 = i * 8; + long k = ((long) data[i8] & 0xff) + (((long) data[i8 + 1] & 0xff) << 8) + + (((long) data[i8 + 2] & 0xff) << 16) + (((long) data[i8 + 3] & 0xff) << 24) + + (((long) data[i8 + 4] & 0xff) << 32) + (((long) data[i8 + 5] & 0xff) << 40) + + (((long) data[i8 + 6] & 0xff) << 48) + (((long) data[i8 + 7] & 0xff) << 56); + + k *= m; + k ^= k >>> r; + k *= m; + h ^= k; + h *= m; + } + + switch (length % 8) { + case 7: + h ^= (long) (data[(length & ~7) + 6] & 0xff) << 48; + case 6: + h ^= (long) (data[(length & ~7) + 5] & 0xff) << 40; + case 5: + h ^= (long) (data[(length & ~7) + 4] & 0xff) << 32; + case 4: + h ^= (long) (data[(length & ~7) + 3] & 0xff) << 24; + case 3: + h ^= (long) (data[(length & ~7) + 2] & 0xff) << 16; + case 2: + h ^= (long) (data[(length & ~7) + 1] & 0xff) << 8; + case 1: + h ^= (long) (data[length & ~7] & 0xff); + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + //Clear sign bit + return h & 0x7fffffffffffffffL; + } +} diff --git a/src/main/java/net/soundvibe/lasher/util/Murmur3.java b/src/main/java/net/soundvibe/lasher/util/Murmur3.java new file mode 100644 index 0000000..6b011e0 --- /dev/null +++ b/src/main/java/net/soundvibe/lasher/util/Murmur3.java @@ -0,0 +1,140 @@ +/* +* Copyright 2015 LinkedIn Corp. All rights reserved. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +*/ + +package net.soundvibe.lasher.util; + +import java.lang.invoke.*; +import java.nio.ByteOrder; + + +/** + * Hashing utility. + */ +public final class Murmur3 { + + private Murmur3() { } + + /** + * Returns the positive hash for the given bytes. + * + * @param bytes bytes to hash + * @return hash + */ + public static long hash(byte[] bytes) { + return hash32(bytes) & 0x7fffffff; + } + + public static int hash(byte[] bytes, int seed) { + return hash32(bytes, bytes.length, seed); + } + + // Constants for 32 bit variant + private static final int C1_32 = 0xcc9e2d51; + private static final int C2_32 = 0x1b873593; + private static final int R1_32 = 15; + private static final int R2_32 = 13; + private static final int M_32 = 5; + private static final int N_32 = 0xe6546b64; + + public static final int DEFAULT_SEED = 104729; + + //** MurMur3 ** + /** + * Generates 32 bit hash from byte array with the default seed. + * + * @param data - input byte array + * @return 32 bit hash + */ + public static int hash32(final byte[] data) { + return hash32(data, 0, data.length, DEFAULT_SEED); + } + + /** + * Generates 32 bit hash from byte array with the given length and seed. + * + * @param data - input byte array + * @param length - length of array + * @param seed - seed. (default 0) + * @return 32 bit hash + */ + public static int hash32(final byte[] data, final int length, final int seed) { + return hash32(data, 0, length, seed); + } + + private static final VarHandle INT_HANDLE = MethodHandles.byteArrayViewVarHandle(int[].class, ByteOrder.LITTLE_ENDIAN); + + private static int getIntLE(byte[] array, int offset) { + return (int)INT_HANDLE.get(array, offset); + } + + /** + * Generates 32 bit hash from byte array with the given length, offset and seed. + * + * @param data - input byte array + * @param offset - offset of data + * @param length - length of array + * @param seed - seed. (default 0) + * @return 32 bit hash + */ + public static int hash32(final byte[] data, final int offset, final int length, final int seed) { + int hash = seed; + final int nblocks = length >> 2; + + // body + for (int i = 0; i < nblocks; i++) { + final int i_4 = i << 2; + final int k = getIntLE(data, offset + i_4); + hash = mix32(k, hash); + } + + // tail + final int idx = nblocks << 2; + int k1 = 0; + switch (length - idx) { + case 3: + k1 ^= data[offset + idx + 2] << 16; + case 2: + k1 ^= data[offset + idx + 1] << 8; + case 1: + k1 ^= data[offset + idx]; + + // mix functions + k1 *= C1_32; + k1 = Integer.rotateLeft(k1, R1_32); + k1 *= C2_32; + hash ^= k1; + } + + return fmix32(length, hash); + } + + private static int mix32(int k, int hash) { + k *= C1_32; + k = Integer.rotateLeft(k, R1_32); + k *= C2_32; + hash ^= k; + return Integer.rotateLeft(hash, R2_32) * M_32 + N_32; + } + + private static int fmix32(final int length, int hash) { + hash ^= length; + hash ^= (hash >>> 16); + hash *= 0x85ebca6b; + hash ^= (hash >>> 13); + hash *= 0xc2b2ae35; + hash ^= (hash >>> 16); + + return hash; + } +} diff --git a/src/test/java/net/soundvibe/lasher/map/LasherMapTest.java b/src/test/java/net/soundvibe/lasher/map/LasherMapTest.java new file mode 100644 index 0000000..c3aa213 --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/map/LasherMapTest.java @@ -0,0 +1,94 @@ +package net.soundvibe.lasher.map; + +import net.soundvibe.lasher.map.core.Lasher; +import net.soundvibe.lasher.serde.Serdes; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.*; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class LasherMapTest { + + private static final long MB_32 = (long) Math.pow(2, 25L); + + @Test + void should_do_basic_operations(@TempDir Path tmpPath) { + try (var sut = new LasherMap<>( + new Lasher(tmpPath, MB_32, MB_32), + Serdes.STRING, Serdes.STRING)) { + assertNull(sut.get("foo")); + + assertNull(sut.put("foo", "value")); + + assertEquals("value", sut.get("foo")); + + assertEquals("value", sut.remove("foo" )); + + assertNull(sut.put("foo", "valueUpdated" )); + + assertEquals("valueUpdated" ,sut.get("foo" )); + + assertEquals("valueUpdated" , sut.put("foo" , "valueUpdatedAgain" )); + + assertEquals("valueUpdatedAgain" , sut.putIfAbsent("foo" , "valueUpdatedAgain" )); + assertNull(sut.putIfAbsent("foo2" , "value" )); + + assertTrue(sut.containsKey("foo2" )); + assertEquals("value" , sut.remove("foo2" )); + assertFalse(sut.containsKey("foo2" )); + + + assertNull(sut.putIfAbsent("foo2" , "value" )); + assertTrue(sut.remove("foo2" , "value" )); + + assertNull(sut.putIfAbsent("foo2" , "value" )); + assertEquals("value" , sut.replace("foo2" , "value2" )); + + assertTrue(sut.replace("foo2" , "value2" , "newValue" )); + + sut.clear(); + assertEquals(0L, sut.sizeLong()); + assertTrue(sut.isEmpty()); + + assertFalse(sut.containsKey("foo")); + assertFalse(sut.containsKey("foo2")); + } + } + + @Test + void should_iterate(@TempDir Path tmpPath) { + try (var sut = new LasherMap<>( + new Lasher(tmpPath, MB_32, MB_32), + Serdes.LONG, Serdes.LONG)) { + var rng = new Random(); + var m = new ConcurrentHashMap(1000); + long nInserts = 1000; + + for(long k=0; k { + counter.incrementAndGet(); + assertEquals(m.get(k).longValue(), v); + m.remove(k); + }); + + assertEquals(nInserts, counter.get() ); + assertTrue(m.isEmpty()); + assertEquals(nInserts, sut.size()); + for(long i=0; i(tmpDir, Duration.ofHours(24), Serdes.STRING, Serdes.STRING)) { + assertNull(sut.put("foo1", "bar1", Instant.parse("2020-02-03T06:10:00Z").getEpochSecond())); + assertNull(sut.put("foo2", "bar2", Instant.parse("2020-02-03T11:00:00Z").getEpochSecond())); + assertNull(sut.put("foo3", "bar3", Instant.parse("2020-02-03T12:23:00Z").getEpochSecond())); + assertNull(sut.put("foo4", "bar4", Instant.parse("2020-02-03T14:00:00Z").getEpochSecond())); + + assertEquals(4L, sut.size()); + assertEquals(4L, sut.stream().count()); + + var entries = sut.stream().collect(toSet()); + assertEquals(Set.of( + new SimpleEntry<>("foo1", "bar1"), + new SimpleEntry<>("foo2", "bar2"), + new SimpleEntry<>("foo3", "bar3"), + new SimpleEntry<>("foo4", "bar4") + ), entries); + + var keys = sut.streamKeys().collect(toSet()); + assertEquals(Set.of("foo1", "foo2", "foo3", "foo4"), keys); + } + } + + @Test + void should_expire_old_entries(@TempDir Path tmpDir) { + try (var sut = new TimeLash<>(tmpDir, Duration.ofHours(6), Serdes.STRING, Serdes.STRING)) { + assertNull(sut.put("foo1", "bar1", Instant.parse("2020-02-03T06:10:00Z").getEpochSecond())); + assertEquals("bar1", sut.get("foo1", Instant.parse("2020-02-03T06:11:00Z").getEpochSecond())); + + assertNull(sut.put("foo2", "bar2", Instant.parse("2020-02-03T11:00:00Z").getEpochSecond())); + assertEquals("bar2", sut.get("foo2", Instant.parse("2020-02-03T11:11:00Z").getEpochSecond())); + + assertNull(sut.put("foo3", "bar3", Instant.parse("2020-02-03T12:23:00Z").getEpochSecond())); + assertEquals("bar3", sut.get("foo3", Instant.parse("2020-02-03T12:11:00Z").getEpochSecond())); + + assertNull(sut.put("foo4", "bar4", Instant.parse("2020-02-03T14:00:00Z").getEpochSecond())); + assertNull(sut.get("foo1", Instant.parse("2020-02-03T06:10:00Z").getEpochSecond())); + assertEquals("bar4", sut.get("foo4", Instant.parse("2020-02-03T14:00:00Z").getEpochSecond())); + assertEquals("bar2", sut.get("foo2", Instant.parse("2020-02-03T11:11:00Z").getEpochSecond())); + assertEquals("bar3", sut.get("foo3", Instant.parse("2020-02-03T12:11:00Z").getEpochSecond())); + assertEquals(3L, sut.size()); + } + } + + @Test + void should_not_allow_adding_expired_keys(@TempDir Path tmpDir) { + try (var sut = new TimeLash<>(tmpDir, Duration.ofHours(1), Serdes.STRING, Serdes.STRING)) { + assertNull(sut.put("foo1", "bar1", Instant.parse("2020-02-03T06:10:10Z"))); + assertNull(sut.put("foo2", "bar2", Instant.parse("2020-02-03T05:10:00Z"))); + + assertNull(sut.get("foo2", Instant.parse("2020-02-03T05:10:00Z"))); + assertEquals("bar1", sut.get("foo1", Instant.parse("2020-02-03T06:10:10Z"))); + } + } + + @Test + void should_remove_entries(@TempDir Path tmpDir) { + try (var sut = new TimeLash<>(tmpDir, Duration.ofHours(1), Serdes.LONG, Serdes.STRING)) { + assertNull(sut.put(1L, "test1", Instant.now())); + assertEquals("test1", sut.get(1L, Instant.now())); + assertTrue(sut.containsKey(1L, Instant.now())); + assertEquals("test1", sut.remove(1L, Instant.now())); + } + } +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/map/core/LasherTest.java b/src/test/java/net/soundvibe/lasher/map/core/LasherTest.java new file mode 100644 index 0000000..b09ce71 --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/map/core/LasherTest.java @@ -0,0 +1,245 @@ +package net.soundvibe.lasher.map.core; + +import net.soundvibe.lasher.util.*; +import org.junit.jupiter.api.*; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.jupiter.api.Assertions.*; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class LasherTest { + + private static final long MB_32 = (long) Math.pow(2, 25L); + private static final long MB_64 = (long) Math.pow(2, 26L); + + @Test + void should_do_basic_operations(@TempDir Path tmpPath) { + try (var sut = new Lasher(tmpPath, MB_32, MB_32)) { + assertNull(sut.get("foo".getBytes())); + + assertNull(sut.put("foo".getBytes(), "value".getBytes())); + + assertArrayEquals("value".getBytes(),sut.get("foo".getBytes())); + + assertArrayEquals("value".getBytes(),sut.remove("foo".getBytes())); + + assertNull(sut.put("foo".getBytes(), "valueUpdated".getBytes())); + + assertArrayEquals("valueUpdated".getBytes(),sut.get("foo".getBytes())); + + assertArrayEquals("valueUpdated".getBytes(), sut.put("foo".getBytes(), "valueUpdatedAgain".getBytes())); + + assertEquals(MB_64, sut.nextPowerOf2(60 * 1024 * 1000)); + + assertArrayEquals("valueUpdatedAgain".getBytes(), sut.putIfAbsent("foo".getBytes(), "valueUpdatedAgain".getBytes())); + assertNull(sut.putIfAbsent("foo2".getBytes(), "value".getBytes())); + + assertTrue(sut.containsKey("foo2".getBytes())); + assertArrayEquals("value".getBytes(), sut.remove("foo2".getBytes())); + assertFalse(sut.containsKey("foo2".getBytes())); + + + assertNull(sut.putIfAbsent("foo2".getBytes(), "value".getBytes())); + assertTrue(sut.remove("foo2".getBytes(), "value".getBytes())); + + assertNull(sut.putIfAbsent("foo2".getBytes(), "value".getBytes())); + assertArrayEquals("value".getBytes(), sut.replace("foo2".getBytes(), "value2".getBytes())); + + assertTrue(sut.replace("foo2".getBytes(), "value2".getBytes(), "newValue".getBytes())); + + sut.clear(); + assertEquals(0L, sut.size()); + + assertFalse(sut.containsKey("foo".getBytes())); + assertFalse(sut.containsKey("foo2".getBytes())); + + assertThrows(NullPointerException.class, () -> sut.put(null, "value".getBytes())); + assertThrows(NullPointerException.class, () -> sut.put("key".getBytes(), null)); + } + } + + @Test + void should_rehash(@TempDir Path tmpPath) { + long fileSize = (long) Math.pow(2, 10L); + long lastIdx = 0L; + try (var sut = new Lasher(tmpPath, fileSize, fileSize)) { + assertEquals(0L, sut.rehashIndex.get()); + + for (long i = 0; i < 800; i++) { + lastIdx = i; + sut.put(BytesSupport.longToBytes(i), BytesSupport.longToBytes(i + 1)); + } + + assertNotEquals(0L, sut.rehashIndex.get()); + + for (long i = 0; i < 800; i++) { + assertArrayEquals(BytesSupport.longToBytes(i + 1),sut.get(BytesSupport.longToBytes(i))); + } + } catch (Exception e) { + System.out.println("Last index: " + lastIdx); + throw e; + } + } + + @Test + void should_rehash_overlapping(@TempDir Path tmpPath) { + long fileSize = (long) Math.pow(2, 8L); + try (var sut = new Lasher(tmpPath, fileSize, fileSize)) { + assertEquals(0L, sut.rehashIndex.get()); + long count = 100_000; + var bytes = new byte[19011]; + Arrays.fill(bytes, (byte)1); + + for (long i = 0; i < count; i++) { + sut.put(BytesSupport.longToBytes(i), bytes); + } + + assertNotEquals(0L, sut.rehashIndex.get()); + + for (long i = 0; i < count; i++) { + assertArrayEquals(bytes, sut.get(BytesSupport.longToBytes(i))); + } + System.out.println("Closing"); + } + } + + @Test + void should_read_from_store(@TempDir Path tmpPath) { + long fileSize = (long) Math.pow(2, 8L); + long count = 100_000; + var bytes = new byte[19011]; + Arrays.fill(bytes, (byte)1); + try (var sut = new Lasher(tmpPath, fileSize, fileSize)) { + assertEquals(0L, sut.rehashIndex.get()); + + for (long i = 0; i < count; i++) { + sut.put(BytesSupport.longToBytes(i), bytes); + } + + assertNotEquals(0L, sut.rehashIndex.get()); + + for (long i = 0; i < count; i++) { + assertArrayEquals(bytes, sut.get(BytesSupport.longToBytes(i))); + } + + assertEquals(count, sut.size()); + } + + try (var sut = new Lasher(tmpPath, fileSize, fileSize)) { + assertEquals(count, sut.size()); + for (long i = 0; i < count; i++) { + assertArrayEquals(bytes, sut.get(BytesSupport.longToBytes(i))); + } + } + } + + @Test + void should_iterate(@TempDir Path tmpPath) { + try (var sut = new Lasher(tmpPath, MB_32, MB_32)) { + var m = new ConcurrentHashMap(1000); + var rng = new Random(); + long nInserts = 1000; + + for(long k=0; k { + for(long i=0; i { + for(long i=recsPerThread; i { + for(long i=recsPerThread*2; i { + for(long i=recsPerThread*3; i { + var v = sut.get(entry.getKey()); + assertArrayEquals(entry.getValue(), v); + iterCount.incrementAndGet(); + }); + assertEquals(recs, iterCount.get()); + var elapsedGetMs = System.currentTimeMillis() - startGetMs; + System.out.printf("Iterated and retrieved %d rows in %d ms, %s rec/s%n", + iterCount.get(), elapsedGetMs, iterCount.get() / (elapsedGetMs / 1000d)); + } catch (Exception e) { + System.err.println("Got error on " + counter + " \n" + e); + throw e; + } finally { + var endMs = System.currentTimeMillis(); + var elapsedMs = endMs - startMs; + System.out.printf("Total %d rows in %d ms, %s rec/s%n", + counter, elapsedMs, counter / (elapsedMs / 1000d)); + var folderSize = getFolderSize(tmpPath.toFile()); + var used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.gc(); + System.out.println("Folder size after in MB: " + folderSize / 1024 / 1024d); + printFiles(tmpPath); + var usedAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.printf("Before: %s MB, after: %s MB, afterGC: %s MB, used: %s MB%n", + toMB(usedBefore), toMB(used), toMB(usedAfter), toMB(usedAfter - usedBefore)); + } + + + } + + @RepeatedTest(1) + void performance_seq(@TempDir Path tmpPath) { + long recs = 20_000_000; + var counterFound = new AtomicLong(0L); + var counterInserted = new AtomicLong(0L); + + var rnd = new Random(); + var entries = LongStream.range(0, recs) + .mapToObj(k -> { + var bytes = new byte[Math.max(16, rnd.nextInt(256))]; + rnd.nextBytes(bytes); + return new AbstractMap.SimpleEntry<>(BytesSupport.toBytes(k), bytes); + }) + .collect(Collectors.toList()); + + System.gc(); + var usedBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + try (var sut = new Lasher(tmpPath)) { + var elapsedPutMs = measure(() -> entries.forEach(e -> { + sut.put(e.getKey(), e.getValue()); + counterInserted.incrementAndGet(); + })); + System.out.printf("Inserted %d rows in %d ms, %s ops/s%n", + counterInserted.get(), elapsedPutMs, counterInserted.get() / (elapsedPutMs / 1000d)); + + var elapsedGetAllMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(e.getKey()); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetAllMs, recs / (elapsedGetAllMs / 1000d)); + + counterFound.set(0L); + var elapsedGetSomeRandomMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(BytesSupport.toBytes(rnd.nextLong())); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetSomeRandomMs, recs / (elapsedGetSomeRandomMs / 1000d)); + var used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.gc(); + var usedAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.printf("Before: %s MB, after: %s MB, afterGC: %s MB, used: %s MB%n", + toMB(usedBefore), toMB(used), toMB(usedAfter), toMB(usedAfter - usedBefore)); + printGCStats(); + } + } + + @RepeatedTest(3) + void performance_random(@TempDir Path tmpPath) { + long recs = 10_000_000; + var counterFound = new AtomicLong(0L); + var counterInserted = new AtomicLong(0L); + var rnd = new Random(); + var entries = rnd.longs(recs) + .mapToObj(k -> { + var bytes = new byte[Math.max(16, rnd.nextInt(256))]; + rnd.nextBytes(bytes); + return new AbstractMap.SimpleEntry<>(BytesSupport.toBytes(k), bytes); + }) + .collect(Collectors.toList()); + System.gc(); + var usedBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + try (var sut = new Lasher(tmpPath, 64L * 1024 * 1024)) { + var elapsedPutRandomMs = measure(() -> entries.forEach(e -> { + sut.put(e.getKey(), e.getValue()); + counterInserted.incrementAndGet(); + })); + System.out.printf("Inserted %d rows in %d ms, %s ops/s%n", + counterInserted.get(), elapsedPutRandomMs, counterInserted.get() / (elapsedPutRandomMs / 1000d)); + + var elapsedGetAllRandomMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(e.getKey()); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetAllRandomMs, recs / (elapsedGetAllRandomMs / 1000d)); + + counterFound.set(0L); + var elapsedGetSomeRandomMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(BytesSupport.toBytes(rnd.nextLong())); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetSomeRandomMs, recs / (elapsedGetSomeRandomMs / 1000d)); + var used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.gc(); + var usedAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.printf("Before: %s MB, after: %s MB, afterGC: %s MB, used: %s MB%n", + toMB(usedBefore), toMB(used), toMB(usedAfter), toMB(usedAfter - usedBefore)); + } + } + + private long measure(Runnable runnable) { + var startGetMs = System.currentTimeMillis(); + runnable.run(); + return System.currentTimeMillis() - startGetMs; + } + + private static long toMB(long sizeinBytes) { + return sizeinBytes / 1024 / 1024; + } + + @RepeatedTest(3) + void performance_test_java_hashmap_random() { + long recs = 10_000_000; + var counterFound = new AtomicLong(0L); + var counterInserted = new AtomicLong(0L); + var rnd = new Random(); + var entries = rnd.longs(recs) + .mapToObj(k -> { + var bytes = new byte[Math.max(16, rnd.nextInt(256))]; + rnd.nextBytes(bytes); + return new AbstractMap.SimpleEntry<>(BytesSupport.toBytes(k), bytes); + }) + .collect(Collectors.toList()); + System.gc(); + var usedBefore = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + + var sut = new HashMap(1_000_000); + var elapsedPutRandomMs = measure(() -> entries.forEach(e -> { + sut.put(e.getKey(), e.getValue()); + counterInserted.incrementAndGet(); + })); + System.out.printf("Inserted %d rows in %d ms, %s ops/s%n", + counterInserted.get(), elapsedPutRandomMs, counterInserted.get() / (elapsedPutRandomMs / 1000d)); + + var elapsedGetAllRandomMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(e.getKey()); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetAllRandomMs, recs / (elapsedGetAllRandomMs / 1000d)); + + counterFound.set(0L); + var elapsedGetSomeRandomMs = measure(() -> entries.forEach(e -> { + var expected = sut.get(BytesSupport.toBytes(rnd.nextLong())); + if (expected != null) { + counterFound.incrementAndGet(); + } + })); + System.out.printf("Iterated %d and found %d rows in %d ms, %s rec/s%n", + recs, counterFound.get(), elapsedGetSomeRandomMs, recs / (elapsedGetSomeRandomMs / 1000d)); + var used = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.gc(); + var usedAfter = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); + System.out.printf("Before: %s MB, after: %s MB, afterGC: %s MB, used: %s MB%n", + toMB(usedBefore), toMB(used), toMB(usedAfter), toMB(usedAfter - usedBefore)); + + } + + private long getFolderSize(File folder) { + long length = 0; + File[] files = folder.listFiles(); + if (files == null) return length; + + for (var file : files) { + if (file.isFile()) { + length += file.length(); + } else { + length += getFolderSize(file); + } + } + return length; + } + + private void printFiles(Path path) { + try (var fileStream = Files.list(path)) { + fileStream.forEach(p -> { + try { + System.out.printf("%s : %d KB%n", p, Files.size(p) / 1024); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + public void printGCStats() { + long totalGarbageCollections = 0; + long garbageCollectionTime = 0; + + for(var gc: ManagementFactory.getGarbageCollectorMXBeans()) { + long count = gc.getCollectionCount(); + + if(count >= 0) { + totalGarbageCollections += count; + } + + long time = gc.getCollectionTime(); + + if(time >= 0) { + garbageCollectionTime += time; + } + } + + System.out.println("Total Garbage Collections: " + totalGarbageCollections); + System.out.println("Total Garbage Collection Time (ms): " + garbageCollectionTime); + } + +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/serde/BytesSerdeTest.java b/src/test/java/net/soundvibe/lasher/serde/BytesSerdeTest.java new file mode 100644 index 0000000..7db4458 --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/serde/BytesSerdeTest.java @@ -0,0 +1,32 @@ +package net.soundvibe.lasher.serde; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +import java.util.Random; +import java.util.stream.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class BytesSerdeTest { + + private static final BytesSerde sut = new BytesSerde(); + + @ParameterizedTest + @NullSource + @MethodSource("bytesProvider") + void should_read_write_bytes(byte[] value) { + var bytes = sut.toBytes(value); + var actual = sut.fromBytes(bytes); + assertEquals(value, actual); + } + + static Stream bytesProvider() { + var random = new Random(); + return IntStream.range(0, 100) + .mapToObj(byte[]::new) + .peek(random::nextBytes); + } +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/serde/IntegerSerdeTest.java b/src/test/java/net/soundvibe/lasher/serde/IntegerSerdeTest.java new file mode 100644 index 0000000..9341ee5 --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/serde/IntegerSerdeTest.java @@ -0,0 +1,31 @@ +package net.soundvibe.lasher.serde; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +import java.util.Random; +import java.util.stream.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class IntegerSerdeTest { + + private static final IntegerSerde sut = new IntegerSerde(); + + @ParameterizedTest + @NullSource + @MethodSource("integerProvider") + void should_read_write_ints(Integer value) { + var bytes = sut.toBytes(value); + var actual = sut.fromBytes(bytes); + assertEquals(value, actual); + } + + static Stream integerProvider() { + var random = new Random(); + return IntStream.range(0, 100) + .mapToObj(i -> random.nextInt()); + } +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/serde/LongSerdeTest.java b/src/test/java/net/soundvibe/lasher/serde/LongSerdeTest.java new file mode 100644 index 0000000..6f77e44 --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/serde/LongSerdeTest.java @@ -0,0 +1,31 @@ +package net.soundvibe.lasher.serde; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +import java.util.*; +import java.util.stream.*; + +import static org.junit.jupiter.api.Assertions.*; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class LongSerdeTest { + + private static final LongSerde sut = new LongSerde(); + + @ParameterizedTest + @NullSource + @MethodSource("longProvider") + void should_read_write_longs(Long value) { + var bytes = sut.toBytes(value); + var actual = sut.fromBytes(bytes); + assertEquals(value, actual); + } + + static Stream longProvider() { + var random = new Random(); + return IntStream.range(0, 100) + .mapToObj(i -> random.nextLong()); + } +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/serde/StringSerdeTest.java b/src/test/java/net/soundvibe/lasher/serde/StringSerdeTest.java new file mode 100644 index 0000000..bd88dbd --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/serde/StringSerdeTest.java @@ -0,0 +1,35 @@ +package net.soundvibe.lasher.serde; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +import java.util.Random; +import java.util.stream.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class StringSerdeTest { + + private static final StringSerde sut = new StringSerde(); + + @ParameterizedTest + @NullSource + @MethodSource("stringProvider") + void should_read_write_strings(String value) { + var bytes = sut.toBytes(value); + var actual = sut.fromBytes(bytes); + assertEquals(value, actual); + } + + static Stream stringProvider() { + var random = new Random(); + return IntStream.range(0, 100) + .mapToObj(byte[]::new) + .map(bytes -> { + random.nextBytes(bytes); + return new String(bytes); + }); + } +} \ No newline at end of file diff --git a/src/test/java/net/soundvibe/lasher/serde/UUIDSerdeTest.java b/src/test/java/net/soundvibe/lasher/serde/UUIDSerdeTest.java new file mode 100644 index 0000000..f19841f --- /dev/null +++ b/src/test/java/net/soundvibe/lasher/serde/UUIDSerdeTest.java @@ -0,0 +1,30 @@ +package net.soundvibe.lasher.serde; + +import org.junit.jupiter.api.*; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; + +import java.util.UUID; +import java.util.stream.*; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +@DisplayNameGeneration(DisplayNameGenerator.ReplaceUnderscores.class) +class UUIDSerdeTest { + + private static final UUIDSerde sut = new UUIDSerde(); + + @ParameterizedTest + @NullSource + @MethodSource("uuidProvider") + void should_read_write_uuids(UUID uuid) { + var bytes = sut.toBytes(uuid); + var actual = sut.fromBytes(bytes); + assertEquals(uuid, actual); + } + + static Stream uuidProvider() { + return IntStream.range(0, 100) + .mapToObj(i -> UUID.randomUUID()); + } +} \ No newline at end of file