diff --git a/scripts/lib/zoom.py b/scripts/lib/zoom.py new file mode 100644 index 0000000..a0958be --- /dev/null +++ b/scripts/lib/zoom.py @@ -0,0 +1,144 @@ + +from functools import wraps +from typing import Dict +from urllib.parse import urlparse +import os.path +import requests +from requests import Response +from zoomus import ZoomClient + + +ZOOM_DOCS_URL = 'https://developers.zoom.us/docs/api/' + + +class ZoomError(Exception): + response: Response + data: Dict + code: int = 0 + message: str + + def __init__(self, response, message=None): + self.response = response + try: + self.data = response.json() + except Exception: + self.data = {} + + self.message = message or self.data.pop('message', 'Zoom API error') + self.code = self.data.pop('code', 0) + super().__init__( + f'{self.message} ' + f'(code={self.code}, http_status={self.response.status_code}) ' + f'Check the docs for details: {ZOOM_DOCS_URL}.' + ) + + +class ZoomResponse: + response: Response + _data: Dict = None + + def __init__(self, response: Response): + self.response = response + + def __getitem__(self, name): + return self.data[name] + + @property + def data(self): + if self._data is None: + self._data = self.response.json() + + return self._data + + @property + def text(self): + return self.response.text + + def raise_for_status(self): + self.raise_for_response(self.response) + + @classmethod + def is_error(cls, response: Response) -> bool: + return response.status_code >= 400 + + @classmethod + def raise_for_response(cls, response: Response): + if cls.is_error(response): + raise ZoomError(response) + + +def wrap_method_with_parsing(original): + @wraps(original) + def wrapper(*args, **kwargs): + result = original(*args, **kwargs) + if isinstance(result, Response): + result = ZoomResponse(result) + result.raise_for_status() + + return result + + return wrapper + + +def wrap_component_with_parsing(component): + for name in dir(component): + if not name.startswith('_'): + original = getattr(component, name) + if callable(original): + setattr(component, name, wrap_method_with_parsing(original)) + + +class FancyZoom(ZoomClient): + """ + Wraps a zoomus ZoomClient so that nice exception objects are raised for bad + responses and good JSON responses are pre-parsed. + + Examples + -------- + >>> instance = FancyZoon(CLIENT_ID, CLIENT_SECRET, ACCOUNT_ID) + + Get a user by ID. The response data is readily available: + + >>> instance.user.get(id='abc123')['id'] == 'abc123' + + Raises an exception for errors instead of returning a response object: + + >>> try: + >>> client.user.get(id='bad_id') + >>> except ZoomError as error: + >>> error.response.status_code == 404 + >>> error.code == 1001 + >>> error.message == 'User does not exist: bad_id' + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + for component in self.components.values(): + wrap_component_with_parsing(component) + + def get_file(self, url: str) -> Response: + # Note the token info in the client isn't really *public*, but it's + # not explicitly private, either. Use `config[]` syntax instead of + # `config.get()` so we get an exception if things have changed and + # this data is no longer available. + response = requests.get(url, stream=True, headers={ + 'Authorization': f'Bearer {self.config['token']}' + }) + ZoomResponse.raise_for_response(response) + return response + + def download_file(self, url: str, download_directory: str) -> str: + response = self.get_file(url) + resolved_url = response.url + filename = urlparse(resolved_url).path.split('/')[-1] + filepath = os.path.join(download_directory, filename) + if os.path.exists(filepath): + response.close() + return + + with open(filepath, 'wb') as f: + for chunk in response.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + + return filepath diff --git a/scripts/upload_zoom_recordings.py b/scripts/upload_zoom_recordings.py index dd715d0..e3cdd6c 100644 --- a/scripts/upload_zoom_recordings.py +++ b/scripts/upload_zoom_recordings.py @@ -28,13 +28,11 @@ from datetime import datetime import os import re -import requests import subprocess import sys import tempfile from typing import Dict -from urllib.parse import urlparse -from zoomus import ZoomClient +from lib.zoom import FancyZoom, ZoomError from lib.constants import VIDEO_CATEGORY_IDS, ZOOM_ROLES from lib.youtube import get_youtube_client, upload_video, add_video_to_playlist, validate_youtube_credentials @@ -65,35 +63,6 @@ def is_truthy(x): DRY_RUN = is_truthy(os.environ.get('EDGI_DRY_RUN', '')) -class ZoomError(Exception): - def __init__(self, response, message=None): - try: - data = response.json() - except Exception: - data = {} - - if not message: - message = data.pop('message', 'Zoom API error!') - - data['http_status'] = response.status_code - full_message = f'{message} ({data!r}) Check the docs for details: https://developers.zoom.us/docs/api/.' - super().__init__(full_message) - - @classmethod - def is_error(cls, response): - return response.status_code >= 400 - - @classmethod - def raise_if_error(cls, response, message=None): - if cls.is_error(response): - raise cls(response, message) - - @classmethod - def parse_or_raise(cls, response, message=None) -> Dict: - cls.raise_if_error(response, message) - return response.json() - - def fix_date(date_string: str) -> str: date = date_string index = date.find('Z') @@ -106,31 +75,8 @@ def pretty_date(date_string: str) -> str: return datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%SZ').strftime('%b %-d, %Y') -def download_zoom_file(client: ZoomClient, url: str, download_directory: str) -> str: - # Note the token info in the client isn't really *public*, but it's - # not explicitly private, either. Use `config[]` syntax instead of - # `config.get()` so we get an exception if things have changed and - # this data is no longer available. - r = requests.get(url, stream=True, headers={ - 'Authorization': f'Bearer {client.config['token']}' - }) - r.raise_for_status() - resolved_url = r.url - filename = urlparse(resolved_url).path.split('/')[-1] - filepath = os.path.join(download_directory, filename) - if os.path.exists(filepath): - r.close() - return - with open(filepath, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: # filter out keep-alive new chunks - f.write(chunk) - - return filepath - - -def meeting_had_no_participants(client: ZoomClient, meeting: Dict) -> bool: - participants = ZoomError.parse_or_raise(client.past_meeting.get_participants(meeting_id=meeting['uuid']))['participants'] +def meeting_had_no_participants(client: FancyZoom, meeting: Dict) -> bool: + participants = client.past_meeting.get_participants(meeting_id=meeting['uuid'])['participants'] return all( any(p.search(u['name']) for p in ZOOM_IGNORE_USER_NAMES) @@ -183,15 +129,15 @@ def main(): print('Please use `python scripts/auth.py` to re-authorize.') return sys.exit(1) - zoom = ZoomClient(ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET, ZOOM_ACCOUNT_ID) + zoom = FancyZoom(ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET, ZOOM_ACCOUNT_ID) # Official meeting recordings we will upload belong to the account owner. - zoom_user_id = zoom.user.list(role_id=ZOOM_ROLES['owner']).json()['users'][0]['id'] + zoom_user_id = zoom.user.list(role_id=ZOOM_ROLES['owner'])['users'][0]['id'] with tempfile.TemporaryDirectory() as tmpdirname: print(f'Creating tmp dir: {tmpdirname}\n') - meetings = ZoomError.parse_or_raise(zoom.recording.list(user_id=zoom_user_id))['meetings'] + meetings = zoom.recording.list(user_id=zoom_user_id)['meetings'] meetings = sorted(meetings, key=lambda m: m['start_time']) # Filter recordings less than 1 minute meetings = filter(lambda m: m['duration'] > 1, meetings) @@ -211,11 +157,11 @@ def main(): if meeting_had_no_participants(zoom, meeting): print(' Deleting recording: nobody attended this meeting.') if not DRY_RUN: - response = zoom.recording.delete(meeting_id=meeting['uuid'], action='trash') - if response.status_code < 300: + try: + zoom.recording.delete(meeting_id=meeting['uuid'], action='trash') print(' 🗑️ Deleted recording.') - else: - print(f' ❌ {ZoomError(response)}') + except ZoomError as error: + print(f' ❌ {error}') continue videos = [file for file in meeting['recording_files'] @@ -232,7 +178,7 @@ def main(): for file in videos: url = file['download_url'] print(f' Download from {url}...') - filepath = download_zoom_file(zoom, url, tmpdirname) + filepath = zoom.download_file(url, tmpdirname) if video_has_audio(filepath): recording_date = fix_date(meeting['start_time']) @@ -281,15 +227,15 @@ def main(): if ZOOM_DELETE_AFTER_UPLOAD and not DRY_RUN: # Just delete the video for now, since that takes the most storage space. - response = zoom.recording.delete_single_recording( - meeting_id=file['meeting_id'], - recording_id=file['id'], - action='trash' - ) - if response.status_code == 204: + try: + zoom.recording.delete_single_recording( + meeting_id=file['meeting_id'], + recording_id=file['id'], + action='trash' + ) print(f' 🗑️ Deleted {file["file_type"]} file from Zoom for recording: {meeting["topic"]}') - else: - print(f' ❌ {ZoomError(response)}') + except ZoomError as error: + print(f' ❌ {error}') if __name__ == '__main__':