Skip to content

Commit

Permalink
OAK-11154: Read partial segments from SegmentWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicolapps committed Sep 27, 2024
1 parent cae54ed commit 67c2790
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.jackrabbit.oak.plugins.memory.ModifiedNodeState;
import org.apache.jackrabbit.oak.segment.RecordWriters.RecordWriter;
import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.apache.jackrabbit.oak.spi.blob.BlobStore;
import org.apache.jackrabbit.oak.spi.state.DefaultNodeStateDiff;
Expand Down Expand Up @@ -156,6 +157,11 @@ public void flush() throws IOException {
writeOperationHandler.flush(store);
}

@Override
public @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
return writeOperationHandler.readPartialSegmentState(sid);
}

@NotNull
RecordId writeMap(@Nullable final MapRecord base, @NotNull final Map<String, RecordId> changes) throws IOException {
return new SegmentWriteOperation(writeOperationHandler.getGCGeneration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,18 @@
import java.io.IOException;
import java.io.PrintStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import org.apache.commons.io.HexDump;
import org.apache.jackrabbit.oak.segment.RecordNumbers.Entry;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -65,7 +71,9 @@
* The behaviour of this class is undefined should the pre-allocated buffer be
* overrun be calling any of the write methods.
* <p>
* Instances of this class are <em>not thread safe</em>
* It is safe to call {@link #readPartialSegmentState(SegmentId)} concurrently with the write operations of a single
* writer (including several concurrent calls of this method). However, it is not safe to have concurrent writers,
* notably because the order in which {@code prepare} and {@code writeXYZ} methods are called matters.
*/
public class SegmentBufferWriter implements WriteOperationHandler {

Expand Down Expand Up @@ -150,7 +158,7 @@ public SegmentBufferWriter(@NotNull SegmentIdProvider idProvider,

@NotNull
@Override
public RecordId execute(@NotNull GCGeneration gcGeneration,
public synchronized RecordId execute(@NotNull GCGeneration gcGeneration,
@NotNull WriteOperation writeOperation)
throws IOException {
checkState(gcGeneration.equals(this.gcGeneration));
Expand All @@ -159,7 +167,7 @@ public RecordId execute(@NotNull GCGeneration gcGeneration,

@Override
@NotNull
public GCGeneration getGCGeneration() {
public synchronized GCGeneration getGCGeneration() {
return gcGeneration;
}

Expand Down Expand Up @@ -220,22 +228,22 @@ private void newSegment(SegmentStore store) throws IOException {
dirty = false;
}

public void writeByte(byte value) {
public synchronized void writeByte(byte value) {
position = BinaryUtils.writeByte(buffer, position, value);
dirty = true;
}

public void writeShort(short value) {
public synchronized void writeShort(short value) {
position = BinaryUtils.writeShort(buffer, position, value);
dirty = true;
}

public void writeInt(int value) {
public synchronized void writeInt(int value) {
position = BinaryUtils.writeInt(buffer, position, value);
dirty = true;
}

public void writeLong(long value) {
public synchronized void writeLong(long value) {
position = BinaryUtils.writeLong(buffer, position, value);
dirty = true;
}
Expand All @@ -245,7 +253,7 @@ public void writeLong(long value) {
*
* @param recordId the record ID.
*/
public void writeRecordId(RecordId recordId) {
public synchronized void writeRecordId(RecordId recordId) {
requireNonNull(recordId);
checkState(segmentReferences.size() + 1 < 0xffff,
"Segment cannot have more than 0xffff references");
Expand Down Expand Up @@ -278,7 +286,7 @@ private static String info(Segment segment) {
return info;
}

public void writeBytes(byte[] data, int offset, int length) {
public synchronized void writeBytes(byte[] data, int offset, int length) {
arraycopy(data, offset, buffer, position, length);
position += length;
dirty = true;
Expand Down Expand Up @@ -308,7 +316,7 @@ private String dumpSegmentBuffer() {
* enough space for a record. It can also be called explicitly.
*/
@Override
public void flush(@NotNull SegmentStore store) throws IOException {
public synchronized void flush(@NotNull SegmentStore store) throws IOException {
if (dirty) {
int referencedSegmentIdCount = segmentReferences.size();
BinaryUtils.writeInt(buffer, Segment.REFERENCED_SEGMENT_ID_COUNT_OFFSET, referencedSegmentIdCount);
Expand Down Expand Up @@ -381,7 +389,7 @@ public void flush(@NotNull SegmentStore store) throws IOException {
* @param store the {@code SegmentStore} instance to write full segments to
* @return a new record id
*/
public RecordId prepare(RecordType type, int size, Collection<RecordId> ids, SegmentStore store) throws IOException {
public synchronized RecordId prepare(RecordType type, int size, Collection<RecordId> ids, SegmentStore store) throws IOException {
checkArgument(size >= 0);
requireNonNull(ids);

Expand Down Expand Up @@ -459,4 +467,65 @@ public RecordId prepare(RecordType type, int size, Collection<RecordId> ids, Seg
return new RecordId(segment.getSegmentId(), recordNumber);
}

@Override
public synchronized @Nullable PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
if (segment == null || !segment.getSegmentId().equals(sid)) {
return null;
}

byte version = SegmentVersion.asByte(LATEST_VERSION);
int generation = gcGeneration.getGeneration();
int fullGeneration = gcGeneration.getFullGeneration();
boolean isCompacted = gcGeneration.isCompacted();

List<PartialSegmentState.SegmentReference> segmentReferencesList = StreamSupport.stream(segmentReferences.spliterator(), false)
.map(segmentId -> new PartialSegmentState.SegmentReference(segmentId.getMostSignificantBits(), segmentId.getLeastSignificantBits()))
.collect(Collectors.toUnmodifiableList());

List<PartialSegmentState.Record> records = getCurrentRecords();

return new PartialSegmentState(
version,
generation,
fullGeneration,
isCompacted,
segmentReferencesList,
records
);
}

/**
* Get the current records in the buffer, in descending order of their offset.
*
* <p>
* The contents of the record currently being written to can be incomplete. In this case,
* {@link PartialSegmentState.Record#contents()} will only contain the data that has been written so far.
*/
private @NotNull List<PartialSegmentState.Record> getCurrentRecords() {
List<PartialSegmentState.Record> result = new ArrayList<>();

Entry previousEntry = null;
for (Entry entry : recordNumbers) {
int currentRecordStart = entry.getOffset();

// Record in recordNumbers are sorted in descending order of offset
assert previousEntry == null || previousEntry.getOffset() >= currentRecordStart;

int nextRecordStart = previousEntry == null ? MAX_SEGMENT_SIZE : previousEntry.getOffset();
boolean isPartiallyWrittenRecord = position >= currentRecordStart;
int currentRecordEnd = isPartiallyWrittenRecord ? position : nextRecordStart;
result.add(
new PartialSegmentState.Record(
entry.getRecordNumber(),
entry.getType(),
currentRecordStart,
Arrays.copyOfRange(buffer, currentRecordStart, currentRecordEnd)
)
);

previousEntry = entry;
}

return List.copyOf(result);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.jackrabbit.oak.api.Blob;
import org.apache.jackrabbit.oak.commons.Buffer;
import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand All @@ -33,6 +34,19 @@ public interface SegmentWriter {

void flush() throws IOException;

/**
* Get the {@link PartialSegmentState partial state} of a segment
* that has started being written to but hasn’t been flushed yet.
*
* @param sid The ID of the segment
* @return The partial state or {@code null} if no partial state was found for the given segment ID.
* @throws UnsupportedOperationException if reading partial segment states is not supported.
*/
@Nullable
default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
throw new UnsupportedOperationException("Trying to read partial segment state from a SegmentWriter that doesn’t support it.");
}

/**
* Write a blob (as list of block records)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@

import java.io.IOException;

import org.apache.jackrabbit.oak.segment.data.PartialSegmentState;
import org.jetbrains.annotations.NotNull;

import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
import org.jetbrains.annotations.Nullable;

/**
* A {@code WriteOperationHandler} executes {@link WriteOperation
Expand Down Expand Up @@ -73,4 +75,10 @@ RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull WriteOperation wri
* @throws IOException
*/
void flush(@NotNull SegmentStore store) throws IOException;

/** @see SegmentWriter#readPartialSegmentState(SegmentId) */
@Nullable
default PartialSegmentState readPartialSegmentState(@NotNull SegmentId sid) {
throw new UnsupportedOperationException("Trying to read partial segment state from a WriteOperationHandler that doesn’t support it.");
}
}
Loading

0 comments on commit 67c2790

Please sign in to comment.