diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..11041c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.egg-info diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7230ab9 --- /dev/null +++ b/LICENSE @@ -0,0 +1,24 @@ +Copyright (C) 2003-2017 Tiziano Zito , Jakob Jordan +Copyright (C) 2017-2023 ASPP + +# This is free software and comes without any warranty, to +# the extent permitted by applicable law. You can redistribute it +# and/or modify it under the terms of the Do What The Fuck You Want +# To Public License, Version 2, as published by Sam Hocevar. +# http://www.wtfpl.net + +Full license text: + +DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE + Version 2, December 2004 + + Copyright (C) 2004 Sam Hocevar + + Everyone is permitted to copy and distribute verbatim or modified + copies of this license document, and changing it is allowed as long + as the name is changed. + + DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE + TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION + + 0. You just DO WHAT THE FUCK YOU WANT TO. diff --git a/massmail b/massmail deleted file mode 100755 index 662997a..0000000 --- a/massmail +++ /dev/null @@ -1,327 +0,0 @@ -#!/usr/bin/env python -# Script to send mass email -# -# Copyright (C) 2003-2017 Tiziano Zito , Jakob Jordan -# -# This script is free software and comes without any warranty, to -# the extent permitted by applicable law. You can redistribute it -# and/or modify it under the terms of the Do What The Fuck You Want -# To Public License, Version 2, as published by Sam Hocevar. -# http://www.wtfpl.net -# -# Full license text: -# -# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE -# Version 2, December 2004. -# -# Everyone is permitted to copy and distribute verbatim or modified -# copies of this license document, and changing it is allowed as long -# as the name is changed. -# -# DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE -# TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION -# -# 0. You just DO WHAT THE FUCK YOU WANT TO. - -import smtplib, getopt, sys, os, email, getpass -import email.header -import email.mime.text -import email.utils -import io -import hashlib -import contextlib -import tempfile -import time -import subprocess -import base64 -import re - -PROGNAME = os.path.basename(sys.argv[0]) -USAGE = """Send mass mail -Usage: - %s [...] PARAMETER_FILE < BODY - -Options: - -F FROM set the From: header for all messages. - Must be ASCII. This argument is required - - -S SUBJECT set the Subject: header for all messages - - -B BCC set the Bcc: header for all messages. - Must be ASCII - - -C CC set the Cc: header for all messages. - Must be ASCII - - -R In-Reply-To set the In-Reply-To: takes a Message-ID as input - - -s SEPARATOR set field separator in parameter file, - default: ";" - - -e ENCODING set PARAMETER_FILE *and* BODY character set - encoding, default: "UTF-8". Note that if you fuck - up this one, your email will be full of rubbish: - You have been warned! - - -z SERVER the SMTP server to use. This argument is required - - -P PORT the SMTP port to use. Must be between 1 and 65535. - - -u SMTP user name. If not set, use anonymous SMTP - connection - - -p SMTP password. If not set you will be prompted for one - - -h show this usage message - -Notes: - The message body is read from standard input or - typed in interactively (exit with ^D) and keywords are subsituted with values - read from a parameter file. The first line of the parameter file defines - the keywords. The keyword $EMAIL$ must always be present and contains a comma - separated list of email addresses. - Keep in mind shell escaping when setting headers with white spaces or special - characters. - Character set encodings are those supported by python. - -Examples: - -* Example of a parameter file: - -$NAME$; $SURNAME$; $EMAIL$ -John; Smith; j@guys.com -Anne; Joyce; a@girls.com - -* Example of body: - -Dear $NAME$ $SURNAME$, - -I think you are a great guy/girl! - -Cheers, - -My self. - -* Example usage: - -%s -F "Great Guy " -S "You are a great guy" -B "Not so great Guy " parameter-file < body - -"""%(PROGNAME, PROGNAME) - -def error(s): - sys.stderr.write(PROGNAME+': ') - sys.stderr.write(s+'\n') - sys.stderr.flush() - sys.exit(-1) - -def parse_command_line_options(arguments): - """Parse options. - - Returns a dictionary of options. - - Arguments are checked for validity. - """ - try: - opts, args = getopt.getopt(arguments, "hs:F:S:B:C:R:e:u:p:P:z:") - except getopt.GetoptError as err: - error(str(err)+USAGE) - - # set default options - options = { - 'sep': u';', - 'from': '', - 'subject': '', - 'bcc': '', - 'cc': '', - 'encoding': 'utf-8', - 'smtpuser': None, - 'smtppassword': None, - 'server': None, - 'port': 0, - 'in_reply_to': '', - } - - for option, value in opts: - if option == "-e": - options['encoding'] = value - if option == "-s": - options['sep'] = value - elif option == "-F": - options['from'] = value - elif option == "-S": - options['subject'] = value - elif option == "-B": - options['bcc'] = value - elif option == "-C": - options['cc'] = value - elif option == "-R": - options['in_reply_to'] = value - elif option == "-h": - print(USAGE) - exit(0) - elif option == "-u": - options['smtpuser'] = value - elif option == "-p": - options['smtppassword'] = value - elif option == "-P": - options['port'] = int(value) - elif option == "-z": - options['server'] = value - - if len(args) == 0: - error('You must specify a parameter file') - - if len(options['from']) == 0: - error('You must set a from address with option -F') - - if options['server'] is None: - error('You must set a SMTP server with option -z') - - if options['sep'] == ",": - error('Separator can not be a comma') - - # set filenames of parameter and mail body - options['fn_parameters'] = args[0] - - return options - -def parse_parameter_file(options): - pars_fh = open(options['fn_parameters'],'rt') - pars = pars_fh.read() - pars_fh.close() - - if options['in_reply_to'] and not options['in_reply_to'].startswith('<'): - options['in_reply_to'] = '<{}>'.format(options['in_reply_to']) - - # split lines - pars = pars.splitlines() - - # get keywords from first line - keywords_list = [key.strip() for key in pars[0].split(options['sep'])] - - # fail immediately if no EMAIL keyword is found - if '$EMAIL$' not in keywords_list: - error('No $EMAIL$ keyword found in %s'%options['fn_parameters']) - - # check that all keywords start and finish with a '$' character - for key in keywords_list: - if not key.startswith('$') or not key.endswith('$'): - error('Keyword "%s" malformed: should be $KEYWORD$'%key) - - # gather all values - email_count = 0 - keywords = dict([(key, []) for key in keywords_list]) - for count, line in enumerate(pars[1:]): - # ignore empty lines - if len(line) == 0: - continue - values = [key.strip() for key in line.split(options['sep'])] - if len(values) != len(keywords_list): - error(('Line %d in "%s" malformed: %d values found instead of' - ' %d: %s'%(count+1,options['fn_parameters'],len(values),len(keywords_list),line)).encode(options['encoding'])) - for i, key in enumerate(keywords_list): - keywords[key].append(values[i]) - email_count += 1 - - return keywords, email_count - -def create_email_bodies(options, keywords, email_count, body): - # find keywords and substitute with values - # we need to create email_count bodies - msgs = {} - - for i in range(email_count): - lbody = re.sub(r'\$\w+\$', lambda m: keywords[m.group(0)][i], body) - - # encode body - lbody = email.mime.text.MIMEText(lbody.encode(options['encoding']), 'plain', options['encoding']) - msgs[keywords['$EMAIL$'][i]] = lbody - - return msgs - -def add_email_headers(options, msgs): - # msgs is now a dictionary with {emailaddr:body} - # we need to add the headers - - for emailaddr in msgs: - msg = msgs[emailaddr] - msg['To'] = str(emailaddr) - msg['From'] = email.header.Header(options['from']) - if options['subject']: - msg['Subject'] = email.header.Header(options['subject'].encode(options['encoding']), options['encoding']) - if options['in_reply_to']: - msg['In-Reply-To'] = email.header.Header(options['in_reply_to']) - if options['cc']: - msg['Cc'] = email.header.Header(options['cc']) - # add the required date header - msg['Date'] = email.utils.formatdate(localtime=True) - # add a unique message-id - msg['Message-ID'] = email.header.Header(generate_unique_id(msg)) - msgs[emailaddr] = msg - - return None - -def generate_unique_id(msg): - # Get headers and body in the message, hash it and convert to base64 - prefix = hashlib.sha256(msg.get_payload(decode=True)).hexdigest()[:16] - message_id = email.utils.make_msgid(idstring=prefix) - return message_id - -def send_messages(options, msgs): - - for emailaddr in msgs: - emails = [e.strip() for e in emailaddr.split(',')] - if len(options['bcc']) > 0: - emails.append(options['bcc']) - print('This email will be sent to:', ', '.join(emails)) - [print(hdr+':', value) for hdr, value in msgs[emailaddr].items()] - print() - print(msgs[emailaddr].get_payload(decode=True).decode(options['encoding'])) - - # ask for confirmation before really sending stuff - # we need to read input from the terminal, because stdin is taken already - sys.stdin = open('/dev/tty') - ans = input('Send the emails above? Type "Y" to confirm! ') - if ans != 'Y': - error('OK, exiting without sending anything!') - - print() - - server = smtplib.SMTP(options['server'], port=options['port']) - - if options['smtpuser'] is not None: - try: - server.starttls() - # get password if needed - if options['smtppassword'] is None: - options['smtppassword'] = getpass.getpass('Enter password for %s: '%options['smtpuser']) - server.login(options['smtpuser'], options['smtppassword']) - except Exception as err: - error(str(err)) - - print() - - for emailaddr in msgs: - print('Sending email to:', emailaddr) - emails = [e.strip() for e in emailaddr.split(',')] - if len(options['bcc']) > 0: - emails.append(options['bcc']) - try: - out = server.sendmail(options['from'], emails, msgs[emailaddr].as_string()) - except Exception as err: - error(str(err)) - - if len(out) != 0: - error(str(out)) - - server.close() - -def main(arguments): - options = parse_command_line_options(arguments) - keywords, email_count = parse_parameter_file(options) - msgs = create_email_bodies(options, keywords, email_count, sys.stdin.read()) - add_email_headers(options, msgs) - send_messages(options, msgs) - -if __name__ == '__main__': - main(sys.argv[1:]) diff --git a/massmail.py b/massmail.py deleted file mode 120000 index d662c24..0000000 --- a/massmail.py +++ /dev/null @@ -1 +0,0 @@ -massmail \ No newline at end of file diff --git a/massmail/__init__.py b/massmail/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/massmail/massmail.py b/massmail/massmail.py new file mode 100755 index 0000000..3e7936e --- /dev/null +++ b/massmail/massmail.py @@ -0,0 +1,310 @@ +#!/usr/bin/env python3 +import email +import mimetypes +import pathlib +import re +import smtplib + +import rich.prompt +import rich.panel +import rich.progress +from rich import print as rprint +import click +import email_validator + + +def parse_parameter_file(parameter_file): + name = parameter_file.name + pars = parameter_file.read() + pars = pars.splitlines() + + # get keywords from first line + key_list = [key.strip() for key in pars[0].split(';')] + + # fail immediately if no EMAIL keyword is found + if '$EMAIL$' not in key_list: + raise click.ClickException(f'No $EMAIL$ keyword found in {name}') + + # check that all keywords start and finish with a '$' character + for key in key_list: + if not key.startswith('$') or not key.endswith('$'): + raise click.ClickException(f'Keyword {key=} malformed in {name}: should be $KEY$') + + # gather all values + keys = dict([(key, []) for key in key_list]) + for count, line in enumerate(pars[1:]): + # ignore empty lines + if len(line) == 0: + continue + values = [value.strip() for value in line.split(';')] + if len(values) != len(key_list): + raise click.ClickException(f'Line {count+2} in {name} malformed: ' + f'{len(values)} found instead of {len(key_list)}') + #for idx, value in enumerate(values): + # # do not allow for empty values + # if value == '': + # raise click.ClickException(f'Line {count+1} in {name} malformed: ' + # f'empty value for key {key_list[idx]}') + for i, key in enumerate(key_list): + keys[key].append(values[i]) + + # validate email addresses + for idx, emails in enumerate(keys['$EMAIL$']): + # split into individual email addresses + validated = [] + for email in emails.split(','): + # remove spaces + email = email.strip() + validated.append(validate_email_address(email, + errstr=f'Line {idx+1} in {name}:\n')) + # store the validated and normalized email addresses back into the dict + keys['$EMAIL$'][idx] = ','.join(validated) + return keys + +def create_email_bodies(body_file, keys, fromh, subject, cc, bcc, inreply_to, attachment): + msgs = {} + body_text = body_file.read() + warned_once = False + # collect attachments once and then attach them to every single message + attachments = {} + for path in attachment: + # guess the MIME type based on file extension only... + mime, encoding = mimetypes.guess_type(path, strict=False) + # if no guess or if the type is already encoded, the MIME type is octet-stream + if mime is None or encoding is not None: + mime = 'application/octet-stream' + maintype, subtype = mime.split('/', 1) + data = path.read_bytes() + attachments[path.name] = (data, maintype, subtype) + + for i, emails in enumerate(keys['$EMAIL$']): + # find keywords and substitute with values + + # The following line does this: + # - Assume that the parameter file has an header like + # $NAME$;$SURNAME$;$EMAIL$ + # - Assume that line "i" from the parameter file look like this + # Gorilla; Blushing; email@gorillas.org + # - Then the following like is equivalent to having a loop over the keys + # from the parameter file: ($NAME$, $SURNAME$) + # - The m in the lambda is going to be the re.match object for $NAME$ at the + # first iteration, and the re.match object for $SURNAME$ at the second one + # - at every iteration, m.group(0) is the string of the matching key, i.e. + # it is "$NAME$" at the first iteration and "$SURNAME$" at the second one + # - keys[m.group(0)] is then equivalent, at the first iteration, to + # keys["$NAME$"] == ["Gorilla", "Othername1", "Othername2"] + # - keys[.group(0)][i] is then equivalent, at the first iteration, to + # keys["$NAME$"][i] == "Gorilla" + # if we assume that "Gorilla" is on line i + # - at the second iteration, we will have: + # keys[m.group(0)] == keys["$SURNAME$"] == ["Blushing", "Othersurname1",....] + # keys[m.group(0)][i] = keys["$SURNAME$"][i] == "Blushing" + # - we only have two iterations for re.sub, because we only have two keys + # in the body matching the regexp r'\$\w+\$' + # - so at the end all the values corresponding to the keys will be inserted + # into the body_text in place of the key + # - at the next iteration of i, we are going to select a different line from + # the parameter file, and generate a new email with different substitutions + try: + body = re.sub(r'\$\w+\$', lambda m: keys[m.group(0)][i], body_text) + except KeyError as err: + # an unknown key was detected + raise click.ClickException(f'Unknown key in body file {body_file.name}: {err}') + # warn if no keys were found + if body == body_text and not warned_once: + rprint(f'[bold][red]WARNING:[/red] no keys found in body file {body_file.name}[/bold]') + warned_once = True + + msg = email.message.EmailMessage() + msg.set_content(body) + msg['To'] = emails + msg['From'] = fromh + msg['Subject'] = subject + if inreply_to: + msg['In-Reply-To'] = inreply_to + if cc: + msg['Cc'] = cc + if bcc: + msg['Bcc'] = bcc + # add the required date header + msg['Date'] = email.utils.localtime() + # add a unique message-id + msg['Message-ID'] = email.utils.make_msgid() + # add attachments + for name, (data, mtyp, styp) in attachments.items(): + msg.add_attachment(data, filename=name, maintype=mtyp, subtype=styp) + msgs[emails] = msg + + return msgs + +def send_messages(msgs, server, user, password): + # print one example email for confirmation + ex_addr = list(msgs.keys())[0] + msg = msgs[ex_addr] + panel = [] + for hdr, value in msg.items(): + if hdr in ('From', 'Subject', 'Cc', 'Bcc', 'In-Reply-To'): + panel.append(f'[yellow]{hdr}[/yellow]: [red bold]{value}[/red bold]') + else: + panel.append(f'[yellow]{hdr}[/yellow]: {value}') + for attachment in msg.iter_attachments(): + name = attachment.get_filename() + content_type = attachment.get_content_type() + panel.append(f'[magenta]Attachment[/magenta] ([cyan]{content_type}[/cyan]): {name}') + body = msg.get_body().get_content() + panel.append(f'\n{body}') + rprint(rich.panel.Panel.fit('\n'.join(panel))) + + # ask for confirmation before really sending stuff + rprint(f'[bold]About to send {len(msgs)} email messages like the one above…[/bold]') + if not rich.prompt.Confirm.ask(f'[bold]Send?[/bold]'): + #if not click.confirm('Send the emails above?', default=None): + raise click.ClickException('Aborted! We did not send anything!') + + print() + + servername = server.split(':')[0] + try: + server = smtplib.SMTP(server) + except Exception as err: + raise click.ClickException(f'Can not connect to "{servername}": {err}') + + try: + server.starttls() + except Exception as err: + raise click.ClickException(f'Could not STARTTLS with "{servername}": {err}') + + if user is not None: + try: + # get password if needed + if password is None: + password = rich.prompt.Prompt.ask(f'Enter password for ' + f'[bold]{user}[/bold] ' + f'on [bold]{servername}[/bold]', + password=True) + server.login(user, password) + except Exception as err: + raise click.ClickException(f'Can not login to {servername}: {err}') + + print() + + for emailaddr in rich.progress.track(msgs, description="[green]Sending:[/green]"): + rprint(f'Sending to: [bold]{emailaddr}[/bold]') + try: + out = server.send_message(msgs[emailaddr]) + except Exception as err: + raise click.ClickException(f'Can not send email: {err}') + + if len(out) != 0: + raise click.ClickException(f'Can not send email: {err}') + + server.quit() + +def validate_inreply_to(context, param, value): + if value is None: + return None + if not (value.startswith('<') and value.endswith('>')): + raise click.BadParameter(f"must be enclosed in brackets (): {value}!") + return value + +def validate_email_address(value, errstr=''): + # we support two kind of email address: + # 1. x@y.org + # 2. Blushing Gorilla + if address := re.match(r'(.+)<(\S+)>', value): + # we are dealing with form 2 + # extract the email address + prefix = address.group(1) # here we get "Blushing Gorilla " + address = address.group(2) # here we get x@y.org + else: + # we are dealing with form 1 + prefix = None + address = value + # validate the email address + try: + emailinfo = email_validator.validate_email(address, check_deliverability=False) + except email_validator.EmailNotValidError as e: + raise click.BadParameter(errstr+f"{value!r} is not a valid email address:\n{str(e)}") + # support different versions of email-validator + try: + email = emailinfo.normalized # version >= 2.0 + except AttributeError: + email = emailinfo.email # version <= 1.3 + if prefix: + return f'{prefix}<{email}>' + else: + return email + + +# a custom click parameter type to represent email addresses +class Email(click.ParamType): + name = "Email" + + def convert(self, value, param, ctx): + # validate the email address + try: + return validate_email_address(value) + except click.BadParameter as e: + self.fail(str(e), param, ctx) + +@click.command(context_settings={'help_option_names': ['-h', '--help'], + 'max_content_width': 120}) + +### REQUIRED OPTIONS ### +@click.option('-F', '--from', 'fromh', required=True, type=Email(), help='set the From: header') +@click.option('-S', '--subject', required=True, help='set the Subject: header') +@click.option('-Z', '--server', required=True, help='the SMTP server to use') +@click.option('-P', '--parameter', 'parameter_file', required=True, + type=click.File(mode='rt', encoding='utf8', errors='strict'), + help='set the parameter file (see above for an example)') +@click.option('-B', '--body', 'body_file', required=True, + type=click.File(mode='rt', encoding='utf8', errors='strict'), + help='set the email body file (see above for an example)') + +### OPTIONALS ### +@click.option('-b', '--bcc', type=Email(), help='set the Bcc: header') +@click.option('-c', '--cc', type=Email(), help='set the Cc: header') +@click.option('-r', '--inreply-to', callback=validate_inreply_to, metavar="", + help='set the In-Reply-to: header. Set it to a Message-ID.') +@click.option('-u', '--user', help='SMTP user name. If not set, use anonymous SMTP connection') +@click.option('-p', '--password', help='SMTP password. If not set you will be prompted for one') +@click.option('-a', '--attachment', help='add attachment [repeat for multiple attachments]', + multiple=True, type=click.Path(exists=True, dir_okay=False, + readable=True, path_type=pathlib.Path)) + +### MAIN SCRIPT ### +def main(fromh, subject, server, parameter_file, body_file, bcc, cc, inreply_to, + user, password, attachment): + """Send mass mail + + Example: + + \b + massmail --from "Blushing Gorilla " --subject "Invitation to the jungle" --server mail.example.com:587 --user user@example.com -P parm.csv -B body.txt + + parm.csv (semi-colon separated): + + \b + $NAME$; $SURNAME$; $EMAIL$ + John; Smith; j@monkeys.com + Anne and Mary; Joyce; a@donkeys.com, m@donkeys.com + + body.txt: + + \b + Dear $NAME$ $SURNAME$, + we kindly invite you to join us in the jungle + Cheers, + Gorilla + + Notes: + + Values from the parameter file (parm.csv) are inserted in the body text (body.txt). The keyword $EMAIL$ must always be present in the parameter files and contains a comma separated list of email addresses. Keep in mind shell escaping when setting headers with white spaces or special characters. Both files must be UTF8 encoded! + """ + keys = parse_parameter_file(parameter_file) + msgs = create_email_bodies(body_file, keys, fromh, subject, cc, bcc, inreply_to, attachment) + send_messages(msgs, server, user, password) + +if __name__ == '__main__': + import sys + main(sys.argv[1:]) diff --git a/massmail/test_massmail.py b/massmail/test_massmail.py new file mode 100644 index 0000000..bb7e160 --- /dev/null +++ b/massmail/test_massmail.py @@ -0,0 +1,491 @@ +import email as email_module +import subprocess +import sys +import time + +from massmail.massmail import main as massmail +import click.testing +import pytest + +# 4096 bit, valid until year 2123 +# generated with: +# openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 36500 -nodes -subj '/CN=localhost' +TLS_KEY="""-----BEGIN PRIVATE KEY----- +MIIJQwIBADANBgkqhkiG9w0BAQEFAASCCS0wggkpAgEAAoICAQDL+HYMRZ21L8Eh +aR0wBgiK9ww/51iNYSKVffXphnt77B1o5A80m98B3v9epY48jE+Cbytl9PxlWHcn +jlGWqO/xSFiETmJokRcBr1s8Ij0tZYc2PuPwpLm+wN3rn1JX4wuskSXaxnDtUgZg +t7GYIsVxpg5qIdvMiV/68sho0pG9Y4LSw+mwy2rIXOwpCWuV0PKOq64++oK2P1N7 +x+B0iyWHvQd+SLKK7yLQr2Sh5iPq14KJkBe2mWjoujtGxo8Cp4aYE1nVAF1VV0Bx +Psp8xsKo6DEcljIw3MfI4gdYUYaxQPWmWb9141iMCoeCkAqrnSSUFdRf6QUhfesu +mNGyIx/d7iE5LsTT9i0kMdemM5R3b9IEwD6EXlJDOVKTZnw2tKqbBnzeIpeRF/qZ +HevOnhLc6AewJacEURFG2TNgkxCJ3SwFdzgIWbYz64J85Q8Bu3e0s4WUjl2XRgsa +IDAYn6tYWndMbGiHfjx60R153lIxHWAu+bAsV15WDI+s1wJT7RjG0Xs37WEcGhK4 +4eFdMtb9cKmWGFIFEhWx/6iFwRk/uPptE+QgMiuEkydUidw/6hQ0vJKnwFRCOcFa +ri4SrUgpgMjKsxNyAosGvi+cE7U7XmtbOOpKGeL1y05JFnUucaQfuCLzaiiI1V/R +HlaeU0ZNk3EQl09coNCzJtjxhHDvcwIDAQABAoICAFoP48Poa0npA5xmhuJBD72Y +dvqygnmupbAfdZk+7cBakePSK1qd5pqzZcvbSxI2HBdqUd3LjjSLmtVG9ISTJJtD +x/3nhHFKez+dt6m2LpAgb9MGcuw7N97f1z1mVFwFHw776hyPGabYXIORKKQV2luj +qGK5f41xLQWn9NDABWT8DvRUWBfdwdEloos+Ixh8MdXIPYCGaXfiP1D9AQFEvXYR +g8EBhYBuNc+yWjtYXIyhyvxFuQrB8z4rmOfX3aac5QO6K6Su7Ac2Jvi58nuk4afm +GmVWdmP34Gk1UGvxV4lltvHUWANMNrljHtGKG4QKN3ABsYwF3mOa72DcTl8bPkKc +peQxa3ZD0BqGeZhD7vUBsLKi1lzk9vxPoAWHcQGFztUSkPu2Nzmi1SJmhu3rPNa/ +eCJD8TzSotVR7sph2ERb5qtTe3kv1vM3pRCGKr8/yx5BoYokzsT1vqDOmUQLN+YF +zopLAlmC2iyPwhYVL1UsfH8L1+acffmlXZ7G2+RXIRzyq3IOHVi499UvLnnhP2E3 +QQmPziKNr1zMuabg0kr9vDlToUY9r6VRlx/ioOs6QlfUBJc7B6/2ElEFupFR2TlF +Lo+JZPgmybE9kAg2dYtQMLoVn9LEkDH1XzXM+cZvIgox7hBCVUvoKO6woA93fLGd +YB71OFyw15FVzsQI+cyNAoIBAQDcFY3a9qPSs3vQahQ2PtYOEv3FcWSJsDvYX8Dh +EeSNu+qCJhzHTXCoFwI91L0IOiOWI2xxBor/5ma2pH5toRfx6Vw9ygskKGK3gPlO +OXkDAypVWky9is09yp80cyx+1M5f/USZfhphN3rQhqWRyvV5M75Ibe3Sis60llS1 +aIV4hmTNZ9sQ0jAGnbL8Ym5Yq55eNX/whEEi1xZSfAuho+ChEBkh8/syRKdTlERc +8EGUip7PIr1KqAxXembcYVP+MbmIPlypzL5Th7vsIj5eBbcLdrhdDY/sBq5730p3 +19pMye49fdkx33eNSkBe6eanUdy4a77VnL9yuItwRDJtdGRPAoIBAQDtQbpmOQgn +WMfUaUuTDIEm9DcCAuRp4IPEHdZ8GpmAgYpjhrKAVeF72OjNglyCtYBHQ3GLOFZt +TqtibPHg+WS6JeBeXJze7STP4IO3JgaspEbO5Rj8wJEwHTZn7BZIX85RJrUduMUQ +oxKgBtMsSrN1IUjHGgOz7NMQRP/j+Beiej1beP/8BJIhGkONlvN9hUM1V6jSM6qy +Vyzm/vJjkT11THe7J36oB23Bncf3hC8Ii14TTBEohHDAbEjSeu0ZQGMc81zKQuyI +zRatFGNKwh52LHyw9zKI0C9bfXUVXe8zNe4e9dpdmsKBx3NoRirYiT/d2YDcrnzh +DjAULX+hcCWdAoIBADaM0SBYybpL6oB6CpB3eq76XhQ2Sukl2W+ELFadDL1kuneP +4sozk5zWNyQEOuZzIqbwGMzbBlDvVr4mf3/E0h6P7OET1zcbG3zIZwLQlAH/ItsN +CsBgSwbp1hQ2B+1X6d8482voKbm2+qX8+cTtPXLRNHTXan8pEJsKN+zO/2YkSY/w +EghVULoTFG4iJ5+qyhInyJJg9ZQhI9NGE8v4xpClYNVdmAGZqq+4rEks89RRl5NX +1PtQM97q49vz89GpmYb/jhA4Q2SI3DdnNXYwjHI29vN5jRa/gTgK3HZf9ifaVUbA +jrkh3owSv2nHJ/iI/eBoNGDV/U3+F/G3tZgTpVkCggEBAIZjjrPMZkPzU+2LXxWC +Jb3s4yOug7c9RyXVSOKvJnfV6I+LgpyTCM/gA640wzX+nRTArRYQ6VOtFgMAdtna +KiYOwlJw3yKe7RUatUEOtwUfYERdHJQ+d37rbR/caJrCOdlZtYmKWYWc+TXP59nU +zmXwXor4v1QxNzSmANQeeTS9TPf9R/J2nFdHyy/uaymUTIdwid3XCj9Ohc6qZp3j +bQ5+K+vE6UdAPflH6DbZltKeLsF7etSagEteirk+jAKbqAiECPFAiz7J/Kg5Pizg +W+TQOij7PJKmaczG+YUK2i0FxUWgOPqAaOCeG07bP/W7eIOvagCWjYHlSXKEeyD0 +pzkCggEBAKHAvy4SKqDZdEZcOZPMEmRIPX+BO0Ld58RE5Guc/X4zAA+CjWmUT6Cn +aHT3YQ/6bKaLKBLmsthRfKEyOA6tPlYqsRWGLv62AK0ZaYqzTypd/NGNO8jCtP07 +IkNlwVYq+Bx2mwqnCZev7R2QeJSzUpAAIRlNt0oCP3yAaoJXY3ulluBZSgjwrds7 +w+ZLd6cs7RoOwMYxfHmKAy+rlcpkds+IDEj0Gxtzy+S9GjN6Yxve9eumN6cepmCn +/qQRqjk4wJOo0oH9JS62L+0b75NRoVa7iciW8Ao22+aZYpRFY+27Nh3pR7PnJsHM +sBuZkVRS+PJe57phZv06aFFrv2VcXXg= +-----END PRIVATE KEY----- +""" + +TLS_CERT="""-----BEGIN CERTIFICATE----- +MIIFCzCCAvOgAwIBAgIULLLAwLBYIJVgJ/39zhjZq+YnzkcwDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJbG9jYWxob3N0MCAXDTIzMDQxNjEyMjIyMFoYDzIxMjMw +MzIzMTIyMjIwWjAUMRIwEAYDVQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEB +AQUAA4ICDwAwggIKAoICAQDL+HYMRZ21L8EhaR0wBgiK9ww/51iNYSKVffXphnt7 +7B1o5A80m98B3v9epY48jE+Cbytl9PxlWHcnjlGWqO/xSFiETmJokRcBr1s8Ij0t +ZYc2PuPwpLm+wN3rn1JX4wuskSXaxnDtUgZgt7GYIsVxpg5qIdvMiV/68sho0pG9 +Y4LSw+mwy2rIXOwpCWuV0PKOq64++oK2P1N7x+B0iyWHvQd+SLKK7yLQr2Sh5iPq +14KJkBe2mWjoujtGxo8Cp4aYE1nVAF1VV0BxPsp8xsKo6DEcljIw3MfI4gdYUYax +QPWmWb9141iMCoeCkAqrnSSUFdRf6QUhfesumNGyIx/d7iE5LsTT9i0kMdemM5R3 +b9IEwD6EXlJDOVKTZnw2tKqbBnzeIpeRF/qZHevOnhLc6AewJacEURFG2TNgkxCJ +3SwFdzgIWbYz64J85Q8Bu3e0s4WUjl2XRgsaIDAYn6tYWndMbGiHfjx60R153lIx +HWAu+bAsV15WDI+s1wJT7RjG0Xs37WEcGhK44eFdMtb9cKmWGFIFEhWx/6iFwRk/ +uPptE+QgMiuEkydUidw/6hQ0vJKnwFRCOcFari4SrUgpgMjKsxNyAosGvi+cE7U7 +XmtbOOpKGeL1y05JFnUucaQfuCLzaiiI1V/RHlaeU0ZNk3EQl09coNCzJtjxhHDv +cwIDAQABo1MwUTAdBgNVHQ4EFgQU3PuRab4JVPITf+ICU5HVOMpW0gIwHwYDVR0j +BBgwFoAU3PuRab4JVPITf+ICU5HVOMpW0gIwDwYDVR0TAQH/BAUwAwEB/zANBgkq +hkiG9w0BAQsFAAOCAgEAOkEwnaMyxc7NdJedwk5sl6yoP/yY/rKDKA1yCEwgolkF +k73F5VcaOcVhwoA/a2h8AwYOQTGv2YNZnRxLskooyKkLrbHoZn34/YO1kgaXAm/p +mtiVsdaoFxUrcE7GXkJDTz4cC8wHc8BFrK6xESk8FYl07DCjIK9kFBzTGLV234kR +OLpvFAoFbvVWlWDT3VLIGjP6EIp7JFE/neTpB8PguZ7dcNdgtoiWz19Pv3JYoWWj +DTSZXJgu6OUjVxWC0T1VBz/uv5n9GNEjLa23hdzhztigmcrOKgrX43cTzNhnu62R +6Tx1HUtkswvxNzhaEKt3TP6Jde7mP5J6DKi1HpfKuGDizFJzoEkmWQNfeCzWniw8 +W01qwU2xZ9AETQB2uAWoHF0vKRX8ZuzOBhK1vW9HVtSZFHJZ3OaS5/5TNvts8fvs +eK5DIjBOr2RQqw+V9JYoBPr3t8EUE1cBSI6THlkWMDvXm5wal0tZOF7HGjCxXnHY +Z7tS2M2dLHwsnOultNrtZn+zjSmmce2/YQzNqZcax5G9kTuU88OguqcDdB3pcWzY +U1iY1r9bTWpfIASnDpYz3RbE9jp5+ZgNjhA6l5lXk9/CHBQWyxL9d2Khp+3IV0t9 +TfvhHNbB/2KiW9Si3G4bfcyPrH60O9yCfmbLDUjRgH2UjbOIefWKcKyAz/ZA5Y0= +-----END CERTIFICATE----- +""" + +# return a locally running SMTP server. This fixture kills the server after the +# test run is finished, i.e. not after every test! +# The resulting server runs at localhost:8025 +@pytest.fixture(scope="module") +def server(tmp_path_factory): + tlsdir = tmp_path_factory.mktemp('tlsdir') + key = tlsdir / 'key' + key.write_text(TLS_KEY) + cert = tlsdir / 'cert' + cert.write_text(TLS_CERT) + server = subprocess.Popen([sys.executable, + '-m', 'aiosmtpd', + '--nosetuid', + '--debug', + '--tlscert', str(cert), + '--tlskey', str(key), + '--listen', 'localhost:8025', + '--class', 'aiosmtpd.handlers.Debugging', 'stderr'], + stdin=None, + text=False, + stderr=subprocess.PIPE, + stdout=None, + bufsize=0, + env={'AIOSMTPD_CONTROLLER_TIMEOUT':'0'}) + # give the smtp server 100 milliseconds to startup + time.sleep(0.1) + yield server + server.terminate() + +# return a "good" parameter file +@pytest.fixture +def parm(tmp_path): + header = '$NAME$;$SURNAME$;$EMAIL$' + row1 = 'Alice;Joyce;donkeys@jungle.com' + f = tmp_path / 'parms.csv' + f.write_text(header+'\n'+row1) + yield f + f.unlink() + +# return a "good" body file +@pytest.fixture +def body(tmp_path): + text = """Dear $NAME$ $SURNAME$, + + we kindly invite you to join us in the jungle + + Cheers, + Gorilla + """ + f = tmp_path / 'body.txt' + f.write_text(text) + yield f + f.unlink() + +def parse_smtp(server): + # we can not just issue a blank .read() because that would would block until + # server.stderr is closed, which only happens after the server has exited + # so we request 1MB (1024*1024 = 2^20 bytes = 1048576) to be sure + smtp = server.stderr.read(1048576) + protocol = [] + emails = [] + for line in smtp.splitlines(): + if line.startswith(b'INFO:mail.log:'): + # this is a line of protocol: SMTP server <-> SMTP client + protocol.append(line) + elif line.startswith(b'---------- MESSAGE FOLLOWS ----------'): + # this lines starts a new email message + email = [] + elif line.startswith(b'------------ END MESSAGE ------------'): + # this line closes an email message + # join all collected lines + email = b'\n'.join(email) # we are dealing with bytes here + # parse the whole thing to get a EmailMessage object from the email module + emails.append(email_module.message_from_bytes(email, + policy=email_module.policy.default)) + else: + # collect this line, it belongs to the current email message + email.append(line) + protocol = b'\n'.join(protocol) + # protocol chat should always be ascii, if we get a UnicodeDecodeError here + # we'll have to examine the server output to understand what's up + protocol = protocol.decode('ascii') + return protocol, emails + +# wrapper for running the massmail script and parse the SMTP server output +def cli(server, parm, body, opts={}, opts_list=[], errs=False, output=False): + # set default options + options = { + '--from' : 'Blushing Gorilla ', + '--subject' : 'Invitation to the jungle', + '--server' : 'localhost:8025', + '--parameter' : str(parm), + '--body' : str(body), + } + # options can be passed by a test as a dictionary or as a list + # as a dictionary + options.update(opts) + opts = [] + for option, value in options.items(): + opts.extend((option, value)) + # we need to allow passing options as a list for those options that can be + # passed multiple times, e.g. --attachment, where a dictionary wouldn't work + opts.extend(opts_list) + # now we have all default options + options passed by the test + # instantiate a click Runner + script = click.testing.CliRunner() + result = script.invoke(massmail, opts, input='y\n') + if errs: + # we expect errors, so do not interact with the SMTP server at all + # and read the errors from the script instead + assert result.exit_code != 0 + return result.output + else: + # parse the output of the SMTP server which is running in the background + protocol, emails = parse_smtp(server) + # in case of unexpected error, let's print the output so we have a chance + # to debug without touching the code here + if result.exit_code != 0: + print(result.output) + print(protocol) + assert result.exit_code == 0 + if output: + # return the protocol text and a list of emails and the output of the script + return protocol, emails, result.output + else: + # return the protocol text and a list of emails + return protocol, emails + +# just test that the cli is working and we get the right help text +def test_help(): + result = click.testing.CliRunner().invoke(massmail, ['-h']) + assert result.exit_code == 0 + assert 'Usage:' in result.output + assert 'Example:' in result.output + +def test_regular_sending(server, parm, body): + protocol, emails = cli(server, parm, body) + email = emails[0] + + # check that the envelope is correct + assert 'sender: gorilla@jungle.com' in protocol + assert 'recip: donkeys@jungle.com' in protocol + + # email is ASCII, so the transfer encoding should be 7bit + assert email['Content-Transfer-Encoding'] == '7bit' + + # check that the headers are correct + assert email['From'] == 'Blushing Gorilla ' + assert email['To'] == 'donkeys@jungle.com' + assert email['Subject'] == 'Invitation to the jungle' + assert 'Date' in email + assert 'Message-ID' in email + assert email.get_charsets() == ['utf-8'] + assert email.get_content_type() == 'text/plain' + + # check that we have inserted the right values for our keys + text = email.get_content() + assert 'Dear Alice Joyce' in text + assert 'we kindly invite you to join us in the jungle' in text + +def test_unicode_body_sending(server, parm, body): + # add some unicode text to the body + with body.open('at') as bodyf: + bodyf.write('\nÜni©ödę¿\n') + protocol, emails = cli(server, parm, body) + email = emails[0] + # unicode characters force the transfer encoding to 8bit + assert email['Content-Transfer-Encoding'] == '8bit' + text = email.get_content() + assert 'Üni©ödę¿' in text + +def test_wild_unicode_body_sending(server, parm, body): + # add some unicode text to the body + with body.open('at') as bodyf: + bodyf.write('\nœ´®†¥¨ˆøπ¬˚∆˙©ƒ∂ßåΩ≈ç√∫˜µ≤ユーザーコードa😀\n') + protocol, emails = cli(server, parm, body) + email = emails[0] + # because we use unicode characters that don't fit in one byte, + # the email will be encoded in base64 for tranfer + assert email['Content-Transfer-Encoding'] == 'base64' + # we have to trust the internal email_module machinery + # to perform the proper decoding + text = email.get_content() + assert 'œ´®†¥¨ˆøπ¬˚∆˙©ƒ∂ßåΩ≈ç√∫˜µ≤ユーザーコードa😀' in text + +def test_unicode_subject(server, parm, body): + opts = {'--subject' : 'Üni©ödę¿' } + protocol, emails = cli(server, parm, body, opts=opts) + email = emails[0] + assert email['Subject'] == 'Üni©ödę¿' + +def test_unicode_from(server, parm, body): + opts = { '--from' : 'Üni©ödę¿ ' } + protocol, emails = cli(server, parm, body, opts=opts) + email = emails[0] + assert email['From'] == 'Üni©ödę¿ ' + +def test_unicode_several_reciepients(server, parm, body): + # add some unicode text to the body + with parm.open('at') as parmf: + parmf.write('\nJohn; Smith; j@monkeys.com\n') + protocol, emails = cli(server, parm, body) + + assert len(emails) == 2 + assert 'sender: gorilla@jungle.com' in protocol + assert 'recip: donkeys@jungle.com' in protocol + assert 'recip: j@monkeys.com' in protocol + + assert emails[0]['To'] == 'donkeys@jungle.com' + assert emails[1]['To'] == 'j@monkeys.com' + assert 'Dear Alice Joyce' in emails[0].get_content() + assert 'Dear John Smith' in emails[1].get_content() + +def test_unicode_parm(server, parm, body): + # add some unicode text to the body + with parm.open('at') as parmf: + parmf.write('\nÜni©ödę¿; Smith; j@monkeys.com\n') + protocol, emails = cli(server, parm, body) + assert 'Dear Üni©ödę¿ Smith' in emails[1].get_content() + +def test_multiple_recipients_in_one_row(server, parm, body): + # add some unicode text to the body + with parm.open('at') as parmf: + parmf.write('\nAnne and Mary; Joyce; a@donkeys.com, m@donkeys.com\n') + protocol, emails = cli(server, parm, body) + assert len(emails) == 2 + assert 'sender: gorilla@jungle.com' in protocol + assert 'recip: donkeys@jungle.com' in protocol + assert 'recip: a@donkeys.com' in protocol + assert 'recip: m@donkeys.com' in protocol + + assert emails[0]['To'] == 'donkeys@jungle.com' + assert emails[1]['To'] == 'a@donkeys.com, m@donkeys.com' + assert 'Dear Alice Joyce' in emails[0].get_content() + assert 'Dear Anne and Mary Joyce' in emails[1].get_content() + + +def test_parm_malformed_keys(server, parm, body): + parm.write_text("""$NAME;$SURNAME$;$EMAIL$ + test;test;test@test.com""") + output = cli(server, parm, body, errs=True) + assert '$NAME' in output + assert 'malformed' in output + parm.write_text("""$NAME$;SURNAME$;$EMAIL$ + test;test;test@test.com""") + output = cli(server, parm, body, errs=True) + assert 'SURNAME$' in output + assert 'malformed' in output + +def test_missing_email_in_parm(server, parm, body): + parm.write_text("""$NAME$;$SURNAME$ + test;test""") + assert 'No $EMAIL$' in cli(server, parm, body, errs=True) + +def test_too_many_values_in_parm(server, parm, body): + with parm.open('at') as parmf: + parmf.write('\nMario;Rossi;j@monkeys.com;too much\n') + output = cli(server, parm, body, errs=True) + assert 'Line 2' in output + assert '4 found instead of 3' in output + +def test_missing_values_in_parm(server, parm, body): + with parm.open('at') as parmf: + parmf.write('\nMario;j@monkeys.com\n') + output = cli(server, parm, body, errs=True) + assert 'Line 2' in output + assert '2 found instead of 3' in output + +def test_empty_values_in_parm(server, parm, body): + with parm.open('at') as parmf: + parmf.write('\nMario;;j@monkeys.com\n') + output = cli(server, parm, body, errs=True) + assert 'Line 2' in output + assert 'empty value for key $SURNAME$' in output + +def test_unknown_key_in_body(server, parm, body): + # add some unknown key to the body + with body.open('at') as bodyf: + bodyf.write('\n$UNKNOWN$\n') + output = cli(server, parm, body, errs=True) + assert 'Unknown key in body' in output + assert '$UNKNOWN$' in output + +def test_no_key_in_body(server, parm, body): + # add some unknown key to the body + with body.open('wt') as bodyf: + bodyf.write('No keys here!\n') + protocol, emails, output = cli(server, parm, body, output=True) + assert 'WARNING: no keys found in body file' in output + assert 'No keys here!' in emails[0].get_content() + + +def test_server_offline(server, parm, body): + opts = {'--server' : 'noserver:25' } + assert 'Can not connect to' in cli(server, parm, body, opts=opts, errs=True) + +def test_server_wrong_authentication(server, parm, body): + opts = {'--user' : 'noone', '--password' : 'nopass' } + assert 'Can not login' in cli(server, parm, body, opts=opts, errs=True) + +def test_bcc(server, parm, body): + opts = {'--bcc' : 'x@monkeys.com'} + protocol, emails = cli(server, parm, body, opts=opts) + + assert len(emails) == 1 + assert 'sender: gorilla@jungle.com' in protocol + assert 'recip: donkeys@jungle.com' in protocol + assert 'recip: x@monkeys.com' in protocol + + assert emails[0]['To'] == 'donkeys@jungle.com' + assert 'Bcc' not in emails[0] + +def test_cc(server, parm, body): + opts = {'--cc' : 'Markus Murkis '} + protocol, emails = cli(server, parm, body, opts=opts) + + assert len(emails) == 1 + assert 'sender: gorilla@jungle.com' in protocol + assert 'recip: donkeys@jungle.com' in protocol + assert 'recip: x@monkeys.com' in protocol + + assert emails[0]['To'] == 'donkeys@jungle.com' + assert emails[0]['Cc'] == 'Markus Murkis ' + +def test_in_reply_to(server, parm, body): + opts = {'--inreply-to' : ''} + protocol, emails = cli(server, parm, body, opts=opts) + + assert len(emails) == 1 + assert emails[0]['In-Reply-To'] == '' + +def test_invalid_in_reply_to(server, parm, body): + opts = {'--inreply-to' : 'abc'} + output = cli(server, parm, body, opts=opts, errs=True) + assert 'Invalid value' in output + assert 'brackets' in output + +def test_validate_from(server, parm, body): + opts = {'--from' : 'invalid@email'} + assert 'is not a valid email' in cli(server, parm, body, opts=opts, errs=True) + opts = {'--from' : 'Blushing Gorilla'} + assert 'is not a valid email' in cli(server, parm, body, opts=opts, errs=True) + opts = {'--from' : 'Blushing Gorilla '} + assert 'is not a valid email' in cli(server, parm, body, opts=opts, errs=True) + +def test_invalid_email_in_parm(server, parm, body): + with parm.open('at') as parmf: + parmf.write('\nMario;Rossi;j@monkeys\n') + output = cli(server, parm, body, errs=True) + assert 'is not a valid email' in output + assert 'Line 2' in output + +def test_rich_email_address_in_parm(server, parm, body): + with parm.open('at') as parmf: + parmf.write('\nMario;Rossi;Mario Rossi \n') + protocol, emails = cli(server, parm, body) + assert 'recip: j@monkeys.org' in protocol + assert 'Mario Rossi' in emails[1]['To'] + assert 'j@monkeys.org' in emails[1]['To'] + +def test_attach_png_and_pdf(server, parm, body, tmp_path): + # create a little PNG (greyscale, 1x1 pixel) from: + # https://garethrees.org/2007/11/14/pngcrush/ + png_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x01\x00\x00\x00\x007n\xf9$\x00\x00\x00\nIDATx\x9cc`\x00\x00\x00\x02\x00\x01H\xaf\xa4q\x00\x00\x00\x00IEND\xaeB`\x82' + png_file = tmp_path / 'test.png' + png_file.write_bytes(png_bytes) + + # create a little PDF (stackoverflow + adaptations) + pdf_bytes = b"%PDF-1.2 \n9 0 obj\n<<\n>>\nstream\nBT/ 32 Tf(text)' ET\nendstream\nendobj\n4 0 obj\n<<\n/Type /Page\n/Parent 5 0 R\n/Contents 9 0 R\n>>\nendobj\n5 0 obj\n<<\n/Kids [4 0 R ]\n/Count 1\n/Type /Pages\n/MediaBox [ 0 0 250 50 ]\n>>\nendobj\n3 0 obj\n<<\n/Pages 5 0 R\n/Type /Catalog\n>>\nendobj\ntrailer\n<<\n/Root 3 0 R\n>>\n%%EOF" + pdf_file = tmp_path / 'test.pdf' + pdf_file.write_bytes(pdf_bytes) + + opts = ('--attachment', str(png_file), '--attachment', str(pdf_file)) + protocol, emails = cli(server, parm, body, opts_list=opts) + email = emails[0] + assert 'Dear Alice Joyce' in email.get_body().get_content() + attachments = list(email.iter_attachments()) + assert attachments[0].get_content_type() == 'image/png' + assert attachments[0].get_filename() == 'test.png' + assert attachments[0].get_content() == png_bytes + assert attachments[1].get_content_type() == 'application/pdf' + assert attachments[1].get_filename() == 'test.pdf' + assert attachments[1].get_content() == pdf_bytes + +def test_unknown_attachment_type(server, parm, body, tmp_path): + random_bytes = b'\x9diou\xd5\x12\xdf/\x03\xf8' + fl = tmp_path / 'dummy' + fl.write_bytes(random_bytes) + + opts = {'--attachment' : str(fl)} + protocol, emails = cli(server, parm, body, opts=opts) + attachments = list(emails[0].iter_attachments()) + assert attachments[0].get_content_type() == 'application/octet-stream' + assert attachments[0].get_content() == random_bytes diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..544660e --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,18 @@ +[project] +name = "massmail" +version = "0.2" +description = "Send mass mail" +authors = [ { name="ASPP", email ="info@aspp.school" } ] +license = { file = "LICENSE" } +requires-python = ">=3.6" +dependencies = [ "aiosmtpd", "click", "email-validator", "rich", "pytest" ] + +[project.scripts] +massmail = "massmail.massmail:main" + +[tool.setuptools] +packages = ["massmail"] + +[build-system] +requires = ["setuptools>=42"] +build-backend = "setuptools.build_meta" diff --git a/test_massmail.py b/test_massmail.py deleted file mode 100644 index 5ba1203..0000000 --- a/test_massmail.py +++ /dev/null @@ -1,25 +0,0 @@ -import tempfile - -import massmail - -def test_dummy(): - pass - -def test_command_help(): - import pytest - with pytest.raises(SystemExit): - massmail.main(['-h']) - -def test_parse_parameter_file(): - expected_keywords = {u'$VALUE$': [u'this is a test'], u'$EMAIL$': [u'testrecv@test']} - with tempfile.NamedTemporaryFile('wt') as f: - f.write('$EMAIL$;$VALUE$\ntestrecv@test;this is a test') - f.flush() - cmd_options = [ - '-F', 'testfrom@test', - '-z', 'localhost', - f.name, - ] - options = massmail.parse_command_line_options(cmd_options) - keywords, email_count = massmail.parse_parameter_file(options) - assert keywords == expected_keywords diff --git a/test_sending.py b/test_sending.py deleted file mode 100644 index 5f9a859..0000000 --- a/test_sending.py +++ /dev/null @@ -1,87 +0,0 @@ -import contextlib -import email -import io -import sys -import os -import time -import subprocess -import base64 -import tempfile - -import massmail - -@contextlib.contextmanager -def replace_stdin(text): - input = io.StringIO(text) - old = sys.stdin - sys.stdin, old = input, sys.stdin - try: - yield - finally: - sys.stdin = old - -@contextlib.contextmanager -def fake_smtp_server(address): - devnull = open(os.devnull, 'w') - server = subprocess.Popen(['python2', - '-m', 'smtpd', - '-n', - '-d', - '-c', 'DebuggingServer', - address], - stdin=devnull, - stdout=devnull, - stderr=subprocess.PIPE) - try: - time.sleep(1) - yield server - finally: - server.terminate() - -def test_local_sending(): - parameter_string = '$EMAIL$;$NAME$;$VALUE$\ntestrecv@test.org;TestName;531' - email_body = 'Dear $NAME$,\nthis is a test: $VALUE$\nBest regards' - email_to = 'testrecv@test.org' - email_from = 'testfrom@test.org' - email_subject = 'Test Subject' - email_encoding = 'utf-8' - - expected_email = email.mime.text.MIMEText('Dear TestName,\nthis is a test: 531\nBest regards'.encode(email_encoding), 'plain', email_encoding) - expected_email['To'] = email_to - expected_email['From'] = email_from - expected_email['Subject'] = email.header.Header(email_subject.encode(email_encoding), email_encoding) - - with tempfile.NamedTemporaryFile('wt') as f: - f.write(parameter_string) - f.flush() - cmd_options = [ - '-F', email_from, - '-S', email_subject, - '-z', 'localhost', - '-e', email_encoding, - f.name - ] - options = massmail.parse_command_line_options(cmd_options) - keywords, email_count = massmail.parse_parameter_file(options) - msgs = massmail.create_email_bodies(options, keywords, email_count, email_body) - massmail.add_email_headers(options, msgs) - assert msgs['testrecv@test.org'].as_string() == expected_email.as_string() - -def test_fake_sending(): - address = 'localhost:8025' - with tempfile.NamedTemporaryFile('wt') as f: - f.write('$EMAIL$;$VALUE$\ntestrecv@test;this is a test') - f.flush() - - with fake_smtp_server(address) as server: - with replace_stdin('EMAIL=$EMAIL$\nVALUE=$VALUE$'): - massmail.main(['-F', 'fake@foobar.com', - '-z', address, - f.name]) - - output = server.stderr.read() - assert b'MAIL FROM:' in output - assert b'RCPT TO:' in output - - encoded = base64.b64encode(b'EMAIL=testrecv@test\nVALUE=this is a test') - assert encoded in output