Skip to content

Commit

Permalink
[fix][broker] Fix broker OOM when upload a large package. (#22989)
Browse files Browse the repository at this point in the history
(cherry picked from commit da2a191)
  • Loading branch information
shibd committed Jul 2, 2024
1 parent b6a8c03 commit 4bae01d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.distributedlog.LogRecord;
Expand All @@ -38,6 +36,7 @@ class DLOutputStream {

private final DistributedLogManager distributedLogManager;
private final AsyncLogWriter writer;
private final byte[] readBuffer = new byte[8192];
private long offset = 0L;

private DLOutputStream(DistributedLogManager distributedLogManager, AsyncLogWriter writer) {
Expand All @@ -50,42 +49,38 @@ static CompletableFuture<DLOutputStream> openWriterAsync(DistributedLogManager d
return distributedLogManager.openAsyncLogWriter().thenApply(w -> new DLOutputStream(distributedLogManager, w));
}

private CompletableFuture<List<LogRecord>> getRecords(InputStream inputStream) {
CompletableFuture<List<LogRecord>> future = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
byte[] readBuffer = new byte[8192];
List<LogRecord> records = new ArrayList<>();
try {
int read = 0;
while ((read = inputStream.read(readBuffer)) != -1) {
log.info("write something into the ledgers offset: {}, length: {}", offset, read);
ByteBuf writeBuf = Unpooled.copiedBuffer(readBuffer, 0, read);
offset += writeBuf.readableBytes();
LogRecord record = new LogRecord(offset, writeBuf);
records.add(record);
}
future.complete(records);
} catch (IOException e) {
log.error("Failed to get all records from the input stream", e);
future.completeExceptionally(e);
private void writeAsyncHelper(InputStream is, CompletableFuture<DLOutputStream> result) {
try {
int read = is.read(readBuffer);
if (read != -1) {
log.info("write something into the ledgers offset: {}, length: {}", offset, read);
final ByteBuf writeBuf = Unpooled.wrappedBuffer(readBuffer, 0, read);
offset += writeBuf.readableBytes();
final LogRecord record = new LogRecord(offset, writeBuf);
writer.write(record).thenAccept(v -> writeAsyncHelper(is, result))
.exceptionally(e -> {
result.completeExceptionally(e);
return null;
});
} else {
result.complete(this);
}
});
return future;
} catch (IOException e) {
log.error("Failed to get all records from the input stream", e);
result.completeExceptionally(e);
}
}

/**
* Write all input stream data to the distribute log.
*
* @param inputStream the data we need to write
* @return
* @return CompletableFuture<DLOutputStream>
*/
CompletableFuture<DLOutputStream> writeAsync(InputStream inputStream) {
return getRecords(inputStream)
.thenCompose(this::writeAsync);
}

private CompletableFuture<DLOutputStream> writeAsync(List<LogRecord> records) {
return writer.writeBulk(records).thenApply(ignore -> this);
CompletableFuture<DLOutputStream> result = new CompletableFuture<>();
writeAsyncHelper(inputStream, result);
return result;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,18 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.anyList;
Expand All @@ -53,9 +54,8 @@ public void setup() {
when(dlm.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
when(writer.markEndOfStream()).thenReturn(CompletableFuture.completedFuture(null));
when(writer.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
when(writer.writeBulk(anyList()))
.thenReturn(CompletableFuture.completedFuture(
Collections.singletonList(CompletableFuture.completedFuture(DLSN.InitialDLSN))));
when(writer.write(any(LogRecord.class)))
.thenReturn(CompletableFuture.completedFuture(DLSN.InitialDLSN));
}

@AfterMethod(alwaysRun = true)
Expand All @@ -75,7 +75,7 @@ public void writeInputStreamData() throws ExecutionException, InterruptedExcepti
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();

verify(writer, times(1)).writeBulk(anyList());
verify(writer, times(1)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
Expand All @@ -91,7 +91,7 @@ public void writeBytesArrayData() throws ExecutionException, InterruptedExceptio
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();

verify(writer, times(1)).writeBulk(anyList());
verify(writer, times(1)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
Expand All @@ -104,7 +104,7 @@ public void writeLongBytesArrayData() throws ExecutionException, InterruptedExce
.thenCompose(w -> w.writeAsync(new ByteArrayInputStream(data))
.thenCompose(DLOutputStream::closeAsync)).get();

verify(writer, times(1)).writeBulk(anyList());
verify(writer, times(4)).write(any(LogRecord.class));
verify(writer, times(1)).markEndOfStream();
verify(writer, times(1)).asyncClose();
verify(dlm, times(1)).asyncClose();
Expand Down

0 comments on commit 4bae01d

Please sign in to comment.