-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsync.py
206 lines (171 loc) · 5.85 KB
/
sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import argparse
import os
import subprocess
import essentia.standard as es
import numpy as np
from PIL import Image
from easing import CubicEaseInOut, LinearInOut, QuadEaseInOut
def get_durations(beat_frames, ms_per_beat, n_frames, interpolation="linear"):
"""Calculate the duration in milliseconds needed to sync the output frames to the beat frames"""
if interpolation not in ["linear", "cubic", "quadratic"]:
raise ValueError(f"{interpolation=} not supported")
durations = []
for ix, frame in enumerate(beat_frames):
next_frame = (
beat_frames[ix + 1]
if ix < len(beat_frames) - 1
else n_frames + beat_frames[0]
)
n = next_frame - frame
lerp = LinearInOut(start=0, end=ms_per_beat, duration=n)
if interpolation == "cubic":
lerp = CubicEaseInOut(start=0, end=ms_per_beat, duration=n)
if interpolation == "quadratic":
lerp = QuadEaseInOut(start=0, end=ms_per_beat, duration=n)
x = np.arange(0, n + 1)
times = list(map(lerp, x))
durations.extend([i - j for i, j in zip(times[1:], times)])
# Re-align the sequence with the input frames
aligned = durations[beat_frames[0] :] + durations[: beat_frames[0]]
if any([i < 2 for i in aligned]):
print(
"WARNING: Durations less than 2ms are not processed well by ffmpeg (TODO: source?).\nTry using fewer beat frames or a different interpolation method."
)
return aligned
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="""Estimates the tempo of an audio file, then reassembles the frames of a GIF to sync its movement
to the beat.""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--audio_filepath",
type=str,
help="The path to the audio file.",
)
parser.add_argument(
"--gif_filepath",
type=str,
help="The path to the gif.",
)
parser.add_argument(
"--bpm",
type=float,
help="The BPM of the audio. Will be estimated if not passed.",
)
parser.add_argument(
"--beat_frames",
nargs="+",
help="The indices (zero-indexed) of the GIF frames to align with the beat.",
)
parser.add_argument(
"--tempo_multiplier",
type=float,
default=1.0,
help="A multiplier applied to the extracted tempo. Speeds up or slows down the animation.",
)
parser.add_argument(
"--interpolation",
type=str,
default="linear",
help="The method of interpolation to use. Options: [linear, cubic, quadratic]",
)
parser.add_argument(
"--output_directory",
type=str,
default=".",
help="The directory to which the output will be saved.",
)
args = parser.parse_args()
# Load audio
audio_filepath = args.audio_filepath
# Estimate BPM
audio_11khz = es.MonoLoader(filename=audio_filepath, sampleRate=11025)()
global_bpm = args.bpm
if not global_bpm:
global_bpm, local_bpm, local_probs = es.TempoCNN(
graphFilename="tempocnn/deeptemp-k16-3.pb"
)(audio_11khz)
if global_bpm == 0:
raise RuntimeError(f"Could not estimate BPM from {audio_filepath}.")
print(f"Estimated BPM: {global_bpm}")
beats_per_second = global_bpm / 60
beats_per_second *= args.tempo_multiplier
seconds_per_beat = 1 / beats_per_second
ms_per_beat = seconds_per_beat * 1000
# Load gif
gif_filepath = args.gif_filepath
im = Image.open(gif_filepath)
beat_frames = [int(i) for i in args.beat_frames]
# Get output frame durations in ms
durations = get_durations(
beat_frames, ms_per_beat, im.n_frames, interpolation=args.interpolation
)
# Create intermediate image & metadata files for ffmpeg in a temporary directory
gif_name = os.path.splitext(os.path.basename(gif_filepath))[0]
tmpdir = f"tmp_{gif_name}"
if not os.path.isdir(tmpdir):
os.mkdir(tmpdir)
tmp_txt = os.path.join(tmpdir, "input.txt")
tmp_vid = os.path.join(tmpdir, "tmp.mov")
with open(tmp_txt, "w") as fh:
try:
while 1:
ix = im.tell()
print(f"Saving frame {ix}")
img_filename = f"{ix}.png"
im.save(
os.path.join(tmpdir, img_filename),
duration=durations[ix],
disposal=3, # 3: Restore to previous content
)
fh.write(f"file '{img_filename}'\n")
fh.write(f"duration {durations[ix]}ms\n")
im.seek(ix + 1)
except EOFError:
pass
# Stitch the images together into a video
# TODO: preserve transparency channels of input PNGs when concatenating
subprocess.check_call(
[
"ffmpeg",
"-f",
"concat",
"-i",
tmp_txt,
"-vsync",
"vfr",
"-pix_fmt",
"yuv420p",
"-y",
tmp_vid,
]
)
audio_name = os.path.splitext(os.path.basename(audio_filepath))[0]
output_filepath = os.path.join(
args.output_directory, f"{audio_name}_{gif_name}.mp4"
)
# Add audio and loop the video to the length of the audio
subprocess.check_call(
[
"ffmpeg",
"-stream_loop",
"-1",
"-i",
tmp_vid,
"-i",
audio_filepath,
"-c:v",
"copy",
"-shortest",
"-map",
"0:v:0",
"-map",
"1:a:0",
"-y",
output_filepath,
]
)
# Clean up temporary files
subprocess.run(["rm", "-rf", f"{tmpdir}"])
print(f"Result saved to {output_filepath}")