This repository has been archived by the owner on Jan 3, 2019. It is now read-only.
forked from karenc/cnx-vagrant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmtp_server.py
executable file
·72 lines (60 loc) · 2.33 KB
/
smtp_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#!/usr/bin/env python
# A smtp server that uses e-mail-tester.appspot.com api to send
# emails.
# Based on https://djangosnippets.org/snippets/96/
import asyncore
import datetime
import email
import json
from smtpd import SMTPServer
import urllib2
EMAIL_API_KEY = 'cnx-test'
def _get_content(mail):
text = ''
html = ''
if mail.is_multipart():
for part in mail.get_payload():
if part.get_content_type() == 'multipart/alternative':
return self._get_content(part)
if part.get_content_charset() is None:
continue
charset = part.get_content_charset()
_text = part.get_payload(decode=True).decode(str(charset))
if part.get_content_type() == 'text/plain':
text = _text
elif part.get_content_type() == 'text/html':
html = _text
else:
text = mail.get_payload(decode=True).decode(
str(mail.get_content_charset()))
return text, html
class MySMTPServer(SMTPServer):
def process_message(self, peer, mailfrom, rcpttos, data):
# peer: ('127.0.0.1', 33360) mailfrom: '[email protected]' rcpttos: ['[email protected]']
message = email.message_from_string(data)
subject = message.get('Subject')
text, html = _get_content(message)
data = json.dumps({
'to': rcpttos,
'subject': subject,
'body': text,
'html': html})
req = urllib2.Request(
'https://e-mail-tester.appspot.com/send-mail/{}'
.format(EMAIL_API_KEY),
data,
{'Content-Type': 'application/json'})
with open('smtpd.log', 'a') as f:
f.write('{} To: {} Subject: {} Response: {}\n'.format(
datetime.datetime.now().isoformat(),
rcpttos,
subject,
urllib2.urlopen(req).read()))
def run():
server = MySMTPServer(('localhost', 25), None)
try:
asyncore.loop()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
run()