Skip to content

Issue #13043 - review of websocket Flushers for 12.1 #13250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: jetty-12.1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -795,14 +795,9 @@ public Flusher(Configuration configuration)
}

@Override
protected void forwardFrame(Frame frame, Callback callback, boolean batch)
protected void forwardFrame(OutgoingEntry entry)
{
OutgoingEntry currentEntry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
negotiated.getExtensions().sendFrame(currentEntry);
negotiated.getExtensions().sendFrame(entry);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.eclipse.jetty.websocket.core.OutgoingEntry;
import org.eclipse.jetty.websocket.core.WebSocketComponents;
import org.eclipse.jetty.websocket.core.util.DemandChain;
import org.eclipse.jetty.websocket.core.util.DemandingFlusher;
import org.eclipse.jetty.websocket.core.util.FragmentingFlusher;
import org.eclipse.jetty.websocket.core.util.WebSocketDemander;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,21 +37,16 @@ public class FragmentExtension extends AbstractExtension implements DemandChain
private static final Logger LOG = LoggerFactory.getLogger(FragmentExtension.class);

private final FragmentingFlusher outgoingFlusher;
private final DemandingFlusher incomingFlusher;
private final WebSocketDemander incomingFlusher;
private final Configuration configuration = new Configuration.ConfigurationCustomizer();

public FragmentExtension()
{
outgoingFlusher = new FragmentingFlusher(configuration)
{
@Override
protected void forwardFrame(Frame frame, Callback callback, boolean batch)
protected void forwardFrame(OutgoingEntry entry)
{
OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
nextOutgoingFrame(entry);
}
};
Expand Down Expand Up @@ -97,8 +92,10 @@ public void init(ExtensionConfig config, WebSocketComponents components)
configuration.setMaxFrameSize(maxLength);
}

public class FragmentingDemandingFlusher extends DemandingFlusher
public class FragmentingDemandingFlusher extends WebSocketDemander
{
private ByteBuffer _payload;

public FragmentingDemandingFlusher()
{
super(FragmentExtension.this::nextIncomingFrame);
Expand All @@ -107,40 +104,42 @@ public FragmentingDemandingFlusher()
@Override
protected boolean handle(Frame frame, Callback callback, boolean first)
{
long maxFrameSize = configuration.getMaxFrameSize();
if (first)
{
if (frame.isControlFrame())
if (frame.isControlFrame() || maxFrameSize <= 0 || frame.getPayloadLength() <= maxFrameSize)
{
emitFrame(frame, callback);
return true;
}

// Slice the payload so we don't modify the original.
_payload = frame.getPayload().slice();
}

ByteBuffer payload = frame.getPayload();
int remaining = payload.remaining();
long maxFrameSize = configuration.getMaxFrameSize();
int remaining = _payload.remaining();
int fragmentSize = (int)Math.min(remaining, maxFrameSize);

boolean continuation = (frame.getOpCode() == OpCode.CONTINUATION) || !first;
Frame fragment = new Frame(continuation ? OpCode.CONTINUATION : frame.getOpCode());
byte opCode = (frame.getOpCode() == OpCode.CONTINUATION || !first) ? OpCode.CONTINUATION : frame.getOpCode();
Frame fragment = new Frame(opCode);
boolean finished = (maxFrameSize <= 0 || remaining <= maxFrameSize);
fragment.setFin(frame.isFin() && finished);

if (finished)
{
// If finished we don't need to fragment, forward original payload.
fragment.setPayload(payload);
fragment.setPayload(_payload);
_payload = null;
}
else
{
// Slice the fragmented payload from the buffer.
int limit = payload.limit();
int newLimit = payload.position() + fragmentSize;
payload.limit(newLimit);
ByteBuffer payloadFragment = payload.slice();
payload.limit(limit);
int limit = _payload.limit();
int newLimit = _payload.position() + fragmentSize;
_payload.limit(newLimit);
ByteBuffer payloadFragment = _payload.slice();
_payload.limit(limit);
fragment.setPayload(payloadFragment);
payload.position(newLimit);
_payload.position(newLimit);
if (LOG.isDebugEnabled())
LOG.debug("Fragmented {}->{}", frame, fragment);
}
Expand All @@ -159,5 +158,12 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
emitFrame(fragment, payloadCallback);
return finished;
}

@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
_payload = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException;
import org.eclipse.jetty.websocket.core.exception.ProtocolException;
import org.eclipse.jetty.websocket.core.util.DemandChain;
import org.eclipse.jetty.websocket.core.util.DemandingFlusher;
import org.eclipse.jetty.websocket.core.util.TransformingFlusher;
import org.eclipse.jetty.websocket.core.util.WebSocketDemander;
import org.eclipse.jetty.websocket.core.util.WebSocketFlusher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,7 +58,6 @@ public class PerMessageDeflateExtension extends AbstractExtension implements Dem
private final IncomingFlusher incomingFlusher;
private DeflaterPool.Entry deflaterHolder;
private InflaterPool.Entry inflaterHolder;
private boolean incomingCompressed;

private ExtensionConfig configRequested;
private ExtensionConfig configNegotiated;
Expand Down Expand Up @@ -264,57 +263,37 @@ public void demand()
incomingFlusher.demand();
}

private class OutgoingFlusher extends TransformingFlusher
private class OutgoingFlusher extends WebSocketFlusher
{
private boolean _first;
private Frame _frame;

@Override
protected boolean onFrame(Frame frame, Callback callback, boolean batch)
protected boolean onFrame(OutgoingEntry entry, boolean first)
{
if (frame.isControlFrame())
if (first)
{
OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
.frame(frame)
.callback(callback)
.batch(batch)
.build();
nextOutgoingFrame(entry);
return true;
}

_first = true;
_frame = frame;

// Provide the frames payload as input to the Deflater.
getDeflater().setInput(frame.getPayload().slice());
callback.succeeded();
return false;
}
if (entry.getFrame().isControlFrame())
{
nextOutgoingFrame(entry);
return true;
}

@Override
protected boolean transform(Callback callback)
{
boolean finished = deflate(callback);
_first = false;
// Provide the frames payload as input to the Deflater.
getDeflater().setInput(entry.getFrame().getPayload().slice());
}

boolean finished = deflate(entry, first);
if (finished)
{
_frame = null;
getDeflater().setInput(BufferUtil.EMPTY_BUFFER);
}

return finished;
}

private boolean deflate(Callback callback)
private boolean deflate(OutgoingEntry entry, boolean first)
{
// Get a buffer for the deflated payload.
long maxFrameSize = getConfiguration().getMaxFrameSize();
int bufferSize = (maxFrameSize <= 0) ? deflateBufferSize : (int)Math.min(maxFrameSize, deflateBufferSize);
RetainableByteBuffer buffer = getByteBufferPool().acquire(bufferSize, false);
ByteBuffer byteBuffer = buffer.getByteBuffer();
callback = Callback.from(callback, buffer::release);
Callback callback = Callback.from(entry.getCallback(), buffer::release);

// Fill up the buffer with a max length of bufferSize;
boolean finished = false;
Expand Down Expand Up @@ -342,44 +321,45 @@ private boolean deflate(Callback callback)
}
}

Frame frame = entry.getFrame();
ByteBuffer payload = byteBuffer;
if (payload.hasRemaining())
{
// Handle tail bytes generated by SYNC_FLUSH.
if (finished && _frame.isFin() && endsWithTail(payload))
if (finished && frame.isFin() && endsWithTail(payload))
{
payload.limit(payload.limit() - TAIL_BYTES.length);
if (LOG.isDebugEnabled())
LOG.debug("payload (TAIL_DROP_FIN_ONLY) = {}", BufferUtil.toDetailString(payload));
}
}
else if (_frame.isFin())
else if (frame.isFin())
{
// Special case: 7.2.3.6. Generating an Empty Fragment Manually
// https://tools.ietf.org/html/rfc7692#section-7.2.3.6
payload = ByteBuffer.wrap(new byte[]{0x00});
}

if (LOG.isDebugEnabled())
LOG.debug("Compressed {}: payload:{}", _frame, payload.remaining());
LOG.debug("Compressed {}: payload:{}", frame, payload.remaining());

Frame chunk = new Frame(_first ? _frame.getOpCode() : OpCode.CONTINUATION);
chunk.setRsv1(_first && _frame.getOpCode() != OpCode.CONTINUATION);
Frame chunk = new Frame(first ? frame.getOpCode() : OpCode.CONTINUATION);
chunk.setRsv1(first && frame.getOpCode() != OpCode.CONTINUATION);
chunk.setPayload(payload);
chunk.setFin(_frame.isFin() && finished);
chunk.setFin(frame.isFin() && finished);

OutgoingEntry entry = new OutgoingEntry.Builder(getCurrentEntry())
nextOutgoingFrame(new OutgoingEntry.Builder(entry)
.frame(chunk)
.callback(callback)
.build();
nextOutgoingFrame(entry);
.build());
return finished;
}
}

private class IncomingFlusher extends DemandingFlusher
private class IncomingFlusher extends WebSocketDemander
{
private boolean _tailBytes;
private boolean _incomingCompressed;
private AtomicReference<RetainableByteBuffer> _payloadRef;

public IncomingFlusher()
Expand All @@ -404,7 +384,7 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
{
case OpCode.TEXT:
case OpCode.BINARY:
incomingCompressed = frame.isRsv1();
_incomingCompressed = frame.isRsv1();
break;

case OpCode.CONTINUATION:
Expand All @@ -416,7 +396,7 @@ protected boolean handle(Frame frame, Callback callback, boolean first)
break;
}

if (!incomingCompressed)
if (!_incomingCompressed)
{
emitFrame(frame, callback);
return true;
Expand Down Expand Up @@ -513,6 +493,7 @@ private boolean inflate(Frame frame, Callback callback, boolean first) throws Da
@Override
protected void onCompleteFailure(Throwable cause)
{
super.onCompleteFailure(cause);
releasePayload(_payloadRef);
}

Expand Down
Loading