Skip to content

Commit

Permalink
#4 Replay sessions in loop implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
vadzim-kazak committed Apr 20, 2018
1 parent ec3ce03 commit 04278fe
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 6 deletions.
6 changes: 3 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ services:
- 61613:61613
services:
build: ./services
image: ibagroup/iot-data-simulator-services:1.0.1
image: ibagroup/iot-data-simulator-services:1.1.0
container_name: iot-ds-services
depends_on:
- mongodb
Expand All @@ -64,7 +64,7 @@ services:
- 8083:8083
ui:
build: ./ui
image: ibagroup/iot-data-simulator-ui:1.0.1
image: ibagroup/iot-data-simulator-ui:1.1.0
container_name: iot-ds-ui
depends_on:
- services
Expand All @@ -79,7 +79,7 @@ services:
command: ["npm", "start"]
data-sender:
build: ./data-sender
image: ibagroup/iot-data-simulator-sender:1.0.1
image: ibagroup/iot-data-simulator-sender:1.1.0
container_name: iot-ds-data-sender
depends_on:
- rabbitmq
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.iba.iot.datasimulator.session.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.iba.iot.datasimulator.session.model.active.filter.DatasetFilter;
import com.iba.iot.datasimulator.session.model.active.generator.Generator;
import com.iba.iot.datasimulator.session.model.active.injector.DeviceInjector;
Expand Down Expand Up @@ -29,7 +30,7 @@ public class SessionCreateUpdateRequest {
/** **/
private int ticksNumber;

/** **/
@JsonProperty(value="isReplayLooped")
private boolean isReplayLooped;

@Valid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public enum ActiveSessionState {

COMPLETED("completed"),

STOPPED("stopped"),

FAILED("failed");

/** **/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.iba.iot.datasimulator.session.model.active.command;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import javax.validation.constraints.NotNull;

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class ActiveSessionManagementCommand {

@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,52 @@

import com.iba.iot.datasimulator.session.model.active.ActiveSessionStatus;

/**
*
*/
public interface ActiveSession {

/**
*
*/
void start();

/**
*
*/
void pause();

/**
*
*/
void resume();

/**
*
*/
void stop();

/**
*
* @param error
*/
void registerError(String error);

/**
*
* @return
*/
ActiveSessionStatus getStatus();

/**
*
* @return
*/
boolean isReplayLooped();

/**
*
* @return
*/
boolean isStopped();
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ public class ActiveSessionEntity implements ActiveSession {
/** **/
private Collection<String> errors = new ArrayList<>();

/** **/
private boolean isStopped;

/**
*
* @param session
Expand Down Expand Up @@ -114,6 +117,7 @@ public void resume() {
public void stop() {

logger.info(">>> Session {} has been stopped", sessionId);
isStopped = true;
dataProducer.stop();
}

Expand Down Expand Up @@ -222,4 +226,14 @@ private void sendSessionFailedPayload() {
private void sendSessionCompletedPayload() {
payloadSender.send(sessionId, new ActiveSessionPayload(ActiveSessionState.COMPLETED), session.getTargetSystem());
}

@Override
public boolean isReplayLooped() {
return session.isReplayLooped();
}

@Override
public boolean isStopped() {
return isStopped;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,14 @@ private void sendAnalyticsMessage(ActiveSessionAnalyticTag tag, String messagePo

messagingTemplate.convertAndSend(StompUtil.getSessionAnalyticsTopic(sessionId), activeSessionAnalyticsMessage);
}

@Override
public boolean isReplayLooped() {
return false;
}

@Override
public boolean isStopped() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public IntervalTimerProcessor(Timer timer) {
public long getWaitInterval(String previousEntry, String nextEntry) {

if (StringUtils.isEmpty(previousEntry) && StringUtils.isNotEmpty(nextEntry)) {
return 0;
return 500;
}

Long value = this.timer.getValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@

import com.iba.iot.datasimulator.session.model.active.ActiveSessionState;
import com.iba.iot.datasimulator.session.model.active.ActiveSessionStatus;
import com.iba.iot.datasimulator.session.model.active.command.ActiveSessionManagementCommand;
import com.iba.iot.datasimulator.session.model.active.command.SessionManagementCommand;
import com.iba.iot.datasimulator.session.model.active.message.ActiveSessionErrorMessage;
import com.iba.iot.datasimulator.session.model.active.message.ActiveSessionStatusMessage;
import com.iba.iot.datasimulator.session.service.active.entity.ActiveSession;
import com.iba.iot.datasimulator.session.service.active.manager.ActiveSessionManager;
import com.iba.iot.datasimulator.session.util.ActiveSessionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -29,6 +32,9 @@ public class ActiveSessionWatcherImpl implements ActiveSessionWatcher {
@Qualifier("sessionStore")
private Map<String, ActiveSession> sessionStore;

@Autowired
private ActiveSessionManager activeSessionManager;

@RabbitListener(queues = "sessionStates")
@Override
public void processCompletedSessions(ActiveSessionStatusMessage message) {
Expand All @@ -37,14 +43,29 @@ public void processCompletedSessions(ActiveSessionStatusMessage message) {
if (status.getState() == ActiveSessionState.COMPLETED) {

String sessionId = status.getSessionId();
if (ActiveSessionUtil.getSessionState(sessionStore.get(sessionId)) == ActiveSessionState.COMPLETED) {
ActiveSession session = sessionStore.get(sessionId);
if (ActiveSessionUtil.getSessionState(session) == ActiveSessionState.COMPLETED) {

logger.debug("Removing session {} from session store as competed.", sessionId);
sessionStore.remove(sessionId);

if (session.isReplayLooped() && !session.isStopped()) {
restartSession(sessionId);
}
}
}
}

/**
*
* @param sessionId
*/
private void restartSession(String sessionId) {

logger.debug(">>> Restarting session {} on completion due to looped session replay setting.");
activeSessionManager.manage(sessionId, new ActiveSessionManagementCommand(SessionManagementCommand.START));
}

@RabbitListener(queues = "sessionErrors")
@Override
public void processSessionError(ActiveSessionErrorMessage message) {
Expand Down

0 comments on commit 04278fe

Please sign in to comment.