-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathutils.py
1347 lines (1090 loc) · 53.8 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import base64
import concurrent.futures
from contextlib import closing
import hashlib
import io
import json
import os
import re
import tempfile
import time
import urllib.request
from html import escape
import chardet
from PIL import Image
from opensearchpy import NotFoundError
from opensearchpy.helpers import bulk
import pypandoc
from pypdf import PdfReader
import pptx
from libs import between_xml_tag
from config import Config
from clients import Clients
from usage import ModelUsage
class ImageNotFoundError(Exception):
pass
class Utils:
def __init__(self, config: Config, clients: Clients):
self.config = config
self.clients = clients
self.usage = ModelUsage()
def get_embedding(self, image_base64: str | None = None, input_text: str | None = None, multimodal: bool = False) -> list[float] | None:
"""
Generate an embedding vector for the given image and/or text input using Amazon Bedrock.
This method can handle text-only, image-only, or multimodal (text + image) inputs.
It selects the appropriate embedding model based on the input types and the multimodal flag.
Args:
image_base64 (str | None, optional): Base64-encoded image string. Defaults to None.
input_text (str | None, optional): Text input for embedding. Defaults to None.
multimodal (bool, optional): Flag to force multimodal embedding. Defaults to False.
Returns:
list[float] | None: The embedding vector as a list of floats, or None if no valid input is provided.
Raises:
Exception: If there's an error in the Bedrock API call.
"""
use_multimodal = False
body = {}
if input_text is not None:
body["inputText"] = input_text
if image_base64 is not None:
body["inputImage"] = image_base64
if multimodal or 'inputImage' in body:
bedrock_runtime_client = self.clients.bedrock_runtime_client_embedding_multimodal_model
embedding_model_id = self.config.EMBEDDING_MULTIMODAL_MODEL
use_multimodal = True
elif 'inputText' in body:
bedrock_runtime_client = self.clients.bedrock_runtime_client_embedding_text_model
embedding_model_id = self.config.EMBEDDING_TEXT_MODEL
else:
return None
response = bedrock_runtime_client.invoke_model(
body=json.dumps(body),
modelId=embedding_model_id,
accept="application/json", contentType="application/json",
)
response_body = json.loads(response.get('body').read())
finish_reason = response_body.get('message')
if finish_reason is not None:
print(finish_reason)
print(f"Body: {body}")
embedding_vector = response_body.get('embedding')
input_text_token_count = response_body.get('inputTextTokenCount')
if use_multimodal:
self.usage.update('inputMultimodalTokenCount', input_text_token_count)
else:
self.usage.update('inputTextTokenCount', input_text_token_count)
if 'inputImage' in body:
self.usage.update('inputImageCount', 1)
return embedding_vector
def add_to_multimodal_index(self, image: dict, image_base64: str) -> None:
"""
Add an image and its metadata to the multimodal index in OpenSearch.
This method computes an embedding vector for the image and its description,
then indexes this information along with other image metadata in OpenSearch.
It uses multimodal capabilities to create a combined embedding of the image and its textual description.
Args:
image (dict): A dictionary containing image metadata including:
- format (str): The image format (e.g., 'png', 'jpeg')
- filename (str): The name of the image file
- description (str): A textual description of the image
- id (str): A unique identifier for the image
image_base64 (str): The base64-encoded string representation of the image
Raises:
Exception: If there's an error during the indexing process
Note:
This method assumes that the OpenSearch client is properly initialized and configured.
The index name is determined by the configuration (self.config.MULTIMODAL_INDEX_NAME).
- id: A unique identifier for the image
image_base64 (str): The base64-encoded string representation of the image
Raises:
Any exceptions raised by the OpenSearch client or the get_embedding function.
Returns:
None. The function prints the indexing result to the console.
"""
embedding_vector = self.get_embedding(image_base64=image_base64, input_text=image['description'])
document = {
"format": image['format'],
"filename": image['filename'],
"description": image['description'],
"embedding_vector": embedding_vector,
}
response = self.clients.opensearch_client.index(
index=self.config.MULTIMODAL_INDEX_NAME,
body=document,
id=image['id'],
refresh=True,
)
print(f"Multimodel index result: {response['result']}")
def store_image(self, image_format: str, image_base64: str, import_image_id:str=''):
"""
Store an image in the file system and index it in the multimodal database.
This method takes a base64-encoded image, stores it in the file system,
generates a description using a text model, and indexes it in the multimodal database.
Args:
image_format (str): The format of the image (e.g., 'png', 'jpeg').
image_base64 (str): The base64-encoded string of the image.
import_image_id (str, optional): An ID to use for importing an existing image. Defaults to ''.
Returns:
dict: A dictionary containing the image metadata if successful, None if there's an ID mismatch.
Note:
- If the image already exists in the index, it returns the existing metadata without re-indexing.
"""
image_bytes = base64.b64decode(image_base64)
image_id = self.get_image_hash(image_bytes)
if import_image_id != '' and import_image_id != image_id:
print(f"Image ID mismatch: {import_image_id} != computed {image_id }")
return None
image_filename = self.config.IMAGE_PATH + image_id + '.' + image_format
if not os.path.exists(image_filename):
with open(image_filename, 'wb') as f:
f.write(image_bytes)
image = self.get_image_by_id(image_id)
if type(image) is dict:
print("Image already indexed.")
return image
if image is not None:
error_message = image
return error_message
# Short description to fit in multimodal embeddings
image_description = self.get_image_description(image_bytes, image_format)
image = {
"id": image_id,
"format": image_format,
"filename": image_filename,
"description": image_description,
}
self.add_to_multimodal_index(image, image_base64)
return image
def invoke_lambda_function(self, function_name: str, event: dict) -> tuple[dict, float]:
"""
Invoke an AWS Lambda function and return its output.
This method invokes a specified AWS Lambda function with the given event data,
retrieves the response, and returns the decoded output.
Args:
function_name (str): The name or ARN of the Lambda function to invoke.
event (dict): The event data to pass to the Lambda function.
Returns:
dict: The decoded output from the Lambda function.
"""
try:
start_time = time.time()
response = self.clients.lambda_client.invoke(
FunctionName=function_name,
Payload=json.dumps(event)
)
except Exception as e:
end_time = time.time()
elapsed_time = end_time - start_time
error_message = f"Error: {e}"
print(error_message)
return { "output": error_message }, elapsed_time
finally:
end_time = time.time()
elapsed_time = end_time - start_time
self.usage.update('functionCalls', 1)
self.usage.update('functionApproximateElapsedTime', elapsed_time)
print(self.usage)
# Get output from response
payload = response['Payload'].read().decode('utf-8')
body = json.loads(payload).get('body', '{}')
result = json.loads(body) # Contains the output (str) and images (list) keys
return result, elapsed_time
def get_image_bytes(self, image_source: str | bytes, format: str = "JPEG", max_image_size: int | None = None, max_image_dimension: int | None = None) -> bytes:
"""
Retrieve image bytes from a source and optionally resize the image.
This method can handle both URL and local file path sources. It will
resize the image if it exceeds the specified maximum size or dimension.
Args:
image_source (str): URL, local path, or base64-encoded string of the image.
format (str, optional): Image format to use when saving the image. Defaults to "JPEG".
max_image_size (int, optional): Maximum allowed size of the image in bytes.
max_image_dimension (int, optional): Maximum allowed dimension (width or height) of the image.
Returns:
bytes: The image data as bytes, potentially resized.
Note:
If resizing is necessary, the function will progressively reduce the image size
until it meets the specified constraints. The resized image is saved in JPEG format.
"""
def check_if_base64_image(image_source: any) -> io.BytesIO | None:
if not isinstance(image_source, str):
return None
try:
decoded_data = base64.b64decode(image_source)
original_image_bytes = io.BytesIO(decoded_data)
Image.open(original_image_bytes) # This fails if the image is not valid
return original_image_bytes
except:
return None
image_bytes = check_if_base64_image(image_source)
if image_bytes is None:
if isinstance(image_source, bytes):
image_bytes = io.BytesIO(image_source)
elif image_source.startswith(('http://', 'https://')):
print(f"Downloading image from URL: {image_source}")
# Download image from URL
with urllib.request.urlopen(image_source) as response:
image_bytes = io.BytesIO(response.read())
else:
# Open image from local path
print(f"Opening image from local path: {image_source}")
image_bytes = io.BytesIO()
with open(image_source, 'rb') as f:
image_bytes.write(f.read())
else:
print("Image is base64 encoded.")
original_image_bytes = io.BytesIO(image_bytes.getvalue())
image_size = len(image_bytes.getvalue())
divide_by = 1
while True:
image_bytes.seek(0) # Reset the file pointer to the beginning
with Image.open(original_image_bytes) as img:
if divide_by > 1:
resize_comment = f"Divided by {divide_by}"
img = img.resize(tuple(x // divide_by for x in img.size))
image_bytes = io.BytesIO()
img.save(image_bytes, format=format)
image_size = image_bytes.tell()
else:
resize_comment = "Original"
print(f"{resize_comment} size {image_size} bytes, dimensions {img.size}")
if ((max_image_size is None or image_size <= max_image_size) and
(max_image_dimension is None or all(s <= max_image_dimension for s in img.size))):
print("Image within required size and dimensions.")
break
divide_by *= 2
return image_bytes.getvalue()
def get_image_base64(self, image_source: str, format: str = "JPEG", max_image_size: int | None = None, max_image_dimension: int | None = None) -> str:
"""
Convert an image to a base64-encoded string, with optional resizing.
Args:
image_source (str): URL or local path of the image.
format (str, optional): Image format to use when saving the image. Defaults to "JPEG".
max_image_size (int, optional): Maximum allowed size of the image in bytes.
max_image_dimension (int, optional): Maximum allowed dimension (width or height) of the image.
Returns:
str: Base64-encoded string representation of the image.
Note:
This function uses get_image_bytes to retrieve and potentially resize the image
before encoding it to base64.
"""
image_bytes = self.get_image_bytes(image_source, format, max_image_size, max_image_dimension)
return base64.b64encode(image_bytes).decode('utf-8')
def get_image_hash(self, image_bytes: bytes) -> str:
"""
Compute a SHA-256 hash for the given image bytes.
This method takes the raw bytes of an image and computes a unique
hash value using the SHA-256 algorithm. This hash can be used as a
unique identifier for the image content.
Args:
image_bytes (bytes): The raw bytes of the image.
Returns:
str: A hexadecimal string representation of the SHA-256 hash.
Note:
This function is deterministic, meaning the same image bytes
will always produce the same hash value.
"""
hash_obj = hashlib.sha256()
hash_obj.update(image_bytes)
return hash_obj.hexdigest()
def get_image_description(self, image_bytes: bytes, image_format: str, detailed: bool = False) -> str:
"""
Generate a description for an image using the AI model.
This method uses an AI model to analyze the given image and generate a textual description.
The description can be either brief or detailed based on the 'detailed' parameter.
Args:
image_bytes (bytes): The raw bytes of the image to be described.
image_format (str): The format of the image (e.g., 'png', 'jpeg', 'gif').
detailed (bool, optional): If True, generate a more comprehensive and detailed description.
If False, generate a brief description. Defaults to False.
Returns:
str: A textual description of the image generated by the AI model.
Note:
The quality and accuracy of the description depend on the capabilities of the underlying AI model.
"""
if detailed:
prompt = self.config.DETAILED_IMAGE_DESCRIPTION_PROMPT
else:
prompt = self.config.SHORT_IMAGE_DESCRIPTION_PROMPT
messages = [{
"role": "user",
"content": [
{
"image": {
"format": image_format,
"source": {"bytes": image_bytes}
}
},
{"text": prompt}
],
}]
print(f"Generating {'detailed ' if detailed else ''}image description...")
image_description = self.invoke_text_model(messages, return_last_message_only=True)
print(f"Image description: {image_description}")
return image_description
def get_image_by_id(self, image_id: str, return_base64: bool = False) -> dict|None|str:
"""
Retrieve image metadata from the multimodal index by its ID.
This method queries the OpenSearch index to fetch metadata for an image
with the specified ID. It can optionally return the image data as a base64-encoded string.
Args:
image_id (str): The unique identifier of the image to retrieve.
return_base64 (bool, optional): If True, include the base64-encoded image data
in the returned dictionary. Defaults to False.
Returns:
dict: A dictionary containing image metadata if the image is found. The dictionary
includes keys such as 'format', 'filename', 'description', and 'id'.
If return_base64 is True, it also includes a 'base64' key with the image data.
str: An error message if the image is not found or if there's an error during retrieval.
Raises:
NotFoundError: If the image with the given ID is not found in the index.
Exception: For any other errors that occur during the retrieval process.
"""
try:
response = self.clients.opensearch_client.get(
id=image_id,
index=self.config.MULTIMODAL_INDEX_NAME,
_source_includes=["format", "filename", "description"],
)
image = response['_source']
image['id'] = image_id
if return_base64:
image['base64'] = self.get_image_base64(image['filename'], format=image['format'])
return image
except NotFoundError:
return None
except Exception as ex:
error_message = f"Error: {ex}"
print(error_message)
return error_message
def invoke_text_model(self, messages: list[dict], system_prompt: str | None = None, temperature: float = 0, tools: list[dict] | None = None, return_last_message_only: bool = False) -> dict | str:
"""
Invoke the text model using Amazon Bedrock's converse API.
This method prepares the request body, handles retries for throttling exceptions,
and processes the response from the model.
Args:
messages (list): List of message dictionaries to be sent to the model.
system_prompt (str, optional): System prompt to be added to the request. Defaults to None.
temperature (float, optional): Temperature setting for the model. Defaults to 0.
tools (list, optional): List of tools to be used by the model. Defaults to None.
return_last_message_only (bool, optional): If True, returns only the last message from the model. Defaults to False.
Returns:
dict or str: If return_last_message_only is False, returns the full response dictionary.
If True, returns only the text of the last message from the model.
In case of an error, returns an error message string.
Raises:
Exception: Propagates any exceptions not related to throttling.
"""
converse_body = {
"modelId": self.config.TEXT_MODEL,
"messages": messages,
"inferenceConfig": {
"maxTokens": self.config.MAX_TOKENS,
"temperature": temperature,
},
}
if system_prompt is not None:
converse_body["system"] = [{"text": system_prompt}]
if tools:
converse_body["toolConfig"] = {"tools": tools.tools_json}
print("Thinking...")
# To handle throttling retries
retry_wait_time = self.config.MIN_RETRY_WAIT_TIME
retry_flag = True
while(retry_flag and retry_wait_time <= self.config.MAX_RETRY_WAIT_TIME):
try:
response = self.clients.bedrock_runtime_client_text_model.converse(**converse_body)
retry_flag = False
except Exception as ex:
print(ex)
if ex.response['Error']['Code'] == 'ThrottlingException':
print(f"Waiting {retry_wait_time} seconds...")
time.sleep(retry_wait_time)
# Double the wait time for the next try
retry_wait_time *= 2
print("Retrying...")
else:
# Handle other client errors
error_message = f"Error: {ex}"
print(error_message)
return error_message
print(f"Stop reason: {response['stopReason']}")
for metrics, value in response['usage'].items():
self.usage.update(metrics, value)
print(self.usage)
if return_last_message_only:
response_message = response['output']['message']
last_message = response_message['content'][0]['text']
return last_message
return response
def add_to_text_index(self, text: str, id: str, metadata: dict, metadata_delete: dict|None=None) -> None:
"""
Add text content to the text index in OpenSearch.
This method processes the input text, splits it into chunks, computes embeddings,
and indexes the documents in OpenSearch. It can optionally delete existing content
based on metadata before indexing new content.
Args:
text (str): The text content to be indexed.
id (str): A unique identifier for the text content.
metadata (dict): Additional metadata to be stored with the text.
metadata_delete (dict|None, optional): Metadata used to delete existing content
before indexing. Defaults to None.
Returns:
None
Behavior:
1. If metadata_delete is provided, it deletes existing content matching that metadata.
2. Splits the input text into chunks.
3. Processes each chunk in parallel, computing embeddings.
4. Indexes all processed chunks in bulk to OpenSearch.
5. Prints information about the indexing process.
Note:
This function relies on external functions split_text_for_collection and get_embedding.
"""
if metadata_delete is not None:
# Delete previous content
delete_query = {
"query": {
"match": metadata_delete
}
}
try:
response = self.clients.opensearch_client.delete_by_query(
index=self.config.TEXT_INDEX_NAME,
body=delete_query,
)
deleted = response['deleted']
if deleted > 0:
print(f"Deleted previous content: {deleted}")
except Exception as ex:
error_message = f"Error deleting previous content: {ex}"
print(error_message)
def process_chunk(i, chunk, metadata, id):
formatted_metadata = '\n '.join([f"{key}: {value}" for key, value in metadata.items()])
chunk = f"{formatted_metadata}\n\n{chunk}"
text_embedding = self.get_embedding(input_text=chunk)
document = {
"id": f"{id}_{i}",
"document": chunk,
"embedding_vector": text_embedding,
}
document = document | metadata
return document
chunks = self.split_text_for_collection(text)
print(f"Split into {len(chunks)} chunks")
# Compute embeddings
avg_chunk_length = sum(len(chunk) for chunk in chunks) / len(chunks)
min_chunk_length = min(len(chunk) for chunk in chunks)
max_chunk_length = max(len(chunk) for chunk in chunks)
print(f"Embedding {len(chunks)} chunks with min/average/max {min_chunk_length}/{round(avg_chunk_length)}/{max_chunk_length} characters...")
documents = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.config.MAX_WORKERS) as executor:
futures = [executor.submit(process_chunk, i + 1, chunk, metadata, id) for i, chunk in enumerate(chunks)]
for future in concurrent.futures.as_completed(futures):
document = future.result()
documents.append(document)
print(f"Indexing {len(documents)} chunks...")
success, failed = bulk(
self.clients.opensearch_client,
documents,
index=self.config.TEXT_INDEX_NAME,
raise_on_exception=True
)
print(f"Indexed {success} documents successfully, {len(failed)} documents failed.")
def split_text_for_collection(self, text: str) -> list[str]:
"""
Split the input text into chunks suitable for indexing or processing.
This function splits the input text into chunks based on sentence boundaries
and length constraints. It aims to create chunks that are between MIN_CHUNK_LENGTH
and MAX_CHUNK_LENGTH characters long, while trying to keep sentences together.
Args:
text (str): The input text to be split into chunks.
Returns:
list: A list of text chunks, where each chunk is a string.
Note:
- The function uses regular expressions to split the text into sentences.
- It attempts to keep sentences together in chunks when possible.
- The constants MIN_CHUNK_LENGTH and MAX_CHUNK_LENGTH should be defined
elsewhere in the code to control the size of the chunks.
"""
chunks = []
sentences = re.split(r'\. |\n|[)}\]][^a-zA-Z0-9]*[({\[]', text)
chunk = ''
next_chunk = ''
for sentence in sentences:
sentence = sentence.strip(' \n')
if len(chunk) < self.config.MAX_CHUNK_LENGTH:
chunk += sentence + "\n"
if len(chunk) > self.config.MIN_CHUNK_LENGTH:
next_chunk += sentence + "\n"
else:
if len(chunk) > 0:
chunks.append(chunk)
chunk = next_chunk
next_chunk = ''
if len(chunk) > 0:
chunks.append(chunk)
return chunks
def process_image_placeholders(self, text: str, forPreview: bool = False) -> str:
"""
Replace image placeholders with markdown to display the image.
Args:
page (str): A string representing a page in the sketchbook.
Returns:
str: The page with image placeholders replaced by markdown.
"""
def replace_image(match):
image_id = match.group(1)
image = self.get_image_by_id(image_id)
if isinstance(image, dict):
if forPreview:
link = f"http://127.0.0.1:7860/gradio_api/file={image["filename"]}"
else:
link = os.path.relpath(os.path.join('..', image["filename"]))
return f'![{escape(image["description"])}]({link})'
else:
error_message = f"Image with 'image_id' {image_id} not found in the image catalog."
print(error_message)
raise ImageNotFoundError(error_message)
pattern = r'\[image_id:\s*([^\s\]]+)\s*\]'
return re.sub(pattern, replace_image, text)
def process_image_and_file_placeholders_for_chat(self, text: str) -> list[dict]:
"""
Replace image placeholders with a list of dictionaries representing text and images.
Args:
text (str): A string containing text and image placeholders.
Returns:
list[dict]: A list of dictionaries, each representing either text or an image.
"""
pattern = r'\[(image_id|file):\s*([^\s\]]+)\s*\]'
parts = re.split(pattern, text)
result = []
for i, part in enumerate(parts):
part = part.strip()
match i % 3:
case 0:
if len(part) > 0: # Only add non-empty text parts
result.append({"format": "text", "text": part})
case 1:
part_type = part
case 2:
match part_type:
case "image_id":
image_id = part
image = self.get_image_by_id(image_id)
result.append({"format": "file", "filename": image['filename'], "description": image['description']})
case "file":
basename = part
filename = self.config.OUTPUT_PATH + basename
result.append({"format": "file", "filename": filename, "description": basename })
case _:
print(f"Unknown part type: {part_type}")
return result
def get_file_name_and_extension(self, full_file_name: str) -> tuple[str, str]:
"""
Extract the file name and extension from a full file path.
This function takes a full file path and returns the file name without the extension
and the extension separately. The extension is returned in lowercase without the leading dot.
Args:
full_file_name (str): The full path of the file including the file name and extension.
Returns:
tuple: A tuple containing two elements:
- file_name (str): The name of the file without the extension.
- extension (str): The file extension in lowercase without the leading dot.
If there's no extension, an empty string is returned.
Example:
>>> get_file_name_and_extension('/path/to/myfile.txt')
('myfile', 'txt')
>>> get_file_name_and_extension('document.PDF')
('document', 'pdf')
>>> get_file_name_and_extension('image')
('image', '')
"""
file_name, extension = os.path.splitext(os.path.basename(full_file_name))
if len(extension) > 0:
extension = extension[1:].lower() # Remove the leading '.' and make it lowercase
return file_name, extension
def is_text_file(self, file_path: str) -> bool:
"""
Check if a file is likely to be a text file based on its content.
This function uses chardet to detect the encoding of the file.
If an encoding is detected with high confidence, it's likely a text file.
Args:
file_path (str): The path to the file to be checked.
Returns:
bool: True if the file is likely to be a text file, False otherwise.
"""
# Read a sample of the file content
sample_size = 1024 # Adjust this value as needed
try:
with open(file_path, 'rb') as f:
raw_data = f.read(sample_size)
except IOError:
return False
# If the sample is empty, consider it as non-text
if not raw_data:
return False
# Use chardet to detect the encoding
result = chardet.detect(raw_data)
# Check if an encoding was detected with high confidence
if result['encoding'] is not None and result['confidence'] > 0.7:
return True
return False
def process_pdf_document(self, file: str) -> str:
"""
Process a PDF document and extract text and images.
This function uses the PyPDF library to extract text from a PDF file.
It also processes images within the PDF and stores them in the image catalog when HANDLE_IMAGES_IN_DOCUMENTS is True.
Args:
file (str): The path to the PDF file to be processed.
Returns:
str: The text content of the PDF file.
"""
text_pages = []
print(f"Processing PDF file: {file}")
reader = PdfReader(file)
for index, page in enumerate(reader.pages):
text = page.extract_text((0, 90))
if self.config.HANDLE_IMAGES_IN_DOCUMENTS:
for image in page.images:
print(f"Processing image: {image.name}")
image_format = image.name.split('.')[-1].lower()
if image_format == 'jpg': # Quick fix
image_format = 'jpeg'
if image_format in self.config.IMAGE_FORMATS:
detailed_description = self.get_image_description(image.data, image_format, detailed=True)
image_base64 = self.get_image_base64(
image.data,
format=image_format,
max_image_size=self.config.MAX_CHAT_IMAGE_SIZE,
max_image_dimension=self.config.MAX_CHAT_IMAGE_DIMENSIONS
)
stored_image = self.store_image(image_format, image_base64)
text += "\n" + between_xml_tag(f"Name '{image.name}':\nImage store image_id: {stored_image['id']}\nDetailed description: {detailed_description}", 'image')
text_pages.append(between_xml_tag(text, 'page', {'id': index}))
return "\n".join(text_pages)
def process_pptx_document(self, file: str) -> str:
"""
Process a PowerPoint document and extract text and images.
This function uses the python-pptx library to extract text from a PowerPoint file.
It also processes images within the PowerPoint presentation and stores them in the image catalog when HANDLE_IMAGES_IN_DOCUMENTS is True.
Args:
file (str): The path to the PowerPoint file to be processed.
Returns:
str: The text content of the PowerPoint file.
"""
def extract_text_from_shape(shape):
if shape.has_text_frame:
text = shape.text
if shape.is_placeholder:
ph_type = type(shape.placeholder_format.type)
text = between_xml_tag(text, 'placeholder', { 'type': ph_type.__name__ })
return text
elif shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.GROUP:
return " ".join([extract_text_from_shape(subshape) for subshape in shape.shapes])
elif shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.TABLE:
return "\n".join([cell.text for row in shape.table.rows for cell in row.cells])
return ""
presentation = pptx.Presentation(file)
extracted_text = []
for slide_number, slide in enumerate(presentation.slides, start=1):
slide_content = [f"Slide {slide_number}:"]
# Extract text from shapes
for shape in slide.shapes:
text = extract_text_from_shape(shape)
if text:
slide_content.append(text)
# Extract notes
if slide.has_notes_slide:
notes_text = slide.notes_slide.notes_text_frame.text
if notes_text:
slide_content.append(f"Notes: {notes_text}")
# Extract image and chart information
for shape in slide.shapes:
if shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.PICTURE:
slide_content.append(f"[Image: {shape.name}]")
if self.config.HANDLE_IMAGES_IN_DOCUMENTS:
image = shape.image
content_type = image.content_type
format = content_type.split('/')[1].lower() # No need to fix jpeg
if format in self.config.IMAGE_FORMATS:
image_base64 = self.get_image_base64(image.blob, format=format, max_image_size=self.config.MAX_CHAT_IMAGE_SIZE, max_image_dimension=self.config.MAX_CHAT_IMAGE_DIMENSIONS)
image = self.store_image(format, image_base64)
elif shape.shape_type == pptx.enum.shapes.MSO_SHAPE_TYPE.CHART:
slide_content.append(f"[Chart: {shape.chart.chart_type}]")
slide_text = between_xml_tag("\n".join(slide_content), 'slide', { 'slide_number': slide_number })
extracted_text.append(slide_text)
return "\n\n".join(between_xml_tag(extracted_text, 'presentation', { 'file': file }))
def process_other_document_formats(self, file: str) -> str:
"""
Process a non-PDF document and extract text and images.
This function uses the pypandoc library to extract text from a PDF file.
It also processes images within the PDF and stores them in the image catalog when HANDLE_IMAGES_IN_DOCUMENTS is True.
Args:
file (str): The path to the PDF file to be processed.
Returns:
str: The text content of the PDF file.
"""
with tempfile.TemporaryDirectory() as temp_dir:
file_text = pypandoc.convert_file(
file,
'rst',
extra_args=[f'--extract-media={temp_dir}']
)
if self.config.HANDLE_IMAGES_IN_DOCUMENTS:
for root, _dirs, files in os.walk(temp_dir):
for file in files:
format = file.split('.')[-1].lower()
if format == 'jpg': # Quick fix
format = 'jpeg'
if format in self.config.IMAGE_FORMATS:
image_path = os.path.join(root, file)
with open(image_path, 'rb') as img_file:
image_bytes = img_file.read()
detailed_description = self.get_image_description(image_bytes, format, detailed=True)
image_base64 = self.get_image_base64(
image_bytes,
format=format,
max_image_size=self.config.MAX_CHAT_IMAGE_SIZE,
max_image_dimension=self.config.MAX_CHAT_IMAGE_DIMENSIONS
)
image = self.store_image(format, image_base64)
file_text += f"\n\nExtracted image:\nimage_id: {image['id']}\ndescription: {detailed_description}"
return file_text
def synthesize_speech(self, text: str, voice: str, use_ssml: bool = False) -> str:
"""
Synthesize speech from text using Amazon Polly.
This function uses the boto3 library to synthesize speech from the given text
using the specified voice. The synthesized speech is returned as a base64-encoded
string.
Args:
text (str): The text to be synthesized.
voice (str): The voice to be used for synthesis.
Returns:
str: The base64-encoded synthesized speech.
"""
response = self.clients.polly_client.synthesize_speech(
Engine='generative',
OutputFormat='mp3',
VoiceId=voice,
Text=text,
TextType='ssml' if use_ssml else 'text',
)
with closing(response["AudioStream"]) as audio_stream:
audio_data = audio_stream.read()
return audio_data
def delete_index(self, index_name: str) -> None:
"""
Delete an index from OpenSearch if it exists.
Args:
client (OpenSearch): The OpenSearch client.
index_name (str): The name of the index to be deleted.
Note:
This function prints the result of the deletion attempt or any exception that occurs.
"""
if self.clients.opensearch_client.indices.exists(index=index_name):
try:
_ = self.clients.opensearch_client.indices.delete(
index=index_name,
)
print(f"Index {index_name} deleted.")
except Exception as ex: print(ex)
def create_index(self, index_name: str, index_config: dict) -> None:
"""
Create an index in OpenSearch if it doesn't already exist.
Args:
client (OpenSearch): The OpenSearch client.
index_name (str): The name of the index to be created.
index_config (dict): The configuration for the index.
Returns:
None
Raises:
Exception: If there's an error during index creation.
Note:
This function prints the result of the creation attempt.
If the index already exists, no action is taken.
"""
if not self.clients.opensearch_client.indices.exists(index=index_name):
try:
_ = self.clients.opensearch_client.indices.create(
index=index_name,
body=index_config,
)
print(f"Index {index_name} created.")
except Exception as ex: print(ex)
def print_index_info(self, index_name: str) -> None:
"""
Print information about the multimodal index.
This function attempts to retrieve and print the configuration of the
MULTIMODAL_INDEX_NAME index from OpenSearch. If successful, it prints