-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathtest_iotedgedev_simulator.py
133 lines (97 loc) · 4.66 KB
/
test_iotedgedev_simulator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import os
import pytest
import shutil
import time
from iotedgedev.version import PY35
from iotedgedev.envvars import EnvVars
from iotedgedev.output import Output
from .utility import get_docker_os_type
from .utility import get_platform_type
from .utility import runner_invoke
pytestmark = pytest.mark.e2e
output = Output()
envvars = EnvVars(output)
env_file_name = envvars.get_dotenv_file()
env_file_path = envvars.get_dotenv_path(env_file_name)
root_dir = os.getcwd()
tests_dir = os.path.join(root_dir, 'tests')
test_solution = 'test_solution'
test_solution_dir = os.path.join(tests_dir, test_solution)
@pytest.fixture(scope="module", autouse=True)
def create_solution(request):
os.chdir(tests_dir)
result = runner_invoke(['solution', 'new', test_solution])
assert 'AZURE IOT EDGE SOLUTION CREATED' in result.output
shutil.copyfile(env_file_path, os.path.join(test_solution_dir, env_file_name))
os.chdir(test_solution_dir)
def clean():
os.chdir(root_dir)
time.sleep(5)
shutil.rmtree(test_solution_dir, ignore_errors=True)
shutil.rmtree(test_solution_dir+"_shared_lib", ignore_errors=True)
runner_invoke(['simulator', 'stop'])
request.addfinalizer(clean)
return
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_setup():
# result = runner_invoke(['simulator', 'setup'])
#
# assert 'Setup IoT Edge Simulator successfully.' in result.output
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_setup_with_iothub():
# result = runner_invoke(['simulator', 'setup', '-i', os.getenv("IOTHUB_CONNECTION_STRING")])
#
# assert 'Setup IoT Edge Simulator successfully.' in result.output
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_start_single():
# result = runner_invoke(['simulator', 'start', '-i', 'input1'])
# assert 'IoT Edge Simulator has been started in single module mode.' in result.output
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_modulecred():
# result = runner_invoke(['simulator', 'modulecred'])
#
# assert 'EdgeHubConnectionString=HostName=' in result.output
# assert 'EdgeModuleCACertificateFile=' in result.output
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_stop(capfd):
# runner_invoke(['simulator', 'stop'])
# out, err = capfd.readouterr()
#
# assert 'IoT Edge Simulator has been stopped successfully.' in out
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_start_solution(capfd):
# result = runner_invoke(['simulator', 'start', '-s', '-b', '-f', 'deployment.template.json'])
# out, err = capfd.readouterr()
#
# assert 'BUILD COMPLETE' in result.output
# assert 'IoT Edge Simulator has been started in solution mode.' in out
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_start_solution_with_setup(capfd):
# result = runner_invoke(['simulator', 'start', '--setup', '-s', '-b', '-f', 'deployment.template.json'])
# out, err = capfd.readouterr()
# assert 'Setup IoT Edge Simulator successfully.' in result.output
# assert 'BUILD COMPLETE' in result.output
# assert 'IoT Edge Simulator has been started in solution mode.' in out
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_monitor(capfd):
# try:
# result = runner_invoke(['monitor', '--timeout', '30'])
# out, err = capfd.readouterr()
# # Assert output from simulator
# sim_match = 'timeCreated'
# if not PY35:
# assert 'Monitoring events from device' in out
# assert sim_match in out
# else:
# assert not err
# assert sim_match in result.output
# finally:
# test_stop(capfd)
# @pytest.mark.skipif(get_docker_os_type() == 'windows', reason='Simulator does not support windows container')
# def test_start_solution_with_deployment(capfd):
# platform_type = get_platform_type()
# deployment_file_path = os.path.join(test_solution_dir, 'config', 'deployment.' + platform_type + '.json')
# runner_invoke(['simulator', 'start', '-f', deployment_file_path])
# out, err = capfd.readouterr()
# assert 'IoT Edge Simulator has been started in solution mode.' in out
# test_monitor(capfd)