-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathworker.py
143 lines (123 loc) · 4.02 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
# -*- coding: UTF-8 -*-
"""
worker.py
Author: Brett Lykins ([email protected])
Description: Handle launching of rq workers
"""
from argparse import ArgumentParser, Namespace
from logging import basicConfig, getLogger
from naas.library.netmiko_lib import netmiko_send_command, netmiko_send_config # noqa F401
from redis import Redis
from rq import Connection, Worker, Queue
from multiprocessing import Process
from socket import gethostname
from time import sleep
from typing import Optional, Sequence
logger = getLogger("naas_worker")
def main() -> None:
"""
Launch rq workers, default of 50
:return:
"""
# Parse some args
args = arg_parsing()
# Setup logging
basicConfig(
level=args.log_level,
format="[%(asctime)s] [%(process)d] [%(levelname)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S %z",
)
# Sleep 10 seconds to allow Redis to come up
logger.debug("Sleeping %s seconds to allow Redis to initialize.", args.sleep)
sleep(args.sleep)
# Launch the workers
logger.debug("Creating %s workers", args.workers)
processes = []
hostname = gethostname()
for w in range(1, args.workers + 1):
proc = Process(
target=worker_launch,
kwargs={
"name": f"naas_{hostname}_{w}",
"queues": args.queues,
"redis_host": args.redis,
"redis_port": args.port,
"redis_pw": args.auth_password,
"log_level": args.log_level,
},
)
processes.append(proc)
proc.start()
def arg_parsing() -> Namespace:
"""
Parse the CLI arguments and return them in an Argparse Namespace
:return:
"""
argparser = ArgumentParser(description="RQ Multi-worker Launcher")
argparser.add_argument(
"workers", type=int, nargs="?", default=100, help="The number of workers to launch. Default: 100"
)
argparser.add_argument(
"-q",
"--queues",
type=str,
nargs="+",
default="naas",
help="What queue(s) are we are working out of? Default: naas",
)
argparser.add_argument(
"-r", "--redis", type=str, default="redis", help="What Redis server are we using? Defualt: redis"
)
argparser.add_argument(
"-p", "--port", type=int, default=6379, help="What port is the Redis server listening on? Default: 6379"
)
argparser.add_argument(
"-a", "--auth_password", type=str, help="Password if the Redis server requires authentication."
)
argparser.add_argument(
"-s",
"--sleep",
type=int,
default=10,
nargs="?",
help="How many seconds to sleep to give Redis a chance to initialize. Default: 10",
)
argparser.add_argument(
"-l",
"--log_level",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default="INFO",
help="What log-level are we to log at",
)
return argparser.parse_args()
def worker_launch(
name: str, queues: Sequence[Queue], redis_host: str, redis_port: int, log_level: str, redis_pw: Optional[str] = None
) -> None:
"""
Function for launching an rq worker
:param name:
:param queues:
:param redis_host:
:param redis_port:
:param redis_pw:
:param log_level:
:return:
"""
# Initialize our Redis connection
logger.debug("Initializing Redis connection to redis://%s:%s", redis_host, str(redis_port))
redis_conn_dict = {"host": redis_host, "port": redis_port}
if redis_pw:
redis_conn_dict["password"] = redis_pw
with Connection(connection=Redis(**redis_conn_dict)) as redis_conn:
logger.debug(
"Starting rq worker %s, with connection to redis://%s:%s, to watch the following queue(s): %s",
name,
redis_host,
redis_port,
queues,
)
w = Worker(queues=queues, name=name, connection=redis_conn)
w.work(logging_level=log_level)
if __name__ == "__main__":
main()