-
Notifications
You must be signed in to change notification settings - Fork 9
/
testTypeSimple.py
100 lines (77 loc) · 2.7 KB
/
testTypeSimple.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import shlex
from dataclasses import dataclass
from ktoolbox import common
import task
import tftbase
from task import ClientTask
from task import ServerTask
from task import TaskOperation
from testSettings import TestSettings
from testType import TestTypeHandler
from tftbase import BaseOutput
from tftbase import FlowTestOutput
from tftbase import TestType
@dataclass(frozen=True)
class TestTypeHandlerSimple(TestTypeHandler):
def __init__(self) -> None:
super().__init__(TestType.SIMPLE)
def _create_server_client(self, ts: TestSettings) -> tuple[ServerTask, ClientTask]:
s = SimpleServer(ts=ts)
c = SimpleClient(ts=ts, server=s)
return (s, c)
TestTypeHandler.register_test_type(TestTypeHandlerSimple())
CMD_SIMPLE_TCP_SERVER_CLIENT = "simple-tcp-server-client"
class SimpleServer(task.ServerTask):
def cmd_line_args(self) -> list[str]:
return [
CMD_SIMPLE_TCP_SERVER_CLIENT,
"--server",
"--addr",
"0.0.0.0",
"--port",
f"{self.port}",
*(self.ts.cfg_descr.get_server().args or ()),
]
def get_template_args(self) -> dict[str, str | list[str]]:
extra_args: dict[str, str | list[str]] = {}
if self.exec_persistent:
extra_args["args"] = self.cmd_line_args()
return {
**super().get_template_args(),
**extra_args,
}
def _create_setup_operation_get_thread_action_cmd(self) -> str:
return shlex.join(self.cmd_line_args())
def _create_setup_operation_get_cancel_action_cmd(self) -> str:
return "killall python3"
class SimpleClient(task.ClientTask):
def cmd_line_args(self) -> list[str]:
return [
CMD_SIMPLE_TCP_SERVER_CLIENT,
"--addr",
f"{self.get_target_ip()}",
"--port",
f"{self.port}",
"--duration",
f"{self.get_duration()}",
*(self.ts.cfg_descr.get_client().args or ()),
]
def _create_task_operation(self) -> TaskOperation:
cmd = shlex.join(self.cmd_line_args())
def _thread_action() -> BaseOutput:
self.ts.clmo_barrier.wait()
r = self.run_oc_exec(cmd)
self.ts.event_client_finished.set()
return FlowTestOutput(
success=r.success,
tft_metadata=self.ts.get_test_metadata(),
command=cmd,
result={
"result": common.dataclass_to_dict(r),
},
bitrate_gbps=tftbase.Bitrate.NA,
)
return TaskOperation(
log_name=self.log_name,
thread_action=_thread_action,
)