Skip to content

Commit

Permalink
#1025: Add Transport Layer abstraction at Server side.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbernard31 committed Oct 7, 2022
1 parent c17b973 commit 1ff5e64
Show file tree
Hide file tree
Showing 103 changed files with 4,630 additions and 2,941 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class LwM2mClientCoapResource extends LwM2mCoapResource {

public LwM2mClientCoapResource(String name, RegistrationEngine registrationEngine,
CaliforniumEndpointsManager endpointsManager) {
super(name);
super(name, null);
this.registrationEngine = registrationEngine;
this.endpointsManager = endpointsManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@
*/
public abstract class AsyncRequestObserver<T extends LwM2mResponse> extends CoapAsyncRequestObserver {

public AsyncRequestObserver(Request coapRequest, final ResponseCallback<T> responseCallback,
final ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor) {
this(coapRequest, responseCallback, errorCallback, timeoutInMs, executor, new TemporaryExceptionTranslator());
}

/**
* A Californium message observer for a CoAP request helping to get results asynchronously dedicated for LWM2M
* requests.
Expand All @@ -49,8 +54,9 @@ public abstract class AsyncRequestObserver<T extends LwM2mResponse> extends Coap
* @param executor used to scheduled timeout tasks.
*/
public AsyncRequestObserver(Request coapRequest, final ResponseCallback<T> responseCallback,
final ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor) {
super(coapRequest, null, errorCallback, timeoutInMs, executor);
final ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor,
ExceptionTranslator exceptionTranslator) {
super(coapRequest, null, errorCallback, timeoutInMs, executor, exceptionTranslator);
this.responseCallback = new CoapResponseCallback() {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,10 @@

import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.elements.exception.EndpointUnconnectedException;
import org.eclipse.californium.scandium.dtls.DtlsHandshakeTimeoutException;
import org.eclipse.leshan.core.request.exception.RequestCanceledException;
import org.eclipse.leshan.core.request.exception.RequestRejectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.eclipse.leshan.core.request.exception.TimeoutException.Type;
import org.eclipse.leshan.core.request.exception.UnconnectedPeerException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,14 +51,15 @@ public class CoapAsyncRequestObserver extends AbstractRequestObserver {
private final long timeoutInMs;
private ScheduledFuture<?> cleaningTask;
private boolean cancelled = false;
private ScheduledExecutorService executor;
private final ScheduledExecutorService executor;
private final ExceptionTranslator exceptionTranslator;

// The Californium API does not ensure that message callback are exclusive
// meaning that you can get a onReponse call and a onCancel one.
// The CoapAsyncRequestObserver ensure that you will receive only one event.
// You get either 1 response or 1 error.
// This boolean is used to ensure this.
private AtomicBoolean eventRaised = new AtomicBoolean(false);
private final AtomicBoolean eventRaised = new AtomicBoolean(false);

private final AtomicBoolean responseTimedOut = new AtomicBoolean(false);

Expand All @@ -81,12 +78,14 @@ public class CoapAsyncRequestObserver extends AbstractRequestObserver {
* @param executor used to scheduled timeout tasks.
*/
public CoapAsyncRequestObserver(Request coapRequest, CoapResponseCallback responseCallback,
ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor) {
ErrorCallback errorCallback, long timeoutInMs, ScheduledExecutorService executor,
ExceptionTranslator exceptionTranslator) {
super(coapRequest);
this.responseCallback = responseCallback;
this.errorCallback = errorCallback;
this.timeoutInMs = timeoutInMs;
this.executor = executor;
this.exceptionTranslator = exceptionTranslator;
}

@Override
Expand Down Expand Up @@ -155,17 +154,7 @@ public void onReject() {
public void onSendError(Throwable error) {
if (eventRaised.compareAndSet(false, true)) {
cancelCleaningTask();
if (error instanceof DtlsHandshakeTimeoutException) {
errorCallback.onError(new TimeoutException(Type.DTLS_HANDSHAKE_TIMEOUT, error,
"Request %s timeout : dtls handshake timeout", coapRequest.getURI()));
} else if (error instanceof EndpointUnconnectedException) {
errorCallback.onError(new UnconnectedPeerException(error,
"Unable to send request %s : peer is not connected (no DTLS connection)",
coapRequest.getURI()));
} else {
errorCallback
.onError(new SendFailedException(error, "Unable to send request %s", coapRequest.getURI()));
}
errorCallback.onError(exceptionTranslator.translate(coapRequest, error));
} else {
LOG.debug("onSendError callback ignored because an event was already raised for this request {}",
coapRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,10 @@

import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.elements.exception.EndpointUnconnectedException;
import org.eclipse.californium.scandium.dtls.DtlsHandshakeTimeoutException;
import org.eclipse.leshan.core.request.exception.RequestCanceledException;
import org.eclipse.leshan.core.request.exception.RequestRejectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.UnconnectedPeerException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,20 +44,22 @@ public class CoapSyncRequestObserver extends AbstractRequestObserver {

private static final Logger LOG = LoggerFactory.getLogger(CoapSyncRequestObserver.class);

private CountDownLatch latch = new CountDownLatch(1);
private AtomicReference<Response> ref = new AtomicReference<>(null);
private AtomicBoolean coapTimeout = new AtomicBoolean(false);
private AtomicReference<RuntimeException> exception = new AtomicReference<>();
private long timeout;
private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Response> ref = new AtomicReference<>(null);
private final AtomicBoolean coapTimeout = new AtomicBoolean(false);
private final AtomicReference<RuntimeException> exception = new AtomicReference<>();
private final long timeout;
private final ExceptionTranslator exceptionTranslator;

/**
* @param coapRequest The CoAP request to observe.
* @param timeoutInMs A response timeout(in millisecond) which is raised if neither a response or error happens (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).
*/
public CoapSyncRequestObserver(Request coapRequest, long timeoutInMs) {
public CoapSyncRequestObserver(Request coapRequest, long timeoutInMs, ExceptionTranslator exceptionTranslator) {
super(coapRequest);
this.timeout = timeoutInMs;
this.exceptionTranslator = exceptionTranslator;
}

@Override
Expand Down Expand Up @@ -97,13 +97,13 @@ public void onReject() {

@Override
public void onSendError(Throwable error) {
if (error instanceof DtlsHandshakeTimeoutException) {
Exception e = exceptionTranslator.translate(coapRequest, error);
if (e instanceof TimeoutException) {
coapTimeout.set(true);
} else if (error instanceof EndpointUnconnectedException) {
exception.set(new UnconnectedPeerException(error,
"Unable to send request %s : peer is not connected (no DTLS connection)", coapRequest.getURI()));
} else if (e instanceof RuntimeException) {
exception.set((RuntimeException) e);
} else {
exception.set(new SendFailedException(error, "Request %s cannot be sent", coapRequest, error.getMessage()));
exception.set(new SendFailedException(e, "Request %s cannot be sent", coapRequest, e.getMessage()));
}
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*******************************************************************************
* Copyright (c) 2022 Sierra Wireless and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v20.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.html.
*
* Contributors:
* Sierra Wireless - initial API and implementation
*******************************************************************************/
package org.eclipse.leshan.core.californium;

import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.elements.exception.EndpointUnconnectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.UnconnectedPeerException;

public class DefaultExceptionTranslator implements ExceptionTranslator {

@Override
public Exception translate(Request coapRequest, Throwable error) {
if (error instanceof EndpointUnconnectedException) {
return new UnconnectedPeerException(error, "Unable to send request %s : peer is not connected",
coapRequest.getURI());
} else {
return new SendFailedException(error, "Unable to send request %s", coapRequest.getURI());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.eclipse.leshan.core.request.Identity;
import org.eclipse.leshan.core.util.Hex;

//TODO TL : to be delete when no more class use it (at the end of the refactoring)

/**
* Utility class used to handle Californium {@link EndpointContext} in Leshan.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016 Sierra Wireless and others.
* Copyright (c) 2022 Sierra Wireless and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
Expand All @@ -13,14 +13,14 @@
* Contributors:
* Sierra Wireless - initial API and implementation
*******************************************************************************/
package org.eclipse.leshan.server.californium.registration;
package org.eclipse.leshan.core.californium;

import org.eclipse.californium.core.observe.ObservationStore;
import org.eclipse.leshan.server.registration.RegistrationStore;
import org.eclipse.californium.core.coap.Request;

/**
* A registration store which is able to store Californium observation.
*/
public interface CaliforniumRegistrationStore extends RegistrationStore, ObservationStore {
public interface ExceptionTranslator {

/**
* Translate exception from underlying transport layer to LwM2m Exception.
*/
Exception translate(Request coapRequest, Throwable error);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@
import org.eclipse.californium.core.CoapResource;
import org.eclipse.californium.core.coap.CoAP.ResponseCode;
import org.eclipse.californium.core.coap.MediaTypeRegistry;
import org.eclipse.californium.core.coap.Message;
import org.eclipse.californium.core.coap.Request;
import org.eclipse.californium.core.coap.Response;
import org.eclipse.californium.core.network.Exchange;
import org.eclipse.californium.core.server.resources.CoapExchange;
import org.eclipse.californium.elements.EndpointContext;
import org.eclipse.leshan.core.californium.identity.IdentityHandler;
import org.eclipse.leshan.core.californium.identity.IdentityHandlerProvider;
import org.eclipse.leshan.core.request.Identity;
import org.eclipse.leshan.core.request.exception.InvalidRequestException;
import org.slf4j.Logger;
Expand All @@ -37,12 +40,15 @@ public class LwM2mCoapResource extends CoapResource {

private static final Logger LOG = LoggerFactory.getLogger(LwM2mCoapResource.class);

private final IdentityHandlerProvider identityHandlerProvider;

/**
* @param name the resource name
* @see CoapResource#CoapResource(String)
*/
public LwM2mCoapResource(String name) {
public LwM2mCoapResource(String name, IdentityHandlerProvider identityHandlerProvider) {
super(name);
this.identityHandlerProvider = identityHandlerProvider;
}

@Override
Expand All @@ -54,7 +60,7 @@ public void handleRequest(Exchange exchange) {
} catch (RuntimeException e) {
Request request = exchange.getRequest();
LOG.error("Exception while handling request [{}] on the resource {} from {}", request, getURI(),
extractIdentitySafely(request.getSourceContext()), e);
extractIdentitySafely(exchange, request), e);
exchange.sendResponse(new Response(ResponseCode.INTERNAL_SERVER_ERROR));
}
}
Expand Down Expand Up @@ -83,10 +89,10 @@ protected void handleInvalidRequest(Exchange exchange, String message, Throwable
if (LOG.isDebugEnabled()) {
if (error != null) {
LOG.debug("Invalid request [{}] received on the resource {} from {}", request, getURI(),
extractIdentitySafely(request.getSourceContext()), error);
extractIdentitySafely(exchange, request), error);
} else {
LOG.debug("Invalid request [{}] received on the resource {} from {} : {}", request, getURI(),
extractIdentitySafely(request.getSourceContext()), message);
extractIdentitySafely(exchange, request), message);
}
}

Expand All @@ -107,18 +113,30 @@ protected void handleInvalidRequest(Exchange exchange, String message, Throwable
* @throws IllegalStateException if we are not able to extract {@link Identity}.
*/
protected Identity extractIdentity(EndpointContext context) {
// TODO TL : to delete once server / client / bootstrap server will use new design
return EndpointContextUtil.extractIdentity(context);
}

protected Identity getForeignPeerIdentity(Exchange exchange, Message receivedMessage) {
IdentityHandler identityHandler = identityHandlerProvider.getIdentityHandler(exchange.getEndpoint());
if (identityHandler != null) {
return identityHandler.getIdentity(receivedMessage);
}
return null;
}

/**
* Create Leshan {@link Identity} from Californium {@link EndpointContext}.
*
* @param context The Californium {@link EndpointContext} to convert.
* @return The corresponding Leshan {@link Identity} or <code>null</code> if we didn't succeed to extract Identity.
*/
protected Identity extractIdentitySafely(EndpointContext context) {
protected Identity extractIdentitySafely(Exchange exchange, Message receivedMessage) {
try {
return extractIdentity(context);
if (identityHandlerProvider == null) {
return extractIdentity(receivedMessage.getSourceContext());
} else {
return getForeignPeerIdentity(exchange, receivedMessage);
}
} catch (RuntimeException e) {
LOG.error("Unable to extract identity", e);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.eclipse.californium.elements.PrincipalEndpointContextMatcher;
import org.eclipse.leshan.core.request.Identity;

// TODO TL: to be move in californium.identity package

/**
* LWM2M principal based endpoint context matcher.
*
Expand Down
Loading

0 comments on commit 1ff5e64

Please sign in to comment.