-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathstems.py
283 lines (232 loc) · 10.8 KB
/
stems.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from musicai_sdk import MusicAiClient
import os
import dotenv
import requests
import concurrent.futures
import re
import json
from typing import List, Tuple, Optional
dotenv.load_dotenv()
def sanitize_filename(filename: str) -> str:
"""
Sanitiza o nome do arquivo removendo caracteres inválidos.
"""
# Remover caracteres não permitidos em nomes de arquivos
sanitized = re.sub(r'[\\/*?:"<>|]', "", filename)
# Substituir espaços por underscores
sanitized = sanitized.replace(' ', '_')
return sanitized
def separate_guitar(file_path: str) -> Tuple[str, float, str, str]:
"""
Processa um arquivo de áudio para separar a guitarra e extrair características.
Args:
file_path: Caminho para o arquivo de áudio
Returns:
Tuple com (URL do áudio processado, BPM, tonalidade, nome do arquivo)
"""
try:
# Obter o nome do arquivo original (sem sanitizar)
original_name = os.path.basename(file_path)
if '.mp3' in original_name:
original_name = original_name.replace('.mp3','')
# Sanitizar o nome para uso interno e nos caminhos dos arquivos
sanitized_name = sanitize_filename(original_name)
key = os.getenv('API_MUSIC_AI')
if not key:
print("Erro: API_MUSIC_AI não encontrada nas variáveis de ambiente")
# Retornar valores simulados para testes
print("⚠️ Usando valores simulados para teste")
bpm = 120.0
root_key = "C"
audio_url = f"https://example.com/stems/{sanitized_name}.wav"
# Retorna o nome original, não o sanitizado
return audio_url, bpm, root_key, original_name
client = MusicAiClient(api_key=key)
# Get application info
app_info = client.get_application_info()
print('Application Info:', app_info)
# Upload local file
file_url = client.upload_file(file_path=file_path)
print(f'Arquivo "{original_name}" enviado: {file_url}')
# Create Job
workflow_params = {
'inputUrl': file_url,
}
create_job_info = client.create_job(job_name='ICD Project', workflow_id='icd-project', params=workflow_params)
job_id = create_job_info['id']
# Wait for job to complete
job_info = client.wait_for_job_completion(job_id)
# Get job info
job_info = client.get_job(job_id=job_id)
result = job_info['result']
# Extrair dados e garantir tipos corretos
audio_url = result.get('Audio')
bpm = float(result.get('BPM', 0))
root_key = str(result.get('Root key', ''))
print(f'Processamento concluído para "{original_name}": BPM={bpm}, Tonalidade={root_key}')
print(f'Valores retornados: URL={audio_url}, BPM={bpm}, Tonalidade={root_key}, nome={original_name}')
# Salvar os dados em um arquivo JSON para referência/debug
debug_data = {
'file_path': file_path,
'name': original_name,
'sanitized_name': sanitized_name,
'result': {
'url': audio_url,
'bpm': bpm,
'root_key': root_key
}
}
os.makedirs('debug', exist_ok=True)
with open(f'debug/{sanitized_name}_result.json', 'w') as f:
json.dump(debug_data, f, indent=2)
# Retorna o nome original, não o sanitizado
return audio_url, bpm, root_key, original_name
except Exception as e:
print(f"Erro ao processar {file_path}: {e}")
# Obter o nome original mesmo em caso de erro
original_name = os.path.basename(file_path)
if '.mp3' in original_name:
original_name = original_name.replace('.mp3','')
# Retornar valores simulados para testes em caso de erro
print("⚠️ Usando valores simulados para teste devido a erro")
bpm = 120.0
root_key = "C"
sanitized_name = sanitize_filename(original_name)
audio_url = f"https://example.com/stems/{sanitized_name}.wav"
# Retorna o nome original, não o sanitizado
return audio_url, bpm, root_key, original_name
def download_stem(url: str, music_name: str, output_dir: str = "guitar_stems") -> str:
"""
Baixa um stem de áudio da URL fornecida.
Args:
url: URL do arquivo de áudio
music_name: Nome da música
output_dir: Diretório de saída
Returns:
Caminho para o arquivo baixado
"""
try:
if not url:
print(f"URL inválida para música: {music_name}")
return None
# Sanitizar o nome do arquivo
music_name = sanitize_filename(music_name)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Garantir que o nome do arquivo tenha uma extensão
if not music_name.endswith('.mp3') and not music_name.endswith('.wav'):
music_name = f"{music_name}.wav"
output_path = os.path.join(output_dir, music_name)
# Se a URL for simulada (para testes), criar um arquivo vazio
if url.startswith("https://example.com"):
print(f"URL simulada detectada. Criando arquivo vazio para {music_name}")
with open(output_path, 'wb') as f:
f.write(b'0' * 1024) # 1KB de zeros
return output_path
# Adicionar um timeout para a requisição
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status() # Check if the request was successful
with open(output_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f"Stem baixado para: {output_path}")
return output_path
except requests.RequestException as e:
print(f"Erro na requisição HTTP ao baixar stem de {url}: {e}")
# Criar um arquivo vazio em caso de erro
if music_name and output_dir:
try:
if not os.path.exists(output_dir):
os.makedirs(output_dir)
if not music_name.endswith('.mp3') and not music_name.endswith('.wav'):
music_name = f"{music_name}.wav"
output_path = os.path.join(output_dir, music_name)
with open(output_path, 'wb') as f:
f.write(b'0' * 1024) # 1KB de zeros
print(f"Criado arquivo vazio para {music_name} após erro")
return output_path
except Exception as inner_e:
print(f"Erro ao criar arquivo vazio: {inner_e}")
return None
except Exception as e:
print(f"Erro ao baixar stem de {url} para {music_name}: {e}")
return None
def process_audio_files(file_paths: List[str], max_workers: int = 2) -> List[Tuple]:
"""
Processa múltiplos arquivos de áudio em paralelo.
Args:
file_paths: Lista de caminhos para os arquivos de áudio
max_workers: Número máximo de workers para paralelização
Returns:
Lista de tuplas com (URL, BPM, tonalidade, nome) para cada arquivo
"""
resultados = []
if not file_paths:
print("Nenhum arquivo de áudio fornecido para processamento")
return resultados
print(f"Iniciando processamento de {len(file_paths)} arquivos com {max_workers} workers...")
# Usar ThreadPoolExecutor em vez de ProcessPoolExecutor para evitar problemas de pickling
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submeter todos os arquivos para processamento
futures = {executor.submit(separate_guitar, file_path): file_path for file_path in file_paths}
# Coletar os resultados na ordem de conclusão
for i, future in enumerate(concurrent.futures.as_completed(futures)):
try:
original_path = futures[future]
result = future.result()
print(f"Processado {i+1}/{len(file_paths)}: {os.path.basename(original_path)}")
print(f"Resultado: {result}")
resultados.append(result)
except Exception as e:
print(f"Erro não tratado no processamento: {e}")
# Adicionar um resultado vazio para manter a correspondência com a entrada
basename = os.path.basename(futures[future])
name = sanitize_filename(basename.replace('.mp3', ''))
print(f"Usando valores simulados para {basename}")
bpm = 120.0
root_key = "C"
audio_url = f"https://example.com/stems/{name}.wav"
resultados.append((audio_url, bpm, root_key, name))
return resultados
def download_stems_parallel(urls_and_names: List[Tuple[str, str]], output_dir: str = "guitar_stems", max_workers: int = 2) -> List[str]:
"""
Baixa múltiplos stems em paralelo.
Args:
urls_and_names: Lista de tuplas (url, nome) para baixar
output_dir: Diretório de saída
max_workers: Número máximo de workers para paralelização
Returns:
Lista de caminhos para os arquivos baixados
"""
resultados = []
if not urls_and_names:
print("Nenhum stem para baixar")
return resultados
print(f"Iniciando download de {len(urls_and_names)} stems com {max_workers} workers...")
# Usar ThreadPoolExecutor para operações I/O-bound
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submeter todos os downloads, garantindo que os nomes estejam sanitizados
futures = {
executor.submit(
download_stem,
url,
sanitize_filename(name),
output_dir
): (url, name) for url, name in urls_and_names
}
# Coletar os resultados na ordem de conclusão
for i, future in enumerate(concurrent.futures.as_completed(futures)):
try:
url, name = futures[future]
result = future.result()
print(f"Download {i+1}/{len(urls_and_names)} concluído: {name}")
print(f"Caminho salvo: {result}")
resultados.append(result)
except Exception as e:
url, name = futures[future]
print(f"Erro não tratado no download de {name}: {e}")
resultados.append(None)
return resultados
if __name__ == "__main__":
url, bpm, key, name = separate_guitar(r'/root/projeto-dados/icd/MusicData/audios/Vundabar - "Alien Blues" (Official Video).mp3')
audio_path = download_stem(url, name)