-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathstorage.py
717 lines (612 loc) · 28.2 KB
/
storage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
import json
import os
from typing import List, Dict, Optional
from datetime import datetime, timedelta
import traceback
import logging
import redis
from utils import create_redis_client
import uuid
class StorageHandler:
# Chaves Redis para webhooks
WEBHOOK_KEY = "webhook_redirects" # Chave para armazenar os webhooks
WEBHOOK_STATS_KEY = "webhook_stats" # Chave para estatísticas
def __init__(self):
# Configuração de logger
self.logger = logging.getLogger("StorageHandler")
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
self.logger.info("StorageHandler inicializado.")
# Conexão com o Redis
self.redis = create_redis_client()
# Retenção de logs e backups
self.log_retention_hours = int(os.getenv('LOG_RETENTION_HOURS', 48))
self.backup_retention_days = int(os.getenv('BACKUP_RETENTION_DAYS', 7))
# Garantir valores padrão para configurações de idioma
if not self.redis.exists(self._get_redis_key("auto_translation")):
self.redis.set(self._get_redis_key("auto_translation"), "false")
if not self.redis.exists(self._get_redis_key("auto_language_detection")):
self.redis.set(self._get_redis_key("auto_language_detection"), "false")
def _get_redis_key(self, key):
return f"transcrevezap:{key}"
def add_log(self, level: str, message: str, metadata: dict = None):
log_entry = {
"timestamp": datetime.now().isoformat(),
"level": level,
"message": message,
"metadata": json.dumps(metadata) if metadata else None
}
self.redis.lpush(self._get_redis_key("logs"), json.dumps(log_entry))
self.redis.ltrim(self._get_redis_key("logs"), 0, 999) # Manter apenas os últimos 1000 logs
self.logger.log(getattr(logging, level.upper(), logging.INFO), f"{message} | Metadata: {metadata}")
def get_allowed_groups(self) -> List[str]:
return self.redis.smembers(self._get_redis_key("allowed_groups"))
def add_allowed_group(self, group: str):
self.redis.sadd(self._get_redis_key("allowed_groups"), group)
def remove_allowed_group(self, group: str):
self.redis.srem(self._get_redis_key("allowed_groups"), group)
def get_blocked_users(self) -> List[str]:
return self.redis.smembers(self._get_redis_key("blocked_users"))
def add_blocked_user(self, user: str):
self.redis.sadd(self._get_redis_key("blocked_users"), user)
def remove_blocked_user(self, user: str):
self.redis.srem(self._get_redis_key("blocked_users"), user)
def get_statistics(self) -> Dict:
total_processed = int(self.redis.get(self._get_redis_key("total_processed")) or 0)
last_processed = self.redis.get(self._get_redis_key("last_processed"))
daily_count = json.loads(self.redis.get(self._get_redis_key("daily_count")) or "{}")
group_count = json.loads(self.redis.get(self._get_redis_key("group_count")) or "{}")
user_count = json.loads(self.redis.get(self._get_redis_key("user_count")) or "{}")
error_count = int(self.redis.get(self._get_redis_key("error_count")) or 0)
success_rate = float(self.redis.get(self._get_redis_key("success_rate")) or 100.0)
return {
"total_processed": total_processed,
"last_processed": last_processed,
"stats": {
"daily_count": daily_count,
"group_count": group_count,
"user_count": user_count,
"error_count": error_count,
"success_rate": success_rate,
}
}
def can_process_message(self, remote_jid):
try:
allowed_groups = self.get_allowed_groups()
blocked_users = self.get_blocked_users()
if remote_jid in blocked_users:
return False
if "@g.us" in remote_jid and remote_jid not in allowed_groups:
return False
return True
except Exception as e:
self.logger.error(f"Erro ao verificar se pode processar mensagem: {e}")
return False
def record_processing(self, remote_jid):
try:
# Incrementar total processado
self.redis.incr(self._get_redis_key("total_processed"))
# Atualizar último processamento
self.redis.set(self._get_redis_key("last_processed"), datetime.now().isoformat())
# Atualizar contagem diária
today = datetime.now().strftime("%Y-%m-%d")
daily_count = json.loads(self.redis.get(self._get_redis_key("daily_count")) or "{}")
daily_count[today] = daily_count.get(today, 0) + 1
self.redis.set(self._get_redis_key("daily_count"), json.dumps(daily_count))
# Atualizar contagem de grupo ou usuário
if "@g.us" in remote_jid:
group_count = json.loads(self.redis.get(self._get_redis_key("group_count")) or "{}")
group_count[remote_jid] = group_count.get(remote_jid, 0) + 1
self.redis.set(self._get_redis_key("group_count"), json.dumps(group_count))
else:
user_count = json.loads(self.redis.get(self._get_redis_key("user_count")) or "{}")
user_count[remote_jid] = user_count.get(remote_jid, 0) + 1
self.redis.set(self._get_redis_key("user_count"), json.dumps(user_count))
# Atualizar taxa de sucesso
total = int(self.redis.get(self._get_redis_key("total_processed")) or 0)
errors = int(self.redis.get(self._get_redis_key("error_count")) or 0)
success_rate = ((total - errors) / total) * 100 if total > 0 else 100
self.redis.set(self._get_redis_key("success_rate"), success_rate)
except Exception as e:
self.logger.error(f"Erro ao registrar processamento: {e}")
def record_error(self):
self.redis.incr(self._get_redis_key("error_count"))
def clean_old_logs(self):
try:
cutoff_time = datetime.now() - timedelta(hours=self.log_retention_hours)
logs = self.redis.lrange(self._get_redis_key("logs"), 0, -1)
for log in logs:
log_entry = json.loads(log)
if datetime.fromisoformat(log_entry["timestamp"]) < cutoff_time:
self.redis.lrem(self._get_redis_key("logs"), 0, log)
else:
break # Assumindo que os logs estão ordenados por tempo
except Exception as e:
self.logger.error(f"Erro ao limpar logs antigos: {e}")
def backup_data(self):
try:
data = {
"allowed_groups": list(self.get_allowed_groups()),
"blocked_users": list(self.get_blocked_users()),
"statistics": self.get_statistics(),
}
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_key = f"backup:{timestamp}"
self.redis.set(backup_key, json.dumps(data))
self.redis.expire(backup_key, self.backup_retention_days * 24 * 60 * 60) # Expira após os dias de retenção
except Exception as e:
self.logger.error(f"Erro ao criar backup: {e}")
def clean_old_backups(self):
try:
for key in self.redis.scan_iter("backup:*"):
if self.redis.ttl(key) <= 0:
self.redis.delete(key)
except Exception as e:
self.logger.error(f"Erro ao limpar backups antigos: {e}")
# Método de rotação de chaves groq
def get_groq_keys(self) -> List[str]:
"""Obtém todas as chaves GROQ armazenadas."""
return list(self.redis.smembers(self._get_redis_key("groq_keys")))
def add_groq_key(self, key: str):
"""Adiciona uma nova chave GROQ ao conjunto."""
if key and key.startswith("gsk_"):
self.redis.sadd(self._get_redis_key("groq_keys"), key)
return True
return False
def remove_groq_key(self, key: str):
"""Remove uma chave GROQ do conjunto."""
self.redis.srem(self._get_redis_key("groq_keys"), key)
def get_next_groq_key(self) -> str:
"""
Obtém a próxima chave GROQ no sistema de rodízio.
Utiliza um contador no Redis para controlar a rotação.
"""
keys = self.get_groq_keys()
if not keys:
return None
# Obtém e incrementa o contador de rodízio
counter = int(self.redis.get(self._get_redis_key("groq_key_counter")) or "0")
next_counter = (counter + 1) % len(keys)
self.redis.set(self._get_redis_key("groq_key_counter"), str(next_counter))
return keys[counter % len(keys)]
def get_penalized_until(self, key: str) -> Optional[datetime]:
"""
Retorna o timestamp até quando a chave está penalizada, ou None se não estiver penalizada.
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = self.redis.get(penalized_key)
if penalized_until:
return datetime.fromisoformat(penalized_until)
return None
def penalize_key(self, key: str, penalty_duration: int):
"""
Penaliza uma chave por um tempo determinado (em segundos).
"""
penalized_key = self._get_redis_key(f"groq_key_penalized_{key}")
penalized_until = datetime.utcnow() + timedelta(seconds=penalty_duration)
self.redis.set(penalized_key, penalized_until.isoformat())
self.redis.expire(penalized_key, penalty_duration) # Expira a chave após o tempo de penalidade
self.add_log("INFO", "Chave GROQ penalizada", {
"key": key,
"penalized_until": penalized_until.isoformat()
})
def get_message_settings(self):
"""Obtém as configurações de mensagens."""
return {
"summary_header": self.redis.get(self._get_redis_key("summary_header")) or "🤖 *Resumo do áudio:*",
"transcription_header": self.redis.get(self._get_redis_key("transcription_header")) or "🔊 *Transcrição do áudio:*",
"output_mode": self.redis.get(self._get_redis_key("output_mode")) or "both",
"character_limit": int(self.redis.get(self._get_redis_key("character_limit")) or "500"),
}
def save_message_settings(self, settings: dict):
"""Salva as configurações de mensagens."""
for key, value in settings.items():
self.redis.set(self._get_redis_key(key), str(value))
def get_process_mode(self):
"""Retorna o modo de processamento configurado"""
mode = self.redis.get(self._get_redis_key("process_mode")) or "all"
self.logger.debug(f"Modo de processamento atual: {mode}")
return mode
def get_contact_language(self, contact_id: str) -> str:
"""
Obtém o idioma configurado para um contato específico.
O contact_id pode vir com ou sem @s.whatsapp.net
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
return self.redis.hget(self._get_redis_key("contact_languages"), contact_id)
def set_contact_language(self, contact_id: str, language: str):
"""
Define o idioma para um contato específico
"""
# Remover @s.whatsapp.net se presente
contact_id = contact_id.split('@')[0]
self.redis.hset(self._get_redis_key("contact_languages"), contact_id, language)
self.logger.info(f"Idioma {language} definido para o contato {contact_id}")
def get_all_contact_languages(self) -> dict:
"""
Retorna um dicionário com todos os contatos e seus idiomas configurados
"""
return self.redis.hgetall(self._get_redis_key("contact_languages"))
def remove_contact_language(self, contact_id: str):
"""
Remove a configuração de idioma de um contato
"""
contact_id = contact_id.split('@')[0]
self.redis.hdel(self._get_redis_key("contact_languages"), contact_id)
self.logger.info(f"Configuração de idioma removida para o contato {contact_id}")
def get_auto_language_detection(self) -> bool:
"""
Verifica se a detecção automática de idioma está ativada
"""
return self.redis.get(self._get_redis_key("auto_language_detection")) == "true"
def set_auto_language_detection(self, enabled: bool):
"""
Ativa ou desativa a detecção automática de idioma
"""
self.redis.set(self._get_redis_key("auto_language_detection"), str(enabled).lower())
self.logger.info(f"Detecção automática de idioma {'ativada' if enabled else 'desativada'}")
def get_auto_translation(self) -> bool:
"""
Verifica se a tradução automática está ativada
"""
return self.redis.get(self._get_redis_key("auto_translation")) == "true"
def set_auto_translation(self, enabled: bool):
"""
Ativa ou desativa a tradução automática
"""
self.redis.set(self._get_redis_key("auto_translation"), str(enabled).lower())
self.logger.info(f"Tradução automática {'ativada' if enabled else 'desativada'}")
def record_language_usage(self, language: str, from_me: bool, auto_detected: bool = False):
"""
Registra estatísticas de uso de idiomas
Args:
language: Código do idioma (ex: 'pt', 'en')
from_me: Se o áudio foi enviado por nós
auto_detected: Se o idioma foi detectado automaticamente
"""
try:
# Validar idioma
if not language:
self.add_log("WARNING", "Tentativa de registrar uso sem idioma definido")
return
# Incrementar contagem total do idioma
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_total",
1
)
# Incrementar contagem por direção (enviado/recebido)
direction = 'sent' if from_me else 'received'
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_{direction}",
1
)
# Se foi detecção automática, registrar
if auto_detected:
self.redis.hincrby(
self._get_redis_key("language_stats"),
f"{language}_auto_detected",
1
)
# Registrar última utilização
self.redis.hset(
self._get_redis_key("language_stats"),
f"{language}_last_used",
datetime.now().isoformat()
)
# Log detalhado
self.add_log("DEBUG", "Uso de idioma registrado", {
"language": language,
"direction": direction,
"auto_detected": auto_detected
})
except Exception as e:
self.add_log("ERROR", "Erro ao registrar uso de idioma", {
"error": str(e),
"type": type(e).__name__
})
def get_language_statistics(self) -> Dict:
"""
Obtém estatísticas de uso de idiomas
"""
try:
stats_raw = self.redis.hgetall(self._get_redis_key("language_stats"))
# Organizar estatísticas por idioma
stats = {}
for key, value in stats_raw.items():
lang, metric = key.split('_', 1)
if lang not in stats:
stats[lang] = {}
if metric == 'last_used':
stats[lang][metric] = value
else:
stats[lang][metric] = int(value)
return stats
except Exception as e:
self.logger.error(f"Erro ao obter estatísticas de idioma: {e}")
return {}
def cache_language_detection(self, contact_id: str, language: str, confidence: float = 1.0):
"""
Armazena em cache o idioma detectado para um contato
"""
contact_id = contact_id.split('@')[0]
cache_data = {
'language': language,
'confidence': confidence,
'timestamp': datetime.now().isoformat(),
'auto_detected': True
}
self.redis.hset(
self._get_redis_key("language_detection_cache"),
contact_id,
json.dumps(cache_data)
)
def get_cached_language(self, contact_id: str) -> Dict:
"""
Obtém o idioma em cache para um contato
Retorna None se não houver cache ou se estiver expirado
"""
contact_id = contact_id.split('@')[0]
cached = self.redis.hget(
self._get_redis_key("language_detection_cache"),
contact_id
)
if not cached:
return None
try:
data = json.loads(cached)
# Verificar se o cache expirou (24 horas)
cache_time = datetime.fromisoformat(data['timestamp'])
if datetime.now() - cache_time > timedelta(hours=24):
return None
return data
except:
return None
def get_webhook_redirects(self) -> List[Dict]:
"""Obtém todos os webhooks de redirecionamento cadastrados."""
webhooks_raw = self.redis.hgetall(self._get_redis_key("webhook_redirects"))
webhooks = []
for webhook_id, data in webhooks_raw.items():
webhook_data = json.loads(data)
webhook_data['id'] = webhook_id
webhooks.append(webhook_data)
return webhooks
def validate_webhook_url(self, url: str) -> bool:
"""Valida se a URL do webhook é acessível."""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
return all([parsed.scheme, parsed.netloc])
except Exception as e:
self.logger.error(f"URL inválida: {url} - {str(e)}")
return False
def add_webhook_redirect(self, url: str, description: str = "") -> str:
"""
Adiciona um novo webhook de redirecionamento.
Retorna o ID do webhook criado.
"""
webhook_id = str(uuid.uuid4())
webhook_data = {
"url": url,
"description": description,
"created_at": datetime.now().isoformat(),
"status": "active",
"error_count": 0,
"success_count": 0,
"last_success": None,
"last_error": None
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
return webhook_id
def clean_webhook_data(self, webhook_id: str):
"""
Remove todos os dados relacionados a um webhook específico do Redis.
Args:
webhook_id: ID do webhook a ser limpo
"""
try:
# Lista de chaves relacionadas ao webhook que precisam ser removidas
keys_to_remove = [
f"webhook_failed_{webhook_id}", # Entregas falhas
f"webhook_stats_{webhook_id}", # Estatísticas específicas
]
# Remove cada chave associada ao webhook
for key in keys_to_remove:
full_key = self._get_redis_key(key)
self.redis.delete(full_key)
self.logger.debug(f"Chave removida: {full_key}")
self.logger.info(f"Dados do webhook {webhook_id} limpos com sucesso")
except Exception as e:
self.logger.error(f"Erro ao limpar dados do webhook {webhook_id}: {str(e)}")
raise
def remove_webhook_redirect(self, webhook_id: str):
"""Remove um webhook de redirecionamento e todos os seus dados associados."""
try:
# Primeiro remove os dados associados
self.clean_webhook_data(webhook_id)
# Depois remove o webhook em si
self.redis.hdel(self._get_redis_key("webhook_redirects"), webhook_id)
self.logger.info(f"Webhook {webhook_id} removido com sucesso")
except Exception as e:
self.logger.error(f"Erro ao remover webhook {webhook_id}: {str(e)}")
raise
def update_webhook_stats(self, webhook_id: str, success: bool, error_message: str = None):
"""Atualiza as estatísticas de um webhook."""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
if success:
webhook_data["success_count"] += 1
webhook_data["last_success"] = datetime.now().isoformat()
else:
webhook_data["error_count"] += 1
webhook_data["last_error"] = {
"timestamp": datetime.now().isoformat(),
"message": error_message
}
self.redis.hset(
self._get_redis_key("webhook_redirects"),
webhook_id,
json.dumps(webhook_data)
)
except Exception as e:
self.logger.error(f"Erro ao atualizar estatísticas do webhook {webhook_id}: {e}")
def retry_failed_webhooks(self):
"""Tenta reenviar webhooks que falharam nas últimas 24h."""
webhooks = self.get_webhook_redirects()
for webhook in webhooks:
if webhook.get("last_error"):
error_time = datetime.fromisoformat(webhook["last_error"]["timestamp"])
if datetime.now() - error_time < timedelta(hours=24):
# Tentar reenviar
pass
def test_webhook(self, url: str) -> tuple[bool, str]:
"""
Testa um webhook antes de salvá-lo.
Retorna uma tupla (sucesso, mensagem)
"""
try:
import aiohttp
import asyncio
async def _test_webhook():
async with aiohttp.ClientSession() as session:
test_payload = {
"test": True,
"timestamp": datetime.now().isoformat(),
"message": "Teste de conexão do TranscreveZAP"
}
async with session.post(
url,
json=test_payload,
headers={"Content-Type": "application/json"},
timeout=10
) as response:
return response.status, await response.text()
status, response = asyncio.run(_test_webhook())
if status in [200, 201, 202]:
return True, "Webhook testado com sucesso!"
return False, f"Erro no teste: Status {status} - {response}"
except Exception as e:
return False, f"Erro ao testar webhook: {str(e)}"
def get_webhook_health(self, webhook_id: str) -> dict:
"""
Calcula métricas de saúde do webhook
"""
try:
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
total_requests = webhook_data["success_count"] + webhook_data["error_count"]
if total_requests == 0:
return {
"health_status": "unknown",
"error_rate": 0,
"success_rate": 0,
"total_requests": 0
}
error_rate = (webhook_data["error_count"] / total_requests) * 100
success_rate = (webhook_data["success_count"] / total_requests) * 100
# Definir status de saúde
if error_rate >= 50:
health_status = "critical"
elif error_rate >= 20:
health_status = "warning"
else:
health_status = "healthy"
return {
"health_status": health_status,
"error_rate": error_rate,
"success_rate": success_rate,
"total_requests": total_requests
}
except Exception as e:
self.logger.error(f"Erro ao calcular saúde do webhook {webhook_id}: {e}")
return None
def retry_webhook(self, webhook_id: str, payload: dict) -> bool:
"""
Tenta reenviar um payload para um webhook específico mantendo o payload original intacto.
Args:
webhook_id: ID do webhook para reenvio
payload: Payload original para reenvio
Returns:
bool: True se o reenvio foi bem sucedido, False caso contrário
"""
try:
import aiohttp
import asyncio
webhook_data = json.loads(
self.redis.hget(self._get_redis_key("webhook_redirects"), webhook_id)
)
async def _retry_webhook():
async with aiohttp.ClientSession() as session:
headers = {
"Content-Type": "application/json",
"X-TranscreveZAP-Forward": "true",
"X-TranscreveZAP-Webhook-ID": webhook_id,
"X-TranscreveZAP-Retry": "true"
}
async with session.post(
webhook_data["url"],
json=payload, # Envia o payload original sem modificações
headers=headers,
timeout=10
) as response:
return response.status in [200, 201, 202]
success = asyncio.run(_retry_webhook())
if success:
self.update_webhook_stats(webhook_id, True)
else:
self.update_webhook_stats(webhook_id, False, "Falha no retry")
return success
except Exception as e:
self.logger.error(f"Erro no retry do webhook {webhook_id}: {e}")
return False
def get_failed_deliveries(self, webhook_id: str) -> List[Dict]:
"""
Retorna lista de entregas falhas para um webhook
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed = self.redis.lrange(key, 0, -1)
return [json.loads(x) for x in failed]
def add_failed_delivery(self, webhook_id: str, payload: dict):
"""
Registra uma entrega falha para retry posterior
"""
key = self._get_redis_key(f"webhook_failed_{webhook_id}")
failed_delivery = {
"timestamp": datetime.now().isoformat(),
"payload": payload,
"retry_count": 0
}
self.redis.lpush(key, json.dumps(failed_delivery))
# Manter apenas as últimas 100 falhas
self.redis.ltrim(key, 0, 99)
def get_llm_provider(self) -> str:
"""Returns active LLM provider (groq or openai)"""
return self.redis.get(self._get_redis_key("active_llm_provider")) or "groq"
def set_llm_provider(self, provider: str):
"""Sets active LLM provider"""
if provider not in ["groq", "openai"]:
raise ValueError("Provider must be 'groq' or 'openai'")
self.redis.set(self._get_redis_key("active_llm_provider"), provider)
def get_openai_keys(self) -> List[str]:
"""Get stored OpenAI API keys"""
return list(self.redis.smembers(self._get_redis_key("openai_keys")))
def add_openai_key(self, key: str):
"""Add OpenAI API key"""
if key and key.startswith("sk-"):
self.redis.sadd(self._get_redis_key("openai_keys"), key)
return True
return False