-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwebhook-cd.py
executable file
·175 lines (151 loc) · 6.12 KB
/
webhook-cd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!./venv/bin/python
#
# Author: jon4hz
# Date: 24.03.2021
# Desc: Webhook for cd
###################################################################################################
# loggging
import logging
logging.basicConfig(level=logging.INFO,
format="{asctime} [{levelname:8}] {module}: {message}",
style="{")
try:
from hashlib import sha1
import hmac
import docker
import importlib
from flask import Flask, request
from waitress import serve
import config
except ImportError as e:
logging.error(f'Could not import all modules - {e}')
exit()
app = Flask(__name__)
def verify_signature(req, secret) -> bool:
# check the signature or return False
try:
received_sign = req.headers.get('X-Hub-Signature').split('sha1=')[-1].strip()
secret = secret.encode()
expected_sign = hmac.HMAC(key=secret, msg=req.data, digestmod=sha1).hexdigest()
return hmac.compare_digest(received_sign, expected_sign)
except Exception as e:
print(e)
return False
def set_properties(old, new):
# stolen from ouroboros (thanks!)
"""Store object for spawning new container in place of the one with outdated image"""
properties = {
'name': old.name,
'hostname': old.attrs['Config']['Hostname'],
'user': old.attrs['Config']['User'],
'detach': True,
'domainname': old.attrs['Config']['Domainname'],
'tty': old.attrs['Config']['Tty'],
'ports': None if not old.attrs['Config'].get('ExposedPorts') else [
(p.split('/')[0], p.split('/')[1]) for p in old.attrs['Config']['ExposedPorts'].keys()
],
'volumes': None if not old.attrs['Config'].get('Volumes') else [
v for v in old.attrs['Config']['Volumes'].keys()
],
'working_dir': old.attrs['Config']['WorkingDir'],
'image': new.tags[0],
'command': old.attrs['Config']['Cmd'],
'host_config': old.attrs['HostConfig'],
'labels': old.attrs['Config']['Labels'],
'entrypoint': old.attrs['Config']['Entrypoint'],
'environment': old.attrs['Config']['Env'],
'healthcheck': old.attrs['Config'].get('Healthcheck', None)
}
return properties
def get_containers(container_names):
containers = []
for name in container_names:
try:
containers.append(client.containers.get(name))
except docker.errors.NotFound as e:
logging.error(e)
return containers
def remove_container(container):
try:
container.remove()
except docker.errors.APIError as e:
logging.error(e)
def stop_container(container):
try:
container.kill()
except docker.errors.APIError as e:
logging.error(e)
def recreate_containers(container_names):
containers = get_containers(container_names)
for container in containers:
current_image = container.image
image_name = container.attrs['Config']['Image']
new_image = client.images.pull(image_name)
if new_image.id != current_image.id:
properties = set_properties(container, new_image)
# remove old container
stop_container(container)
remove_container(container)
# create new container
created = client.api.create_container(**properties)
new_container = client.containers.get(created.get("Id"))
# connect the new container to all networks of the old container
for network_name, network_config in container.attrs['NetworkSettings']['Networks'].items():
network = client.networks.get(network_config['NetworkID'])
try:
network.disconnect(new_container.id, force=True)
except docker.errors.APIError:
pass
new_network_config = {
'container': new_container,
'aliases': network_config['Aliases'],
'links': network_config['Links']
}
if network_config['IPAMConfig']:
new_network_config.update(
{
'ipv4_address': network_config['IPAddress'],
'ipv6_address': network_config['GlobalIPv6Address']
}
)
try:
network.connect(**new_network_config)
except docker.errors.APIError as e:
if any(err in str(e) for err in ['user configured subnets', 'user defined networks']):
if new_network_config.get('ipv4_address'):
del new_network_config['ipv4_address']
if new_network_config.get('ipv6_address'):
del new_network_config['ipv6_address']
network.connect(**new_network_config)
else:
logging.error('Unable to attach updated container to network "%s". Error: %s', network.name, e)
new_container.start()
logging.info(f'Successfully updated container {container.name}')
else:
logging.info(f'No new image found for container {container.name}')
@app.route('/webhooks/containers', methods=['POST'])
def main():
if request.method == 'POST':
try:
importlib.reload(config)
except Exception as e:
logging.error(f'Could not reload config - {e}')
if verify_signature(request, config.WEBHOOK_SECRET):
try:
recreate_containers(config.DOCKER_CONTAINERS)
except Exception as e:
logging.error(e)
return 'Success', 200
return 'Forbidden', 403
return 'Not allowed', 405
if __name__ == '__main__':
# create docker client
try:
client = docker.from_env()
except docker.errors.DockerException as e:
logging.error(f'Could not create docker client - {e}')
# create webhook server
try:
serve(app, host='0.0.0.0', port=8888)
except Exception as e:
logging.error(f'Could not create webhook server - {e}')