Skip to content

Commit

Permalink
Add support for visiting buffers backed by byte[] arrays
Browse files Browse the repository at this point in the history
- getBytes calls setBytes with a byte[] argument for
  heap ByteBufs
  • Loading branch information
lhotari committed Feb 1, 2024
1 parent 19e33ca commit 86d002d
Show file tree
Hide file tree
Showing 14 changed files with 120 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static void visitBuffers(ByteBuf buffer, int offset, int length, ByteBufV
*/
public interface ByteBufVisitorCallback {
void visitBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength);
void visitArray(byte[] visitArray, int visitIndex, int visitLength);
}

/**
Expand All @@ -93,14 +94,23 @@ private static void doRecursivelyVisitBuffers(ByteBuf buffer, int offset, int le
// visit the wrapped buffers recursively if the buffer is not backed by an array or memory address
// and the max depth has not been reached
if (depth < maxDepth && !buffer.hasMemoryAddress() && !buffer.hasArray()) {
visitBuffersImpl(buffer, offset, length, (visitBuffer, visitIndex, visitLength) -> {
if (visitBuffer == buffer && visitIndex == offset && visitLength == length) {
// visit the buffer since it was already passed to visitBuffersImpl and further recursion
// would cause unnecessary recursion up to the max depth of recursion
callback.visitBuffer(visitBuffer, visitIndex, visitLength);
} else {
// use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively
doRecursivelyVisitBuffers(visitBuffer, visitIndex, visitLength, callback, maxDepth, depth + 1);
visitBuffersImpl(buffer, offset, length, new ByteBufVisitorCallback() {
@Override
public void visitBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) {
if (visitBuffer == buffer && visitIndex == offset && visitLength == length) {
// visit the buffer since it was already passed to visitBuffersImpl and further recursion
// would cause unnecessary recursion up to the max depth of recursion
callback.visitBuffer(visitBuffer, visitIndex, visitLength);
} else {
// use the doRecursivelyVisitBuffers method to visit the wrapped buffer, possibly recursively
doRecursivelyVisitBuffers(visitBuffer, visitIndex, visitLength, callback, maxDepth, depth + 1);
}
}

@Override
public void visitArray(byte[] visitArray, int visitIndex, int visitLength) {
// visit the array
callback.visitArray(visitArray, visitIndex, visitLength);
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,9 @@ void populateValueAndReset(int digest, ByteBuf buf) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, data, offset, len);
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return Crc32cIntChecksum.resumeChecksum(digest, buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ interface CRC32Digest {
long getValueAndReset();

void update(ByteBuf buf, int offset, int len);
void update(byte[] buffer, int offset, int len);
}

private static final FastThreadLocal<CRC32Digest> crc = new FastThreadLocal<CRC32Digest>() {
Expand Down Expand Up @@ -67,6 +68,12 @@ int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
crc.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
// This is stored as 8 bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,30 @@ public abstract class DigestManager {

abstract int internalUpdate(int digest, ByteBuf buffer, int offset, int len);

abstract int internalUpdate(int digest, byte[] buffer, int offset, int len);

final int update(int digest, ByteBuf buffer, int offset, int len) {
MutableInt digestRef = new MutableInt(digest);
ByteBufVisitor.visitBuffers(buffer, offset, len,
(ByteBuf childBuffer, int childIndex, int childLength) -> {
if (childLength > 0) {
// recursively visit the sub buffer and update the digest
int updatedDigest = internalUpdate(digestRef.intValue(), childBuffer, childIndex, childLength);
digestRef.setValue(updatedDigest);
new ByteBufVisitor.ByteBufVisitorCallback() {
@Override
public void visitBuffer(ByteBuf visitBuffer, int visitIndex, int visitLength) {
if (visitLength > 0) {
// recursively visit the sub buffer and update the digest
int updatedDigest =
internalUpdate(digestRef.intValue(), visitBuffer, visitIndex, visitLength);
digestRef.setValue(updatedDigest);
}
}

@Override
public void visitArray(byte[] visitArray, int visitIndex, int visitLength) {
if (visitLength > 0) {
// update the digest with the array
int updatedDigest =
internalUpdate(digestRef.intValue(), visitArray, visitIndex, visitLength);
digestRef.setValue(updatedDigest);
}
}
});
return digestRef.intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,32 @@ public void update(ByteBuf buf, int index, int length) {
crcValue = (int) updateByteBuffer.invoke(null, crcValue, buf.memoryAddress(), index, length);
} else if (buf.hasArray()) {
// Use the internal method to update from array based
crcValue = (int) updateBytes.invoke(null, crcValue, buf.array(), buf.arrayOffset() + index, length);
crcValue = updateArray(crcValue, buf.array(), buf.arrayOffset() + index, length);
} else {
// Fallback to data copy if buffer is not contiguous
byte[] b = new byte[length];
buf.getBytes(index, b, 0, length);
crcValue = (int) updateBytes.invoke(null, crcValue, b, 0, b.length);
crcValue = updateArray(crcValue, b, 0, length);
}
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static int updateArray(int crcValue, byte[] buf, int offset, int length)
throws IllegalAccessException, InvocationTargetException {
return (int) updateBytes.invoke(null, crcValue, buf, offset, length);
}

@Override
public void update(byte[] buffer, int offset, int len) {
try {
crcValue = (int) updateBytes.invoke(null, crcValue, buffer, offset, len);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}

private static final Method updateByteBuffer;
private static final Method updateBytes;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ int internalUpdate(int digest, ByteBuf buffer, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return 0;
}

@Override
void populateValueAndReset(int digest, ByteBuf buffer) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return 0;
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
mac.get().update(buffer, offset, len);
return 0;
}

@Override
boolean isInt32Digest() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,9 @@ public long getValueAndReset() {
public void update(ByteBuf buf, int offset, int len) {
crc.update(buf.slice(offset, len).nioBuffer());
}

@Override
public void update(byte[] buffer, int offset, int len) {
crc.update(buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ void populateValueAndReset(int digest, ByteBuf buf) {
int internalUpdate(int digest, ByteBuf data, int offset, int len) {
return intHash.resume(digest, data, offset, len);
}

@Override
int internalUpdate(int digest, byte[] buffer, int offset, int len) {
return intHash.resume(digest, buffer, offset, len);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,27 @@ public static int resumeChecksum(int previousChecksum, ByteBuf payload) {
/**
* Computes incremental checksum with input previousChecksum and input payload
*
* @param previousChecksum : previously computed checksum
* @param payload
* @return
* @param previousChecksum the previously computed checksum
* @param payload the data for which the checksum is to be computed
* @param offset the starting position in the payload
* @param len the number of bytes to include in the checksum computation
* @return the updated checksum
*/
public static int resumeChecksum(int previousChecksum, ByteBuf payload, int offset, int len) {
return CRC32C_HASH.resume(previousChecksum, payload, offset, len);
}

/**
* Computes incremental checksum with input previousChecksum and input payload
*
* @param previousChecksum the previously computed checksum
* @param payload the data for which the checksum is to be computed
* @param offset the starting position in the payload
* @param len the number of bytes to include in the checksum computation
* @return the updated checksum
*/
public static int resumeChecksum(int previousChecksum, byte[] payload, int offset, int len) {
return CRC32C_HASH.resume(previousChecksum, payload, offset, len);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,6 @@ public interface IntHash {
int resume(int current, ByteBuf buffer);

int resume(int current, ByteBuf buffer, int offset, int len);

int resume(int current, byte[] buffer, int offset, int len);
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public int resume(int current, ByteBuf buffer, int offset, int len) {
return hash.resume(current, buffer.slice(offset, len).nioBuffer());
}
}

@Override
public int resume(int current, byte[] buffer, int offset, int len) {
return hash.resume(current, buffer, offset, len);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ private int resume(int current, long address, int offset, int length) {
}
}

private int resume(int current, byte[] array, int offset, int length) {
@Override
public int resume(int current, byte[] array, int offset, int length) {
try {
return (int) UPDATE_BYTES.invoke(null, current, array, offset, offset + length);
} catch (IllegalAccessException | InvocationTargetException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ public int resume(int current, ByteBuf buffer, int offset, int len) {
return hash.resume(current, buffer.slice(offset, len).nioBuffer());
}
}

@Override
public int resume(int current, byte[] buffer, int offset, int len) {
return hash.resume(current, buffer, offset, len);
}
}

0 comments on commit 86d002d

Please sign in to comment.