Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add binary option to WebSockets #1598

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -150,24 +150,27 @@ public final Object[] websocket(IArguments args) throws LuaException {
String address;
Map<?, ?> headerTable;
Optional<Double> timeoutArg;
boolean binary;

if (args.get(0) instanceof Map) {
var options = args.getTable(0);
address = getStringField(options, "url");
headerTable = optTableField(options, "headers", Collections.emptyMap());
timeoutArg = optRealField(options, "timeout");
binary = optBooleanField(options, "binary", false);
} else {
address = args.getString(0);
headerTable = args.optTable(1, Collections.emptyMap());
timeoutArg = Optional.empty();
binary = args.optBoolean(2, false);
}

var headers = getHeaders(headerTable);
var timeout = getTimeout(timeoutArg);

try {
var uri = WebsocketClient.parseUri(address);
if (!new Websocket(websockets, apiEnvironment, uri, address, headers, timeout).queue(Websocket::connect)) {
if (!new Websocket(websockets, apiEnvironment, uri, address, headers, timeout, binary).queue(Websocket::connect)) {
throw new LuaException("Too many websockets already open");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ public class Websocket extends Resource<Websocket> implements WebsocketClient {
private final String address;
private final HttpHeaders headers;
private final int timeout;
private final boolean binary;

public Websocket(ResourceGroup<Websocket> limiter, IAPIEnvironment environment, URI uri, String address, HttpHeaders headers, int timeout) {
public Websocket(ResourceGroup<Websocket> limiter, IAPIEnvironment environment, URI uri, String address, HttpHeaders headers, int timeout, boolean binary) {
super(limiter);
this.environment = environment;
this.uri = uri;
this.address = address;
this.headers = headers;
this.timeout = timeout;
this.binary = binary;
}

public void connect() {
Expand Down Expand Up @@ -184,4 +186,9 @@ public void sendBinary(ByteBuffer message) {
var channel = channel();
if (channel != null) channel.writeAndFlush(new BinaryWebSocketFrame(Unpooled.wrappedBuffer(message)));
}

@Override
public boolean isBinary() {
return binary;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public interface WebsocketClient extends Closeable {
*/
void sendBinary(ByteBuffer message);

/**
* Determine whether the websocket sends binary messages by default.
*
* @return Whether the websocket sends binary messages by default.
*/
default boolean isBinary() {
return false;
}

/**
* Parse an address, ensuring it is a valid websocket URI.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import dan200.computercraft.core.apis.IAPIEnvironment;
import dan200.computercraft.core.apis.http.options.Options;

import java.nio.charset.StandardCharsets;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Arrays;
import java.util.Optional;

Expand Down Expand Up @@ -79,7 +81,21 @@ public final void send(Coerced<String> message, Optional<Boolean> binary) throws
if (binary.orElse(false)) {
websocket.sendBinary(LuaValues.encode(text));
} else {
websocket.sendText(text);
var data = text;
if (websocket.isBinary()) {
// Try to convert the string from UTF-8 bytes to UTF-16 codepoints.
// If this fails, fall back to normal ANSI.
try {
var buf = LuaValues.encode(text);
var bytes = new byte[buf.capacity()];
buf.get(bytes);
data = new String(bytes, StandardCharsets.UTF_8);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to use a CharsetDecoder here instead, and then fail on CharacterCodingException errors, instead of replacing invalid characters with U+FFFD. Not sure, leave as is, will have a think about it.

} catch (UnsupportedCharsetException ignored) {
// Suppress warnings.
data = text;
}
}
websocket.sendText(data);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ public void channelRead0(ChannelHandlerContext ctx, Object msg) {
}

var frame = (WebSocketFrame) msg;
if (frame instanceof TextWebSocketFrame textFrame) {
if (websocket.isBinary() || frame instanceof BinaryWebSocketFrame) {
var converted = NetworkUtils.toBytes(frame.content());

websocket.environment().observe(Metrics.WEBSOCKET_INCOMING, converted.length);
websocket.environment().queueEvent(MESSAGE_EVENT, websocket.address(), converted, frame instanceof BinaryWebSocketFrame);
} else if (frame instanceof TextWebSocketFrame textFrame) {
var data = textFrame.text();

websocket.environment().observe(Metrics.WEBSOCKET_INCOMING, data.length());
websocket.environment().queueEvent(MESSAGE_EVENT, websocket.address(), data, false);
} else if (frame instanceof BinaryWebSocketFrame) {
var converted = NetworkUtils.toBytes(frame.content());

websocket.environment().observe(Metrics.WEBSOCKET_INCOMING, converted.length);
websocket.environment().queueEvent(MESSAGE_EVENT, websocket.address(), converted, true);
} else if (frame instanceof CloseWebSocketFrame closeFrame) {
websocket.close(closeFrame.statusCode(), closeFrame.reasonText());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ local function check_websocket_options(options, body)
check_key(options, "url", "string")
check_key(options, "headers", "table", true)
check_key(options, "timeout", "number", true)
check_key(options, "binary", "boolean", true)
end


Expand All @@ -299,18 +300,19 @@ these options behave.
@see websocket_success
@see websocket_failure
]]
function websocketAsync(url, headers)
function websocketAsync(url, headers, binary)
local actual_url
if type(url) == "table" then
check_websocket_options(url)
actual_url = url.url
else
expect(1, url, "string")
expect(2, headers, "table", "nil")
expect(3, binary, "boolean", "nil")
actual_url = url
end

local ok, err = nativeWebsocket(url, headers)
local ok, err = nativeWebsocket(url, headers, binary)
if not ok then
os.queueEvent("websocket_failure", actual_url, err)
end
Expand Down Expand Up @@ -355,18 +357,19 @@ from above are passed in as fields instead (for instance,
ws.close()

]]
function websocket(url, headers)
function websocket(url, headers, binary)
local actual_url
if type(url) == "table" then
check_websocket_options(url)
actual_url = url.url
else
expect(1, url, "string")
expect(2, headers, "table", "nil")
expect(3, binary, "boolean", "nil")
actual_url = url
end

local ok, err = nativeWebsocket(url, headers)
local ok, err = nativeWebsocket(url, headers, binary)
if not ok then return ok, err end

while true do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,32 @@ class TestHttpApi {
}
}

@Test
fun `Supports binary websockets`() {
runServer {
LuaTaskRunner.runTest {
val httpApi = addApi(HTTPAPI(environment))
assertThat("http.websocket succeeded", httpApi.websocket(ObjectArguments(WS_URL, null, true)), array(equalTo(true)))

val connectEvent = pullEvent()
assertThat(connectEvent, array(equalTo("websocket_success"), equalTo(WS_URL), isA(WebsocketHandle::class.java)))

val websocket = connectEvent[2] as WebsocketHandle
websocket.send(Coerced("Hello \u00E2\u0098\u00BA"), Optional.of(false))

val message = websocket.receive(Optional.empty()).await()
// The string is converted to bytes because it's technically sent as a byte array.
// This difference doesn't matter in Lua, but it does here.
assertThat("Received a return message", message, array(equalTo("HELLO \u263A".toByteArray()), equalTo(false)))

websocket.close()

val closeEvent = pullEventOrTimeout(500.milliseconds, "websocket_closed")
assertThat("No event was queued", closeEvent, equalTo(null))
}
}
}

@Test
fun `Queues an event when the socket is externally closed`() {
runServer { stop ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@ public class TWebsocket extends Resource<TWebsocket> implements WebsocketClient
private final IAPIEnvironment environment;
private final URI uri;
private final String address;
private final boolean binary;

private @Nullable WebSocket websocket;

public TWebsocket(ResourceGroup<TWebsocket> limiter, IAPIEnvironment environment, URI uri, String address, HttpHeaders headers, int timeout) {
public TWebsocket(ResourceGroup<TWebsocket> limiter, IAPIEnvironment environment, URI uri, String address, HttpHeaders headers, int timeout, boolean binary) {
super(limiter);
this.environment = environment;
this.uri = uri;
this.address = address;
this.binary = binary;
}

public void connect() {
Expand Down Expand Up @@ -76,6 +78,11 @@ public void sendBinary(ByteBuffer message) {
websocket.send(array);
}

@Override
public boolean isBinary() {
return binary;
}

@Override
protected void dispose() {
super.dispose();
Expand Down