Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(10): refactor processing lib to MoviePy #12

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,26 @@
"Bitwarden",
"compand",
"crossfade",
"crossfadein",
"crossfading",
"dotenv",
"escription",
"fadein",
"ffprobe",
"httpx",
"lavfi",
"libx",
"loudnorm",
"lxml",
"mdfind",
"perfcounter",
"strerror",
"studylight",
"subclip",
"udio",
"versetxt",
"videoclips",
"videofile",
"xfade"
]
}
90 changes: 47 additions & 43 deletions process-video.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
import os
import subprocess
import time
from utils.config import parse_video_parameters
from utils.constants import OUTPUT_BASE_DIR
from utils.file import (
create_and_change_directory,
delete_dir,
ensure_dir_exists,
is_valid_file,
)
from utils.helpers import print_error, print_info, print_success
from utils.media import (
apply_video_compression,
check_and_download,
crossfade_videos,
crossfade_videos_with_pymovie,
download_media_from_youtube,
get_video_settings,
get_video_upload_date,
)
from utils.types import PathsDict
from utils.time import format_seconds_to_readable
from utils.ui import confirm_parameters


def main() -> None:
warmup_perfcounter = time.perf_counter()

print_info("Processing video...")

ensure_dir_exists("tmp")
Expand All @@ -36,64 +39,65 @@ def main() -> None:
)

try:
check_and_download(intro_url, "./tmp/intro.mp4")
check_and_download(outro_url, "./tmp/outro.mp4")
start_perfcounter = time.perf_counter()

# set the config vars and paths
upload_date = get_video_upload_date(youtube_url)
output_dir = create_and_change_directory(upload_date)
downloaded_video_file = download_media_from_youtube(
youtube_url, start_time, end_time, "video"
)

paths: PathsDict = {
"intro": {
"raw": "./tmp/intro.mp4",
"compressed": "./tmp/01-intro_compressed.mp4",
"crossfaded": "./tmp/04-intro-base_crossfaded.mp4",
},
"base": {
"raw": f"./tmp/base_downloaded_raw.mp4",
"compressed": "./tmp/02-base_compressed.mp4",
"crossfaded": "./tmp/05-base-output_crossfaded.mp4",
},
"outro": {
"raw": "./tmp/outro.mp4",
"compressed": "./tmp/03-outro_compressed.mp4",
},
}
intro_clip_path = os.path.join(OUTPUT_BASE_DIR, "intro.mp4")
outro_clip_path = os.path.join(OUTPUT_BASE_DIR, "outro.mp4")
final_path = os.path.join(output_dir, f"{upload_date}_final.mp4")

# download the video clips
check_and_download(intro_url, intro_clip_path)
check_and_download(outro_url, outro_clip_path)
downloaded_video_file = download_media_from_youtube(
youtube_url, start_time, end_time, upload_date, "video"
)

if not is_valid_file(downloaded_video_file):
raise ValueError(
f"The file {downloaded_video_file} was not created or is too small. Please check for errors."
)

# apply "standard loudness" to all clips individually
for key in paths.keys():
apply_video_compression(paths[key]["raw"], paths[key]["compressed"])
downloading_perfcounter = time.perf_counter()

base_settings = get_video_settings(paths["base"]["compressed"])
downloaded_video_compressed = f"./tmp/{upload_date}_base_compressed.mp4"
apply_video_compression(downloaded_video_file, downloaded_video_compressed)

# crossfade the intro to base
crossfade_videos(
paths["intro"]["compressed"],
paths["base"]["compressed"],
1,
paths["intro"]["crossfaded"],
base_settings,
)
compression_perfcounter = time.perf_counter()

# crossfade the intro/base result to outro
crossfade_videos(
paths["intro"]["crossfaded"],
paths["outro"]["compressed"],
1,
# stitch the clips together
crossfade_videos_with_pymovie(
[
intro_clip_path,
downloaded_video_compressed,
outro_clip_path,
],
2,
final_path,
base_settings,
)
crossfade_perfcounter = time.perf_counter()

print_success("Video processing complete.")

end_perfcounter = time.perf_counter()

# print the performance timing
print_success(
f"Downloading took : {format_seconds_to_readable(downloading_perfcounter - start_perfcounter)} seconds."
)
print_success(
f"Compressing took : {format_seconds_to_readable(compression_perfcounter - downloading_perfcounter)} seconds."
)
print_success(
f"Crossfading took : {format_seconds_to_readable(crossfade_perfcounter - compression_perfcounter)} seconds."
)
print_success(
f"Total process took: {format_seconds_to_readable(end_perfcounter - start_perfcounter)}."
)

except ValueError as ve:
print_error(f"File validation error: {ve}")
except Exception as e:
Expand Down
11 changes: 11 additions & 0 deletions utils/file.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import shutil
import sys
from utils.constants import OUTPUT_BASE_DIR

Expand All @@ -13,6 +14,16 @@ def ensure_dir_exists(path: str):
return directory


def delete_dir(path: str):
directory = os.path.join(".", path)
try:
shutil.rmtree(directory)
print(f"Directory '{directory}' has been deleted.")
except OSError as e:
print(f"Error deleting filepath {path}: {e.strerror} - {e.filename}")
return directory


def ensure_file_exists(filename):
if not os.path.exists(filename):
print(f"Error: The file '{filename}' does not exist. Exiting.")
Expand Down
102 changes: 69 additions & 33 deletions utils/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,19 @@
import subprocess
import requests
import os
from moviepy.editor import AudioFileClip, concatenate_audioclips, AudioClip
from moviepy.editor import (
AudioFileClip,
concatenate_audioclips,
AudioClip,
VideoFileClip,
CompositeVideoClip,
afx,
vfx,
)
from typing import Literal

from utils.file import ensure_dir_exists
from utils.helpers import run_command
from utils.helpers import print_info, run_command
from utils.time import is_valid_time
from utils.constants import SCRIPT_DIR

Expand Down Expand Up @@ -82,51 +90,42 @@ def download_media_from_youtube(
youtube_url: str,
start_time: str,
end_time: str,
upload_date: str,
media_type: Literal["audio", "video"] = "audio",
) -> str:
if not is_valid_time(start_time) or not is_valid_time(end_time):
raise ValueError("Invalid time format")

# Define filename based on media type
file_extension = "mp3" if media_type == "audio" else "mp4"
media_filename = os.path.join(
"tmp",
f"base_downloaded_raw.{'mp3' if media_type == 'audio' else 'mp4'}",
"tmp", f"{upload_date}_base_downloaded_raw.{file_extension}"
)

# Configuring command for different media types
command = [
"yt-dlp",
"--progress",
"-o",
media_filename,
"--format",
"bestaudio" if media_type == "audio" else "bestvideo+bestaudio",
]

if media_type == "audio":
command = [
"yt-dlp",
"--progress",
"-x",
"--audio-format",
"mp3",
"-o",
media_filename,
youtube_url,
]
command.extend(["-x", "--audio-format", "mp3"])
elif media_type == "video":
command = [
"yt-dlp",
"--progress",
"--format",
"bestvideo+bestaudio",
"--merge-output-format",
"mp4",
"-o",
media_filename,
youtube_url,
]
else:
raise ValueError("Invalid media type specified")
command.append("--merge-output-format")
command.append("mp4")

# Apply trimming if necessary
if start_time != "00:00:00" or end_time != "00:00:00":
command.extend(
["--postprocessor-args", f"ffmpeg:-ss {start_time} -to {end_time}"]
postprocessor_args = (
f"ffmpeg:-ss {start_time} -to {end_time} -avoid_negative_ts make_zero"
)
if media_type == "video":
postprocessor_args += " -c:v libx264 -c:a aac" # Re-encoding for video
command.extend(["--postprocessor-args", postprocessor_args])

command.append(youtube_url)

# Execute download command
try:
subprocess.run(command, check=True)
except subprocess.CalledProcessError as e:
Expand Down Expand Up @@ -318,6 +317,42 @@ def crossfade_videos(
)


def crossfade_videos_with_pymovie(video_paths, crossfade_duration, output_path):
# Ensure there are at least two videos to crossfade
if len(video_paths) < 2:
raise ValueError("At least two video paths are required for crossfading.")

# Initialize list to hold video clips with their start times
clips = []
total_duration = 0 # Keep track of the total duration after each clip is added

# Load video clips and set their start times
for i, path in enumerate(video_paths):
clip = VideoFileClip(path)
# If it's not the first clip, set it to start where the last one starts to fade out
if i > 0:
start_time = max(0, total_duration - crossfade_duration)
clip = clip.set_start(start_time).crossfadein(crossfade_duration)
else:
clip = clip.set_start(0)

clips.append(clip)
# Update total_duration to the end of the current clip, not including the crossfade
total_duration += clip.duration - crossfade_duration if i > 0 else clip.duration

# Create the crossfade clip by overlaying the clips
final_clip = CompositeVideoClip(clips, size=clips[0].size)

# Write the output video file with crossfade
final_clip.write_videofile(
output_path,
codec="libx264",
audio_codec="aac",
audio_fps=48000,
threads=8,
)


def trim_base_video(video_path, segment_duration, base_settings):
tmp_dir = ensure_dir_exists("tmp")

Expand Down Expand Up @@ -442,6 +477,7 @@ def download_video(s3_url, local_path):

def check_and_download(video_url, file_path):
if not os.path.exists(file_path):
print_info(f"File {file_path} doesn't exist, downloading...")
download_video(video_url, file_path)
else:
print(f"File {file_path} already exists. Skipping download.")
7 changes: 7 additions & 0 deletions utils/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,10 @@ def get_formatted_date(input_date):
formatted_date = date_obj.strftime("%A, %B %d, %Y")

return formatted_date


def format_seconds_to_readable(seconds):
"""Converts a number of seconds to a formatted string showing hours, minutes, and seconds."""
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours)} hours, {int(minutes)} minutes, {seconds:.2f} seconds"
14 changes: 4 additions & 10 deletions utils/types.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
from typing import Optional, TypedDict


class VideoPaths(TypedDict):
raw: str
compressed: str
crossfaded: Optional[str]
from typing import TypedDict


class PathsDict(TypedDict, total=False):
intro: VideoPaths
base: VideoPaths
outro: VideoPaths
intro: str
base: str
outro: str