-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
459 lines (416 loc) · 24.7 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
import os
from groq import Groq
import streamlit as st
from PIL import Image
import tempfile
import base64
import pypdfium2 as pdfium
from io import BytesIO
from pytesseract import image_to_string
import pytesseract
import cv2
import numpy as np
import json
from streamlit_ace import st_ace # Importando o componente Ace
import re # Importado para regex no parsing
# Configurar o caminho do Tesseract OCR
pytesseract.pytesseract.tesseract_cmd = r'/usr/bin/tesseract'
# Função para codificar bytes de imagem em base64
def encode_image(image_bytes):
return base64.b64encode(image_bytes).decode('utf-8')
# Função para inverter as cores da imagem
def invert_image_color(image_bytes):
# Converter bytes de imagem em um array NumPy
nparr = np.frombuffer(image_bytes, np.uint8)
# Decodificar a imagem para o formato OpenCV
image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
# Inverter as cores
inverted_image = 255 - image
# Codificar a imagem invertida de volta para bytes
_, buffer = cv2.imencode('.jpg', inverted_image)
inverted_image_bytes = buffer.tobytes()
return inverted_image_bytes
# Função para combinar duas imagens horizontalmente
def combine_images(image_bytes1, image_bytes2):
# Abrir as duas imagens
image1 = Image.open(BytesIO(image_bytes1))
image2 = Image.open(BytesIO(image_bytes2))
# Garantir que ambas as imagens tenham a mesma altura
if image1.height != image2.height:
# Redimensionar a segunda imagem para a altura da primeira
aspect_ratio = image2.width / image2.height
new_width = int(image1.height * aspect_ratio)
image2 = image2.resize((new_width, image1.height))
# Combinar as imagens horizontalmente
combined_image = Image.new('RGB', (image1.width + image2.width, image1.height))
combined_image.paste(image1, (0, 0))
combined_image.paste(image2, (image1.width, 0))
# Salvar a imagem combinada em bytes
byte_arr = BytesIO()
combined_image.save(byte_arr, format='JPEG', optimize=True)
combined_image_bytes = byte_arr.getvalue()
return combined_image_bytes
# Função para converter PDF em imagens (sem aplicar inversão de cores)
def convert_pdf_to_images(file_path, scale=300/72):
pdf_file = pdfium.PdfDocument(file_path)
page_indices = [i for i in range(len(pdf_file))]
renderer = pdf_file.render(
pdfium.PdfBitmap.to_pil,
page_indices=page_indices,
scale=scale,
)
list_final_images = []
for i, image in zip(page_indices, renderer):
# Converter imagem PIL para bytes
image_byte_array = BytesIO()
image.save(image_byte_array, format='JPEG', optimize=True)
image_bytes = image_byte_array.getvalue()
list_final_images.append({i: image_bytes})
return list_final_images
# Função para extrair texto usando o Tesseract OCR (usando imagens originais)
def extract_text_with_pytesseract(list_dict_final_images):
image_list = [list(data.values())[0] for data in list_dict_final_images]
image_content = []
for index, image_bytes in enumerate(image_list):
# Abrir imagem a partir dos bytes
image = Image.open(BytesIO(image_bytes))
# Realizar OCR
raw_text = str(image_to_string(image, lang='por'))
image_content.append(raw_text)
return "\n".join(image_content)
def main():
st.set_page_config(layout="wide") # Ajusta o layout para largura total
st.title('OFM Extractor | llama 3.2')
# Inicializar o estado do Streamlit para armazenar o JSON gerado, editor e contador de versões
if 'generated_json' not in st.session_state:
st.session_state.generated_json = ""
if 'ace_json_editor' not in st.session_state:
st.session_state.ace_json_editor = ""
if 'last_uploaded_file' not in st.session_state:
st.session_state.last_uploaded_file = None
if 'images_list' not in st.session_state:
st.session_state.images_list = []
if 'ocr_text' not in st.session_state:
st.session_state.ocr_text = ""
if 'editor_version' not in st.session_state:
st.session_state.editor_version = 0 # Inicializa o contador de versões
# Obter a chave de API da variável de ambiente
api_key = os.getenv('GROQ_API_KEY')
if not api_key:
st.error('A chave de API da Groq não foi encontrada. Verifique as configurações de segredos no Streamlit Cloud.')
return
# Inicializar o cliente Groq com a chave de API
client = Groq(api_key=api_key)
uploaded_file = st.file_uploader("Escolha uma imagem ou PDF...", type=["jpg", "jpeg", "png", "pdf"])
# Verificar se um novo arquivo foi carregado
if uploaded_file is not None:
# Verificar se o arquivo atual é diferente do último carregado
if st.session_state.last_uploaded_file != uploaded_file.name:
# Atualizar o último arquivo carregado
st.session_state.last_uploaded_file = uploaded_file.name
# Limpar o JSON gerado anteriormente, editor e outros estados
st.session_state.generated_json = ""
st.session_state.ace_json_editor = ""
st.session_state.images_list = []
st.session_state.ocr_text = ""
st.session_state.editor_version += 1 # Incrementa o contador de versões
else:
# Se nenhum arquivo estiver carregado, limpar todos os estados
if st.session_state.generated_json != "":
st.session_state.generated_json = ""
if st.session_state.last_uploaded_file is not None:
st.session_state.last_uploaded_file = None
if st.session_state.images_list != []:
st.session_state.images_list = []
if st.session_state.ocr_text != "":
st.session_state.ocr_text = ""
if st.session_state.ace_json_editor != "":
st.session_state.ace_json_editor = ""
if st.session_state.editor_version != 0:
st.session_state.editor_version = 0 # Reseta o contador de versões
if uploaded_file is not None:
try:
with st.spinner('Processando o arquivo...'):
if uploaded_file.type == "application/pdf":
# É um PDF
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(uploaded_file.getbuffer())
tmp_file_path = tmp_file.name
# Converter PDF em imagens (sem inversão de cores)
st.session_state.images_list = convert_pdf_to_images(tmp_file_path)
# Extrair texto usando OCR das imagens originais
st.session_state.ocr_text = extract_text_with_pytesseract(st.session_state.images_list)
# Obter a primeira imagem (página) do PDF
first_image_bytes = list(st.session_state.images_list[0].values())[0]
# Aplicar inversão de cores à primeira imagem para o LLM
enhanced_image_bytes = invert_image_color(first_image_bytes)
# Combinar as imagens original e invertida
combined_image_bytes = combine_images(first_image_bytes, enhanced_image_bytes)
# Codificar a imagem combinada em base64
base64_combined_image = encode_image(combined_image_bytes)
else:
# É uma imagem
image_bytes = uploaded_file.getvalue()
# Realizar OCR na imagem original
image = Image.open(BytesIO(image_bytes))
st.session_state.ocr_text = image_to_string(image, lang='por')
# Aplicar inversão de cores à imagem para o LLM
enhanced_image_bytes = invert_image_color(image_bytes)
# Combinar as imagens original e invertida
combined_image_bytes = combine_images(image_bytes, enhanced_image_bytes)
# Codificar a imagem combinada em base64
base64_combined_image = encode_image(combined_image_bytes)
# Definições dos campos do JSON (mantido igual)
field_definitions = (
"Definições dos campos do JSON a serem preenchidos:\n\n"
"1. numeroRps (String) - Número do RPS que gerou a nota fiscal de saída de serviço. Obrigatório: Sim\n"
"2. numeroNota (String) - Número da nota fiscal de saída de serviço. Obrigatório: Sim\n"
"3. dataEmissao (String) - Data de emissão da nota fiscal de saída (Formato: DD/MM/YYYY HH24:MI:SS). Obrigatório: Sim\n"
"4. codigoSerie (String) - Código da série da nota fiscal de serviço. Obrigatório: Não\n"
"5. descricaoSerie (String) - Descrição da série da nota fiscal de serviço. Obrigatório: Não\n"
"6. codigoModelo (String) - Código do modelo da nota fiscal de serviço. Obrigatório: Sim\n"
"7. descricaoModelo (String) - Descrição do modelo da nota fiscal de serviço. Obrigatório: Não\n"
"8. cnpjCliente (String) - CNPJ do cliente da nota fiscal de serviço. Obrigatório: Não\n"
"9. razaoCliente (String) - Razão social do cliente da nota fiscal de serviço. Obrigatório: Não\n"
"10. codIbgeEstadoServico (String) - Código IBGE do estado da execução do serviço. Obrigatório: Sim\n"
"11. codIbgeCidadeServico (String) - Código IBGE da cidade da execução do serviço. Obrigatório: Sim\n"
"12. tipoTributacaoIss (String) - Tipo de Tributação do ISS (1 a 9). Obrigatório: Sim\n"
" Valores possíveis:\n"
" 1. Tributado no Município\n"
" 2. Tributado fora do Município\n"
" 3. Tributado no Município Isento\n"
" 4. Tributado fora do Município Isento\n"
" 5. Tributado no Município Imune\n"
" 6. Tributado fora do Município Imune\n"
" 7. Tributado no Município Suspensa\n"
" 8. Tributado fora do Município Suspensa\n"
" 9. Exp Servicos\n"
"13. valorNotaFiscal (BigDecimal) - Valor da nota fiscal de serviço. Obrigatório: Sim\n"
"14. valorMulta (BigDecimal) - Valor da multa na nota fiscal de serviço. Obrigatório: Não\n"
"15. valorDesconto (BigDecimal) - Valor do desconto na nota fiscal de serviço. Obrigatório: Não\n"
"16. termoRecebimento (String) - Descrição do termo de recebimento da nota fiscal de serviço integrado com o Fusion (Oracle). Obrigatório: Não\n"
"17. observacao (String) - Descrição da observação da nota fiscal de serviço. Obrigatório: Sim\n\n"
"Servicos:\n"
"1. codigoTipoServico (String) - Código do Tipo de Serviço na nota fiscal. Obrigatório: Sim\n"
"2. descricaoTipoServico (String) - Descrição do Tipo de Serviço na nota fiscal. Obrigatório: Sim\n"
"3. codigoServico (String) - Código do Serviço na nota fiscal. Obrigatório: Sim\n"
"4. descricaoServico (String) - Descrição do Serviço na nota fiscal. Obrigatório: Sim\n"
"5. quantidadeServico (BigDecimal) - Quantidade do serviço na nota fiscal. Obrigatório: Sim\n"
"6. valorServico (BigDecimal) - Valor do serviço na nota fiscal. Obrigatório: Sim\n"
"7. valorTotalServico (BigDecimal) - Valor total do serviço na nota fiscal. Obrigatório: Sim\n\n"
"ImpostosRetido (Se houver):\n"
"1. indicadorImposto (String) - Tipo de imposto (ex: COFINS, PIS/PASEP, ISS, INSS-PJ, INSS-PF, IRRF-PF, IRRF-PJ, CSLL). Obrigatório se houver informação no documento.\n"
"2. codigoReceita (String) - Código da Receita. Obrigatório: Não\n"
"3. indicadorRetencao (String) - Indica se o imposto possui Retenção. Obrigatório se houver informação no documento.\n"
"4. vlrBaseImposto (BigDecimal) - Valor base do Imposto. Obrigatório se houver informação no documento.\n"
"5. aliquotaImposto (BigDecimal) - Alíquota do Imposto. Obrigatório se houver informação no documento.\n"
"6. vlrImposto (BigDecimal) - Valor do Imposto. Obrigatório se houver informação no documento.\n\n"
"Titulos:\n"
"1. numeroTitulo (String) - Informar o número do título. Obrigatório: Não\n"
"2. dataVencimento (Data) - Data de vencimento do título (Formato: DD/MM/YYYY). Obrigatório: Não\n"
"3. cnpjCpfCredorTitulo (String) - CNPJ/CPF do credor do título. Obrigatório: Não\n"
"4. valorTitulo (BigDecimal) - Valor do título. Obrigatório: Não\n"
"5. indicadorTipoTitulo (String) - Tipo do título ('P' - Título do credor principal, 'R' - Título de retenção). Obrigatório: Não\n"
)
# Construir o conteúdo da mensagem com o entendimento de que os arquivos podem não seguir um padrão
message_content = [
{
"type": "text",
"text": (
"Por favor, analise o documento fornecido e extraia todas as informações relevantes necessárias para preencher o JSON abaixo. "
"Note que os arquivos fornecidos podem não seguir um padrão específico, portanto, é importante buscar as informações pertinentes para preencher o JSON, independentemente do formato do documento. "
"Utilize tanto o conteúdo das imagens (original e tratada) quanto o texto extraído via OCR fornecido abaixo. "
"Corrija todas as incongruências entre o OCR e as imagens para chegar ao melhor resultado possível. "
"Preencha o JSON abaixo com os dados extraídos. "
"Não inclua descrições adicionais, apenas preencha o JSON seguindo exatamente a estrutura apresentada. "
"Sempre responda **apenas com o JSON**, sem incluir qualquer texto adicional ou explicações.\n\n"
"Certifique-se de preencher todos os campos com as informações extraídas do documento fornecido e siga a estrutura exata para garantir a compatibilidade com o sistema de integração. "
"Por favor, concentre-se apenas nas informações do arquivo; não invente dados. Para os campos que não tiverem informação no arquivo, deixe vazio como \"\".\n\n"
f"{field_definitions}\n"
"Conteúdo extraído via OCR (pode conter erros):\n"
f"{st.session_state.ocr_text}\n\n"
"Estrutura do JSON a ser preenchido, caso não tenha informações suficientes de seção, exiba apenas o array vazio []:\n"
"[\n"
" {\n"
" \"numeroRps\": \"\",\n"
" \"numeroNota\": \"\",\n"
" \"dataEmissao\": \"\",\n"
" \"codigoSerie\": \"\",\n"
" \"descricaoSerie\": \"\",\n"
" \"codigoModelo\": \"\",\n"
" \"descricaoModelo\": \"\",\n"
" \"cnpjCliente\": \"\",\n"
" \"razaoCliente\": \"\",\n"
" \"codIbgeEstadoServico\": \"\",\n"
" \"codIbgeCidadeServico\": \"\",\n"
" \"tipoTributacaoIss\": \"\",\n"
" \"valorNotaFiscal\": 0,\n"
" \"valorMulta\": 0,\n"
" \"valorDesconto\": 0,\n"
" \"termoRecebimento\": \"\",\n"
" \"observacao\": \"\",\n"
" \"Servicos\": [\n"
" {\n"
" \"codigoTipoServico\": \"\",\n"
" \"descricaoTipoServico\": \"\",\n"
" \"codigoServico\": \"\",\n"
" \"descricaoServico\": \"\",\n"
" \"quantidadeServico\": 0,\n"
" \"valorServico\": 0,\n"
" \"valorTotalServico\": 0,\n"
" \"cstSpedEfdSaida\": 0,\n"
" \"aliqPisSpedEfdSaida\": 0.0,\n"
" \"aliqCofinsSpedEfdSaida\": 0.0\n"
" }\n"
" ],\n"
" \"CodigoReceita\": [\n"
" {\n"
" \"codigoReceita\": \"\"\n"
" },\n"
" {\n"
" \"codigoReceita\": \"\"\n"
" }\n"
" ],\n"
" \"ImpostosRetido\": [\n"
" {\n"
" \"indicadorImposto\": \"\",\n"
" \"codigoReceita\": \"\",\n"
" \"indicadorRetencao\": \"\",\n"
" \"vlrBaseImposto\": 0,\n"
" \"aliquotaImposto\": 0.0,\n"
" \"vlrImposto\": 0.0\n"
" }\n"
" ],\n"
" \"Titulos\": [\n"
" {\n"
" \"numeroTitulo\": \"\",\n"
" \"dataVencimento\": \"\",\n"
" \"cnpjCpfCredorTitulo\": \"\",\n"
" \"valorTitulo\": 0,\n"
" \"indicadorTipoTitulo\": \"\"\n"
" }\n"
" ]\n"
" }\n"
"]"
)
}
]
# Adicionar a imagem combinada ao conteúdo da mensagem
message_content.append(
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_combined_image}"
}
}
)
# Função para gerar o JSON usando a API Groq
def generate_json():
completion = client.chat.completions.create(
model="llama-3.2-11b-vision-preview",
messages=[
{
"role": "user",
"content": message_content
}
],
temperature=0, # Definir como 0 para saída determinística
max_tokens=8000, # Ajuste conforme necessário
top_p=1,
stream=False,
stop=None,
)
raw_response = completion.choices[0].message.content
# Extrair apenas o JSON da resposta usando regex
json_match = re.search(r'\[.*\]', raw_response, re.DOTALL)
if json_match:
raw_json = json_match.group(0)
else:
raw_json = raw_response # Fallback se não encontrar padrão
try:
# Parsear o JSON retornado para garantir que está válido
parsed_json = json.loads(raw_json)
# Reformatar o JSON com indentação
pretty_json = json.dumps(parsed_json, indent=4, ensure_ascii=False)
# Atualizar tanto o JSON gerado quanto o editor de JSON
st.session_state.generated_json = pretty_json
st.session_state.ace_json_editor = pretty_json
return pretty_json
except json.JSONDecodeError as e:
st.error(f"Erro ao parsear o JSON retornado pela API: {e}")
return raw_json # Retorna o JSON bruto mesmo que inválido
# Se o JSON ainda não foi gerado, faça a geração inicial
if st.session_state.generated_json == "":
with st.spinner('Gerando o JSON...'):
st.session_state.generated_json = generate_json()
# Exibir a imagem e o JSON lado a lado
col1, col2 = st.columns([1, 1]) # Ajuste as proporções conforme necessário
with col1:
st.subheader('Documento')
if uploaded_file.type == "application/pdf":
num_pages = len(st.session_state.images_list)
page_number = st.selectbox(
"Selecione a página do PDF para visualizar:",
options=range(1, num_pages + 1),
index=0
)
selected_image_bytes = list(st.session_state.images_list[page_number - 1].values())[0]
st.image(selected_image_bytes, caption=f'Página {page_number} do PDF carregado.', use_column_width=True)
else:
st.image(image_bytes, caption='Imagem carregada.', use_column_width=True)
with col2:
st.subheader('JSON Editor')
st.markdown('Para aplicar as alterações, utilize **CTRL+ENTER**.') # Descrição adicionada
# Botões lado a lado após a descrição
col_buttons = st.columns(2)
with col_buttons[0]:
# Botão para regenerar o JSON
if st.button('Regenerar JSON'):
with st.spinner('Regenerando o JSON...'):
st.session_state.generated_json = generate_json()
st.success('JSON regenerado com sucesso!')
with col_buttons[1]:
# Botão para baixar o JSON
try:
parsed_json = json.loads(st.session_state.generated_json)
json_valid = True
except json.JSONDecodeError as e:
json_valid = False
st.error(f"O JSON fornecido não é válido: {e}")
if json_valid:
st.download_button(
label="Baixar JSON",
data=json.dumps(parsed_json, indent=4, ensure_ascii=False),
file_name='extracted_data.json',
mime='application/json'
)
else:
st.warning("Corrija os erros no JSON para habilitar o download.")
# Editor de código Ace para edição do JSON com destaque de sintaxe
edited_json = st_ace(
value=st.session_state.ace_json_editor,
language='json',
theme='twilight', # Você pode escolher outros temas disponíveis
key=f'ace_json_editor_{st.session_state.editor_version}', # Chave única baseada na versão
height=900, # Aumentei a altura para melhor visualização
font_size=18,
show_gutter=True,
show_print_margin=True,
wrap=True,
)
# Atualizar o JSON no session_state se houver alterações
if edited_json and edited_json != st.session_state.ace_json_editor:
try:
parsed_json = json.loads(edited_json)
# Reformatar o JSON com indentação
pretty_json = json.dumps(parsed_json, indent=4, ensure_ascii=False)
st.session_state.generated_json = pretty_json
st.session_state.ace_json_editor = pretty_json
except json.JSONDecodeError as e:
# Se o JSON for inválido, manter o texto editado como está
st.session_state.ace_json_editor = edited_json
except Exception as e:
st.error('Ocorreu um erro no processamento. Por favor, tente novamente.')
st.write(str(e))
if __name__ == '__main__':
main()