-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathconftest.py
1038 lines (923 loc) · 42.1 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
# _*_ coding:utf-8 _*_
# SPDX-FileCopyrightText: 2023 UnionTech Software Technology Co., Ltd.
# SPDX-License-Identifier: GPL-2.0-only
# pylint: disable=C0301,R0912,C0413,R0914,W0212,R1702,R0915
# pylint: disable=C0114,W0621,C0411,C0412,R1706,E0401
import re
import sys
from os import environ
from setting.globalconfig import GlobalConfig
environ["DISPLAY"] = ":0"
environ["PIPENV_VERBOSITY"] = "-1"
environ["XAUTHORITY"] = f"{GlobalConfig.HOME}/.Xauthority"
from setting.globalconfig import SystemPath
for i in SystemPath:
if i.value not in sys.path:
sys.path.append(i.value)
from os import system
from os import remove
from os import makedirs
from os import walk
from os.path import exists
from os.path import join
from os.path import splitext
from enum import Enum
from time import sleep
from collections import deque, Counter
from datetime import datetime
from json import dumps
from re import findall
from shutil import copyfile
from multiprocessing import Process
from concurrent.futures import wait
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ALL_COMPLETED
import allure
import pytest
from _pytest.mark import Mark
from _pytest.terminal import TerminalReporter
from funnylog.conf import setting as log_setting
try:
import letmego
HAS_LETMEGO = True
letmego.conf.setting.PASSWORD = GlobalConfig.PASSWORD
letmego.conf.setting.RUNNING_MAN_FILE = f"{GlobalConfig.REPORT_PATH}/_running_man.log"
letmego.conf.setting.DEBUG = GlobalConfig.LETMEGO_DEBUG
except ModuleNotFoundError:
HAS_LETMEGO = False
log_setting.LOG_FILE_PATH = GlobalConfig.REPORT_PATH
log_setting.CLASS_NAME_STARTSWITH = GlobalConfig.CLASS_NAME_STARTSWITH
log_setting.CLASS_NAME_ENDSWITH = GlobalConfig.CLASS_NAME_ENDSWITH
log_setting.CLASS_NAME_CONTAIN = GlobalConfig.CLASS_NAME_CONTAIN
from setting import skipif
from setting.globalconfig import ConfStr
from setting.globalconfig import FixedCsvTitle
from src import logger
from src.plugins.allure_report_extend import AllureReportExtend
from src.plugins import emoji_hooks
from src.cmdctl import CmdCtl
from src.pms._base import write_case_result
from src.pms._base import runs_id_cmd_log
from src.pms.task import Task
from src.pms.suite import Suite
from src.pms.send2pms import Send2Pms
from src.recording_screen import recording_screen
FLAG_FEEL = "=" * 10
LN = "\n"
class LabelType(Enum):
L1 = allure.severity_level.BLOCKER
L2 = allure.severity_level.CRITICAL
L3 = allure.severity_level.NORMAL
L4 = allure.severity_level.MINOR
def add_mark(item, name: str = "", args: tuple = (), kwargs: dict = None):
item.own_markers.append(Mark(name=name, args=args, kwargs=kwargs))
def write_json(session):
return bool(
session.config.option.send_pms
and (session.config.option.task_id or session.config.option.suite_id)
)
def auto_send(session):
return bool(session.config.option.send_pms and session.config.option.trigger)
def async_send(session):
return bool(
session.config.option.send_pms == ConfStr.ASYNC.value
and session.config.option.trigger == ConfStr.AUTO.value
)
def finish_send(session):
return bool(
session.config.option.send_pms == ConfStr.FINISH.value
and session.config.option.trigger == ConfStr.AUTO.value
)
def pytest_addoption(parser):
"""pytest_cmdline_main"""
parser.addoption("--clean", action="store", default="no", help="是否清理环境&杀进程")
parser.addoption(
"--log_level", action="store", default=GlobalConfig.LOG_LEVEL, help="终端日志输出级别"
)
try:
parser.addoption("--noskip", action="store", default="", help="skip-xxx标签不生效")
parser.addoption("--ifixed", action="store", default="", help="fixed-xxx标签不生效")
except ValueError:
...
parser.addoption("--max_fail", action="store", default="", help="最大失败次数")
parser.addoption(
"--record_failed_case", action="store", default="", help="失败录屏从第几次失败开始录制视频"
)
parser.addoption("--send_pms", action="store", default="", help="用例数据回填")
parser.addoption("--task_id", action="store", default="", help="测试单id")
parser.addoption("--trigger", action="store", default="", help="数据回填的触发者")
parser.addoption("--suite_id", action="store", default="", help="pms的测试套件ID")
parser.addoption("--pms_user", action="store", default="", help="登录pms的账号")
parser.addoption("--pms_password", action="store", default="", help="登录pms的密码")
parser.addoption("--top", action="store", default="", help="过程中记录top命令中的值")
parser.addoption(
"--duringfail",
action="store_true",
dest="duringfail",
default=False,
help="出现错误时立即显示",
)
parser.addoption("--repeat", action="store", default=1, type=int, help="用例重复执行的次数")
parser.addoption("--export_csv_file", action="store", default="", help="导出csv文件")
parser.addoption("--line", action="store", default="", help="业务线(CI)")
parser.addoption("--app_name", action="store", default="", help="执行的应用名称")
parser.addoption("--slaves", action="store", default="", help="远程测试机")
parser.addoption(
"--autostart", action="store", default="", help="重启类场景开启letmego执行方案"
)
def pytest_cmdline_main(config):
# 初始化log配置,以解决html报告日志格式问题
log_info = logger(config.option.log_level)
config.option.log_level = config.option.log_level
config.option.log_format = log_info.log_format
config.option.log_date_format = log_info.date_format
def pytest_addhooks(pluginmanager):
"""pytest_addhooks"""
pluginmanager.add_hookspecs(emoji_hooks)
@pytest.mark.trylast
def pytest_configure(config):
if hasattr(config, "workerinput"):
return # xdist worker
reporter = config.pluginmanager.getplugin("terminalreporter")
if config.option.duringfail and reporter:
custom_reporter = DuringfailingTerminalReporter(reporter)
config.pluginmanager.unregister(custom_reporter)
config.pluginmanager.register(custom_reporter)
def pytest_sessionstart(session):
_display = (
GlobalConfig.DisplayServer.wayland
if GlobalConfig.IS_WAYLAND
else GlobalConfig.DisplayServer.x11
)
logger.info(f"当前系统显示协议为 {_display.title()}")
session.config.option.start_time = datetime.now()
user = session.config.option.pms_user
password = session.config.option.pms_password
task_id = session.config.option.task_id
suite_id = session.config.option.suite_id
if write_json(session):
session.case_res_path = Send2Pms.case_res_path(task_id or suite_id)
session.data_send_result_csv = Send2Pms.data_send_result_csv(task_id or suite_id)
if user and password and async_send(session):
session.all_thread_task = []
session.t_executor = ThreadPoolExecutor()
if not session.config.option.collectonly and session.config.option.top:
def record_top():
top_log_path = f"{GlobalConfig.REPORT_PATH}/logs"
if not exists(top_log_path):
makedirs(top_log_path)
system(
f"{GlobalConfig.top_cmd} | grep ^top -A {int(session.config.option.top) + 6} > "
f"{top_log_path}/top_{GlobalConfig.TIME_STRING}.log"
)
session.p = Process(target=record_top, args=())
session.p.start()
@pytest.hookimpl(trylast=True)
def pytest_generate_tests(metafunc):
repeat = metafunc.config.option.repeat
marks = metafunc.definition.get_closest_marker("repeat")
if marks is not None:
repeat = int(marks.args[0])
if repeat > 1:
metafunc.fixturenames.append("__pytest_repeat_step_number")
def ids(i, number=repeat):
return f"{i + 1}-{number}"
metafunc.parametrize(
"__pytest_repeat_step_number",
range(repeat),
indirect=True,
ids=ids,
)
def pytest_collection_modifyitems(session):
walk_dir = (
f"{GlobalConfig.APPS_PATH}/{session.config.option.app_name}"
if session.config.option.app_name
and exists(f"{GlobalConfig.APPS_PATH}/{session.config.option.app_name}")
else GlobalConfig.APPS_PATH
)
csv_path_dict, no_youqu_mark = walk_apps(walk_dir)
if not csv_path_dict:
return
user = session.config.option.pms_user
password = session.config.option.pms_password
suite_id = session.config.option.suite_id
task_id = session.config.option.task_id
containers = {}
suite_runs_ids = suit_id_deque = task_runs_ids = task_id_deque = None
skip_index = fixed_index = removed_index = pms_id_index = None
if suite_id and task_id:
raise ValueError("suite_id 和 task_id 不能同时存在~")
if suite_id or task_id:
if not (user and password):
raise ValueError("pms_user 或 pms_password 未传入")
if suite_id:
suite_runs_ids, suit_id_deque = get_runs_id_deque(user, password, Suite, "suite", suite_id)
print(
f"{LN}测试套件: https://pms.uniontech.com/zentao/testsuite-view-{suite_id}.html"
f"{LN}关联的用例:{LN}{f'{LN}'.join([runs_id_cmd_log(i) for i in suite_runs_ids])}"
)
elif task_id:
task_runs_ids, task_id_deque = get_runs_id_deque(user, password, Task, "task", task_id)
print(
f"{LN}测试单: https://pms.uniontech.com/testtask-cases-{task_id}.html"
f"{LN}关联的用例:{LN}{f'{LN}'.join([runs_id_cmd_log(i) for i in task_runs_ids])}"
)
for item in session.items[::-1]:
item.name = item.name.encode("utf-8").decode("unicode_escape")
item._nodeid = item.nodeid.encode("utf-8").decode("unicode_escape")
if no_youqu_mark:
continue_flag = False
for app_abspath in no_youqu_mark:
if app_abspath in item.fspath.strpath:
continue_flag = True
break
if continue_flag:
continue
try:
csv_name, _id = findall(r"test_(.*?)_(\d+)", item.name)[0]
_case_name, _case_id = findall(r"test_(.*?)_(\d+)", item.fspath.purebasename)[0]
if _id != _case_id:
raise ValueError
if _case_name != csv_name:
raise FileNotFoundError
except IndexError:
skip_text = f"用例函数名称缺少用例id:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
except ValueError:
skip_text = f"用例py文件的id与用例函数的id不一致:[{item.nodeid}]"
logger.error(skip_text)
add_mark(item, ConfStr.SKIP.value, (skip_text,), {})
except FileNotFoundError:
logger.error(f"用例py文件的名称与用例函数的名称不一致:[{item.nodeid}]")
session.items.remove(item)
else:
csv_path = csv_path_dict.get(csv_name)
if not csv_path:
if "asan" not in csv_name:
logger.error(f"{csv_name}.csv 文件不存在!")
continue
if not containers.get(csv_path):
with open(csv_path, "r", encoding="utf-8") as _f:
txt_list = _f.readlines()
if not txt_list:
continue
# 通过csv的表头找到对应的索引(排除ID列的索引)
for index, title in enumerate(txt_list[0].strip().split(",")):
if title.strip() == FixedCsvTitle.skip_reason.value:
skip_index = index - 1
elif title.strip() == FixedCsvTitle.fixed.value:
fixed_index = index - 1
elif title.strip() == FixedCsvTitle.removed.value:
removed_index = index - 1
elif title.strip().strip("*") == FixedCsvTitle.pms_case_id.value.strip("*"):
pms_id_index = index - 1
taglines = [txt.strip().split(",") for txt in txt_list[1:]]
id_tags_dict = {f"{int(i[0]):0>3}": i[1:] for i in taglines if i[0]}
# 每个csv文件单独管理一套index
containers[csv_path] = id_tags_dict
containers[csv_path][ConfStr.SKIP_INDEX.value] = skip_index
containers[csv_path][ConfStr.FIXED_INDEX.value] = fixed_index
containers[csv_path][ConfStr.REMOVED_INDEX.value] = removed_index
containers[csv_path][ConfStr.PMS_ID_INDEX.value] = pms_id_index
# 将index重置
skip_index = fixed_index = removed_index = pms_id_index = None
# 如果是想通过测试单跑或者测试套件跑用例,但是csv文件里面又没有保存“PMS用例ID”列,直接不跑
if (task_id or suite_id) and containers[csv_path][ConfStr.PMS_ID_INDEX.value] is None:
session.items.remove(item)
continue
tags = containers.get(csv_path).get(_id)
if tags:
try:
if containers[csv_path][ConfStr.REMOVED_INDEX.value] is not None and tags[
containers[csv_path][ConfStr.REMOVED_INDEX.value]
].strip('"').startswith(f"{ConfStr.REMOVED.value}-"):
session.items.remove(item)
continue
except IndexError as exc:
logger.error(
f"\ncsv_path:\t{csv_path}\ntags:\t{tags}\n"
f"error_tag_index:\t{containers[csv_path][ConfStr.REMOVED_INDEX.value]}"
)
raise IndexError from exc
for index, tag in enumerate(tags):
if tag:
tag = tag.strip('"')
# 先处理“跳过原因”列
if index == containers[csv_path][ConfStr.SKIP_INDEX.value]:
# 标签是以 “skip-” 开头, noskip 用于解除所有的skip
if not session.config.option.noskip and tag.startswith(
f"{ConfStr.SKIP.value}-"
):
# 标签以 “fixed-” 开头, ifixed表示ignore fixed, 用于忽略所有的fixed
# 1. 不给ifixed参数时,只要标记了fixed的用例,即使标记了skip-,也会执行;
# 2. 给ifixed 参数时(--ifixed yes),fixed不生效,仅通过skip跳过用例;
try:
if (
not session.config.option.ifixed
and containers[csv_path][ConfStr.FIXED_INDEX.value]
is not None
and tags[containers[csv_path][ConfStr.FIXED_INDEX.value]]
.strip('"')
.startswith(f"{ConfStr.FIXED.value}-")
):
continue
except IndexError:
# 如果访问越界,说明这行没有fixed标签或者标签写错位置了,所以正常跳过
pass
add_mark(item, ConfStr.SKIP.value, (tag,), {})
elif (
not session.config.option.noskip
and f"{ConfStr.SKIPIF.value}_" in tag
):
tag_list = tag.split("&&")
for _tag in tag_list:
skip_method, param = _tag.strip(" ").split("-", maxsplit=1)
if hasattr(skipif, skip_method):
skip_result = getattr(skipif, skip_method)(param)
add_mark(
item,
ConfStr.SKIPIF.value,
(skip_result,),
{"reason": _tag},
)
else:
logger.error(
f"未找到判断是否跳过的自定义方法 <{skip_method}>"
)
add_mark(
item,
ConfStr.SKIP.value,
(f"未找到判断是否跳过的自定义方法 <{skip_method}>",),
{},
)
else: # 非跳过列
# 处理 pms id
if containers[csv_path][ConfStr.PMS_ID_INDEX.value] == index:
if suite_runs_ids:
if tag not in suit_id_deque:
session.items.remove(item)
continue
add_run_case_id(session, item, tag, suite_runs_ids)
elif task_runs_ids:
if tag not in task_id_deque:
session.items.remove(item)
continue
add_run_case_id(session, item, tag, task_runs_ids)
# 处理其他自定义标签
try:
mark_title = txt_list[0].strip().split(",")[index + 1]
except IndexError:
# 如果写了标签,但是没有对应的表头
mark_title = ""
add_mark(item, tag, (mark_title,), {})
else: # tag为空
# 处理 pmd id 为空的情况
if (task_id or suite_id) and containers[csv_path][
ConfStr.PMS_ID_INDEX.value
] == index:
session.items.remove(item)
continue
else:
if session.config.option.allure_report_dir:
# 批量执行时,不执行没有ID的用例。
logger.error(f"<{item.name}> csv文件中未标记,强制跳过")
session.items.remove(item)
if session.config.option.autostart:
for item in session.items[::-1]:
_reruns = None
if hasattr(session.config.option, "reruns"):
_reruns = session.config.option.reruns
if HAS_LETMEGO:
if letmego.read_testcase_running_status(item, reruns=_reruns):
session.items.remove(item)
if (suite_id or task_id) and session.items:
print("\n即将执行的用例:")
for item in session.items:
for mark in item.own_markers:
if mark.args == (FixedCsvTitle.pms_case_id.value,):
print(f"case_id: {mark.name}, case_name: {item.name}")
break
print() # 处理日志换行
def pytest_collection_finish(session):
session.item_count = len(session.items)
pop_skip_case_from_items = session.items[:]
is_skiped_case = False
for item in pop_skip_case_from_items[::-1]:
for mark in item.own_markers:
if mark.name == ConfStr.SKIP.value:
is_skiped_case = True
try:
pop_skip_case_from_items.remove(item)
except ValueError:
...
elif mark.name == ConfStr.SKIPIF.value and mark.args == (True,):
is_skiped_case = True
try:
pop_skip_case_from_items.remove(item)
except ValueError:
...
print(
f"用例收集数量:\t{session.item_count} "
f"{f'(剔除跳过: {len(pop_skip_case_from_items)})' if is_skiped_case else ''}"
)
print(
f"用例文件数量:\t{len(set([item.fspath for item in session.items]))} "
f"{f'(剔除跳过: {len(set([item.fspath for item in pop_skip_case_from_items]))})' if is_skiped_case else ''}"
)
if session.config.option.reruns and not session.config.option.collectonly:
print(f"失败重跑次数:\t{session.config.option.reruns}")
if session.config.option.max_fail and not session.config.option.collectonly:
session.config.option.maxfail = int(
float(session.config.option.max_fail) * session.item_count
)
print(f"最大失败次数:\t{session.config.option.maxfail}")
session.sessiontimeout = 0
if session.config.option.timeout and not session.config.option.collectonly:
_min, sec = divmod(int(session.config.option.timeout), 60)
hour, _min = divmod(_min, 60)
print(
f"用例超时时间:\t{session.config.option.timeout}s ({hour}{'小时' if hour else ''}{_min}{'分' if _min else ''}{sec}秒)"
)
# sessiontimeout
_n = 0
items_timeout = 0
for item in session.items:
for mark in item.own_markers:
if mark.name == "timeout":
try:
item_timeout = mark.args[0]
_n += 1
except IndexError:
item_timeout = 0
items_timeout += item_timeout
break
session.sessiontimeout = (
(session.item_count - _n) * session.config.option.timeout
) + items_timeout
_min, sec = divmod(int(session.sessiontimeout), 60)
hour, _min = divmod(_min, 60)
print(
f"会话超时时间:\t{session.sessiontimeout}s ({hour}{'小时' if hour else ''}{_min}{'分' if _min else ''}{sec}秒)"
)
if session.config.option.collectonly and session.config.option.export_csv_file:
execute = []
execute.append("用例名称," + GlobalConfig.EXPORT_CSV_HEARD + LN)
for item in session.items:
node_id = item.nodeid.split("[")[0]
header = GlobalConfig.EXPORT_CSV_HEARD.split(",")
case_info = ["" for _ in header]
case_info.insert(0, node_id)
for mark in item.own_markers:
try:
index = header.index(mark.args[0]) + 1
except (ValueError, IndexError):
continue
case_info[index] = mark.name
execute.append(",".join(case_info) + LN)
execute2 = list(set(execute))
execute2.sort(key=execute.index)
if not exists(GlobalConfig.REPORT_PATH):
makedirs(GlobalConfig.REPORT_PATH)
with open(
f"{GlobalConfig.REPORT_PATH}/{session.config.option.export_csv_file}",
"w+",
encoding="utf-8",
) as _f:
_f.writelines(execute2)
def pytest_runtest_setup(item):
if HAS_LETMEGO and hasattr(item, "execution_count"):
letmego.conf.setting.EXECUTION_COUNT = item.execution_count
print() # 处理首行日志换行的问题
current_item_count = f"[{item.session.items.index(item) + 1}/{item.session.item_count}] "
try:
current_item_percent = "{:.0f}%".format(
int(item.session.items.index(item) + 1) / int(item.session.item_count) * 100
)
except:
current_item_percent = ""
try:
rerun_text = f" | <重跑第{item.execution_count - 1}次>" if item.execution_count > 1 else ""
except AttributeError:
rerun_text = ""
logger.info(
f"{LN}{FLAG_FEEL} {item.function.__name__} || "
f"{str(item.function.__doc__).replace(LN, '').replace(' ', '')}{rerun_text} "
f"{FLAG_FEEL} {current_item_count} {current_item_percent}"
)
try:
if item.execution_count >= (int(item.config.option.record_failed_case) + 1):
logger.info("开启录屏")
item.record = {}
item.record["object"] = recording_screen(
f"{item.name}_{item.execution_count}"
) # 存放录屏对象
item.record["image_path"] = next(item.record["object"]) # 录屏文件地址
sleep(3) # 等待3秒,优化录屏效果
except AttributeError:
pass
if item.config.option.pms_user and item.config.option.pms_password:
def send2pms(case_res_path, data_send_result_csv):
Send2Pms(
user=item.config.option.pms_user,
password=item.config.option.pms_password,
).send2pms(case_res_path, data_send_result_csv)
if async_send(item.session):
task = item.session.t_executor.submit(
send2pms, item.session.case_res_path, item.session.data_send_result_csv
)
item.session.all_thread_task.append(task)
# pylint: disable=unused-argument
def pytest_runtest_call(item):
logger.info(f"{FLAG_FEEL} case body {FLAG_FEEL}")
def pytest_runtest_teardown(item):
logger.info(f"{FLAG_FEEL} teardown {FLAG_FEEL}")
sessiontimeout = item.session.sessiontimeout
if sessiontimeout:
duration = datetime.now() - item.session.config.option.start_time
if duration.seconds > int(sessiontimeout):
# 处理时间秒为 xx分xx秒
_min, sec = divmod(duration.seconds, 60)
# 处理时间分为 xx小时xx分xx秒
hour, _min = divmod(_min, 60)
raise item.session.Interrupted(f"会话超时({hour}小时{_min}分{sec}秒),用例强制终止!")
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
out = yield
report = out.get_result()
if report.when == "setup":
for mark in item.own_markers:
if mark.name == "parametrize":
continue
if mark.args:
if mark.args[0] == FixedCsvTitle.case_level.value:
try:
allure.dynamic.severity(LabelType[mark.name].value)
except KeyError:
allure.dynamic.severity(LabelType.L3.value)
elif mark.args[0] == FixedCsvTitle.pms_case_id.value:
# if mark.name:
testcase_url = f"https://pms.uniontech.com/testcase-view-{mark.name}.html"
allure.dynamic.testcase(testcase_url)
logger.info(testcase_url)
else:
allure.dynamic.tag(mark.name)
if report.outcome == "failed":
if write_json(item.session):
write_case_result(item, report)
if report.when == "call":
logger.info(f"运行结果: {str(report.outcome).upper()}")
if write_json(item.session):
# 只要是需要数据回填(无论是自动还是手动),都需要写json结果.
write_case_result(item, report)
if HAS_LETMEGO and item.config.option.autostart:
letmego.write_testcase_running_status(item, report)
try:
if item.execution_count >= (int(item.config.option.record_failed_case) + 1):
if report.when == "call": # 存放录屏当次测试结果
item.record["result"] = report.outcome
try:
# 记录断言的模板图片
item.record["template"] = call.excinfo.value.args[0].args[1:]
except (IndexError, KeyError, AttributeError):
# 记录ocr识别区域图片
try:
pic = call.excinfo.value.args[0][1]
if isinstance(pic, tuple):
item.record["ocr"] = call.excinfo.value.args[0][1]
except (IndexError, AttributeError, TypeError):
# 非ocr断言
pass
elif report.when == "teardown":
try:
sleep(3)
# 调用生成器保存视频
next(item.record["object"])
except StopIteration:
# 录屏时测试结果为passed,则删除视频
if item.record.get("result") == ConfStr.PASSED.value:
try:
remove(item.record["image_path"])
except FileNotFoundError:
pass
else:
if exists(GlobalConfig.SCREEN_CACHE):
screen_png = f"{splitext(item.record['image_path'])[0]}.png"
copyfile(GlobalConfig.SCREEN_CACHE, screen_png)
allure.attach.file(
screen_png,
name="屏幕截图",
attachment_type=allure.attachment_type.PNG,
)
try:
for index, tem in enumerate(item.record["template"]):
template = f"{splitext(item.record['image_path'])[0]}_template_{index}.png"
CmdCtl.run_cmd(f"cp {tem}.png {template}")
allure.attach.file(
template,
name="模板图片",
attachment_type=allure.attachment_type.PNG,
)
except (FileNotFoundError, KeyError):
# 非图像识别错误
pass
try:
template = f"{splitext(item.record['image_path'])[0]}_ocr_.png"
CmdCtl.run_cmd(f"cp {item.record['ocr']} {template}")
allure.attach.file(
template,
name="OCR识别区域",
attachment_type=allure.attachment_type.PNG,
)
except KeyError:
# ocr 识别区域
pass
allure.attach.file(
item.record["image_path"],
name="用例视频",
attachment_type=allure.attachment_type.MP4,
)
logger.info(
"结束录屏! "
f"{'重跑用例测试成功,删除视频录像' if item.record.get('result') == ConfStr.PASSED.value else ''}"
)
except (AttributeError, KeyError):
pass
def pytest_report_teststatus(report, config):
# 在 setup 和 teardown 阶段处理 error 和 skip
if report.when in ("setup", "teardown"):
if report.failed:
short, verbose = config.hook.pytest_emoji_error(
config=config, head_line=report.head_line
)
return "error", short, verbose
if report.skipped:
short, verbose = config.hook.pytest_emoji_skipped(
config=config, head_line=report.head_line
)
return "skipped", short, verbose
# 在用例执行阶段处理 passed skipped failed
if report.when == "call":
short = verbose = ""
if report.passed:
short, verbose = config.hook.pytest_emoji_passed(
config=config, head_line=report.head_line
)
elif report.skipped:
short, verbose = config.hook.pytest_emoji_skipped(
config=config, head_line=report.head_line
)
elif report.failed:
short, verbose = config.hook.pytest_emoji_failed(
config=config, head_line=report.head_line
)
return report.outcome, short, verbose
return None
def pytest_sessionfinish(session):
tr = session.config.pluginmanager.get_plugin("terminalreporter")
execute = {}
for _, items in tr.stats.items():
for item in items:
if hasattr(item, "outcome"):
default_result = {"result": "blocked", "longrepr": "None"}
if item.outcome == ConfStr.PASSED.value:
default_result["result"] = "pass"
elif item.outcome == ConfStr.SKIPPED.value:
default_result["result"] = "skip"
elif item.outcome == ConfStr.RERUN.value:
continue
else:
default_result["result"] = "fail"
default_result["longrepr"] = item.longreprtext
item_name = item.fspath
if not execute.get(item_name) or (
item.outcome != ConfStr.PASSED.value
and execute.get(item_name).get("result") == "pass"
):
execute[item_name] = default_result
if execute:
json_report_path = join(GlobalConfig.JSON_REPORT_PATH, "json")
if not exists(json_report_path):
makedirs(json_report_path)
with open(f"{json_report_path}/detail_report.json", "w", encoding="utf-8") as _f:
_f.write(dumps(execute, indent=2, ensure_ascii=False))
res = Counter([execute.get(i).get("result") for i in execute])
with open(f"{json_report_path}/summarize.json", "w", encoding="utf-8") as _f:
_f.write(dumps(
{
"total": sum(res.values()),
"pass": res.get("pass", 0),
"fail": res.get("fail", 0),
"skip": res.get("skip", 0),
},
indent=2,
ensure_ascii=False
))
if session.config.option.allure_report_dir:
AllureReportExtend.environment_info(session, execute)
# 后续移除
if execute:
with open(f"{GlobalConfig.ROOT_DIR}/ci_result.json", "w", encoding="utf-8") as _f:
_f.write(dumps(execute, indent=2, ensure_ascii=False))
if session.config.option.pms_user and session.config.option.pms_password:
def send2pms(case_res_path, data_send_result_csv):
Send2Pms(
user=session.config.option.pms_user,
password=session.config.option.pms_password,
).send2pms(case_res_path, data_send_result_csv)
if async_send(session):
wait(session.all_thread_task, return_when=ALL_COMPLETED)
send2pms(session.case_res_path, session.data_send_result_csv)
session.t_executor.shutdown()
if finish_send(session):
send2pms(session.case_res_path, session.data_send_result_csv)
if not session.config.option.collectonly and session.config.option.top:
session.p.terminate()
system(
f"ps -ef | grep '{GlobalConfig.top_cmd}' | "
"cut -c 9-15 | xargs kill -9 > /dev/null 2>&1"
)
session.p.close()
if exists(GlobalConfig.TMPDIR):
# 清理临时模板图片
CmdCtl.run_cmd(
f"echo '{GlobalConfig.PASSWORD}' | sudo -S rm -rf {GlobalConfig.TMPDIR}",
interrupt=False,
out_debug_flag=False,
command_log=False,
)
# pylint: disable=unused-argument
def pytest_emoji_passed(config, head_line):
"""pytest_emoji_passed"""
return (
f"【 {datetime.now()} {head_line} || ✔ 】\n",
f"【 {datetime.now()} {head_line} || PASSED ✔ 】\n",
)
# pylint: disable=unused-argument
def pytest_emoji_failed(config, head_line):
"""pytest_emoji_failed"""
return (
f"【 {datetime.now()} {head_line} || ✘ 】\n",
f"【 {datetime.now()} {head_line} || FAILED ✘ 】\n",
)
# pylint: disable=unused-argument
def pytest_emoji_skipped(config, head_line):
"""pytest_emoji_skipped"""
return (
f"【 {datetime.now()} {head_line} || 🙄☛ 】\n",
f"【 {datetime.now()} {head_line} || SKIPPED 🙄☞ 】\n",
)
# pylint: disable=unused-argument
def pytest_emoji_error(config, head_line):
"""pytest_emoji_error"""
return (
f"【 {datetime.now()} {head_line} || ☹ 】\n",
f"【 {datetime.now()} {head_line} || ERROR ☹ 】\n",
)
class DuringfailingTerminalReporter(TerminalReporter):
"""测试过程中立即显示报错"""
def __init__(self, reporter):
TerminalReporter.__init__(self, reporter)
self._tw = reporter._tw
def pytest_collectreport(self, report):
"""pytest_collectreport"""
# 立即显示收集过程中发生的错误。
TerminalReporter.pytest_collectreport(self, report)
if report.failed:
if self.isatty:
self.rewrite("")
self.print_failure(report)
def pytest_runtest_logreport(self, report):
"""pytest_runtest_logreport"""
# 立刻显示运行测试期间发生的故障和错误
TerminalReporter.pytest_runtest_logreport(self, report)
if report.failed and not hasattr(report, "wasxfail"):
if self.verbosity <= 0:
self._tw.line()
self.print_failure(report)
def summary_failures(self):
"""summary failures"""
# 防止显示错误摘要,因为我们已经错误发生后立即显示错误。
def summary_errors(self):
"""summary errors"""
def print_failure(self, report):
"""print failure"""
if self.config.option.tbstyle != "no":
if self.config.option.tbstyle == "line":
line = self._getcrashline(report)
self.write_line(line)
else:
msg = self._getfailureheadline(report)
if report.when == "collect":
msg = "ERROR collecting " + msg
elif report.when == "setup":
msg = "ERROR at setup of " + msg
elif report.when == "teardown":
msg = "ERROR at teardown of " + msg
self.write_sep("_", msg)
if not self.config.getvalue("usepdb"):
self._outrep_summary(report)
def get_runs_id_deque(user, password, class_obj, func, _id):
"""get runs id deque"""
if not (user and password):
raise ValueError("缺少PMS用户名或密码")
runs_ids = getattr(class_obj(user, password), f"get_{func}_data")(_id)
if not runs_ids:
raise ValueError(f"task_id: {_id}, 没有获取关联的用例")
id_deque = deque()
for i in runs_ids:
id_deque.append(i.get("case_id"))
id_deque.append(i.get("from_case_id"))
return runs_ids, id_deque
def add_run_case_id(session, item, tag, runs_ids):
"""add run case id"""
if auto_send(session):
# 需要回填数据的时候才做
for i in runs_ids:
_case_id = i.get("case_id")
_from_case_id = i.get("from_case_id")
_run_case_id = i.get("run_case_id")
if tag in (_case_id, _from_case_id):
add_mark(item, _run_case_id, ("run_case_id",), {})
add_mark(item, _from_case_id, ("from_case_id",), {})
break
@pytest.fixture
def __pytest_repeat_step_number(request):
"""pytest repeat step number"""
marker = request.node.get_closest_marker("repeat")
repeat = marker and marker.args[0] or request.config.option.repeat
if repeat > 1:
return request.param
return None
def walk_apps(walk_dir):
"""walk apps"""
no_youqu_mark = {}
csv_path_dict = {}
for root, _, files in walk(walk_dir):
if "NOYOUQUMARK" in files and not no_youqu_mark.get(root):
no_youqu_mark[root] = True
continue
for file in files:
if file.endswith(".csv") and file != "case_list.csv":
csv_path_dict[splitext(file)[0]] = f"{root}/{file}"
return csv_path_dict, no_youqu_mark
@pytest.fixture(scope='module')
def native_page():
from playwright.sync_api import sync_playwright
driver = sync_playwright().start()
browser = driver.chromium.launch(
headless=False,
args=[
'--start-maximized',
],
)
context = browser.new_context(
ignore_https_errors=True,
no_viewport=True,
)
_page = context.new_page()
yield _page
context.close()
browser.close()
driver.stop()
@pytest.fixture(scope='module')
def page():
from playwright.sync_api import sync_playwright
driver = sync_playwright().start()
browser = driver.chromium.launch_persistent_context(