From a0a1678fafe27c68776edd82c400ac5e3bab8657 Mon Sep 17 00:00:00 2001 From: Vadzim Kazak Date: Fri, 20 Apr 2018 16:12:08 +0300 Subject: [PATCH 1/3] #4 Working on session loop replay. --- .../session/factory/session/SessionFactoryImpl.java | 1 + .../java/com/iba/iot/datasimulator/session/model/Session.java | 3 +++ .../session/model/SessionCreateUpdateRequest.java | 3 +++ 3 files changed, 7 insertions(+) diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/factory/session/SessionFactoryImpl.java b/services/src/main/java/com/iba/iot/datasimulator/session/factory/session/SessionFactoryImpl.java index 05e9328..ab7f58a 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/factory/session/SessionFactoryImpl.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/factory/session/SessionFactoryImpl.java @@ -47,6 +47,7 @@ public Session buildFromCreateUpdateRequest(SessionCreateUpdateRequest sessionCr session.setTimer(sessionCreateUpdateRequest.getTimer()); session.setDatasetFilter(sessionCreateUpdateRequest.getDatasetFilter()); session.setTicksNumber(sessionCreateUpdateRequest.getTicksNumber()); + session.setReplayLooped(sessionCreateUpdateRequest.isReplayLooped()); String definitionId = sessionCreateUpdateRequest.getDataDefinitionId(); if (StringUtils.isNotBlank(definitionId)) { diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/model/Session.java b/services/src/main/java/com/iba/iot/datasimulator/session/model/Session.java index 56869d3..48e56e9 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/model/Session.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/model/Session.java @@ -64,6 +64,9 @@ public class Session implements ModelEntity { @JsonView(SessionViews.Short.class) private int ticksNumber; + @JsonView(SessionViews.Short.class) + private boolean isReplayLooped; + @JsonView(SessionViews.Short.class) @Embedded @Valid diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java b/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java index 7fc1155..3f49108 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java @@ -29,6 +29,9 @@ public class SessionCreateUpdateRequest { /** **/ private int ticksNumber; + /** **/ + private boolean isReplayLooped; + @Valid private DatasetFilter datasetFilter; From ec3ce0332610fa62d0dd5b349fc7b87c91278284 Mon Sep 17 00:00:00 2001 From: Evgeny Sorokin Date: Fri, 20 Apr 2018 16:57:17 +0300 Subject: [PATCH 2/3] #4 UI: auto-replay field added to session data producing options --- .../session/SessionWizard/SessionWizard.js | 109 ++++++++++-------- .../Form/inputs/MaterialCheckbox.js | 40 +++++++ .../app/shared/models/session/SessionEntry.js | 14 ++- .../app/shared/stores/session/TimerStore.js | 11 ++ 4 files changed, 121 insertions(+), 53 deletions(-) create mode 100644 ui/public-src/src/app/shared/components/Form/inputs/MaterialCheckbox.js diff --git a/ui/public-src/src/app/screens/Main/components/session/SessionWizard/SessionWizard.js b/ui/public-src/src/app/screens/Main/components/session/SessionWizard/SessionWizard.js index f84a6e7..e5c5563 100644 --- a/ui/public-src/src/app/screens/Main/components/session/SessionWizard/SessionWizard.js +++ b/ui/public-src/src/app/screens/Main/components/session/SessionWizard/SessionWizard.js @@ -32,6 +32,7 @@ import SystemsList from "../../system/SystemsListContainer"; import MaterialTextField from "components/Form/inputs/MaterialTextField"; import MaterialSelect from "components/Form/inputs/MaterialSelect"; +import MaterialCheckbox from 'components/Form/inputs/MaterialCheckbox'; import EditorField from "components/Form/Editor"; import SchemaNode from "../../definition/DefinitionWizard/SchemaStep/SchemaConstructor/SchemaNode"; @@ -255,7 +256,7 @@ export default class SessionWizard extends React.Component { let type = field.$type || field.type; - if (field.key === "ticksNumber") + if (field.key === "ticksNumber" || field.key === 'isReplayLooped') return; if (type === "select") { @@ -310,60 +311,66 @@ export default class SessionWizard extends React.Component { {store.timerStore.isAdvancedPanelOpen && (

- { - - } + + {datasetFilterStore.shouldShowDatasetFilter && (
{ - - - {datasetFilterStore.isCustomFunctionSelected && ( -
- -
- )} - {datasetFilterStore.isDatasetEntrySelected && ( -
- - -
- )} -
+
+ + Dataset filter + + + + {datasetFilterStore.isCustomFunctionSelected && ( +
+ +
+ )} + {datasetFilterStore.isDatasetEntrySelected && ( +
+ + +
+ )} +
+
}
)} diff --git a/ui/public-src/src/app/shared/components/Form/inputs/MaterialCheckbox.js b/ui/public-src/src/app/shared/components/Form/inputs/MaterialCheckbox.js new file mode 100644 index 0000000..7e35bff --- /dev/null +++ b/ui/public-src/src/app/shared/components/Form/inputs/MaterialCheckbox.js @@ -0,0 +1,40 @@ +import React from "react"; +import { observer } from "mobx-react"; +import glamorous from "glamorous"; + +import Checkbox from "material-ui/Checkbox"; +import { InputLabel } from "material-ui/Input"; +import { FormControl, FormControlLabel } from "material-ui/Form"; + +const InputContainer = glamorous.div({ + paddingBottom: "15px", + position: "relative" +}); +const Error = glamorous.small({ + color: "red", + display: "block", + marginLeft: "5px", + position: "absolute" +}); + +export default observer( + ({ field, placeholder = null, label, style, disabled, checked, value }) => ( + + + + } + label={field.label} + /> + + {field.error} + + ) +); diff --git a/ui/public-src/src/app/shared/models/session/SessionEntry.js b/ui/public-src/src/app/shared/models/session/SessionEntry.js index 07b8826..b419634 100644 --- a/ui/public-src/src/app/shared/models/session/SessionEntry.js +++ b/ui/public-src/src/app/shared/models/session/SessionEntry.js @@ -21,12 +21,14 @@ export default class SessionEntry { @observable devices = []; @observable paths = []; @observable ticksNumber; + @observable isReplayLooped = false; constructor( { id, name, ticksNumber, + isReplayLooped = false, dataDefinition, timer = {}, generator = { type: generatorTypes.jsFunction }, @@ -40,6 +42,7 @@ export default class SessionEntry { this.id = id; this.name = name; this.ticksNumber = ticksNumber; + this.isReplayLooped = isReplayLooped; this.dataDefinition = new DefinitionEntry({ ...dataDefinition @@ -70,6 +73,11 @@ export default class SessionEntry { this.ticksNumber = ticksNumber; } + @action.bound + setIsReplayLooped(isLooped) { + this.isReplayLooped = isLooped; + } + @action.bound setDefinition(definition) { this.dataDefinition = definition; @@ -198,12 +206,13 @@ export default class SessionEntry { @computed get data() { - let { id, name, ticksNumber, timer, ...params } = this.toJSON(); + let { id, name, isReplayLooped, ticksNumber, timer, ...params } = this.toJSON(); let data = { name, ticksNumber, - timer + timer, + isReplayLooped }; if (id) { @@ -235,6 +244,7 @@ export default class SessionEntry { let params = toJS({ id: this.id, name: this.name, + isReplayLooped: this.isReplayLooped }); if(this.ticksNumber) { diff --git a/ui/public-src/src/app/shared/stores/session/TimerStore.js b/ui/public-src/src/app/shared/stores/session/TimerStore.js index 2cd0bda..f1784da 100644 --- a/ui/public-src/src/app/shared/stores/session/TimerStore.js +++ b/ui/public-src/src/app/shared/stores/session/TimerStore.js @@ -277,6 +277,17 @@ export default class TimerStore { } }, value: untracked(() => session.ticksNumber || '') + }, + { + key: 'isReplayLooped', + name: 'auto-replay', + label: 'Auto-replay', + hooks: { + onChange: field => { + session.setIsReplayLooped(field.value); + } + }, + value: untracked(() => session.isReplayLooped) } ]; From 04278fefc462e919bec8ec49f31ab55b64a8ca73 Mon Sep 17 00:00:00 2001 From: Vadzim Kazak Date: Fri, 20 Apr 2018 19:19:07 +0300 Subject: [PATCH 3/3] #4 Replay sessions in loop implementation. --- docker-compose.yml | 6 ++-- .../model/SessionCreateUpdateRequest.java | 3 +- .../model/active/ActiveSessionState.java | 2 ++ .../ActiveSessionManagementCommand.java | 4 +++ .../service/active/entity/ActiveSession.java | 35 +++++++++++++++++++ .../active/entity/ActiveSessionEntity.java | 14 ++++++++ .../active/entity/DummyActiveSession.java | 10 ++++++ .../timer/IntervalTimerProcessor.java | 2 +- .../watcher/ActiveSessionWatcherImpl.java | 23 +++++++++++- 9 files changed, 93 insertions(+), 6 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index b5b054b..9c5bd58 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -39,7 +39,7 @@ services: - 61613:61613 services: build: ./services - image: ibagroup/iot-data-simulator-services:1.0.1 + image: ibagroup/iot-data-simulator-services:1.1.0 container_name: iot-ds-services depends_on: - mongodb @@ -64,7 +64,7 @@ services: - 8083:8083 ui: build: ./ui - image: ibagroup/iot-data-simulator-ui:1.0.1 + image: ibagroup/iot-data-simulator-ui:1.1.0 container_name: iot-ds-ui depends_on: - services @@ -79,7 +79,7 @@ services: command: ["npm", "start"] data-sender: build: ./data-sender - image: ibagroup/iot-data-simulator-sender:1.0.1 + image: ibagroup/iot-data-simulator-sender:1.1.0 container_name: iot-ds-data-sender depends_on: - rabbitmq diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java b/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java index 3f49108..9c19411 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/model/SessionCreateUpdateRequest.java @@ -1,5 +1,6 @@ package com.iba.iot.datasimulator.session.model; +import com.fasterxml.jackson.annotation.JsonProperty; import com.iba.iot.datasimulator.session.model.active.filter.DatasetFilter; import com.iba.iot.datasimulator.session.model.active.generator.Generator; import com.iba.iot.datasimulator.session.model.active.injector.DeviceInjector; @@ -29,7 +30,7 @@ public class SessionCreateUpdateRequest { /** **/ private int ticksNumber; - /** **/ + @JsonProperty(value="isReplayLooped") private boolean isReplayLooped; @Valid diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/model/active/ActiveSessionState.java b/services/src/main/java/com/iba/iot/datasimulator/session/model/active/ActiveSessionState.java index c1de350..180cc94 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/model/active/ActiveSessionState.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/model/active/ActiveSessionState.java @@ -11,6 +11,8 @@ public enum ActiveSessionState { COMPLETED("completed"), + STOPPED("stopped"), + FAILED("failed"); /** **/ diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/model/active/command/ActiveSessionManagementCommand.java b/services/src/main/java/com/iba/iot/datasimulator/session/model/active/command/ActiveSessionManagementCommand.java index 2464fe5..ab5ce9c 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/model/active/command/ActiveSessionManagementCommand.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/model/active/command/ActiveSessionManagementCommand.java @@ -1,12 +1,16 @@ package com.iba.iot.datasimulator.session.model.active.command; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; import lombok.ToString; import javax.validation.constraints.NotNull; @Data @ToString +@AllArgsConstructor +@NoArgsConstructor public class ActiveSessionManagementCommand { @NotNull diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSession.java b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSession.java index 800bbf7..6bc93a3 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSession.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSession.java @@ -2,17 +2,52 @@ import com.iba.iot.datasimulator.session.model.active.ActiveSessionStatus; +/** + * + */ public interface ActiveSession { + /** + * + */ void start(); + /** + * + */ void pause(); + /** + * + */ void resume(); + /** + * + */ void stop(); + /** + * + * @param error + */ void registerError(String error); + /** + * + * @return + */ ActiveSessionStatus getStatus(); + + /** + * + * @return + */ + boolean isReplayLooped(); + + /** + * + * @return + */ + boolean isStopped(); } diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSessionEntity.java b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSessionEntity.java index 857f415..ab136df 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSessionEntity.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/ActiveSessionEntity.java @@ -61,6 +61,9 @@ public class ActiveSessionEntity implements ActiveSession { /** **/ private Collection errors = new ArrayList<>(); + /** **/ + private boolean isStopped; + /** * * @param session @@ -114,6 +117,7 @@ public void resume() { public void stop() { logger.info(">>> Session {} has been stopped", sessionId); + isStopped = true; dataProducer.stop(); } @@ -222,4 +226,14 @@ private void sendSessionFailedPayload() { private void sendSessionCompletedPayload() { payloadSender.send(sessionId, new ActiveSessionPayload(ActiveSessionState.COMPLETED), session.getTargetSystem()); } + + @Override + public boolean isReplayLooped() { + return session.isReplayLooped(); + } + + @Override + public boolean isStopped() { + return isStopped; + } } diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/DummyActiveSession.java b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/DummyActiveSession.java index 5a2af39..2e93470 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/DummyActiveSession.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/entity/DummyActiveSession.java @@ -141,4 +141,14 @@ private void sendAnalyticsMessage(ActiveSessionAnalyticTag tag, String messagePo messagingTemplate.convertAndSend(StompUtil.getSessionAnalyticsTopic(sessionId), activeSessionAnalyticsMessage); } + + @Override + public boolean isReplayLooped() { + return false; + } + + @Override + public boolean isStopped() { + return false; + } } diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/processing/timer/IntervalTimerProcessor.java b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/processing/timer/IntervalTimerProcessor.java index edd5959..a389b99 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/processing/timer/IntervalTimerProcessor.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/processing/timer/IntervalTimerProcessor.java @@ -37,7 +37,7 @@ public IntervalTimerProcessor(Timer timer) { public long getWaitInterval(String previousEntry, String nextEntry) { if (StringUtils.isEmpty(previousEntry) && StringUtils.isNotEmpty(nextEntry)) { - return 0; + return 500; } Long value = this.timer.getValue(); diff --git a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/watcher/ActiveSessionWatcherImpl.java b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/watcher/ActiveSessionWatcherImpl.java index 3561bc9..1ba3726 100644 --- a/services/src/main/java/com/iba/iot/datasimulator/session/service/active/watcher/ActiveSessionWatcherImpl.java +++ b/services/src/main/java/com/iba/iot/datasimulator/session/service/active/watcher/ActiveSessionWatcherImpl.java @@ -2,9 +2,12 @@ import com.iba.iot.datasimulator.session.model.active.ActiveSessionState; import com.iba.iot.datasimulator.session.model.active.ActiveSessionStatus; +import com.iba.iot.datasimulator.session.model.active.command.ActiveSessionManagementCommand; +import com.iba.iot.datasimulator.session.model.active.command.SessionManagementCommand; import com.iba.iot.datasimulator.session.model.active.message.ActiveSessionErrorMessage; import com.iba.iot.datasimulator.session.model.active.message.ActiveSessionStatusMessage; import com.iba.iot.datasimulator.session.service.active.entity.ActiveSession; +import com.iba.iot.datasimulator.session.service.active.manager.ActiveSessionManager; import com.iba.iot.datasimulator.session.util.ActiveSessionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +32,9 @@ public class ActiveSessionWatcherImpl implements ActiveSessionWatcher { @Qualifier("sessionStore") private Map sessionStore; + @Autowired + private ActiveSessionManager activeSessionManager; + @RabbitListener(queues = "sessionStates") @Override public void processCompletedSessions(ActiveSessionStatusMessage message) { @@ -37,14 +43,29 @@ public void processCompletedSessions(ActiveSessionStatusMessage message) { if (status.getState() == ActiveSessionState.COMPLETED) { String sessionId = status.getSessionId(); - if (ActiveSessionUtil.getSessionState(sessionStore.get(sessionId)) == ActiveSessionState.COMPLETED) { + ActiveSession session = sessionStore.get(sessionId); + if (ActiveSessionUtil.getSessionState(session) == ActiveSessionState.COMPLETED) { logger.debug("Removing session {} from session store as competed.", sessionId); sessionStore.remove(sessionId); + + if (session.isReplayLooped() && !session.isStopped()) { + restartSession(sessionId); + } } } } + /** + * + * @param sessionId + */ + private void restartSession(String sessionId) { + + logger.debug(">>> Restarting session {} on completion due to looped session replay setting."); + activeSessionManager.manage(sessionId, new ActiveSessionManagementCommand(SessionManagementCommand.START)); + } + @RabbitListener(queues = "sessionErrors") @Override public void processSessionError(ActiveSessionErrorMessage message) {