From a5360903e87a663a8440b72e8e29b8dd6158ccb2 Mon Sep 17 00:00:00 2001 From: Dean MacGregor Date: Tue, 28 May 2024 10:16:38 -0400 Subject: [PATCH] dl_stream and email try --- pyproject.toml | 2 +- requirements.txt | 3 +- src/dean_utils/utils/az_utils.py | 68 +++++++++++++++++++++++++++ src/dean_utils/utils/email_utility.py | 11 ++++- 4 files changed, 81 insertions(+), 3 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 264fb81..380342d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "dean_utils" -version="0.0.17" +version="0.0.18" authors=[ { name="Dean MacGregor", email="powertrading121@gmail.com"} ] diff --git a/requirements.txt b/requirements.txt index 15fcd7c..8db5d22 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,5 @@ adlfs fsspec azure-storage-queue python-multipart -azure-communication-email \ No newline at end of file +azure-communication-email +httpx \ No newline at end of file diff --git a/src/dean_utils/utils/az_utils.py b/src/dean_utils/utils/az_utils.py index 915752f..ce2da96 100644 --- a/src/dean_utils/utils/az_utils.py +++ b/src/dean_utils/utils/az_utils.py @@ -4,10 +4,18 @@ from azure.storage.queue.aio import QueueServiceClient as QSC from azure.storage.queue import TextBase64EncodePolicy import azure.storage.blob as asb +from azure.storage.blob.aio import BlobClient +from azure.storage.blob import BlobBlock +from azure.core.exceptions import HttpResponseError import asyncio from typing import List, Optional import fsspec +import httpx from datetime import datetime, timedelta, timezone +from typing import TypeAlias, Literal +from uuid import uuid4 + +HTTPX_METHODS: TypeAlias = Literal["GET", "POST"] def def_cos(db_name, client_name): @@ -69,6 +77,66 @@ def __init__(self, connection_string=os.environ["Synblob"]): } self.stor = stor + async def stream_dl( + self, + client: httpx.AsyncClient, + method: HTTPX_METHODS, + url: str, + path: str, + /, + recurs=False, + **httpx_extras, + ) -> None: + """ + Help on method stream_dl + + async stream_dl(client, method, url, **httpx_extras) + Download file streaming in chunks in async as downloader and to a Blob + + Parameters + ---------- + client: httpx.AsyncClient + The httpx Async Client object to use + method: + The HTTP method whether GET or POST + url: + The URL to download + path: + The full path to Azure file being saved + **httpx_extras + Any extra arguments to be sent to client.stream + """ + async with BlobClient.from_connection_string( + self.connection_string, *(path.split("/", maxsplit=1)) + ) as target, client.stream(method, url, **httpx_extras) as resp: + resp.raise_for_status() + block_list = [] + async for chunk in resp.aiter_bytes(): + block_id = uuid4().hex + try: + await target.stage_block(block_id=block_id, data=chunk) + except HttpResponseError as err: + if "The specified blob or block content is invalid." not in str( + err + ): + raise err + await asyncio.sleep(1) + await target.commit_block_list([]) + await target.delete_blob() + if recurs is False: + await self.stream_dl( + client, + method, + url, + path, + recurs=True, + httpx_extras=httpx_extras, + ) + else: + raise err + block_list.append(BlobBlock(block_id=block_id)) + await target.commit_block_list(block_list) + async def exists(self, path: str): """ Help on method _exists in module adlfs.spec: diff --git a/src/dean_utils/utils/email_utility.py b/src/dean_utils/utils/email_utility.py index 0efc4ea..480ad66 100644 --- a/src/dean_utils/utils/email_utility.py +++ b/src/dean_utils/utils/email_utility.py @@ -7,6 +7,10 @@ from azure.communication.email import EmailClient +class MissingEnvVars(Exception): + pass + + def send_email(from_email: str, to_email: str, subject: str, msg: str) -> None: """ # Send email using smtp.gmail.com, password must be stored in env variable gmail_pw @@ -43,7 +47,10 @@ def send_email(from_email: str, to_email: str, subject: str, msg: str) -> None: server.sendmail(from_email, to_email, mimemsg.as_string()) -email_client = EmailClient.from_connection_string(os.environ["azuremail"]) +try: + email_client = EmailClient.from_connection_string(os.environ["azuremail"]) +except: # noqa: E722 + email_client = None def az_send( @@ -53,6 +60,8 @@ def az_send( from_email: str = None, to_email: str = None, ) -> None: + if email_client is None: + raise MissingEnvVars("missing azuremail var") if os.environ["error_email"] is not None and to_email is None: to_email = os.environ["error_email"] if os.environ["from_email"] is not None and from_email is None: