diff --git a/README.rst b/README.rst index 8665487..e56b75f 100644 --- a/README.rst +++ b/README.rst @@ -94,7 +94,7 @@ Email attributes Email has 2 basic body variants: text and html. Sender can choose to include: one, other, both or neither(rare). -MailMessage and MailAttachment public attributes are cached by functools.lru_cache +MailMessage and MailAttachment public attributes are cached by functools.cached_property .. code-block:: python @@ -428,7 +428,8 @@ Big thanks to people who helped develop this library: `K900 `_, `homoLudenus `_, `sphh `_, -`bh `_ +`bh `_, +`tomasmach `_ Help the project ---------------- diff --git a/docs/dev_notes.txt b/docs/dev_notes.txt index d6abb73..b3b2264 100644 --- a/docs/dev_notes.txt +++ b/docs/dev_notes.txt @@ -40,6 +40,11 @@ icons 📨 📬 📪 📭 📫 ✉ 📧 🖂 🖃 🖅 📩 +fetch +===== +FULL - Macro equivalent to: (FLAGS INTERNALDATE RFC822.SIZE ENVELOPE BODY) + + code ==== def eml_to_python_structs(eml_path: str): diff --git a/docs/release_notes.rst b/docs/release_notes.rst index 9aa6e37..407f38e 100644 --- a/docs/release_notes.rst +++ b/docs/release_notes.rst @@ -1,3 +1,10 @@ +1.9.1 +===== +* Replaced: functools.lru_cache to functools.cached_property +* Replaced: .format() to f'' +* Optimized: speed for imap_utf7 +* Replaced: typing.AnyStr to utils.StrOrBytes + 1.9.0 ===== * Added: __str__ to MailAttachment diff --git a/docs/todo.txt b/docs/todo.txt index c6444d4..8d4fc3d 100644 --- a/docs/todo.txt +++ b/docs/todo.txt @@ -1,7 +1,4 @@ -FULL - Macro equivalent to: (FLAGS INTERNALDATE RFC822.SIZE ENVELOPE BODY) - add servers for tests rambler GMX Mail diff --git a/imap_tools/__init__.py b/imap_tools/__init__.py index c5f7482..d9562fb 100644 --- a/imap_tools/__init__.py +++ b/imap_tools/__init__.py @@ -11,4 +11,4 @@ from .utils import EmailAddress from .errors import * -__version__ = '1.9.0' +__version__ = '1.9.1' diff --git a/imap_tools/errors.py b/imap_tools/errors.py index 29b9bd8..ce52a73 100644 --- a/imap_tools/errors.py +++ b/imap_tools/errors.py @@ -21,8 +21,8 @@ def __init__(self, command_result: tuple, expected: Any): self.expected = expected def __str__(self): - return 'Response status "{exp}" expected, but "{typ}" received. Data: {data}'.format( - exp=self.expected, typ=self.command_result[0], data=str(self.command_result[1])) + return (f'Response status "{self.expected}" expected, ' + f'but "{self.command_result[0]}" received. Data: {str(self.command_result[1])}') class MailboxFolderSelectError(UnexpectedCommandStatusError): diff --git a/imap_tools/folder.py b/imap_tools/folder.py index 937c458..da30e5f 100644 --- a/imap_tools/folder.py +++ b/imap_tools/folder.py @@ -1,9 +1,9 @@ import re -from typing import AnyStr, Optional, Iterable, List, Dict, Tuple +from typing import Optional, Iterable, List, Dict, Tuple from .imap_utf7 import utf7_decode from .consts import MailBoxFolderStatusOptions -from .utils import check_command_status, pairs_to_dict, encode_folder +from .utils import check_command_status, pairs_to_dict, encode_folder, StrOrBytes from .errors import MailboxFolderStatusValueError, MailboxFolderSelectError, MailboxFolderCreateError, \ MailboxFolderRenameError, MailboxFolderDeleteError, MailboxFolderStatusError, MailboxFolderSubscribeError @@ -24,8 +24,7 @@ def __init__(self, name: str, delim: str, flags: Tuple[str, ...]): self.flags = flags def __repr__(self): - return "{}(name={}, delim={}, flags={})".format( - self.__class__.__name__, repr(self.name), repr(self.delim), repr(self.flags)) + return f"{self.__class__.__name__}(name={repr(self.name)}, delim={repr(self.delim)}, flags={repr(self.flags)})" def __eq__(self, other): return all(getattr(self, i) == getattr(other, i) for i in self.__slots__) @@ -38,7 +37,7 @@ def __init__(self, mailbox): self.mailbox = mailbox self._current_folder = None - def set(self, folder: AnyStr, readonly: bool = False) -> tuple: + def set(self, folder: StrOrBytes, readonly: bool = False) -> tuple: """Select current folder""" result = self.mailbox.client.select(encode_folder(folder), readonly) check_command_status(result, MailboxFolderSelectError) @@ -49,7 +48,7 @@ def exists(self, folder: str) -> bool: """Checks whether a folder exists on the server.""" return len(self.list('', folder)) > 0 - def create(self, folder: AnyStr) -> tuple: + def create(self, folder: StrOrBytes) -> tuple: """ Create folder on the server. Use email box delimiter to separate folders. Example for "|" delimiter: "folder|sub folder" @@ -67,20 +66,20 @@ def get(self) -> Optional[str]: """ return self._current_folder - def rename(self, old_name: AnyStr, new_name: AnyStr) -> tuple: + def rename(self, old_name: StrOrBytes, new_name: StrOrBytes) -> tuple: """Rename folder from old_name to new_name""" result = self.mailbox.client._simple_command( 'RENAME', encode_folder(old_name), encode_folder(new_name)) check_command_status(result, MailboxFolderRenameError) return result - def delete(self, folder: AnyStr) -> tuple: + def delete(self, folder: StrOrBytes) -> tuple: """Delete folder""" result = self.mailbox.client._simple_command('DELETE', encode_folder(folder)) check_command_status(result, MailboxFolderDeleteError) return result - def status(self, folder: Optional[AnyStr] = None, options: Optional[Iterable[str]] = None) -> Dict[str, int]: + def status(self, folder: Optional[StrOrBytes] = None, options: Optional[Iterable[str]] = None) -> Dict[str, int]: """ Get the status of a folder :param folder: mailbox folder, current folder if None @@ -97,7 +96,7 @@ def status(self, folder: Optional[AnyStr] = None, options: Optional[Iterable[str if opt not in MailBoxFolderStatusOptions.all: raise MailboxFolderStatusValueError(str(opt)) status_result = self.mailbox.client._simple_command( - command, encode_folder(folder), '({})'.format(' '.join(options))) + command, encode_folder(folder), f'({" ".join(options)})') check_command_status(status_result, MailboxFolderStatusError) result = self.mailbox.client._untagged_response(status_result[0], status_result[1], command) check_command_status(result, MailboxFolderStatusError) @@ -105,7 +104,7 @@ def status(self, folder: Optional[AnyStr] = None, options: Optional[Iterable[str values = status_data.decode().split('(')[-1].split(')')[0].split(' ') return {k: int(v) for k, v in pairs_to_dict(values).items() if str(v).isdigit()} - def list(self, folder: AnyStr = '', search_args: str = '*', subscribed_only: bool = False) -> List[FolderInfo]: + def list(self, folder: StrOrBytes = '', search_args: str = '*', subscribed_only: bool = False) -> List[FolderInfo]: """ Get a listing of folders on the server :param folder: mailbox folder, if empty - get from root @@ -148,7 +147,7 @@ def list(self, folder: AnyStr = '', search_args: str = '*', subscribed_only: boo )) return result - def subscribe(self, folder: AnyStr, value: bool) -> tuple: + def subscribe(self, folder: StrOrBytes, value: bool) -> tuple: """subscribe/unsubscribe to folder""" method = self.mailbox.client.subscribe if value else self.mailbox.client.unsubscribe result = method(encode_folder(folder)) diff --git a/imap_tools/imap_utf7.py b/imap_tools/imap_utf7.py index 6e8c810..c7214c5 100644 --- a/imap_tools/imap_utf7.py +++ b/imap_tools/imap_utf7.py @@ -8,7 +8,10 @@ """ import binascii -from typing import Iterable, MutableSequence +from typing import MutableSequence + +AMPERSAND_ORD = ord('&') +HYPHEN_ORD = ord('-') # ENCODING @@ -17,10 +20,10 @@ def _modified_base64(value: str) -> bytes: return binascii.b2a_base64(value.encode('utf-16be')).rstrip(b'\n=').replace(b'/', b',') -def _do_b64(_in: Iterable[str], r: MutableSequence[bytes]): +def _do_b64(_in: MutableSequence[str], r: MutableSequence[bytes]): if _in: r.append(b'&' + _modified_base64(''.join(_in)) + b'-') - del _in[:] + _in.clear() def utf7_encode(value: str) -> bytes: @@ -48,20 +51,20 @@ def _modified_unbase64(value: bytearray) -> str: def utf7_decode(value: bytes) -> str: res = [] - decode_arr = bytearray() + encoded_chars = bytearray() for char in value: - if char == ord('&') and not decode_arr: - decode_arr.append(ord('&')) - elif char == ord('-') and decode_arr: - if len(decode_arr) == 1: + if char == AMPERSAND_ORD and not encoded_chars: + encoded_chars.append(AMPERSAND_ORD) + elif char == HYPHEN_ORD and encoded_chars: + if len(encoded_chars) == 1: res.append('&') else: - res.append(_modified_unbase64(decode_arr[1:])) - decode_arr = bytearray() - elif decode_arr: - decode_arr.append(char) + res.append(_modified_unbase64(encoded_chars[1:])) + encoded_chars = bytearray() + elif encoded_chars: + encoded_chars.append(char) else: res.append(chr(char)) - if decode_arr: - res.append(_modified_unbase64(decode_arr[1:])) + if encoded_chars: + res.append(_modified_unbase64(encoded_chars[1:])) return ''.join(res) diff --git a/imap_tools/mailbox.py b/imap_tools/mailbox.py index 8aa2ef1..14665a0 100644 --- a/imap_tools/mailbox.py +++ b/imap_tools/mailbox.py @@ -2,14 +2,14 @@ import imaplib import datetime from collections import UserString -from typing import AnyStr, Optional, List, Iterable, Sequence, Union, Tuple, Iterator +from typing import Optional, List, Iterable, Sequence, Union, Tuple, Iterator from .message import MailMessage from .folder import MailBoxFolderManager from .idle import IdleManager from .consts import UID_PATTERN, PYTHON_VERSION_MINOR from .utils import clean_uids, check_command_status, chunks, encode_folder, clean_flags, check_timeout_arg_support, \ - chunks_crop + chunks_crop, StrOrBytes from .errors import MailboxStarttlsError, MailboxLoginError, MailboxLogoutError, MailboxNumbersError, \ MailboxFetchError, MailboxExpungeError, MailboxDeleteError, MailboxCopyError, MailboxFlagError, \ MailboxAppendError, MailboxUidsError, MailboxTaggedResponseError @@ -18,7 +18,7 @@ # 20Mb is enough for search response with about 2 000 000 message numbers imaplib._MAXLINE = 20 * 1024 * 1024 # 20Mb -Criteria = Union[AnyStr, UserString] +Criteria = Union[StrOrBytes, UserString] class BaseMailBox: @@ -80,7 +80,7 @@ def login_utf8(self, username: str, password: str, initial_folder: Optional[str] def xoauth2(self, username: str, access_token: str, initial_folder: Optional[str] = 'INBOX') -> 'BaseMailBox': """Authenticate to account using OAuth 2.0 mechanism""" - auth_string = 'user={}\1auth=Bearer {}\1\1'.format(username, access_token) + auth_string = f'user={username}\1auth=Bearer {access_token}\1\1' result = self.client.authenticate('XOAUTH2', lambda x: auth_string) # noqa check_command_status(result, MailboxLoginError) if initial_folder is not None: @@ -133,7 +133,7 @@ def uids(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', encoded_criteria = criteria if type(criteria) is bytes else str(criteria).encode(charset) if sort: sort = (sort,) if isinstance(sort, str) else sort - uid_result = self.client.uid('SORT', '({})'.format(' '.join(sort)), charset, encoded_criteria) + uid_result = self.client.uid('SORT', f'({" ".join(sort)})', charset, encoded_criteria) else: uid_result = self.client.uid('SEARCH', 'CHARSET', charset, encoded_criteria) # *charset are opt here check_command_status(uid_result, MailboxUidsError) @@ -187,8 +187,8 @@ def fetch(self, criteria: Criteria = 'ALL', charset: str = 'US-ASCII', limit: Op :param sort: criteria for sort messages on server, use SortCriteria constants. Charset arg is important for sort :return generator: MailMessage """ - message_parts = "(BODY{}[{}] UID FLAGS RFC822.SIZE)".format( - '' if mark_seen else '.PEEK', 'HEADER' if headers_only else '') + message_parts = \ + f"(BODY{'' if mark_seen else '.PEEK'}[{'HEADER' if headers_only else ''}] UID FLAGS RFC822.SIZE)" limit_range = slice(0, limit) if type(limit) is int else limit or slice(None) assert type(limit_range) is slice uids = tuple((reversed if reverse else iter)(self.uids(criteria, charset, sort)))[limit_range] @@ -218,7 +218,7 @@ def delete(self, uid_list: Union[str, Iterable[str]]) -> Optional[Tuple[tuple, t expunge_result = self.expunge() return store_result, expunge_result - def copy(self, uid_list: Union[str, Iterable[str]], destination_folder: AnyStr) -> Optional[tuple]: + def copy(self, uid_list: Union[str, Iterable[str]], destination_folder: StrOrBytes) -> Optional[tuple]: """ Copy email messages into the specified folder Do nothing on empty uid_list @@ -231,7 +231,7 @@ def copy(self, uid_list: Union[str, Iterable[str]], destination_folder: AnyStr) check_command_status(copy_result, MailboxCopyError) return copy_result - def move(self, uid_list: Union[str, Iterable[str]], destination_folder: AnyStr) -> Optional[Tuple[tuple, tuple]]: + def move(self, uid_list: Union[str, Iterable[str]], destination_folder: StrOrBytes) -> Optional[Tuple[tuple, tuple]]: """ Move email messages into the specified folder Do nothing on empty uid_list @@ -256,14 +256,13 @@ def flag(self, uid_list: Union[str, Iterable[str]], flag_set: Union[str, Iterabl if not uid_str: return None store_result = self.client.uid( - 'STORE', uid_str, ('+' if value else '-') + 'FLAGS', - '({})'.format(' '.join(clean_flags(flag_set)))) + 'STORE', uid_str, ('+' if value else '-') + 'FLAGS', f'({" ".join(clean_flags(flag_set))})') check_command_status(store_result, MailboxFlagError) expunge_result = self.expunge() return store_result, expunge_result def append(self, message: Union[MailMessage, bytes], - folder: AnyStr = 'INBOX', + folder: StrOrBytes = 'INBOX', dt: Optional[datetime.datetime] = None, flag_set: Optional[Union[str, Iterable[str]]] = None) -> tuple: """ @@ -281,7 +280,7 @@ def append(self, message: Union[MailMessage, bytes], cleaned_flags = clean_flags(flag_set or []) typ, dat = self.client.append( encode_folder(folder), # noqa - '({})'.format(' '.join(cleaned_flags)) if cleaned_flags else None, + f'({" ".join(cleaned_flags)})' if cleaned_flags else None, dt or datetime.datetime.now(timezone), # noqa message if type(message) is bytes else message.obj.as_bytes() ) @@ -339,10 +338,10 @@ def __init__(self, host='', port=993, timeout=None, keyfile=None, certfile=None, def _get_mailbox_client(self) -> imaplib.IMAP4: if PYTHON_VERSION_MINOR < 9: - return imaplib.IMAP4_SSL(self._host, self._port, self._keyfile, self._certfile, self._ssl_context) + return imaplib.IMAP4_SSL(self._host, self._port, self._keyfile, self._certfile, self._ssl_context) # noqa elif PYTHON_VERSION_MINOR < 12: return imaplib.IMAP4_SSL( - self._host, self._port, self._keyfile, self._certfile, self._ssl_context, self._timeout) + self._host, self._port, self._keyfile, self._certfile, self._ssl_context, self._timeout) # noqa else: return imaplib.IMAP4_SSL(self._host, self._port, ssl_context=self._ssl_context, timeout=self._timeout) diff --git a/imap_tools/message.py b/imap_tools/message.py index d7a3129..ef249e9 100644 --- a/imap_tools/message.py +++ b/imap_tools/message.py @@ -4,7 +4,7 @@ import imaplib import datetime from itertools import chain -from functools import lru_cache +from functools import cached_property from email.header import decode_header from email.message import _parseparam, _unquotevalue # noqa from typing import Tuple, Dict, Optional, List @@ -29,7 +29,7 @@ def from_bytes(cls, raw_message_data: bytes): def __str__(self): repl = '[\t\n\r\f\v]' - return '{}, {}, {}'.format(self.date, re.sub(repl, '', self.from_), re.sub(repl, '', self.subject)) + return f'{self.date}, {re.sub(repl, "", self.from_)}, {re.sub(repl, "", self.subject)}' @staticmethod def _get_message_data_parts(fetch_data: list) -> (bytes, bytes, List[bytes]): @@ -52,8 +52,7 @@ def _get_message_data_parts(fetch_data: list) -> (bytes, bytes, List[bytes]): raw_message_data = fetch_item[1] return raw_message_data, raw_uid_data, raw_flag_data - @property - @lru_cache() + @cached_property def uid(self) -> Optional[str]: """Message UID""" # _raw_uid_data - zimbra, yandex, gmail, gmx @@ -64,8 +63,7 @@ def uid(self) -> Optional[str]: return uid_match.group('uid') return None - @property - @lru_cache() + @cached_property def size_rfc822(self) -> int: """RFC822 message size from server, bytes count, 0 if not found""" for raw_flag_item in self._raw_flag_data: @@ -74,14 +72,12 @@ def size_rfc822(self) -> int: return int(size_match.group('size')) return 0 - @property - @lru_cache() + @cached_property def size(self) -> int: """Message size, bytes count""" return len(bytes(self.obj)) - @property - @lru_cache() + @cached_property def flags(self) -> Tuple[str, ...]: """ Message flags @@ -92,8 +88,7 @@ def flags(self) -> Tuple[str, ...]: result.extend(imaplib.ParseFlags(raw_flag_item)) return tuple(i.decode().strip() for i in result) # noqa - @property - @lru_cache() + @cached_property def subject(self) -> str: """Message subject""" if 'subject' in self.obj: @@ -101,81 +96,68 @@ def subject(self) -> str: return ''.join(decode_value(*head_part) for head_part in decode_header(raw)) return '' - @property - @lru_cache() + @cached_property def from_values(self) -> Optional[EmailAddress]: """Sender (all data)""" result_set = parse_email_addresses(self.obj['From'] or '') return result_set[0] if result_set else None - @property - @lru_cache() + @cached_property def from_(self) -> str: """Sender email""" return self.from_values.email if self.from_values else '' - @property - @lru_cache() + @cached_property def to_values(self) -> Tuple[EmailAddress, ...]: """Recipients (all data)""" return tuple(chain(*(parse_email_addresses(i or '') for i in self.obj.get_all('To', [])))) - @property - @lru_cache() + @cached_property def to(self) -> Tuple[str, ...]: """Recipients emails""" return tuple(i.email for i in self.to_values) - @property - @lru_cache() + @cached_property def cc_values(self) -> Tuple[EmailAddress, ...]: """Carbon copy (all data)""" return tuple(chain(*(parse_email_addresses(i or '') for i in self.obj.get_all('Cc', [])))) - @property - @lru_cache() + @cached_property def cc(self) -> Tuple[str, ...]: """Carbon copy emails""" return tuple(i.email for i in self.cc_values) - @property - @lru_cache() + @cached_property def bcc_values(self) -> Tuple[EmailAddress, ...]: """Blind carbon copy (all data)""" return tuple(chain(*(parse_email_addresses(i or '') for i in self.obj.get_all('Bcc', [])))) - @property - @lru_cache() + @cached_property def bcc(self) -> Tuple[str, ...]: """Blind carbon copy emails""" return tuple(i.email for i in self.bcc_values) - @property - @lru_cache() + @cached_property def reply_to_values(self) -> Tuple[EmailAddress, ...]: """Reply-to emails (all data)""" return tuple(chain(*(parse_email_addresses(i or '') for i in self.obj.get_all('Reply-To', [])))) - @property - @lru_cache() + @cached_property def reply_to(self) -> Tuple[str, ...]: """Reply-to emails""" return tuple(i.email for i in self.reply_to_values) - @property - @lru_cache() + @cached_property def date_str(self) -> str: """Message sent date string as is""" return str(self.obj['Date'] or '') - @property - @lru_cache() + @cached_property def date(self) -> datetime.datetime: """Message sent date""" return parse_email_date(self.date_str) - @property - @lru_cache() + @cached_property def text(self) -> str: """Plain text of the mail message""" results = [] @@ -186,8 +168,7 @@ def text(self) -> str: results.append(decode_value(part.get_payload(decode=True), part.get_content_charset())) return ''.join(results) - @property - @lru_cache() + @cached_property def html(self) -> str: """HTML text of the mail message""" results = [] @@ -199,8 +180,7 @@ def html(self) -> str: results.append(replace_html_ct_charset(html, 'utf-8')) return ''.join(results) - @property - @lru_cache() + @cached_property def headers(self) -> Dict[str, Tuple[str, ...]]: """ Message headers @@ -211,8 +191,7 @@ def headers(self) -> Dict[str, Tuple[str, ...]]: result.setdefault(key.lower(), []).append(val) return {k: tuple(v) for k, v in result.items()} - @property - @lru_cache() + @cached_property def attachments(self) -> List['MailAttachment']: """ Mail message attachments list @@ -236,10 +215,9 @@ def __init__(self, part): self.part = part def __str__(self): - return '<{} | {} | {} | {}>'.format(self.filename, self.content_type, self.content_disposition, self.content_id) + return f'<{self.filename} | {self.content_type} | {self.content_disposition} | {self.content_id}>' - @property - @lru_cache() + @cached_property def filename(self) -> str: """ Attachment filename @@ -280,26 +258,22 @@ def filename(self) -> str: return attempt_1_filename - @property - @lru_cache() + @cached_property def content_id(self) -> str: if 'Content-ID' in self.part: raw = self.part['Content-ID'] return ''.join(decode_value(*head_part) for head_part in decode_header(raw)).lstrip('<').rstrip('>') return '' - @property - @lru_cache() + @cached_property def content_type(self) -> str: return self.part.get_content_type() - @property - @lru_cache() + @cached_property def content_disposition(self) -> str: return self.part.get_content_disposition() or '' - @property - @lru_cache() + @cached_property def payload(self) -> bytes: payload = self.part.get_payload(decode=True) if payload: @@ -318,8 +292,7 @@ def payload(self) -> bytes: # could not find payload return b'' - @property - @lru_cache() + @cached_property def size(self) -> int: """Attachment size, bytes count""" return len(self.payload) diff --git a/imap_tools/query.py b/imap_tools/query.py index 28d6267..f8b38c0 100644 --- a/imap_tools/query.py +++ b/imap_tools/query.py @@ -15,17 +15,17 @@ class Header: def __init__(self, name: str, value: str): if not isinstance(name, str): - raise TypeError('Header-name expected str value, "{}" received'.format(type(name))) + raise TypeError(f'Header-name expected str value, "{type(name)}" received') self.name = quote(name) if not isinstance(value, str): - raise TypeError('Header-value expected str value, "{}" received'.format(type(value))) + raise TypeError(f'Header-value expected str value, "{type(value)}" received') self.value = quote(value) def __str__(self): - return '{0.name}: {0.value}'.format(self) + return f'{self.name}: {self.value}' def __lt__(self, other): - return '{0.name}{0.value}'.format(self) < '{0.name}{0.value}'.format(other) + return f'{self.name}{self.value}' < f'{other.name}{other.value}' class UidRange: @@ -51,7 +51,7 @@ def __init__(self, start: str, end: Optional[str] = None): raise TypeError('UidRange end arg must be str with digits or *') def __str__(self): - return '{}{}'.format(self.start, ':{}'.format(self.end) if self.end else '') + return f'{self.start}{f":{self.end}" if self.end else ""}' class LogicOperator(UserString): @@ -90,11 +90,11 @@ def __init__( self.converted_strings = converted_strings for val in converted_strings: if not any(isinstance(val, t) for t in (str, UserString)): - raise TypeError('Unexpected type "{}" for converted part, str like obj expected'.format(type(val))) + raise TypeError(f'Unexpected type "{type(val)}" for converted part, str like obj expected') unconverted_dict = {k: v for k, v in locals().items() if k in SEARCH_KEYS and v is not None} self.converted_params = ParamConverter(unconverted_dict).convert() if not any((self.converted_strings, self.converted_params)): - raise ValueError('{} expects params'.format(self.__class__.__name__)) + raise ValueError(f'{self.__class__.__name__} expects params') super().__init__(self.combine_params()) def combine_params(self) -> str: @@ -104,7 +104,7 @@ def combine_params(self) -> str: @staticmethod def prefix_join(operator: str, params: Iterable[str]) -> str: """Join params by prefix notation rules, enclose group in parentheses""" - return '({})'.format(functools.reduce(lambda a, b: '{}{} {}'.format(operator, a, b), params)) + return f'({functools.reduce(lambda a, b: f"{operator}{a} {b}", params)})' class AND(LogicOperator): @@ -125,7 +125,7 @@ class NOT(LogicOperator): """Inverts the result of a logical expression""" def combine_params(self) -> str: - return 'NOT {}'.format(self.prefix_join('', itertools.chain(self.converted_strings, self.converted_params))) + return f'NOT {self.prefix_join("", itertools.chain(self.converted_strings, self.converted_params))}' class ParamConverter: @@ -161,45 +161,45 @@ def convert(self) -> List[str]: converted = [] for key, raw_val in sorted(self.params.items(), key=lambda x: x[0]): for val in sorted(self._gen_values(key, raw_val)): - convert_func = getattr(self, 'convert_{}'.format(key), None) + convert_func = getattr(self, f'convert_{key}', None) if not convert_func: - raise KeyError('"{}" is an invalid parameter.'.format(key)) + raise KeyError(f'"{key}" is an invalid parameter.') converted.append(convert_func(key, val)) return converted @classmethod def format_date(cls, value: datetime.date) -> str: """To avoid locale affects""" - return '{}-{}-{}'.format(value.day, SHORT_MONTH_NAMES[value.month - 1], value.year) + return f'{value.day}-{SHORT_MONTH_NAMES[value.month - 1]}-{value.year}' @staticmethod def cleaned_str(key: str, value: str) -> str: if type(value) is not str: - raise TypeError('"{}" expected str value, "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected str value, "{type(value)}" received') return str(value) @staticmethod def cleaned_date(key: str, value: datetime.date) -> datetime.date: if type(value) is not datetime.date: - raise TypeError('"{}" expected datetime.date value, "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected datetime.date value, "{type(value)}" received') return value @staticmethod def cleaned_bool(key: str, value: bool) -> bool: if type(value) is not bool: - raise TypeError('"{}" expected bool value, "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected bool value, "{type(value)}" received') return bool(value) @staticmethod def cleaned_true(key: str, value: bool) -> True: if value is not True: - raise TypeError('"{}" expected "True", "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected "True", "{type(value)}" received') return True @staticmethod def cleaned_uint(key: str, value: int) -> int: if type(value) is not int or int(value) < 0: - raise TypeError('"{}" expected int value >= 0, "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected int value >= 0, "{type(value)}" received') return int(value) @staticmethod @@ -211,12 +211,12 @@ def cleaned_uid(key: str, value: Union[str, Iterable[str], UidRange]) -> str: try: return clean_uids(value) except TypeError as e: - raise TypeError('{} parse error: {}'.format(key, str(e))) + raise TypeError(f'{key} parse error: {str(e)}') @staticmethod def cleaned_header(key: str, value: Header) -> Header: if not isinstance(value, Header): - raise TypeError('"{}" expected Header (H) value, "{}" received'.format(key, type(value))) + raise TypeError(f'"{key}" expected Header (H) value, "{type(value)}" received') return value def convert_answered(self, key, value) -> str: @@ -241,89 +241,89 @@ def convert_deleted(self, key, value) -> str: def convert_keyword(self, key, value) -> str: """Messages with the specified keyword flag set. (KEYWORD)""" - return 'KEYWORD {}'.format(self.cleaned_str(key, value)) + return f'KEYWORD {self.cleaned_str(key, value)}' def convert_no_keyword(self, key, value) -> str: """Messages that do not have the specified keyword flag set. (UNKEYWORD)""" - return 'UNKEYWORD {}'.format(self.cleaned_str(key, value)) + return f'UNKEYWORD {self.cleaned_str(key, value)}' def convert_from_(self, key, value) -> str: """Messages that contain the specified string in the envelope structure's FROM field.""" - return 'FROM {}'.format(quote(self.cleaned_str(key, value))) + return f'FROM {quote(self.cleaned_str(key, value))}' def convert_to(self, key, value) -> str: """Messages that contain the specified string in the envelope structure's TO field.""" - return 'TO {}'.format(quote(self.cleaned_str(key, value))) + return f'TO {quote(self.cleaned_str(key, value))}' def convert_subject(self, key, value) -> str: """Messages that contain the specified string in the envelope structure's SUBJECT field.""" - return 'SUBJECT {}'.format(quote(self.cleaned_str(key, value))) + return f'SUBJECT {quote(self.cleaned_str(key, value))}' def convert_body(self, key, value) -> str: """Messages that contain the specified string in the body of the message.""" - return 'BODY {}'.format(quote(self.cleaned_str(key, value))) + return f'BODY {quote(self.cleaned_str(key, value))}' def convert_text(self, key, value) -> str: """Messages that contain the specified string in the header or body of the message.""" - return 'TEXT {}'.format(quote(self.cleaned_str(key, value))) + return f'TEXT {quote(self.cleaned_str(key, value))}' def convert_bcc(self, key, value) -> str: """Messages that contain the specified string in the envelope structure's BCC field.""" - return 'BCC {}'.format(quote(self.cleaned_str(key, value))) + return f'BCC {quote(self.cleaned_str(key, value))}' def convert_cc(self, key, value) -> str: """Messages that contain the specified string in the envelope structure's CC field.""" - return 'CC {}'.format(quote(self.cleaned_str(key, value))) + return f'CC {quote(self.cleaned_str(key, value))}' def convert_date(self, key, value) -> str: """ Messages whose internal date (disregarding time and timezone) is within the specified date. (ON) """ - return 'ON {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'ON {self.format_date(self.cleaned_date(key, value))}' def convert_date_gte(self, key, value) -> str: """ Messages whose internal date (disregarding time and timezone) is within or later than the specified date. (SINCE) """ - return 'SINCE {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'SINCE {self.format_date(self.cleaned_date(key, value))}' def convert_date_lt(self, key, value) -> str: """ Messages whose internal date (disregarding time and timezone) is earlier than the specified date. (BEFORE) """ - return 'BEFORE {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'BEFORE {self.format_date(self.cleaned_date(key, value))}' def convert_sent_date(self, key, value) -> str: """ Messages whose [RFC-2822] Date: header (disregarding time and timezone) is within the specified date. (SENTON) """ - return 'SENTON {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'SENTON {self.format_date(self.cleaned_date(key, value))}' def convert_sent_date_gte(self, key, value) -> str: """ Messages whose [RFC-2822] Date: header (disregarding time and timezone) is within or later than the specified date. (SENTSINCE) """ - return 'SENTSINCE {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'SENTSINCE {self.format_date(self.cleaned_date(key, value))}' def convert_sent_date_lt(self, key, value) -> str: """ Messages whose [RFC-2822] Date: header (disregarding time and timezone) is earlier than the specified date. (SENTBEFORE) """ - return 'SENTBEFORE {}'.format(self.format_date(self.cleaned_date(key, value))) + return f'SENTBEFORE {self.format_date(self.cleaned_date(key, value))}' def convert_size_gt(self, key, value) -> str: """Messages with an [RFC-2822] size larger than the specified number of octets. (LARGER)""" - return 'LARGER {}'.format(self.cleaned_uint(key, value)) + return f'LARGER {self.cleaned_uint(key, value)}' def convert_size_lt(self, key, value) -> str: """Messages with an [RFC-2822] size smaller than the specified number of octets. (SMALLER)""" - return 'SMALLER {}'.format(self.cleaned_uint(key, value)) + return f'SMALLER {self.cleaned_uint(key, value)}' def convert_new(self, key, value) -> str: """ @@ -358,14 +358,14 @@ def convert_header(self, key, value) -> str: If the string to search is zero-length, this matches all messages that have a header line with the specified field-name regardless of the contents. """ - return 'HEADER {0.name} {0.value}'.format(self.cleaned_header(key, value)) + return f'HEADER {self.cleaned_header(key, value).name} {self.cleaned_header(key, value).value}' def convert_uid(self, key, value) -> str: """Messages with unique identifiers corresponding to the specified unique identifier set.""" - return 'UID {}'.format(self.cleaned_uid(key, value)) + return f'UID {self.cleaned_uid(key, value)}' def convert_gmail_label(self, key, value) -> str: - return 'X-GM-LABELS {}'.format(quote(self.cleaned_str(key, value))) + return f'X-GM-LABELS {quote(self.cleaned_str(key, value))}' SEARCH_KEYS = tuple(i.replace('convert_', '') for i in dir(ParamConverter) if 'convert_' in i) diff --git a/imap_tools/utils.py b/imap_tools/utils.py index a62a8a9..d215e23 100644 --- a/imap_tools/utils.py +++ b/imap_tools/utils.py @@ -4,11 +4,13 @@ from itertools import zip_longest from email.utils import getaddresses, parsedate_to_datetime from email.header import decode_header, Header -from typing import AnyStr, Union, Optional, Tuple, Iterable, Any, List, Dict, Iterator +from typing import Union, Optional, Tuple, Iterable, Any, List, Dict, Iterator from .consts import SHORT_MONTH_NAMES, MailMessageFlags from .imap_utf7 import utf7_encode +StrOrBytes = Union[str, bytes] + def clean_uids(uid_set: Union[str, Iterable[str]]) -> str: """ @@ -27,9 +29,9 @@ def clean_uids(uid_set: Union[str, Iterable[str]]) -> str: # check uid types for uid in uid_set: if type(uid) is not str: - raise TypeError('uid "{}" is not string'.format(str(uid))) + raise TypeError(f'uid "{str(uid)}" is not string') if not re.match(r'^[\d*:]+$', uid.strip()): - raise TypeError('Wrong uid: "{}"'.format(uid)) + raise TypeError(f'Wrong uid: "{uid}"') return ','.join(i.strip() for i in uid_set) @@ -46,7 +48,7 @@ def check_command_status(command_result: tuple, exception: type, expected='OK'): raise exception(command_result=command_result, expected=expected) -def decode_value(value: AnyStr, encoding: Optional[str] = None) -> str: +def decode_value(value: StrOrBytes, encoding: Optional[str] = None) -> str: """Converts value to utf-8 encoding""" if isinstance(encoding, str): encoding = encoding.lower() @@ -68,11 +70,10 @@ def __init__(self, name: str, email: str): @property def full(self): - return '{} <{}>'.format(self.name, self.email) if self.name and self.email else self.name or self.email + return f'{self.name} <{self.email}>' if self.name and self.email else self.name or self.email def __repr__(self): - return "{}(name={}, email={})".format( - self.__class__.__name__, repr(self.name), repr(self.email)) + return f"{self.__class__.__name__}(name={repr(self.name)}, email={repr(self.email)})" def __eq__(self, other): return all(getattr(self, i) == getattr(other, i) for i in self.__slots__) @@ -124,7 +125,7 @@ def parse_email_date(value: str) -> datetime.datetime: group = match.groupdict() day, month, year = group['date'].split() time_values = group['time'].split(':') - zone_sign = int('{}1'.format(group.get('zone_sign') or '+')) + zone_sign = int(f'{group.get("zone_sign") or "+"}1') zone = group['zone'] try: return datetime.datetime( @@ -144,7 +145,7 @@ def parse_email_date(value: str) -> datetime.datetime: return datetime.datetime(1900, 1, 1) -def quote(value: AnyStr) -> AnyStr: +def quote(value: StrOrBytes) -> StrOrBytes: if isinstance(value, str): return '"' + value.replace('\\', '\\\\').replace('"', '\\"') + '"' else: @@ -158,19 +159,7 @@ def pairs_to_dict(items: List[Any]) -> Dict[Any, Any]: return dict((items[i * 2], items[i * 2 + 1]) for i in range(len(items) // 2)) -def chunks(iterable: Iterable[Any], n: int, fill_value: Optional[Any] = None) -> Iterator[Tuple[Any, ...]]: - """ - Group data into fixed-length chunks or blocks - [iter(iterable)]*n creates one iterator, repeated n times in the list - izip_longest then effectively performs a round-robin of "each" (same) iterator - Examples: - chunks('ABCDEFGH', 3, '?') --> [('A', 'B', 'C'), ('D', 'E', 'F'), ('G', 'H', '?')] - chunks([1, 2, 3, 4, 5], 2) --> [(1, 2), (3, 4), (5, None)] - """ - return zip_longest(*[iter(iterable)] * n, fillvalue=fill_value) - - -def encode_folder(folder: AnyStr) -> bytes: +def encode_folder(folder: StrOrBytes) -> bytes: """Encode folder name""" if isinstance(folder, bytes): return folder @@ -188,7 +177,7 @@ def clean_flags(flag_set: Union[str, Iterable[str]]) -> List[str]: upper_sys_flags = tuple(i.upper() for i in MailMessageFlags.all) for flag in flag_set: if not type(flag) is str: - raise ValueError('Flag - str value expected, but {} received'.format(type(flag_set))) + raise ValueError(f'Flag - str value expected, but {type(flag_set)} received') if flag.upper() not in upper_sys_flags and flag.startswith('\\'): raise ValueError('Non system flag must not start with "\\"') return flag_set @@ -207,7 +196,7 @@ def replace_html_ct_charset(html: str, new_charset: str) -> str: meta = meta_ct_match.group(0) meta_new = re.sub( pattern=r'charset\s*=\s*[a-zA-Z0-9_:.+-]+', - repl='charset={}'.format(new_charset), + repl=f'charset={new_charset}', string=meta, count=1, flags=re.IGNORECASE @@ -216,6 +205,18 @@ def replace_html_ct_charset(html: str, new_charset: str) -> str: return html +def chunks(iterable: Iterable[Any], n: int, fill_value: Optional[Any] = None) -> Iterator[Tuple[Any, ...]]: + """ + Group data into fixed-length chunks or blocks + [iter(iterable)]*n creates one iterator, repeated n times in the list + izip_longest then effectively performs a round-robin of "each" (same) iterator + Examples: + chunks('ABCDEFGH', 3, '?') --> [('A', 'B', 'C'), ('D', 'E', 'F'), ('G', 'H', '?')] + chunks([1, 2, 3, 4, 5], 2) --> [(1, 2), (3, 4), (5, None)] + """ + return zip_longest(*[iter(iterable)] * n, fillvalue=fill_value) + + def chunks_crop(lst: iter, n: int) -> iter: """ Yield successive n-sized chunks from lst.