-
Notifications
You must be signed in to change notification settings - Fork 19
/
SimulationNoGUI.py
238 lines (186 loc) · 8.53 KB
/
SimulationNoGUI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
# -*- coding: utf-8 -*-
"""
Name: SimulationNoGUI.py
Brief description: Overwrite some methods to implement the no gui version of DEVSimPy and make simulation from dsp file
in batch mode
Author(s): A-T. Luciani <[email protected]>, capocchi <[email protected]>
Version: 1.0
Last modified: 2015.01.11 by L. Capocchi
GENERAL NOTES AND REMARKS:
GLOBAL VARIABLES AND FUNCTIONS:
"""
import os
import sys
import time
import builtins
import traceback
import gettext
import json
import pusher
_ = gettext.gettext
path = os.path.join('Domain')
if path not in sys.path:
sys.path.append(path)
class Printer:
"""
Print things to stdout on one line dynamically
"""
def __init__(self,data):
sys.stdout.write("\r\x1b[K"+data.__str__())
sys.stdout.flush()
def yes(prompt:str = 'Please enter Yes/No: ')->bool:
while True:
try:
i = input(prompt)
except KeyboardInterrupt:
return False
if i.lower() in ('yes','y'): return True
elif i.lower() in ('no','n'): return False
class SimuPusher():
def __init__(self, simu_name):
# app_id/key/secret might be linked to user TBC
self.app_id = '178867'
self.key = 'c2d255356f53779e6020'
self.secret = '9d41a54d45d25274df63'
self.pusher = pusher.Pusher(app_id=self.app_id,key=self.key,secret=self.secret,ssl=True,port=443)
self.channel = simu_name
def push(self, event, data):
self.pusher.trigger(self.channel, event, json.dumps(data))
class PrintPusher():
def __init__(self, simu_name):
pass
def push(self, event, data):
print((json.dumps(data)))
def makeSimulation(master, T, simu_name:str="simu", is_remote:bool=False, json_trace:bool=True):
"""
"""
from InteractionSocket import InteractionManager
json_report = {'date':time.strftime("%c")}
json_report['summary'] ="Simulation in batch mode with %s"%builtins.__dict__['DEFAULT_DEVS_DIRNAME']
json_report['mode'] ='no-gui'
json_report['time'] = T
json_report['success'] = True
json_report['output'] = []
if not master : return False
json_report['devs_instance'] = str(master)
if isinstance(master, tuple):
json_report['summary'] += "...DEVS instance not created: %s\n"%str(master)
sys.stdout.write(json.dumps(json_report))
return False
else:
json_report['summary'] += "...DEVS instance created"
# Start Simulation
json_report['summary'] += "...Performing DEVS simulation"
CPUduration = 0.0
interactionManager = None
try:
# # Pusher service for Simulation --> User communication
# simuPusher = SimuPusher(simu_name) if is_remote else PrintPusher(simu_name)
# ### Get live stream URL if exist :
# for m in [a for a in master.getComponentSet() if hasattr(a, 'plotUrl') and a.plotUrl != '']:
# json_report['output'].append({'label':m.name, 'plotUrl':m.plotUrl})
# ### Get live stream URL if exist :
# for m in [a for a in master.getComponentSet() if hasattr(a, 'pusherChannel')]:
# m.pusherChannel = simu_name
# json_report['output'].append({'label':m.name, 'pusherChannel':m.pusherChannel})
# # Send to user
# simuPusher.push('live_streams', {'live_streams': json_report['output']})
sim = runSimulation(master, T)
thread = sim.Run()
if is_remote:
# Socket service for WebService <--> Simulation communication
socket_id='socket_'+simu_name
interactionManager = InteractionManager(socket_id=socket_id, simulation_thread=thread)
interactionManager.start()
first_real_time = time.time()
progress = 0
if not builtins.__dict__['NTL']:
while thread.isAlive() if hasattr(thread,'isAlive') else thread.is_alive():
new_real_time = time.time()
CPUduration = new_real_time - first_real_time
new_progress = 100.0*(float(thread.model.timeLast) / float(T)) if float(T) != 0 else 100.0
if new_progress - progress > 5:
progress = new_progress
# simuPusher.push('progress', {'progress':progress})
if not json_trace:
Printer(CPUduration)
if interactionManager != None:
interactionManager.stop()
interactionManager.join()
# simuPusher.push('progress', {'progress':100})
except:
json_report['summary'] += " *** EXCEPTION raised in simulation ***"
json_report['success'] = False
sys.stderr.write(traceback.format_exc())
if interactionManager != None:
interactionManager.stop()
interactionManager.join()
### if sim_name is in param, a log file is writed on the logs dir
if simu_name:
os.makedirs('logs', exist_ok=True)
with open(os.path.join('logs',simu_name+'.report'), 'w') as f:
f.write(json.dumps(json_report))
json_report['summary'] += "...DEVS simulation completed!"
json_report['duration'] = CPUduration
# ### inform that data file has been generated
# json_report['output'] = []
# for m in [a for a in master.getComponentSet() if hasattr(a, 'fileName')]:
# for i in range(len(m.IPorts)):
# fn ='%s%s.dat'%(m.fileName,str(i))
# if os.path.exists(fn):
# json_report['output'].append({'label':m.name+'_port_' + str(i),
# 'filename':os.path.basename(fn)})
# for m in [a for a in master.getComponentSet() if hasattr(a, 'plotUrl')]:
# json_report['output'].append({'label':m.name, 'plotUrl':m.plotUrl})
# ### if sim_name is in param, a log file is writed on the logs dir
# if simu_name:
# os.makedirs('logs', exist_ok=True)
# with open(os.path.join('logs',simu_name+'.report'), 'w') as f:
# f.write(json.dumps(json_report))
return True
class runSimulation:
"""
"""
def __init__(self, master, time):
""" Constructor.
"""
# local copy
self.master = master
self.time = time
### No time limit simulation (defined in the builtin dico from .devsimpy file)
self.ntl = builtins.__dict__['NTL']
# simulator strategy
self.selected_strategy = DEFAULT_SIM_STRATEGY
self.dynamic_structure_flag = builtins.__dict__['DYNAMIC_STRUCTURE']
self.real_time_flag = builtins.__dict__['REAL_TIME']
### profiling simulation
self.prof = False
self.verbose = False
# definition du thread, du timer et du compteur pour les % de simulation
self.thread = None
self.count = 10.0
self.stdioWin = None
###
def Run(self):
""" run simulation.
"""
assert(self.master is not None)
### pour prendre en compte les simulations multiples sans relancer un SimulationDialog
### si le thread n'est pas lanc� (pas pendant un suspend)
# if self.thread is not None and not self.thread.thread_suspend:
diagram = self.master.getBlockModel()
################################################################################################################
######### To Do : refaire l'enregistrement du chemin d'enregistrements des resultats du to_disk ###################
for m in self.master.getComponentSet():
if str(m)=='To_Disk':
dir_fn = os.path.dirname(diagram.last_name_saved).replace('\t','').replace(' ','')
# label = m.getBlockModel()
m.fileName = os.path.join(dir_fn,"%s_%s"%(os.path.basename(diagram.last_name_saved).split('.')[0],os.path.basename(m.fileName)))
################################################################################################################
################################################################################################################
if self.master:
from Patterns.Factory import simulator_factory
if not self.ntl:
self.master.FINAL_TIME = float(self.time)
self.thread = simulator_factory(self.master, self.selected_strategy, self.prof, self.ntl, self.verbose, self.dynamic_structure_flag, self.real_time_flag)
return self.thread