Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a nice wrapper for zoomus #76

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions scripts/lib/zoom.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@

from functools import wraps
from typing import Dict
from urllib.parse import urlparse
import os.path
import requests
from requests import Response
from zoomus import ZoomClient


ZOOM_DOCS_URL = 'https://developers.zoom.us/docs/api/'


class ZoomError(Exception):
response: Response
data: Dict
code: int = 0
message: str

def __init__(self, response, message=None):
self.response = response
try:
self.data = response.json()
except Exception:
self.data = {}

self.message = message or self.data.pop('message', 'Zoom API error')
self.code = self.data.pop('code', 0)
super().__init__(
f'{self.message} '
f'(code={self.code}, http_status={self.response.status_code}) '
f'Check the docs for details: {ZOOM_DOCS_URL}.'
)


class ZoomResponse:
response: Response
_data: Dict = None

def __init__(self, response: Response):
self.response = response

def __getitem__(self, name):
return self.data[name]

@property
def data(self):
if self._data is None:
self._data = self.response.json()

return self._data

@property
def text(self):
return self.response.text

def raise_for_status(self):
self.raise_for_response(self.response)

@classmethod
def is_error(cls, response: Response) -> bool:
return response.status_code >= 400

@classmethod
def raise_for_response(cls, response: Response):
if cls.is_error(response):
raise ZoomError(response)


def wrap_method_with_parsing(original):
@wraps(original)
def wrapper(*args, **kwargs):
result = original(*args, **kwargs)
if isinstance(result, Response):
result = ZoomResponse(result)
result.raise_for_status()

return result

return wrapper


def wrap_component_with_parsing(component):
for name in dir(component):
if not name.startswith('_'):
original = getattr(component, name)
if callable(original):
setattr(component, name, wrap_method_with_parsing(original))


class FancyZoom(ZoomClient):
"""
Wraps a zoomus ZoomClient so that nice exception objects are raised for bad
responses and good JSON responses are pre-parsed.

Examples
--------
>>> instance = FancyZoon(CLIENT_ID, CLIENT_SECRET, ACCOUNT_ID)

Get a user by ID. The response data is readily available:

>>> instance.user.get(id='abc123')['id'] == 'abc123'

Raises an exception for errors instead of returning a response object:

>>> try:
>>> client.user.get(id='bad_id')
>>> except ZoomError as error:
>>> error.response.status_code == 404
>>> error.code == 1001
>>> error.message == 'User does not exist: bad_id'
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

for component in self.components.values():
wrap_component_with_parsing(component)

def get_file(self, url: str) -> Response:
# Note the token info in the client isn't really *public*, but it's
# not explicitly private, either. Use `config[]` syntax instead of
# `config.get()` so we get an exception if things have changed and
# this data is no longer available.
response = requests.get(url, stream=True, headers={
'Authorization': f'Bearer {self.config['token']}'
})
ZoomResponse.raise_for_response(response)
return response

def download_file(self, url: str, download_directory: str) -> str:
response = self.get_file(url)
resolved_url = response.url
filename = urlparse(resolved_url).path.split('/')[-1]
filepath = os.path.join(download_directory, filename)
if os.path.exists(filepath):
response.close()
return

with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)

return filepath
92 changes: 19 additions & 73 deletions scripts/upload_zoom_recordings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@
from datetime import datetime
import os
import re
import requests
import subprocess
import sys
import tempfile
from typing import Dict
from urllib.parse import urlparse
from zoomus import ZoomClient
from lib.zoom import FancyZoom, ZoomError
from lib.constants import VIDEO_CATEGORY_IDS, ZOOM_ROLES
from lib.youtube import get_youtube_client, upload_video, add_video_to_playlist, validate_youtube_credentials

Expand Down Expand Up @@ -65,35 +63,6 @@ def is_truthy(x):
DRY_RUN = is_truthy(os.environ.get('EDGI_DRY_RUN', ''))


class ZoomError(Exception):
def __init__(self, response, message=None):
try:
data = response.json()
except Exception:
data = {}

if not message:
message = data.pop('message', 'Zoom API error!')

data['http_status'] = response.status_code
full_message = f'{message} ({data!r}) Check the docs for details: https://developers.zoom.us/docs/api/.'
super().__init__(full_message)

@classmethod
def is_error(cls, response):
return response.status_code >= 400

@classmethod
def raise_if_error(cls, response, message=None):
if cls.is_error(response):
raise cls(response, message)

@classmethod
def parse_or_raise(cls, response, message=None) -> Dict:
cls.raise_if_error(response, message)
return response.json()


def fix_date(date_string: str) -> str:
date = date_string
index = date.find('Z')
Expand All @@ -106,31 +75,8 @@ def pretty_date(date_string: str) -> str:
return datetime.strptime(date_string, '%Y-%m-%dT%H:%M:%SZ').strftime('%b %-d, %Y')


def download_zoom_file(client: ZoomClient, url: str, download_directory: str) -> str:
# Note the token info in the client isn't really *public*, but it's
# not explicitly private, either. Use `config[]` syntax instead of
# `config.get()` so we get an exception if things have changed and
# this data is no longer available.
r = requests.get(url, stream=True, headers={
'Authorization': f'Bearer {client.config['token']}'
})
r.raise_for_status()
resolved_url = r.url
filename = urlparse(resolved_url).path.split('/')[-1]
filepath = os.path.join(download_directory, filename)
if os.path.exists(filepath):
r.close()
return
with open(filepath, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)

return filepath


def meeting_had_no_participants(client: ZoomClient, meeting: Dict) -> bool:
participants = ZoomError.parse_or_raise(client.past_meeting.get_participants(meeting_id=meeting['uuid']))['participants']
def meeting_had_no_participants(client: FancyZoom, meeting: Dict) -> bool:
participants = client.past_meeting.get_participants(meeting_id=meeting['uuid'])['participants']

return all(
any(p.search(u['name']) for p in ZOOM_IGNORE_USER_NAMES)
Expand Down Expand Up @@ -183,15 +129,15 @@ def main():
print('Please use `python scripts/auth.py` to re-authorize.')
return sys.exit(1)

zoom = ZoomClient(ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET, ZOOM_ACCOUNT_ID)
zoom = FancyZoom(ZOOM_CLIENT_ID, ZOOM_CLIENT_SECRET, ZOOM_ACCOUNT_ID)

# Official meeting recordings we will upload belong to the account owner.
zoom_user_id = zoom.user.list(role_id=ZOOM_ROLES['owner']).json()['users'][0]['id']
zoom_user_id = zoom.user.list(role_id=ZOOM_ROLES['owner'])['users'][0]['id']

with tempfile.TemporaryDirectory() as tmpdirname:
print(f'Creating tmp dir: {tmpdirname}\n')

meetings = ZoomError.parse_or_raise(zoom.recording.list(user_id=zoom_user_id))['meetings']
meetings = zoom.recording.list(user_id=zoom_user_id)['meetings']
meetings = sorted(meetings, key=lambda m: m['start_time'])
# Filter recordings less than 1 minute
meetings = filter(lambda m: m['duration'] > 1, meetings)
Expand All @@ -211,11 +157,11 @@ def main():
if meeting_had_no_participants(zoom, meeting):
print(' Deleting recording: nobody attended this meeting.')
if not DRY_RUN:
response = zoom.recording.delete(meeting_id=meeting['uuid'], action='trash')
if response.status_code < 300:
try:
zoom.recording.delete(meeting_id=meeting['uuid'], action='trash')
print(' 🗑️ Deleted recording.')
else:
print(f' ❌ {ZoomError(response)}')
except ZoomError as error:
print(f' ❌ {error}')
continue

videos = [file for file in meeting['recording_files']
Expand All @@ -232,7 +178,7 @@ def main():
for file in videos:
url = file['download_url']
print(f' Download from {url}...')
filepath = download_zoom_file(zoom, url, tmpdirname)
filepath = zoom.download_file(url, tmpdirname)

if video_has_audio(filepath):
recording_date = fix_date(meeting['start_time'])
Expand Down Expand Up @@ -281,15 +227,15 @@ def main():

if ZOOM_DELETE_AFTER_UPLOAD and not DRY_RUN:
# Just delete the video for now, since that takes the most storage space.
response = zoom.recording.delete_single_recording(
meeting_id=file['meeting_id'],
recording_id=file['id'],
action='trash'
)
if response.status_code == 204:
try:
zoom.recording.delete_single_recording(
meeting_id=file['meeting_id'],
recording_id=file['id'],
action='trash'
)
print(f' 🗑️ Deleted {file["file_type"]} file from Zoom for recording: {meeting["topic"]}')
else:
print(f' ❌ {ZoomError(response)}')
except ZoomError as error:
print(f' ❌ {error}')


if __name__ == '__main__':
Expand Down