-
Notifications
You must be signed in to change notification settings - Fork 0
/
get_files_from_imap.py
118 lines (106 loc) · 4.14 KB
/
get_files_from_imap.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import imaplib
import configparser
import os
import email.header
import logging
from tqdm import tqdm
from argh import *
from pprint import pprint
class imap_file_crawler():
def __init__(self, hostname, username, password, download_folder, port=993, loglevel="WARNING"):
logging.basicConfig(level=loglevel)
self.hostname = hostname
self.port = port
self.username = username
self.password = password
self.download_folder = download_folder
def connect(self):
logging.info(f"Connecting to {self.hostname}")
connection = imaplib.IMAP4_SSL(self.hostname, port=self.port)
logging.info(f"Logging in as {self.username}")
connection.login(self.username, self.password)
self.connection = connection
def disconnect(self):
if 'connection' in dir(self):
self.connection.logout()
@staticmethod
def trueValidator(dummy):
return True
def crawl(self, folder='INBOX', imap_filter='ALL', validator=True):
self.connection.select(folder, readonly=self.trueValidator)
typ, msg_ids = self.connection.search(None, imap_filter)
for id in tqdm(msg_ids[0].split()):
try:
result, email_data = self.connection.fetch(id, '(RFC822)')
if email_data[0] is None:
logging.warning(f"{id} is None, result was: {result}")
continue
email_message = email.message_from_bytes(email_data[0][1])
for part in email_message.walk():
if part.get_content_maintype() == 'multipart':
logging.debug("skipped multipart")
continue
if part.get('Content-Disposition') is None:
logging.debug("skipped Content-Disposition=None")
continue
if part.get('Content-Disposition') == "inline":
logging.debug("skipped Content-Disposition=inline")
continue
filename = part.get_filename()
if filename is None:
if self.verbose: pprint(part.items())
continue
if validator(filename):
att_path = os.path.join(self.download_folder, filename)
if not os.path.isfile(att_path):
fp = open(att_path, 'wb')
fp.write(part.get_payload(decode=True))
fp.close()
except Exception as e:
if 'email_data' in locals():
pprint(email_data)
self.disconnect()
raise
# example validator that check if file has ".pdf" extension
def isPDF(filename):
return filename.endswith(".pdf")
@named('cli')
@arg('-v', '--verbosity', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
def run(hostname : "Mailserver hostname", username : "Mail account username",
password : "Mail account password",
download_folder : "Folder to which the files will be downloaded" = "/tmp",
port : "Mailserver SSL port" = 993,
folder : "The imap folder you want to crawl" = "INBOX",
verbosity : "Changes output verbosity" = "WARNING"):
'''
run with parameters supplied to command
'''
try:
crawler = imap_file_crawler(hostname, username, password, download_folder, port, verbosity)
crawler.connect()
crawler.crawl(folder=folder, validator=isPDF)
finally:
crawler.disconnect()
@named('config')
@aliases('cfg')
@arg('-v', '--verbosity', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
def run_with_configfile(configfile : "Config file in ini format" = "config.ini",
verbosity : "Changes output verbosity" = ""):
'''
run with parameters from config file
'''
config = configparser.ConfigParser()
config.read(configfile)
if not verbosity:
verbosity = config.get('general', 'verbose', fallback="WARNING")
hostname = config.get('server', 'hostname')
port = config.get('server', 'port', fallback='993')
username = config.get('account', 'username')
password = config.get('account', 'password')
imap_folder = config.get('account', 'folder', fallback='INBOX')
download_folder = config.get('download', 'folder', fallback='/tmp')
run(hostname, username, password, download_folder, port, imap_folder, verbosity)
if __name__ == '__main__':
p = ArghParser()
p.add_commands([run_with_configfile, run])
dispatch(p)