Skip to content

Commit

Permalink
#1228: Add (Manual)DataSender as client API of Timestamped Send Request
Browse files Browse the repository at this point in the history
Signed-off-by: adamsero <[email protected]>
Also-by: Simon Bernard <[email protected]>
  • Loading branch information
adamsero authored and sbernard31 committed Jul 1, 2022
1 parent fe18d83 commit 60a95c2
Show file tree
Hide file tree
Showing 16 changed files with 879 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -45,13 +46,16 @@
import org.eclipse.leshan.client.observer.LwM2mClientObserver;
import org.eclipse.leshan.client.observer.LwM2mClientObserverAdapter;
import org.eclipse.leshan.client.observer.LwM2mClientObserverDispatcher;
import org.eclipse.leshan.client.request.LwM2mRequestSender;
import org.eclipse.leshan.client.resource.LwM2mObjectEnabler;
import org.eclipse.leshan.client.resource.LwM2mObjectTree;
import org.eclipse.leshan.client.resource.LwM2mRootEnabler;
import org.eclipse.leshan.client.resource.RootEnabler;
import org.eclipse.leshan.client.resource.listener.ObjectListener;
import org.eclipse.leshan.client.resource.listener.ObjectsListenerAdapter;
import org.eclipse.leshan.client.send.NoDataException;
import org.eclipse.leshan.client.send.DataSender;
import org.eclipse.leshan.client.send.DataSenderManager;
import org.eclipse.leshan.client.send.SendService;
import org.eclipse.leshan.client.servers.ServerIdentity;
import org.eclipse.leshan.core.californium.EndpointFactory;
import org.eclipse.leshan.core.link.LinkSerializer;
Expand All @@ -62,10 +66,7 @@
import org.eclipse.leshan.core.node.codec.LwM2mDecoder;
import org.eclipse.leshan.core.node.codec.LwM2mEncoder;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.ReadCompositeRequest;
import org.eclipse.leshan.core.request.SendRequest;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ReadCompositeResponse;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.response.SendResponse;
import org.eclipse.leshan.core.util.Validate;
Expand Down Expand Up @@ -93,13 +94,14 @@ public class LeshanClient implements LwM2mClient {
private final RegistrationEngine engine;
private final LwM2mClientObserverDispatcher observers;
private final LinkSerializer linkSerializer;
private final DataSenderManager dataSenderManager;

public LeshanClient(String endpoint, InetSocketAddress localAddress,
List<? extends LwM2mObjectEnabler> objectEnablers, Configuration coapConfig, Builder dtlsConfigBuilder,
List<Certificate> trustStore, EndpointFactory endpointFactory, RegistrationEngineFactory engineFactory,
BootstrapConsistencyChecker checker, Map<String, String> additionalAttributes,
Map<String, String> bsAdditionalAttributes, LwM2mEncoder encoder, LwM2mDecoder decoder,
ScheduledExecutorService sharedExecutor, LinkSerializer linkSerializer,
List<? extends LwM2mObjectEnabler> objectEnablers, List<DataSender> dataSenders, Configuration coapConfig,
Builder dtlsConfigBuilder, List<Certificate> trustStore, EndpointFactory endpointFactory,
RegistrationEngineFactory engineFactory, BootstrapConsistencyChecker checker,
Map<String, String> additionalAttributes, Map<String, String> bsAdditionalAttributes, LwM2mEncoder encoder,
LwM2mDecoder decoder, ScheduledExecutorService sharedExecutor, LinkSerializer linkSerializer,
LwM2mAttributeParser attributeParser) {

Validate.notNull(endpoint);
Expand All @@ -117,7 +119,7 @@ public LeshanClient(String endpoint, InetSocketAddress localAddress,
endpointFactory);
requestSender = createRequestSender(endpointsManager, sharedExecutor, encoder, objectTree.getModel(),
linkSerializer);

dataSenderManager = createDataSenderManager(dataSenders, rootEnabler, requestSender);
engine = engineFactory.createRegistratioEngine(endpoint, objectTree, endpointsManager, requestSender,
bootstrapHandler, observers, additionalAttributes, bsAdditionalAttributes,
getSupportedContentFormat(decoder, encoder), sharedExecutor);
Expand All @@ -140,6 +142,15 @@ protected LwM2mObjectTree createObjectTree(List<? extends LwM2mObjectEnabler> ob
return new LwM2mObjectTree(this, objectEnablers);
}

protected DataSenderManager createDataSenderManager(List<DataSender> dataSenders, LwM2mRootEnabler rootEnabler,
LwM2mRequestSender requestSender) {
Map<String, DataSender> dataSenderMap = new HashMap<>();
for (DataSender dataSender : dataSenders) {
dataSenderMap.put(dataSender.getName(), dataSender);
}
return new DataSenderManager(dataSenderMap, rootEnabler, requestSender);
}

protected LwM2mClientObserverDispatcher createClientObserverDispatcher() {
LwM2mClientObserverDispatcher observer = new LwM2mClientObserverDispatcher();
observer.addObserver(new LwM2mClientObserverAdapter() {
Expand Down Expand Up @@ -255,6 +266,7 @@ public void start() {
endpointsManager.start();
engine.start();
objectTree.start();
dataSenderManager.start();

if (LOG.isInfoEnabled()) {
LOG.info("Leshan client[endpoint:{}] started.", engine.getEndpoint());
Expand All @@ -264,6 +276,7 @@ public void start() {
@Override
public void stop(boolean deregister) {
LOG.info("Stopping Leshan Client ...");
dataSenderManager.stop();
engine.stop(deregister);
endpointsManager.stop();
objectTree.stop();
Expand All @@ -274,6 +287,7 @@ public void stop(boolean deregister) {
@Override
public void destroy(boolean deregister) {
LOG.info("Destroying Leshan client ...");
dataSenderManager.destroy();
engine.destroy(deregister);
endpointsManager.destroy();
requestSender.destroy();
Expand Down Expand Up @@ -303,36 +317,44 @@ public boolean triggerClientInitiatedBootstrap(boolean deregister) {
}

@Override
public SendResponse sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs)
throws InterruptedException {
Validate.notNull(server);
Validate.notEmpty(paths);
public SendService getSendService() {
return new SendService() {

Map<LwM2mPath, LwM2mNode> collectedData = collectData(server, paths);
return requestSender.send(server, new SendRequest(format, collectedData, null), timeoutInMs);
}
@Override
public SendResponse sendData(ServerIdentity server, ContentFormat format, List<String> paths,
long timeoutInMs) throws InterruptedException {
Validate.notNull(server);
Validate.notEmpty(paths);

Map<LwM2mPath, LwM2mNode> collectedData = dataSenderManager.getCurrentValues(server,
LwM2mPath.getLwM2mPathList(paths));
return dataSenderManager.sendData(server, format, collectedData, timeoutInMs);
}

@Override
public void sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs,
ResponseCallback<SendResponse> onResponse, ErrorCallback onError) {
Validate.notNull(server);
Validate.notEmpty(paths);
Validate.notNull(onResponse);
Validate.notNull(onError);

Map<LwM2mPath, LwM2mNode> collectedData = collectData(server, paths);
requestSender.send(server, new SendRequest(format, collectedData, null), timeoutInMs, onResponse, onError);
}
@Override
public void sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs,
ResponseCallback<SendResponse> onResponse, ErrorCallback onError) {
Validate.notNull(server);
Validate.notEmpty(paths);
Validate.notNull(onResponse);
Validate.notNull(onError);

private Map<LwM2mPath, LwM2mNode> collectData(ServerIdentity server, List<String> paths) {
// format is not really used as this is an internal call, kind of HACK :/ ...
ContentFormat format = ContentFormat.SENML_CBOR;
ReadCompositeResponse response = rootEnabler.read(server, new ReadCompositeRequest(format, format, paths));
if (response.isSuccess()) {
return response.getContent();
}
throw new NoDataException("Unable to collect data for %s : %s / %s", paths, response.getCode(),
response.getErrorMessage());
Map<LwM2mPath, LwM2mNode> collectedData = dataSenderManager.getCurrentValues(server,
LwM2mPath.getLwM2mPathList(paths));
dataSenderManager.sendData(server, format, collectedData, onResponse, onError, timeoutInMs);

}

@Override
public DataSender getDataSender(String senderName) {
return dataSenderManager.getDataSender(senderName);
}

@Override
public <T extends DataSender> T getDataSender(String senderName, Class<T> senderSubType) {
return dataSenderManager.getDataSender(senderName, senderSubType);
}
};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import java.net.InetSocketAddress;
import java.security.cert.Certificate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -46,6 +48,7 @@
import org.eclipse.leshan.client.object.Server;
import org.eclipse.leshan.client.resource.LwM2mObjectEnabler;
import org.eclipse.leshan.client.resource.ObjectsInitializer;
import org.eclipse.leshan.client.send.DataSender;
import org.eclipse.leshan.core.LwM2mId;
import org.eclipse.leshan.core.californium.DefaultEndpointFactory;
import org.eclipse.leshan.core.californium.EndpointFactory;
Expand Down Expand Up @@ -73,6 +76,7 @@ public class LeshanClientBuilder {

private InetSocketAddress localAddress;
private List<? extends LwM2mObjectEnabler> objectEnablers;
private List<DataSender> dataSenders;

private Configuration coapConfig;
private Builder dtlsConfigBuilder;
Expand Down Expand Up @@ -138,6 +142,31 @@ public LeshanClientBuilder setObjects(List<? extends LwM2mObjectEnabler> objectE
return this;
}

/**
* Set the list of {@link DataSender} used by the client.
* <p>
* Note that each sender should have a different name.
*/
public LeshanClientBuilder setDataSenders(DataSender... dataSenders) {
this.dataSenders = Arrays.asList(dataSenders);

// check DataSender has name and this name is not shared by several senders
ArrayList<String> usedNames = new ArrayList<>();
for (int i = 0; i < dataSenders.length; i++) {
DataSender dataSender = dataSenders[i];
if (dataSender.getName() == null) {
throw new IllegalArgumentException(
String.format("%s at index %d have a null name.", dataSender.getClass().getSimpleName(), i));
}
if (usedNames.contains(dataSender.getName())) {
throw new IllegalArgumentException(String.format("name '%s' of %s at index %d is already used.",
dataSender.getName(), dataSender.getClass().getSimpleName(), i));
}
usedNames.add(dataSender.getName());
}
return this;
}

/**
* Set the {@link LwM2mEncoder} which will encode {@link LwM2mNode} with supported content format.
* <p>
Expand Down Expand Up @@ -320,6 +349,8 @@ public LeshanClient build() {
new Device("Eclipse Leshan", "model12345", "12345", EnumSet.of(BindingMode.U)));
objectEnablers = initializer.createAll();
}
if (dataSenders == null)
dataSenders = new ArrayList<>();
if (encoder == null)
encoder = new DefaultLwM2mEncoder();
if (decoder == null)
Expand Down Expand Up @@ -368,7 +399,7 @@ protected Connector createSecuredConnector(DtlsConnectorConfig dtlsConfig) {
localAddress, incompleteConfig.getAddress()));
}

return createLeshanClient(endpoint, localAddress, objectEnablers, coapConfig, dtlsConfigBuilder,
return createLeshanClient(endpoint, localAddress, objectEnablers, dataSenders, coapConfig, dtlsConfigBuilder,
this.trustStore, endpointFactory, engineFactory, bootstrapConsistencyChecker, additionalAttributes,
bsAdditionalAttributes, encoder, decoder, executor, linkSerializer, attributeParser);
}
Expand Down Expand Up @@ -403,14 +434,14 @@ protected Connector createSecuredConnector(DtlsConnectorConfig dtlsConfig) {
* @return the new {@link LeshanClient}
*/
protected LeshanClient createLeshanClient(String endpoint, InetSocketAddress localAddress,
List<? extends LwM2mObjectEnabler> objectEnablers, Configuration coapConfig, Builder dtlsConfigBuilder,
List<Certificate> trustStore, EndpointFactory endpointFactory, RegistrationEngineFactory engineFactory,
BootstrapConsistencyChecker checker, Map<String, String> additionalAttributes,
Map<String, String> bsAdditionalAttributes, LwM2mEncoder encoder, LwM2mDecoder decoder,
ScheduledExecutorService sharedExecutor, LinkSerializer linkSerializer,
List<? extends LwM2mObjectEnabler> objectEnablers, List<DataSender> dataSenders, Configuration coapConfig,
Builder dtlsConfigBuilder, List<Certificate> trustStore, EndpointFactory endpointFactory,
RegistrationEngineFactory engineFactory, BootstrapConsistencyChecker checker,
Map<String, String> additionalAttributes, Map<String, String> bsAdditionalAttributes, LwM2mEncoder encoder,
LwM2mDecoder decoder, ScheduledExecutorService sharedExecutor, LinkSerializer linkSerializer,
LwM2mAttributeParser attributeParser) {
return new LeshanClient(endpoint, localAddress, objectEnablers, coapConfig, dtlsConfigBuilder, trustStore,
endpointFactory, engineFactory, checker, additionalAttributes, bsAdditionalAttributes, encoder, decoder,
sharedExecutor, linkSerializer, attributeParser);
return new LeshanClient(endpoint, localAddress, objectEnablers, dataSenders, coapConfig, dtlsConfigBuilder,
trustStore, endpointFactory, engineFactory, checker, additionalAttributes, bsAdditionalAttributes,
encoder, decoder, sharedExecutor, linkSerializer, attributeParser);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,9 @@
*******************************************************************************/
package org.eclipse.leshan.client;

import java.util.List;

import org.eclipse.leshan.client.resource.LwM2mObjectTree;
import org.eclipse.leshan.client.send.NoDataException;
import org.eclipse.leshan.client.send.SendService;
import org.eclipse.leshan.client.servers.ServerIdentity;
import org.eclipse.leshan.core.node.codec.CodecException;
import org.eclipse.leshan.core.request.ContentFormat;
import org.eclipse.leshan.core.request.exception.InvalidRequestException;
import org.eclipse.leshan.core.request.exception.InvalidResponseException;
import org.eclipse.leshan.core.request.exception.RequestCanceledException;
import org.eclipse.leshan.core.request.exception.RequestRejectedException;
import org.eclipse.leshan.core.request.exception.SendFailedException;
import org.eclipse.leshan.core.request.exception.TimeoutException;
import org.eclipse.leshan.core.request.exception.UnconnectedPeerException;
import org.eclipse.leshan.core.response.ErrorCallback;
import org.eclipse.leshan.core.response.ResponseCallback;
import org.eclipse.leshan.core.response.SendResponse;

/**
* A Lightweight M2M client.
Expand Down Expand Up @@ -77,73 +63,9 @@ public interface LwM2mClient {
boolean triggerClientInitiatedBootstrap(boolean deregister);

/**
* Send Data synchronously to a LWM2M Server.
* <p>
* The "Send" operation is used by the LwM2M Client to send data to the LwM2M Server without explicit request by
* that Server.
* <p>
* If some data can not be collected before to send, this will be silently ignored.<br>
* If there is not data to send at all, {@link NoDataException} is raised.
*
* @param server to which data must be send
* @param format {@link ContentFormat} to use. It MUST be {@link ContentFormat#SENML_CBOR} or
* {@link ContentFormat#SENML_JSON}
* @param paths the list of LWM2M node path to send.
* @param timeoutInMs The global timeout to wait in milliseconds (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout)
* @return the LWM2M response. The response can be <code>null</code> if the timeout expires (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).
*
* @throws InterruptedException if the thread was interrupted.
* @throws InvalidRequestException if send request can not be created.
* @throws CodecException if request payload can not be encoded.
* @throws NoDataException if we can not collect data for given list of path.
* @throws RequestRejectedException if the request is rejected by foreign peer.
* @throws RequestCanceledException if the request is cancelled.
* @throws SendFailedException if the request can not be sent. E.g. error at CoAP or DTLS/UDP layer.
* @throws InvalidResponseException if the response received is malformed.
* @throws UnconnectedPeerException if client is not connected (no dtls connection available).
*/
SendResponse sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs)
throws InterruptedException;

/**
* Send Data asynchronously to a LWM2M Server.
* <p>
* The "Send" operation is used by the LwM2M Client to send data to the LwM2M Server without explicit request by
* that Server.
* <p>
* If some data can not be collected before to send, this will be silently ignored.<br>
* If there is not data to send at all, {@link NoDataException} is raised.
* <p>
* {@link ResponseCallback} and {@link ErrorCallback} are exclusively called.
*
* @param server to which data must be send
* @param format {@link ContentFormat} to use. It MUST be {@link ContentFormat#SENML_CBOR} or
* {@link ContentFormat#SENML_JSON}
* @param paths the list of LWM2M node path to send.
* @param timeoutInMs The global timeout to wait in milliseconds (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout)
* @param responseCallback a callback called when a response is received (successful or error response). This
* callback MUST NOT be null.
* @param errorCallback a callback called when an error or exception occurred when response is received. It can be :
* <ul>
* <li>{@link RequestRejectedException} if the request is rejected by foreign peer.</li>
* <li>{@link RequestCanceledException} if the request is cancelled.</li>
* <li>{@link SendFailedException} if the request can not be sent. E.g. error at CoAP or DTLS/UDP layer.</li>
* <li>{@link InvalidResponseException} if the response received is malformed.</li>
* <li>{@link UnconnectedPeerException} if client is not connected (no dtls connection available).</li>
* <li>{@link TimeoutException} if the timeout expires (see
* https://github.com/eclipse/leshan/wiki/Request-Timeout).</li>
* <li>or any other RuntimeException for unexpected issue.
* </ul>
* This callback MUST NOT be null.
* @throws CodecException if request payload can not be encoded.
* @throws NoDataException if we can not collect data for given list of path.
* @throws InvalidRequestException if send request can not be created.
* @return {@link SendService} which exposes API to send data to server.
*/
void sendData(ServerIdentity server, ContentFormat format, List<String> paths, long timeoutInMs,
ResponseCallback<SendResponse> responseCallback, ErrorCallback errorCallback);
SendService getSendService();

/**
* @return the {@link LwM2mObjectTree} containing all the object implemented by this client.
Expand Down
Loading

0 comments on commit 60a95c2

Please sign in to comment.