From 45f202355cd93660dd454041482dfa2ef02896e1 Mon Sep 17 00:00:00 2001 From: priyal1508 <54278892+priyal1508@users.noreply.github.com> Date: Wed, 11 Sep 2024 14:25:25 +0400 Subject: [PATCH] Add ADI Based Skillset (#11) Co-authored-by: Ben Constable --- .vscode/extensions.json | 6 + .vscode/launch.json | 15 + .vscode/settings.json | 7 + .vscode/tasks.json | 15 + README.md | 3 +- adi_function_app/.funcignore | 8 + .../README.md | 32 +- adi_function_app/adi_2_ai_search.py | 571 ++++++++++++++++++ adi_function_app/environment.py | 30 + adi_function_app/function_app.py | 126 ++++ adi_function_app/host.json | 16 + .../images/Indexing vs Indexing with ADI.png | Bin adi_function_app/key_phrase_extraction.py | 130 ++++ adi_function_app/pre_embedding_cleaner.py | 157 +++++ adi_function_app/requirements.txt | 21 + adi_function_app/storage_account.py | 86 +++ deploy_ai_search/README.md | 18 + deploy_ai_search/ai_search.py | 555 +++++++++++++++++ deploy_ai_search/deploy.py | 55 ++ deploy_ai_search/environment.py | 257 ++++++++ deploy_ai_search/rag_documents.py | 275 +++++++++ deploy_ai_search/requirements.txt | 5 + 22 files changed, 2376 insertions(+), 12 deletions(-) create mode 100644 .vscode/extensions.json create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json create mode 100644 .vscode/tasks.json create mode 100644 adi_function_app/.funcignore rename {ai_search_with_adi => adi_function_app}/README.md (85%) create mode 100644 adi_function_app/adi_2_ai_search.py create mode 100644 adi_function_app/environment.py create mode 100644 adi_function_app/function_app.py create mode 100644 adi_function_app/host.json rename {ai_search_with_adi => adi_function_app}/images/Indexing vs Indexing with ADI.png (100%) create mode 100644 adi_function_app/key_phrase_extraction.py create mode 100644 adi_function_app/pre_embedding_cleaner.py create mode 100644 adi_function_app/requirements.txt create mode 100644 adi_function_app/storage_account.py create mode 100644 deploy_ai_search/README.md create mode 100644 deploy_ai_search/ai_search.py create mode 100644 deploy_ai_search/deploy.py create mode 100644 deploy_ai_search/environment.py create mode 100644 deploy_ai_search/rag_documents.py create mode 100644 deploy_ai_search/requirements.txt diff --git a/.vscode/extensions.json b/.vscode/extensions.json new file mode 100644 index 0000000..cbbad0f --- /dev/null +++ b/.vscode/extensions.json @@ -0,0 +1,6 @@ +{ + "recommendations": [ + "ms-azuretools.vscode-azurefunctions", + "ms-python.python" + ] +} diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..7ff8568 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,15 @@ +{ + "configurations": [ + { + "connect": { + "host": "localhost", + "port": 9091 + }, + "name": "Attach to Python Functions", + "preLaunchTask": "func: host start", + "request": "attach", + "type": "debugpy" + } + ], + "version": "0.2.0" +} diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..4d62d59 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "azureFunctions.projectLanguage": "Python", + "azureFunctions.projectLanguageModel": 2, + "azureFunctions.projectRuntime": "~4", + "azureFunctions.scmDoBuildDuringDeployment": true, + "debug.internalConsoleOptions": "neverOpen" +} diff --git a/.vscode/tasks.json b/.vscode/tasks.json new file mode 100644 index 0000000..359b710 --- /dev/null +++ b/.vscode/tasks.json @@ -0,0 +1,15 @@ +{ + "tasks": [ + { + "command": "host start", + "isBackground": true, + "label": "func: host start", + "options": { + "cwd": "${workspaceFolder}/ai_search_with_adi_function_app" + }, + "problemMatcher": "$func-python-watch", + "type": "func" + } + ], + "version": "2.0.0" +} diff --git a/README.md b/README.md index 3fbf3e6..2b649a2 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ It is intended that the plugins and skills provided in this repository, are adap ## Components - `./text2sql` contains an Multi-Shot implementation for Text2SQL generation and querying which can be used to answer questions backed by a database as a knowledge base. -- `./ai_search_with_adi` contains code for linking Azure Document Intelligence with AI Search to process complex documents with charts and images, and uses multi-modal models (gpt4o) to interpret and understand these. +- `./ai_search_with_adi_function_app` contains code for linking Azure Document Intelligence with AI Search to process complex documents with charts and images, and uses multi-modal models (gpt4o) to interpret and understand these. +- `./deploy_ai_search` provides an easy Python based utility for deploying an index, indexer and corresponding skillset for AI Search. The above components have been successfully used on production RAG projects to increase the quality of responses. The code provided in this repo is a sample of the implementation and should be adjusted before being used in production. diff --git a/adi_function_app/.funcignore b/adi_function_app/.funcignore new file mode 100644 index 0000000..f1110d3 --- /dev/null +++ b/adi_function_app/.funcignore @@ -0,0 +1,8 @@ +.git* +.vscode +__azurite_db*__.json +__blobstorage__ +__queuestorage__ +local.settings.json +test +.venv diff --git a/ai_search_with_adi/README.md b/adi_function_app/README.md similarity index 85% rename from ai_search_with_adi/README.md rename to adi_function_app/README.md index d43d1e7..3638b88 100644 --- a/ai_search_with_adi/README.md +++ b/adi_function_app/README.md @@ -38,35 +38,46 @@ The properties returned from the ADI Custom Skill are then used to perform the f ## Provided Notebooks \& Utilities -- `./ai_search.py`, `./deployment.py` provide an easy Python based utility for deploying an index, indexer and corresponding skillset for AI Search. -- `./function_apps/indexer` provides a pre-built Python function app that communicates with Azure Document Intelligence, Azure OpenAI etc to perform the Markdown conversion, extraction of figures, figure understanding and corresponding cleaning of Markdown. +- `./ai_search_with_adi_function_app` provides a pre-built Python function app that communicates with Azure Document Intelligence, Azure OpenAI etc to perform the Markdown conversion, extraction of figures, figure understanding and corresponding cleaning of Markdown. - `./rag_with_ai_search.ipynb` provides example of how to utilise the AI Search plugin to query the index. +## Deploying AI Search Setup + +To deploy the pre-built index and associated indexer / skillset setup, see instructions in `./ai_search/README.md`. + ## ADI Custom Skill -Deploy the associated function app and required resources. You can then experiment with the custom skill by sending an HTTP request in the AI Search JSON format to the `/adi_2_ai_search` HTTP endpoint. +Deploy the associated function app and required resources. You can then experiment with the custom skill by sending an HTTP request in the AI Search JSON format to the `/adi_2_deploy_ai_search` HTTP endpoint. To use with an index, either use the utility to configure a indexer in the provided form, or integrate the skill with your skillset pipeline. -### function_app.py +### Deployment Steps + +1. Update `.env` file with the associated values. Not all values are required dependent on whether you are using System / User Assigned Identities or a Key based authentication. Use this template to update the environment variables in the function app. +2. Make sure the infra and required identities are setup. This setup requires Azure Document Intelligence and GPT4o. +3. [Deploy your function app](https://learn.microsoft.com/en-us/azure/azure-functions/functions-deployment-technologies?tabs=windows) and test with a HTTP request. -`./function_apps/indexer/function_app.py` contains the HTTP entrypoints for the ADI skill and the other provided utility skills. +### Code Files -### adi_2_aisearch +#### function_app.py -`./function_apps/indexer/adi_2_aisearch.py` contains the methods for content extraction with ADI. The key methods are: +`./indexer/ai_search_with_adi_function_app.py` contains the HTTP entrypoints for the ADI skill and the other provided utility skills. -#### analyse_document +#### adi_2_aisearch + +`./indexer/adi_2_aisearch.py` contains the methods for content extraction with ADI. The key methods are: + +##### analyse_document This method takes the passed file, uploads it to ADI and retrieves the Markdown format. -#### process_figures_from_extracted_content +##### process_figures_from_extracted_content This method takes the detected figures, and crops them out of the page to save them as images. It uses the `understand_image_with_vlm` to communicate with Azure OpenAI to understand the meaning of the extracted figure. `update_figure_description` is used to update the original Markdown content with the description and meaning of the figure. -#### clean_adi_markdown +##### clean_adi_markdown This method performs the final cleaning of the Markdown contents. In this method, the section headings and page numbers are extracted for the content to be returned to the indexer. @@ -181,7 +192,6 @@ If `chunk_by_page` header is `False`: **Page wise analysis in ADI is recommended to avoid splitting tables / figures across multiple chunks, when the chunking is performed.** - ## Production Considerations Below are some of the considerations that should be made before using this custom skill in production: diff --git a/adi_function_app/adi_2_ai_search.py b/adi_function_app/adi_2_ai_search.py new file mode 100644 index 0000000..8597550 --- /dev/null +++ b/adi_function_app/adi_2_ai_search.py @@ -0,0 +1,571 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +from azure.identity import DefaultAzureCredential, get_bearer_token_provider +import base64 +from azure.core.credentials import AzureKeyCredential +from azure.ai.documentintelligence.aio import DocumentIntelligenceClient +from azure.ai.documentintelligence.models import AnalyzeResult, ContentFormat +import os +import re +import asyncio +import fitz +from PIL import Image +import io +import logging +from storage_account import StorageAccountHelper +import concurrent.futures +import json +from openai import AsyncAzureOpenAI +import openai +from environment import IdentityType, get_identity_type + + +def crop_image_from_pdf_page(pdf_path, page_number, bounding_box): + """ + Crops a region from a given page in a PDF and returns it as an image. + + :param pdf_path: Path to the PDF file. + :param page_number: The page number to crop from (0-indexed). + :param bounding_box: A tuple of (x0, y0, x1, y1) coordinates for the bounding box. + :return: A PIL Image of the cropped area. + """ + doc = fitz.open(pdf_path) + page = doc.load_page(page_number) + + logging.debug(f"Bounding Box: {bounding_box}") + logging.debug(f"Page Number: {page_number}") + + # Cropping the page. The rect requires the coordinates in the format (x0, y0, x1, y1). + bbx = [x * 72 for x in bounding_box] + rect = fitz.Rect(bbx) + pix = page.get_pixmap(matrix=fitz.Matrix(300 / 72, 300 / 72), clip=rect) + + img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) + + if pix.width == 0 or pix.height == 0: + logging.error("Cropped image has 0 width or height.") + return None + + doc.close() + return img + + +def clean_adi_markdown( + markdown_text: str, page_no: int = None, remove_irrelevant_figures=False +): + """Clean Markdown text extracted by the Azure Document Intelligence service. + + Args: + ----- + markdown_text (str): The original Markdown text. + remove_irrelevant_figures (bool): Whether to remove all figures or just irrelevant ones. + + Returns: + -------- + str: The cleaned Markdown text. + """ + + output_dict = {} + comment_patterns = r"|||" + cleaned_text = re.sub(comment_patterns, "", markdown_text, flags=re.DOTALL) + + combined_pattern = r"(.*?)\n===|\n#+\s*(.*?)\n" + doc_metadata = re.findall(combined_pattern, cleaned_text, re.DOTALL) + doc_metadata = [match for group in doc_metadata for match in group if match] + + if remove_irrelevant_figures: + # Remove irrelevant figures + irrelevant_figure_pattern = r"\s*" + cleaned_text = re.sub( + irrelevant_figure_pattern, "", cleaned_text, flags=re.DOTALL + ) + + output_dict["content"] = cleaned_text + output_dict["sections"] = doc_metadata + + # add page number when chunk by page is enabled + if page_no is not None: + output_dict["page_number"] = page_no + + return output_dict + + +def update_figure_description(md_content, img_description, offset, length): + """ + Updates the figure description in the Markdown content. + + Args: + md_content (str): The original Markdown content. + img_description (str): The new description for the image. + idx (int): The index of the figure. + + Returns: + str: The updated Markdown content with the new figure description. + """ + + # Define the new string to replace the old content + new_string = f'' + + # Calculate the end index of the content to be replaced + end_index = offset + length + + # Ensure that the end_index does not exceed the length of the Markdown content + if end_index > len(md_content): + end_index = len(md_content) + + # Replace the old string with the new string + new_md_content = md_content[:offset] + new_string + md_content[end_index:] + + return new_md_content, len(new_string) + + +async def understand_image_with_gptv(image_base64, caption, tries_left=3): + """ + Generates a description for an image using the GPT-4V model. + + Parameters: + - image_base64 (str): image file. + - caption (str): The caption for the image. + + Returns: + - img_description (str): The generated description for the image. + """ + + MAX_TOKENS = 2000 + api_version = os.environ.get("OpenAI__ApiVersion") + model = os.environ.get("OpenAI__MultiModalDeployment") + + if get_identity_type() != IdentityType.SYSTEM_ASSIGNED: + token_provider = get_bearer_token_provider( + DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" + ) + api_key = None + elif get_identity_type() != IdentityType.USER_ASSIGNED: + token_provider = get_bearer_token_provider( + DefaultAzureCredential( + managed_identity_client_id=os.environ.get("FunctionApp__ClientId") + ), + "https://cognitiveservices.azure.com/.default", + ) + api_key = None + else: + token_provider = None + api_key = os.environ.get("OpenAI__ApiKey") + + system_prompt = """You are an expert in image analysis. Use your experience and skills to provided a detailed description of any provided images. You should FOCUS on what info can be inferred from the image and the meaning of the data inside the image. Draw actionable insights and conclusions from the image. + + If the image is a chart for instance, you should describe the data trends, patterns, and insights that can be drawn from the chart. + + If the image is a map, you should describe the geographical features, landmarks, and any other relevant information that can be inferred from the map. + + If the image is a diagram, you should describe the components, relationships, and any other relevant information that can be inferred from the diagram. + + Include any data points, labels, and other relevant information that can be inferred from the image. + + IMPORTANT: If the provided image is a logo or photograph, simply return 'Irrelevant Image'.""" + + user_input = "Describe this image with technical analysis. Provide a well-structured, description." + + if caption != "": + user_input += f" (note: it has image caption: {caption})" + + try: + async with AsyncAzureOpenAI( + api_key=api_key, + api_version=api_version, + azure_ad_token_provider=token_provider, + azure_endpoint=os.environ.get("OpenAI__Endpoint"), + ) as client: + # We send both image caption and the image body to GPTv for better understanding + response = await client.chat.completions.create( + model=model, + messages=[ + { + "role": "system", + "content": system_prompt, + }, + { + "role": "user", + "content": [ + { + "type": "text", + "text": user_input, + }, + { + "type": "image_url", + "image_url": { + "url": f"data:image/png;base64,{image_base64}" + }, + }, + ], + }, + ], + max_tokens=MAX_TOKENS, + ) + + img_description = response.choices[0].message.content + + logging.info(f"Image Description: {img_description}") + + return img_description + except openai.RateLimitError as e: + logging.error("OpenAI Rate Limit Error: %s", e) + + if tries_left > 0: + logging.info( + "Retrying understanding of image with %s tries left.", tries_left + ) + remaining_tries = tries_left - 1 + backoff = 20 ** (3 - remaining_tries) + await asyncio.sleep(backoff) + return await understand_image_with_gptv( + image_base64, caption, tries_left=remaining_tries + ) + else: + raise Exception("OpenAI Rate Limit Error: No retries left.") from e + except (openai.OpenAIError, openai.APIConnectionError) as e: + logging.error("OpenAI Error: %s", e) + + raise Exception("OpenAI Rate Limit Error: No retries left.") from e + + +def pil_image_to_base64(image, image_format="JPEG"): + """ + Converts a PIL image to a base64-encoded string. + + Args: + image (PIL.Image.Image): The image to be converted. + image_format (str): The format to save the image in. Defaults to "JPEG". + + Returns: + str: The base64-encoded string representation of the image. + """ + if image.mode == "RGBA" and image_format == "JPEG": + image = image.convert("RGB") + buffered = io.BytesIO() + image.save(buffered, format=image_format) + return base64.b64encode(buffered.getvalue()).decode("utf-8") + + +async def mark_image_as_irrelevant(): + return "Irrelevant Image" + + +async def process_figures_from_extracted_content( + file_path: str, + markdown_content: str, + figures: list, + page_number: None | int = None, + page_offset: int = 0, +) -> str: + """Process the figures extracted from the content using ADI and send them for analysis. + + Args: + ----- + file_path (str): The path to the PDF file. + markdown_content (str): The extracted content in Markdown format. + figures (list): The list of figures extracted by the Azure Document Intelligence service. + page_number (int): The page number to process. If None, all pages are processed. + page_offset (int): The offset of the page. + + Returns: + -------- + str: The updated Markdown content with the figure descriptions.""" + + figure_spans = [] + image_understanding_tasks = [] + for idx, figure in enumerate(figures): + img_description = "" + logging.debug(f"Figure #{idx} has the following spans: {figure.spans}") + + caption_region = figure.caption.bounding_regions if figure.caption else [] + for region in figure.bounding_regions: + # Skip the region if it is not on the specified page + if page_number is not None and region.page_number != page_number: + continue + + if region not in caption_region: + # To learn more about bounding regions, see https://aka.ms/bounding-region + bounding_box = ( + region.polygon[0], # x0 (left) + region.polygon[1], # y0 (top) + region.polygon[4], # x1 (right) + region.polygon[5], # y1 (bottom) + ) + cropped_image = crop_image_from_pdf_page( + file_path, region.page_number - 1, bounding_box + ) # page_number is 1-indexed + + figure_spans.append(figure.spans[0]) + + if cropped_image is None: + image_understanding_tasks.append(mark_image_as_irrelevant()) + else: + image_base64 = pil_image_to_base64(cropped_image) + + image_understanding_tasks.append( + understand_image_with_gptv(image_base64, figure.caption.content) + ) + logging.info(f"\tDescription of figure {idx}: {img_description}") + break + + image_descriptions = await asyncio.gather(*image_understanding_tasks) + + logging.info(f"Image Descriptions: {image_descriptions}") + + running_offset = 0 + for figure_span, image_description in zip(figure_spans, image_descriptions): + starting_offset = figure_span.offset + running_offset - page_offset + markdown_content, desc_offset = update_figure_description( + markdown_content, image_description, starting_offset, figure_span.length + ) + running_offset += desc_offset + + return markdown_content + + +def create_page_wise_content(result: AnalyzeResult) -> list: + """Create a list of page-wise content extracted by the Azure Document Intelligence service. + + Args: + ----- + result (AnalyzeResult): The result of the document analysis. + + Returns: + -------- + list: A list of page-wise content extracted by the Azure Document Intelligence service. + """ + + page_wise_content = [] + page_numbers = [] + page_offsets = [] + + for page_number, page in enumerate(result.pages): + page_content = result.content[ + page.spans[0]["offset"] : page.spans[0]["offset"] + page.spans[0]["length"] + ] + page_wise_content.append(page_content) + page_numbers.append(page_number + 1) + page_offsets.append(page.spans[0]["offset"]) + + return page_wise_content, page_numbers, page_offsets + + +async def analyse_document(file_path: str) -> AnalyzeResult: + """Analyse a document using the Azure Document Intelligence service. + + Args: + ----- + file_path (str): The path to the document to analyse. + + Returns: + -------- + AnalyzeResult: The result of the document analysis.""" + with open(file_path, "rb") as f: + file_read = f.read() + + if get_identity_type() == IdentityType.SYSTEM_ASSIGNED: + credential = DefaultAzureCredential() + elif get_identity_type() == IdentityType.USER_ASSIGNED: + credential = DefaultAzureCredential( + managed_identity_client_id=os.environ["FunctionApp__ClientId"] + ) + else: + credential = AzureKeyCredential(os.environ["AIService__Services__Key"]) + + async with DocumentIntelligenceClient( + endpoint=os.environ["AIService__Services__Endpoint"], + credential=credential, + ) as document_intelligence_client: + poller = await document_intelligence_client.begin_analyze_document( + model_id="prebuilt-layout", + analyze_request=file_read, + output_content_format=ContentFormat.MARKDOWN, + content_type="application/octet-stream", + ) + + result = await poller.result() + + if result is None or result.content is None or result.pages is None: + raise ValueError( + "Failed to analyze the document with Azure Document Intelligence." + ) + + return result + + +async def process_adi_2_ai_search(record: dict, chunk_by_page: bool = False) -> dict: + """Process the extracted content from the Azure Document Intelligence service and prepare it for Azure Search. + + Args: + ----- + record (dict): The record containing the extracted content. + chunk_by_page (bool): Whether to chunk the content by page. + + Returns: + -------- + dict: The processed content ready for Azure Search.""" + logging.info("Python HTTP trigger function processed a request.") + + storage_account_helper = StorageAccountHelper() + + try: + source = record["data"]["source"] + logging.info(f"Request Body: {record}") + except KeyError: + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": "Failed to extract data with ADI. Pass a valid source in the request body.", + } + ], + "warnings": None, + } + else: + logging.info(f"Source: {source}") + + try: + source_parts = source.split("/") + blob = "/".join(source_parts[4:]) + logging.info(f"Blob: {blob}") + + container = source_parts[3] + + file_extension = blob.split(".")[-1] + target_file_name = f"{record['recordId']}.{file_extension}" + + temp_file_path, _ = await storage_account_helper.download_blob_to_temp_dir( + blob, container, target_file_name + ) + logging.info(temp_file_path) + except Exception as e: + logging.error(f"Failed to download the blob: {e}") + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": f"Failed to download the blob. Check the source and try again. {e}", + } + ], + "warnings": None, + } + + try: + result = await analyse_document(temp_file_path) + except Exception as e: + logging.error(e) + logging.info("Sleeping for 10 seconds and retrying") + await asyncio.sleep(10) + try: + result = await analyse_document(temp_file_path) + except ValueError as inner_e: + logging.error(inner_e) + logging.error( + f"Failed to analyze the document with Azure Document Intelligence: {e}" + ) + logging.error( + "Failed to analyse %s with Azure Document Intelligence.", blob + ) + await storage_account_helper.add_metadata_to_blob( + blob, container, {"AzureSearch_Skip": "true"} + ) + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": f"Failed to analyze the document with Azure Document Intelligence. This blob will now be skipped {inner_e}", + } + ], + "warnings": None, + } + except Exception as inner_e: + logging.error(inner_e) + logging.error( + "Failed to analyse %s with Azure Document Intelligence.", blob + ) + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": f"Failed to analyze the document with Azure Document Intelligence. Check the logs and try again. {inner_e}", + } + ], + "warnings": None, + } + + try: + if chunk_by_page: + cleaned_result = [] + markdown_content, page_numbers, page_offsets = create_page_wise_content( + result + ) + content_with_figures_tasks = [ + process_figures_from_extracted_content( + temp_file_path, + page_content, + result.figures, + page_number=page_number, + page_offset=page_offset, + ) + for page_content, page_number, page_offset in zip( + markdown_content, page_numbers, page_offsets + ) + ] + content_with_figures = await asyncio.gather(*content_with_figures_tasks) + + with concurrent.futures.ProcessPoolExecutor() as executor: + futures = { + executor.submit( + clean_adi_markdown, page_content, page_number, True + ): page_content + for page_content, page_number in zip( + content_with_figures, page_numbers + ) + } + for future in concurrent.futures.as_completed(futures): + cleaned_result.append(future.result()) + + else: + markdown_content = result.content + content_with_figures = await process_figures_from_extracted_content( + temp_file_path, + markdown_content, + result.figures, + page_offset=0, + page_number=1, + ) + cleaned_result = clean_adi_markdown( + content_with_figures, remove_irrelevant_figures=True + ) + except Exception as e: + logging.error(e) + logging.error(f"Failed to process the extracted content: {e}") + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": f"Failed to process the extracted content. Check the logs and try again. {e}", + } + ], + "warnings": None, + } + + logging.info("Document Extracted") + logging.info(f"Result: {cleaned_result}") + + src = { + "recordId": record["recordId"], + "data": {"extracted_content": cleaned_result}, + } + + json_str = json.dumps(src, indent=4) + + logging.info(f"final output: {json_str}") + + return src diff --git a/adi_function_app/environment.py b/adi_function_app/environment.py new file mode 100644 index 0000000..232254e --- /dev/null +++ b/adi_function_app/environment.py @@ -0,0 +1,30 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import os +from enum import Enum + + +class IdentityType(Enum): + """The type of the indexer""" + + USER_ASSIGNED = "user_assigned" + SYSTEM_ASSIGNED = "system_assigned" + KEY = "key" + + +def get_identity_type() -> IdentityType: + """This function returns the identity type. + + Returns: + IdentityType: The identity type + """ + identity = os.environ.get("IdentityType") + + if identity == "user_assigned": + return IdentityType.USER_ASSIGNED + elif identity == "system_assigned": + return IdentityType.SYSTEM_ASSIGNED + elif identity == "key": + return IdentityType.KEY + else: + raise ValueError("Invalid identity type") diff --git a/adi_function_app/function_app.py b/adi_function_app/function_app.py new file mode 100644 index 0000000..cca6005 --- /dev/null +++ b/adi_function_app/function_app.py @@ -0,0 +1,126 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import azure.functions as func +import logging +import json +import asyncio + +from adi_2_ai_search import process_adi_2_ai_search +from pre_embedding_cleaner import process_pre_embedding_cleaner +from key_phrase_extraction import process_key_phrase_extraction + +logging.basicConfig(level=logging.DEBUG) +app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION) + + +@app.route(route="adi_2_ai_search", methods=[func.HttpMethod.POST]) +async def adi_2_ai_search(req: func.HttpRequest) -> func.HttpResponse: + """Extract the content from a document using ADI.""" + + try: + req_body = req.get_json() + values = req_body.get("values") + adi_config = req.headers + + chunk_by_page = adi_config.get("chunk_by_page", "False").lower() == "true" + logging.info(f"Chunk by Page: {chunk_by_page}") + except ValueError: + return func.HttpResponse( + "Please valid Custom Skill Payload in the request body", status_code=400 + ) + else: + logging.debug("Input Values: %s", values) + + record_tasks = [] + + for value in values: + record_tasks.append( + asyncio.create_task( + process_adi_2_ai_search(value, chunk_by_page=chunk_by_page) + ) + ) + + results = await asyncio.gather(*record_tasks) + logging.debug("Results: %s", results) + + return func.HttpResponse( + json.dumps({"values": results}), + status_code=200, + mimetype="application/json", + ) + + +@app.route(route="pre_embedding_cleaner", methods=[func.HttpMethod.POST]) +async def pre_embedding_cleaner(req: func.HttpRequest) -> func.HttpResponse: + """HTTP trigger for data cleanup function. + + Args: + req (func.HttpRequest): The HTTP request object. + + Returns: + func.HttpResponse: The HTTP response object.""" + logging.info("Python HTTP trigger data cleanup function processed a request.") + + try: + req_body = req.get_json() + values = req_body.get("values") + except ValueError: + return func.HttpResponse( + "Please valid Custom Skill Payload in the request body", status_code=400 + ) + else: + logging.debug("Input Values: %s", values) + + record_tasks = [] + + for value in values: + record_tasks.append( + asyncio.create_task(process_pre_embedding_cleaner(value)) + ) + + results = await asyncio.gather(*record_tasks) + logging.debug("Results: %s", results) + cleaned_tasks = {"values": results} + + return func.HttpResponse( + json.dumps(cleaned_tasks), status_code=200, mimetype="application/json" + ) + + +@app.route(route="key_phrase_extractor", methods=[func.HttpMethod.POST]) +async def key_phrase_extractor(req: func.HttpRequest) -> func.HttpResponse: + """HTTP trigger for data cleanup function. + + Args: + req (func.HttpRequest): The HTTP request object. + + Returns: + func.HttpResponse: The HTTP response object.""" + logging.info("Python HTTP trigger data cleanup function processed a request.") + + try: + req_body = req.get_json() + values = req_body.get("values") + logging.info(req_body) + except ValueError: + return func.HttpResponse( + "Please valid Custom Skill Payload in the request body", status_code=400 + ) + else: + logging.debug("Input Values: %s", values) + + record_tasks = [] + + for value in values: + record_tasks.append( + asyncio.create_task(process_key_phrase_extraction(value)) + ) + + results = await asyncio.gather(*record_tasks) + logging.debug("Results: %s", results) + + return func.HttpResponse( + json.dumps({"values": results}), + status_code=200, + mimetype="application/json", + ) diff --git a/adi_function_app/host.json b/adi_function_app/host.json new file mode 100644 index 0000000..20e5f3c --- /dev/null +++ b/adi_function_app/host.json @@ -0,0 +1,16 @@ +{ + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle", + "version": "[4.*, 5.0.0)" + }, + "functionTimeout": "00:05:00", + "logging": { + "applicationInsights": { + "samplingSettings": { + "excludedTypes": "Request", + "isEnabled": true + } + } + }, + "version": "2.0" +} diff --git a/ai_search_with_adi/images/Indexing vs Indexing with ADI.png b/adi_function_app/images/Indexing vs Indexing with ADI.png similarity index 100% rename from ai_search_with_adi/images/Indexing vs Indexing with ADI.png rename to adi_function_app/images/Indexing vs Indexing with ADI.png diff --git a/adi_function_app/key_phrase_extraction.py b/adi_function_app/key_phrase_extraction.py new file mode 100644 index 0000000..c93d62a --- /dev/null +++ b/adi_function_app/key_phrase_extraction.py @@ -0,0 +1,130 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging +import json +import os +from azure.ai.textanalytics.aio import TextAnalyticsClient +from azure.core.exceptions import HttpResponseError +from azure.core.credentials import AzureKeyCredential +import asyncio +from azure.identity import DefaultAzureCredential +from environment import IdentityType, get_identity_type + +MAX_TEXT_ELEMENTS = 5120 + + +def split_document(document: str, max_size: int) -> list[str]: + """Split a document into chunks of max_size. + + Args: + document (str): The document to split. + max_size (int): The maximum size of each chunk.""" + return [document[i : i + max_size] for i in range(0, len(document), max_size)] + + +async def extract_key_phrases_from_text( + data: list[str], max_key_phrase_count: int, retries_left: int = 3 +) -> list[str]: + """Extract key phrases from the text. + + Args: + data (list[str]): The text data. + max_key_phrase_count (int): The maximum number of key phrases to return. + + Returns: + list[str]: The key phrases extracted from the text.""" + logging.info("Python HTTP trigger function processed a request.") + + key_phrase_list = [] + + if get_identity_type() == IdentityType.SYSTEM_ASSIGNED: + credential = DefaultAzureCredential() + elif get_identity_type() == IdentityType.USER_ASSIGNED: + credential = DefaultAzureCredential( + managed_identity_client_id=os.environ.get("FunctionApp__ClientId") + ) + else: + credential = AzureKeyCredential(os.environ.get("AIService__Services__Key")) + text_analytics_client = TextAnalyticsClient( + endpoint=os.environ.get("AIService__Services__Endpoint"), + credential=credential, + ) + + async with text_analytics_client: + try: + # Split large documents + split_documents = [] + for doc in data: + if len(doc) > MAX_TEXT_ELEMENTS: + split_documents.extend(split_document(doc, MAX_TEXT_ELEMENTS)) + else: + split_documents.append(doc) + + result = await text_analytics_client.extract_key_phrases(split_documents) + for idx, doc in enumerate(result): + if not doc.is_error: + key_phrase_list.extend(doc.key_phrases[:max_key_phrase_count]) + else: + raise Exception(f"Document {idx} error: {doc.error}") + except HttpResponseError as e: + if e.status_code == 429 and retries_left > 0: # Rate limiting error + wait_time = 2**retries_left # Exponential backoff + logging.info( + "%s Rate limit exceeded. Retrying in %s seconds...", e, wait_time + ) + await asyncio.sleep(wait_time) + return await extract_key_phrases_from_text( + data, max_key_phrase_count, retries_left - 1 + ) + else: + raise Exception(f"An error occurred: {e}") from e + + return key_phrase_list + + +async def process_key_phrase_extraction( + record: dict, max_key_phrase_count: int = 5 +) -> dict: + """Extract key phrases using azure ai services. + + Args: + record (dict): The record to process. + max_key_phrase_count(int): no of keywords to return + + Returns: + dict: extracted key words.""" + + try: + json_str = json.dumps(record, indent=4) + + logging.info(f"key phrase extraction Input: {json_str}") + extracted_record = { + "recordId": record["recordId"], + "data": {}, + "errors": None, + "warnings": None, + } + extracted_record["data"]["key_phrases"] = await extract_key_phrases_from_text( + [record["data"]["text"]], max_key_phrase_count + ) + except Exception as inner_e: + logging.error("key phrase extraction Error: %s", inner_e) + logging.error( + "Failed to extract key phrase. Check function app logs for more details of exact failure." + ) + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": "Failed to extract key phrase. Check function app logs for more details of exact failure." + } + ], + "warnings": None, + } + else: + json_str = json.dumps(extracted_record, indent=4) + + logging.info(f"key phrase extraction output: {json_str}") + return extracted_record diff --git a/adi_function_app/pre_embedding_cleaner.py b/adi_function_app/pre_embedding_cleaner.py new file mode 100644 index 0000000..005954e --- /dev/null +++ b/adi_function_app/pre_embedding_cleaner.py @@ -0,0 +1,157 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import logging +import json +import nltk +import re +from nltk.tokenize import word_tokenize + +nltk.download("punkt") +nltk.download("stopwords") + + +def get_section(cleaned_text: str) -> list: + """ + Returns the section details from the content + + Args: + cleaned_text: The input text + + Returns: + list: The sections related to text + + """ + combined_pattern = r"(.*?)\n===|\n#+\s*(.*?)\n" + doc_metadata = re.findall(combined_pattern, cleaned_text, re.DOTALL) + doc_metadata = [match for group in doc_metadata for match in group if match] + return clean_sections(doc_metadata) + + +def clean_sections(sections: list) -> list: + """Cleans the sections by removing special characters and extra white spaces.""" + cleaned_sections = [re.sub(r"[=#]", "", match).strip() for match in sections] + + return cleaned_sections + + +def remove_markdown_tags(text: str, tag_patterns: dict) -> str: + """ + Remove specified Markdown tags from the text, keeping the contents of the tags. + + Args: + text: The input text containing Markdown tags. + tag_patterns: A dictionary where keys are tags and values are their specific patterns. + + Returns: + str: The text with specified tags removed. + """ + try: + for tag, pattern in tag_patterns.items(): + try: + # Replace the tags using the specific pattern, keeping the content inside the tags + text = re.sub(pattern, r"\1", text, flags=re.DOTALL) + except re.error as e: + logging.error(f"Regex error for tag '{tag}': {e}") + except Exception as e: + logging.error(f"An error occurred in remove_markdown_tags: {e}") + return text + + +def clean_text(src_text: str) -> str: + """This function performs following cleanup activities on the text, remove all unicode characters + remove line spacing,remove stop words, normalize characters + + Args: + src_text (str): The text to cleanup. + + Returns: + str: The clean text.""" + + try: + # Define specific patterns for each tag + tag_patterns = { + "figurecontent": r"", + "figure": r"
(.*?)
", + "figures": r"\(figures/\d+\)(.*?)\(figures/\d+\)", + "figcaption": r"
(.*?)
", + } + cleaned_text = remove_markdown_tags(src_text, tag_patterns) + + # remove line breaks + cleaned_text = re.sub(r"\n", "", cleaned_text) + + # remove stopwords + tokens = word_tokenize(cleaned_text, "english") + stop_words = nltk.corpus.stopwords.words("english") + filtered_tokens = [word for word in tokens if word not in stop_words] + cleaned_text = " ".join(filtered_tokens) + + # remove special characters + cleaned_text = re.sub(r"[^a-zA-Z\s]", "", cleaned_text) + + # remove extra white spaces + cleaned_text = " ".join([word for word in cleaned_text.split()]) + + # case normalization + cleaned_text = cleaned_text.lower() + except Exception as e: + logging.error(f"An error occurred in clean_text: {e}") + return "" + return cleaned_text + + +async def process_pre_embedding_cleaner(record: dict) -> dict: + """Cleanup the data using standard python libraries. + + Args: + record (dict): The record to cleanup. + + Returns: + dict: The clean record.""" + + try: + json_str = json.dumps(record, indent=4) + + logging.info(f"embedding cleaner Input: {json_str}") + + cleaned_record = { + "recordId": record["recordId"], + "data": {}, + "errors": None, + "warnings": None, + } + + # scenarios when page by chunking is enabled + if isinstance(record["data"]["chunk"], dict): + cleaned_record["data"]["cleaned_chunk"] = clean_text( + record["data"]["chunk"]["content"] + ) + cleaned_record["data"]["chunk"] = record["data"]["chunk"]["content"] + cleaned_record["data"]["cleaned_sections"] = clean_sections( + record["data"]["chunk"]["sections"] + ) + else: + cleaned_record["data"]["cleaned_chunk"] = clean_text( + record["data"]["chunk"] + ) + cleaned_record["data"]["chunk"] = record["data"]["chunk"] + cleaned_record["data"]["cleaned_sections"] = get_section( + record["data"]["chunk"] + ) + + except Exception as e: + logging.error("string cleanup Error: %s", e) + return { + "recordId": record["recordId"], + "data": {}, + "errors": [ + { + "message": "Failed to cleanup data. Check function app logs for more details of exact failure." + } + ], + "warnings": None, + } + json_str = json.dumps(cleaned_record, indent=4) + + logging.info(f"embedding cleaner output: {json_str}") + return cleaned_record diff --git a/adi_function_app/requirements.txt b/adi_function_app/requirements.txt new file mode 100644 index 0000000..a923e7f --- /dev/null +++ b/adi_function_app/requirements.txt @@ -0,0 +1,21 @@ +# DO NOT include azure-functions-worker in this file +# The Python Worker is managed by Azure Functions platform +# Manually managing azure-functions-worker may cause unexpected issues +python-dotenv +azure-functions +openai +azure-storage-blob +pandas +azure-identity +openpyxl +regex +nltk==3.8.1 +bs4 +azure-search +azure-search-documents +azure-ai-documentintelligence +azure-ai-textanalytics +azure-ai-vision-imageanalysis +PyMuPDF +aiohttp +Pillow diff --git a/adi_function_app/storage_account.py b/adi_function_app/storage_account.py new file mode 100644 index 0000000..8c5aa98 --- /dev/null +++ b/adi_function_app/storage_account.py @@ -0,0 +1,86 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +import logging +import os +import tempfile +from azure.storage.blob.aio import BlobServiceClient +from azure.identity import DefaultAzureCredential +import urllib +from environment import IdentityType, get_identity_type + + +class StorageAccountHelper: + """Helper class for interacting with Azure Blob Storage.""" + + async def get_client(self): + """Get the BlobServiceClient object.""" + if get_identity_type() == IdentityType.SYSTEM_ASSIGNED: + endpoint = os.environ.get("StorageAccount__Endpoint") + credential = DefaultAzureCredential() + return BlobServiceClient(account_url=endpoint, credential=credential) + elif get_identity_type() == IdentityType.USER_ASSIGNED: + endpoint = os.environ.get("StorageAccount__Endpoint") + credential = DefaultAzureCredential( + managed_identity_client_id=os.environ.get("FunctionApp__ClientId") + ) + return BlobServiceClient(account_url=endpoint, credential=credential) + else: + endpoint = os.environ.get("StorageAccount__ConnectionString") + return BlobServiceClient(account_url=endpoint) + + async def add_metadata_to_blob( + self, source: str, container: str, metadata: dict + ) -> None: + """Add metadata to the blob. + + Args + source (str): The source of the blob. + container (str): The container of the blob. + metadata (dict): The metadata to add to the blob.""" + + blob = urllib.parse.unquote_plus(source) + + blob_service_client = await self.get_client() + async with blob_service_client: + async with blob_service_client.get_blob_client( + container=container, blob=blob + ) as blob_client: + await blob_client.set_blob_metadata(metadata) + + logging.info("Metadata Added") + + async def download_blob_to_temp_dir( + self, source: str, container: str, target_file_name + ) -> tuple[str, dict]: + """Download the file from the Azure Blob Storage. + + Args: + source (str): The source of the blob. + container (str): The container of the blob. + target_file_name (str): The target file name.""" + + blob = urllib.parse.unquote_plus(source) + + blob_service_client = await self.get_client() + async with blob_service_client: + async with blob_service_client.get_blob_client( + container=container, blob=blob + ) as blob_client: + blob_download = await blob_client.download_blob() + blob_contents = await blob_download.readall() + + blob_properties = await blob_client.get_blob_properties() + + logging.info("Blob Downloaded") + # Get the temporary directory + temp_dir = tempfile.gettempdir() + + # Define the temporary file path + temp_file_path = os.path.join(temp_dir, target_file_name) + + # Write the blob contents to the temporary file + with open(temp_file_path, "wb") as temp_file: + temp_file.write(blob_contents) + + return temp_file_path, blob_properties.metadata diff --git a/deploy_ai_search/README.md b/deploy_ai_search/README.md new file mode 100644 index 0000000..c124cd5 --- /dev/null +++ b/deploy_ai_search/README.md @@ -0,0 +1,18 @@ +# AI Search Indexing with Azure Document Intelligence - Pre-built Index Setup + +The associated scripts in this portion of the repository contains pre-built scripts to deploy the skillset with Azure Document Intelligence. + +## Steps + +1. Update `.env` file with the associated values. Not all values are required dependent on whether you are using System / User Assigned Identities or a Key based authentication. +2. Adjust `rag_documents.py` with any changes to the index / indexer. The `get_skills()` method implements the skills pipeline. Make any adjustments here in the skills needed to enrich the data source. +3. Run `deploy.py` with the following args: + + - `indexer_type rag`. This selects the `rag_documents` sub class. + - `enable_page_chunking True`. This determines whether page wise chunking is applied in ADI, or whether the inbuilt skill is used for TextSplit. **Page wise analysis in ADI is recommended to avoid splitting tables / figures across multiple chunks, when the chunking is performed.** + - `rebuild`. Whether to delete and rebuild the index. + - `suffix`. Optional parameter that will apply a suffix onto the deployed index and indexer. This is useful if you want deploy a test version, before overwriting the main version. + +## ai_search.py & environment.py + +This includes a variety of helper files and scripts to deploy the index setup. This is useful for CI/CD to avoid having to write JSON files manually or use the UI to deploy the pipeline. diff --git a/deploy_ai_search/ai_search.py b/deploy_ai_search/ai_search.py new file mode 100644 index 0000000..75a6bf0 --- /dev/null +++ b/deploy_ai_search/ai_search.py @@ -0,0 +1,555 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import logging +from abc import ABC, abstractmethod +from azure.search.documents.indexes.models import ( + SearchIndex, + SearchableField, + VectorSearch, + VectorSearchProfile, + HnswAlgorithmConfiguration, + SemanticSearch, + NativeBlobSoftDeleteDeletionDetectionPolicy, + HighWaterMarkChangeDetectionPolicy, + WebApiSkill, + AzureOpenAIEmbeddingSkill, + AzureOpenAIVectorizer, + AzureOpenAIParameters, + SearchIndexer, + SearchIndexerSkillset, + SearchIndexerDataContainer, + SearchIndexerDataSourceConnection, + SearchIndexerDataSourceType, + OutputFieldMappingEntry, + InputFieldMappingEntry, + SynonymMap, + SplitSkill, + SearchIndexerIndexProjections, +) +from azure.core.exceptions import HttpResponseError +from azure.search.documents.indexes import SearchIndexerClient, SearchIndexClient +from environment import AISearchEnvironment, IdentityType + + +class AISearch(ABC): + """Handles the deployment of the AI search pipeline.""" + + def __init__( + self, + suffix: str | None = None, + rebuild: bool | None = False, + ): + """Initialize the AI search class + + Args: + suffix (str, optional): The suffix for the indexer. Defaults to None. If an suffix is provided, it is assumed to be a test indexer. + rebuild (bool, optional): Whether to rebuild the index. Defaults to False. + """ + + if not hasattr(self, "indexer_type"): + self.indexer_type = None # Needed to help mypy understand that indexer_type is defined in the child class + raise ValueError("indexer_type is not defined in the child class.") + + if rebuild is not None: + self.rebuild = rebuild + else: + self.rebuild = False + + # If suffix is None, then it is not a test indexer. Test indexer limits the rate of indexing and turns off the schedule. Useful for testing index changes + if suffix is None: + self.suffix = "" + self.test = False + else: + self.suffix = f"-{suffix}-test" + self.test = True + + self.environment = AISearchEnvironment(indexer_type=self.indexer_type) + + self._search_indexer_client = SearchIndexerClient( + self.environment.ai_search_endpoint, self.environment.ai_search_credential + ) + self._search_index_client = SearchIndexClient( + self.environment.ai_search_endpoint, self.environment.ai_search_credential + ) + + @property + def indexer_name(self): + """Get the indexer name for the indexer.""" + return f"{str(self.indexer_type.value)}-indexer{self.suffix}" + + @property + def skillset_name(self): + """Get the skillset name for the indexer.""" + return f"{str(self.indexer_type.value)}-skillset{self.suffix}" + + @property + def semantic_config_name(self): + """Get the semantic config name for the indexer.""" + return f"{str(self.indexer_type.value)}-semantic-config{self.suffix}" + + @property + def index_name(self): + """Get the index name for the indexer.""" + return f"{str(self.indexer_type.value)}-index{self.suffix}" + + @property + def data_source_name(self): + """Get the data source name for the indexer.""" + blob_container_name = self.environment.storage_account_blob_container_name + return f"{blob_container_name}-data-source{self.suffix}" + + @property + def vector_search_profile_name(self): + """Get the vector search profile name for the indexer.""" + return f"{str(self.indexer_type.value)}-vector-search-profile{self.suffix}" + + @property + def vectorizer_name(self): + """Get the vectorizer name.""" + return f"{str(self.indexer_type.value)}-vectorizer{self.suffix}" + + @property + def algorithm_name(self): + """Get the algorithm name""" + + return f"{str(self.indexer_type.value)}-algorithm{self.suffix}" + + @abstractmethod + def get_index_fields(self) -> list[SearchableField]: + """Get the index fields for the indexer. + + Returns: + list[SearchableField]: The index fields""" + + @abstractmethod + def get_semantic_search(self) -> SemanticSearch: + """Get the semantic search configuration for the indexer. + + Returns: + SemanticSearch: The semantic search configuration""" + + def get_skills(self) -> list: + """Get the skillset for the indexer. + + Returns: + list: The skillsets used in the indexer""" + + return [] + + def get_indexer(self) -> SearchIndexer: + """Get the indexer for the indexer.""" + + return None + + def get_index_projections(self) -> SearchIndexerIndexProjections: + """Get the index projections for the indexer.""" + + return None + + def get_synonym_map_names(self) -> list[str]: + """Get the synonym map names for the indexer.""" + return [] + + def get_data_source(self) -> SearchIndexerDataSourceConnection: + """Get the data source for the indexer.""" + + if self.get_indexer() is None: + return None + + data_deletion_detection_policy = NativeBlobSoftDeleteDeletionDetectionPolicy() + + data_change_detection_policy = HighWaterMarkChangeDetectionPolicy( + high_water_mark_column_name="metadata_storage_last_modified" + ) + + container = SearchIndexerDataContainer( + name=self.environment.storage_account_blob_container_name + ) + + data_source_connection = SearchIndexerDataSourceConnection( + name=self.data_source_name, + type=SearchIndexerDataSourceType.AZURE_BLOB, + connection_string=self.environment.storage_account_connection_string, + container=container, + data_change_detection_policy=data_change_detection_policy, + data_deletion_detection_policy=data_deletion_detection_policy, + ) + + if self.environment.identity_type != IdentityType.KEY: + data_source_connection.identity = self.environment.ai_search_identity_id + + return data_source_connection + + def get_pre_embedding_cleaner_skill( + self, context, source, target_name="cleaned_chunk" + ) -> WebApiSkill: + """Get the custom skill for data cleanup. + + Args: + ----- + context (str): The context of the skill + inputs (List[InputFieldMappingEntry]): The inputs of the skill + outputs (List[OutputFieldMappingEntry]): The outputs of the skill + + Returns: + -------- + WebApiSkill: The custom skill for data cleanup""" + + if self.test: + batch_size = 2 + degree_of_parallelism = 2 + else: + batch_size = 16 + degree_of_parallelism = 16 + + pre_embedding_cleaner_skill_inputs = [ + InputFieldMappingEntry(name="chunk", source=source) + ] + + pre_embedding_cleaner_skill_outputs = [ + OutputFieldMappingEntry(name="cleaned_chunk", target_name=target_name), + OutputFieldMappingEntry(name="chunk", target_name="chunk"), + OutputFieldMappingEntry( + name="cleaned_sections", target_name="cleaned_sections" + ), + ] + + pre_embedding_cleaner_skill = WebApiSkill( + name="Pre Embedding Cleaner Skill", + description="Skill to clean the data before sending to embedding", + context=context, + uri=self.environment.get_custom_skill_function_url("pre_embedding_cleaner"), + timeout="PT230S", + batch_size=batch_size, + degree_of_parallelism=degree_of_parallelism, + http_method="POST", + inputs=pre_embedding_cleaner_skill_inputs, + outputs=pre_embedding_cleaner_skill_outputs, + ) + + if self.environment.identity_type != IdentityType.KEY: + pre_embedding_cleaner_skill.auth_identity = ( + self.environment.function_app_app_registration_resource_id + ) + + if self.environment.identity_type == IdentityType.USER_ASSIGNED: + pre_embedding_cleaner_skill.auth_identity = ( + self.environment.ai_search_user_assigned_identity + ) + + return pre_embedding_cleaner_skill + + def get_text_split_skill(self, context, source) -> SplitSkill: + """Get the skill for text split. + + Args: + ----- + context (str): The context of the skill + inputs (List[InputFieldMappingEntry]): The inputs of the skill + outputs (List[OutputFieldMappingEntry]): The outputs of the skill + + Returns: + -------- + splitSKill: The skill for text split""" + + text_split_skill = SplitSkill( + name="Text Split Skill", + description="Skill to split the text before sending to embedding", + context=context, + text_split_mode="pages", + maximum_page_length=2000, + page_overlap_length=500, + inputs=[InputFieldMappingEntry(name="text", source=source)], + outputs=[OutputFieldMappingEntry(name="textItems", target_name="pages")], + ) + + return text_split_skill + + def get_adi_skill(self, chunk_by_page=False) -> WebApiSkill: + """Get the custom skill for adi. + + Returns: + -------- + WebApiSkill: The custom skill for adi""" + + if self.test: + batch_size = 1 + degree_of_parallelism = 4 + else: + # Depending on your GPT Token limit, you may need to adjust the batch size and degree of parallelism + batch_size = 1 + degree_of_parallelism = 8 + + if chunk_by_page: + output = [ + OutputFieldMappingEntry(name="extracted_content", target_name="pages") + ] + else: + output = [ + OutputFieldMappingEntry( + name="extracted_content", target_name="extracted_content" + ) + ] + + adi_skill = WebApiSkill( + name="ADI Skill", + description="Skill to generate ADI", + context="/document", + uri=self.environment.get_custom_skill_function_url("adi"), + timeout="PT230S", + batch_size=batch_size, + degree_of_parallelism=degree_of_parallelism, + http_method="POST", + http_headers={"chunk_by_page": chunk_by_page}, + inputs=[ + InputFieldMappingEntry( + name="source", source="/document/metadata_storage_path" + ) + ], + outputs=output, + ) + + if self.environment.identity_type != IdentityType.KEY: + adi_skill.auth_identity = ( + self.environment.function_app_app_registration_resource_id + ) + + if self.environment.identity_type == IdentityType.USER_ASSIGNED: + adi_skill.auth_identity = self.environment.ai_search_user_assigned_identity + + return adi_skill + + def get_vector_skill( + self, context, source, target_name="vector" + ) -> AzureOpenAIEmbeddingSkill: + """Get the vector skill for the indexer. + + Returns: + AzureOpenAIEmbeddingSkill: The vector skill for the indexer""" + + embedding_skill_inputs = [ + InputFieldMappingEntry(name="text", source=source), + ] + embedding_skill_outputs = [ + OutputFieldMappingEntry(name="embedding", target_name=target_name) + ] + + vector_skill = AzureOpenAIEmbeddingSkill( + name="Vector Skill", + description="Skill to generate embeddings", + context=context, + deployment_id=self.environment.open_ai_embedding_deployment, + model_name=self.environment.open_ai_embedding_model, + resource_uri=self.environment.open_ai_endpoint, + inputs=embedding_skill_inputs, + outputs=embedding_skill_outputs, + dimensions=self.environment.open_ai_embedding_dimensions, + ) + + if self.environment.identity_type == IdentityType.KEY: + vector_skill.api_key = self.environment.open_ai_api_key + elif self.environment.identity_type == IdentityType.USER_ASSIGNED: + vector_skill.auth_identity = ( + self.environment.ai_search_user_assigned_identity + ) + + return vector_skill + + def get_key_phrase_extraction_skill(self, context, source) -> WebApiSkill: + """Get the key phrase extraction skill. + + Args: + ----- + context (str): The context of the skill + source (str): The source of the skill + + Returns: + -------- + WebApiSkill: The key phrase extraction skill""" + + if self.test: + batch_size = 4 + degree_of_parallelism = 4 + else: + batch_size = 16 + degree_of_parallelism = 16 + + key_phrase_extraction_skill_inputs = [ + InputFieldMappingEntry(name="text", source=source), + ] + key_phrase_extraction__skill_outputs = [ + OutputFieldMappingEntry(name="key_phrases", target_name="keywords") + ] + key_phrase_extraction_skill = WebApiSkill( + name="Key phrase extraction API", + description="Skill to extract keyphrases", + context=context, + uri=self.environment.get_custom_skill_function_url("key_phrase_extraction"), + timeout="PT230S", + batch_size=batch_size, + degree_of_parallelism=degree_of_parallelism, + http_method="POST", + inputs=key_phrase_extraction_skill_inputs, + outputs=key_phrase_extraction__skill_outputs, + ) + + if self.environment.identity_type != IdentityType.KEY: + key_phrase_extraction_skill.auth_identity = ( + self.environment.function_app_app_registration_resource_id + ) + + if self.environment.identity_type == IdentityType.USER_ASSIGNED: + key_phrase_extraction_skill.auth_identity = ( + self.environment.ai_search_user_assigned_identity + ) + + return key_phrase_extraction_skill + + def get_vector_search(self) -> VectorSearch: + """Get the vector search configuration for compass. + + Args: + indexer_type (str): The type of the indexer + + Returns: + VectorSearch: The vector search configuration + """ + + open_ai_params = AzureOpenAIParameters( + resource_uri=self.environment.open_ai_endpoint, + model_name=self.environment.open_ai_embedding_model, + deployment_id=self.environment.open_ai_embedding_deployment, + ) + + if self.environment.identity_type == IdentityType.KEY: + open_ai_params.api_key = self.environment.open_ai_api_key + elif self.environment.identity_type == IdentityType.USER_ASSIGNED: + open_ai_params.auth_identity = ( + self.environment.ai_search_user_assigned_identity + ) + + vector_search = VectorSearch( + algorithms=[ + HnswAlgorithmConfiguration(name=self.algorithm_name), + ], + profiles=[ + VectorSearchProfile( + name=self.vector_search_profile_name, + algorithm_configuration_name=self.algorithm_name, + vectorizer=self.vectorizer_name, + ) + ], + vectorizers=[ + AzureOpenAIVectorizer( + name=self.vectorizer_name, + azure_open_ai_parameters=open_ai_params, + ), + ], + ) + + return vector_search + + def deploy_index(self): + """This function deploys index""" + + index_fields = self.get_index_fields() + vector_search = self.get_vector_search() + semantic_search = self.get_semantic_search() + index = SearchIndex( + name=self.index_name, + fields=index_fields, + vector_search=vector_search, + semantic_search=semantic_search, + ) + if self.rebuild: + self._search_index_client.delete_index(self.index_name) + self._search_index_client.create_or_update_index(index) + + logging.info("%s index created", index.name) + + def deploy_skillset(self): + """This function deploys the skillset.""" + skills = self.get_skills() + + if len(skills) == 0: + logging.warning("No skills defined. Skipping skillset deployment.") + + return + + index_projections = self.get_index_projections() + + skillset = SearchIndexerSkillset( + name=self.skillset_name, + description="Skillset to chunk documents and generating embeddings", + skills=skills, + index_projections=index_projections, + ) + + self._search_indexer_client.create_or_update_skillset(skillset) + + logging.info("%s skillset created", skillset.name) + + def deploy_data_source(self): + """This function deploys the data source.""" + data_source = self.get_data_source() + + if data_source is None: + logging.warning("Data source not defined. Skipping data source deployment.") + + return + + result = self._search_indexer_client.create_or_update_data_source_connection( + data_source + ) + + logging.info("%s data source created", result.name) + + def deploy_indexer(self): + """This function deploys the indexer.""" + indexer = self.get_indexer() + + if indexer is None: + logging.warning("Indexer not defined. Skipping data source deployment.") + + return + + result = self._search_indexer_client.create_or_update_indexer(indexer) + + logging.info("%s indexer created", result.name) + + def run_indexer(self): + """This function runs the indexer.""" + self._search_indexer_client.run_indexer(self.indexer_name) + + logging.info( + "%s is running. If queries return no results, please wait a bit and try again.", + self.indexer_name, + ) + + def reset_indexer(self): + """This function runs the indexer.""" + self._search_indexer_client.reset_indexer(self.indexer_name) + + logging.info("%s reset.", self.indexer_name) + + def deploy_synonym_map(self): + """This function deploys the synonym map.""" + + synonym_maps = self.get_synonym_map_names() + if len(synonym_maps) > 0: + for synonym_map in synonym_maps: + try: + synonym_map = SynonymMap(name=synonym_map, synonyms="") + self._search_index_client.create_or_update_synonym_map(synonym_map) + except HttpResponseError as e: + logging.error("Unable to deploy synonym map. %s", e) + + def deploy(self): + """This function deploys the whole AI search pipeline.""" + self.deploy_data_source() + self.deploy_synonym_map() + self.deploy_index() + self.deploy_skillset() + self.deploy_indexer() + + logging.info("%s setup deployed", self.indexer_type.value) diff --git a/deploy_ai_search/deploy.py b/deploy_ai_search/deploy.py new file mode 100644 index 0000000..7254c03 --- /dev/null +++ b/deploy_ai_search/deploy.py @@ -0,0 +1,55 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import argparse +from rag_documents import RagDocumentsAISearch + + +def deploy_config(arguments: argparse.Namespace): + """Deploy the indexer configuration based on the arguments passed. + + Args: + arguments (argparse.Namespace): The arguments passed to the script""" + if arguments.indexer_type == "rag": + index_config = RagDocumentsAISearch( + suffix=arguments.suffix, + rebuild=arguments.rebuild, + enable_page_by_chunking=arguments.enable_page_chunking, + ) + else: + raise ValueError("Invalid Indexer Type") + + index_config.deploy() + + if arguments.rebuild: + index_config.reset_indexer() + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Process some arguments.") + parser.add_argument( + "--indexer_type", + type=str, + required=True, + help="Type of Indexer want to deploy.", + ) + parser.add_argument( + "--rebuild", + type=bool, + required=False, + help="Whether want to delete and rebuild the index", + ) + parser.add_argument( + "--enable_page_chunking", + type=bool, + required=False, + help="Whether want to enable chunking by page in adi skill, if no value is passed considered False", + ) + parser.add_argument( + "--suffix", + type=str, + required=False, + help="Suffix to be attached to indexer objects", + ) + + args = parser.parse_args() + deploy_config(args) diff --git a/deploy_ai_search/environment.py b/deploy_ai_search/environment.py new file mode 100644 index 0000000..d7cbdb4 --- /dev/null +++ b/deploy_ai_search/environment.py @@ -0,0 +1,257 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +import os +from dotenv import find_dotenv, load_dotenv +from enum import Enum +from azure.identity import DefaultAzureCredential +from azure.core.credentials import AzureKeyCredential +from azure.search.documents.indexes.models import SearchIndexerDataUserAssignedIdentity + + +class IndexerType(Enum): + """The type of the indexer""" + + RAG_DOCUMENTS = "rag-documents" + + +class IdentityType(Enum): + """The type of the indexer""" + + USER_ASSIGNED = "user_assigned" + SYSTEM_ASSIGNED = "system_assigned" + KEY = "key" + + +class AISearchEnvironment: + """This class is used to get the environment variables for the AI search service.""" + + def __init__(self, indexer_type: IndexerType): + """Initialize the AISearchEnvironment class. + + Args: + indexer_type (IndexerType): The type of the indexer + """ + load_dotenv(find_dotenv()) + + self.indexer_type = indexer_type + + @property + def normalised_indexer_type(self) -> str: + """This function returns the normalised indexer type. + + Returns: + str: The normalised indexer type + """ + + normalised_indexer_type = ( + self.indexer_type.value.replace("-", " ").title().replace(" ", "") + ) + + return normalised_indexer_type + + @property + def identity_type(self) -> IdentityType: + """This function returns the identity type. + + Returns: + IdentityType: The identity type + """ + identity = os.environ.get("IdentityType").lower() + + if identity == "user_assigned": + return IdentityType.USER_ASSIGNED + elif identity == "system_assigned": + return IdentityType.SYSTEM_ASSIGNED + elif identity == "key": + return IdentityType.KEY + else: + raise ValueError("Invalid identity type") + + @property + def ai_search_endpoint(self) -> str: + """This function returns the ai search endpoint. + + Returns: + str: The ai search endpoint + """ + return os.environ.get("AIService__AzureSearchOptions__Endpoint") + + @property + def ai_search_identity_id(self) -> str: + """This function returns the ai search identity id. + + Returns: + str: The ai search identity id + """ + return os.environ.get("AIService__AzureSearchOptions__Identity__ClientId") + + @property + def ai_search_user_assigned_identity(self) -> SearchIndexerDataUserAssignedIdentity: + """This function returns the ai search user assigned identity. + + Returns: + SearchIndexerDataUserAssignedIdentity: The ai search user assigned identity + """ + user_assigned_identity = SearchIndexerDataUserAssignedIdentity( + user_assigned_identity=os.environ.get( + "AIService__AzureSearchOptions__Identity__FQName" + ) + ) + return user_assigned_identity + + @property + def ai_search_credential(self) -> DefaultAzureCredential | AzureKeyCredential: + """This function returns the ai search credential. + + Returns: + DefaultAzureCredential | AzureKeyCredential: The ai search credential + """ + if self.identity_type == IdentityType.SYSTEM_ASSIGNED: + return DefaultAzureCredential() + elif self.identity_type == IdentityType.USER_ASSIGNED: + return DefaultAzureCredential( + managed_identity_client_id=self.ai_search_identity_id + ) + else: + return AzureKeyCredential( + os.environ.get("AIService__AzureSearchOptions__Key") + ) + + @property + def open_ai_api_key(self) -> str: + """This function returns the open ai api key. + + Returns: + str: The open ai api key + """ + return os.environ.get("OpenAI__ApiKey") + + @property + def open_ai_endpoint(self) -> str: + """This function returns the open ai endpoint. + + Returns: + str: The open ai endpoint + """ + return os.environ.get("OpenAI__Endpoint") + + @property + def open_ai_embedding_model(self) -> str: + """This function returns the open ai embedding model. + + Returns: + str: The open ai embedding model + """ + return os.environ.get("OpenAI__EmbeddingModel") + + @property + def open_ai_embedding_deployment(self) -> str: + """This function returns the open ai embedding deployment. + + Returns: + str: The open ai embedding deployment + """ + return os.environ.get("OpenAI__EmbeddingDeployment") + + @property + def storage_account_connection_string(self) -> str: + """This function returns the blob connection string. If the identity type is user_assigned or system_assigned, it returns the FQEndpoint, otherwise it returns the ConnectionString""" + if self.identity_type in [ + IdentityType.SYSTEM_ASSIGNED, + IdentityType.USER_ASSIGNED, + ]: + return os.environ.get("StorageAccount__FQEndpoint") + else: + return os.environ.get("StorageAccount__ConnectionString") + + @property + def storage_account_blob_container_name(self) -> str: + """ + This function returns azure blob container name + """ + + return os.environ.get( + f"StorageAccount__{self.normalised_indexer_type}__Container" + ) + + @property + def function_app_end_point(self) -> str: + """ + This function returns function app endpoint + """ + return os.environ.get("FunctionApp__Endpoint") + + @property + def function_app_key(self) -> str: + """ + This function returns function app key + """ + return os.environ.get("FunctionApp__Key") + + @property + def function_app_app_registration_resource_id(self) -> str: + """ + This function returns function app app registration resource id + """ + return os.environ.get("FunctionApp__AppRegistrationResourceId") + + @property + def function_app_pre_embedding_cleaner_route(self) -> str: + """ + This function returns function app data cleanup function name + """ + return os.environ.get("FunctionApp__PreEmbeddingCleaner__FunctionName") + + @property + def function_app_adi_route(self) -> str: + """ + This function returns function app adi name + """ + return os.environ.get("FunctionApp__ADI__FunctionName") + + @property + def function_app_key_phrase_extractor_route(self) -> str: + """ + This function returns function app keyphrase extractor name + """ + return os.environ.get("FunctionApp__KeyPhraseExtractor__FunctionName") + + @property + def open_ai_embedding_dimensions(self) -> str: + """ + This function returns dimensions for embedding model. + + Returns: + str: The dimensions for embedding model + """ + + return os.environ.get("OpenAI__EmbeddingDimensions") + + @property + def use_private_endpoint(self) -> bool: + """ + This function returns true if private endpoint is used + """ + return ( + os.environ.get("AIService__AzureSearchOptions__UsePrivateEndpoint").lower() + == "true" + ) + + def get_custom_skill_function_url(self, skill_type: str): + """ + Get the function app url that is hosting the custom skill + """ + if skill_type == "pre_embedding_cleaner": + route = self.function_app_pre_embedding_cleaner_route + elif skill_type == "adi": + route = self.function_app_adi_route + elif skill_type == "key_phrase_extraction": + route = self.function_app_key_phrase_extractor_route + else: + raise ValueError(f"Invalid skill type: {skill_type}") + + full_url = ( + f"{self.function_app_end_point}/api/{route}?code={self.function_app_key}" + ) + + return full_url diff --git a/deploy_ai_search/rag_documents.py b/deploy_ai_search/rag_documents.py new file mode 100644 index 0000000..5afa932 --- /dev/null +++ b/deploy_ai_search/rag_documents.py @@ -0,0 +1,275 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +from azure.search.documents.indexes.models import ( + SearchFieldDataType, + SearchField, + SearchableField, + SemanticField, + SemanticPrioritizedFields, + SemanticConfiguration, + SemanticSearch, + InputFieldMappingEntry, + SearchIndexer, + FieldMapping, + IndexingParameters, + IndexingParametersConfiguration, + SearchIndexerIndexProjections, + SearchIndexerIndexProjectionSelector, + SearchIndexerIndexProjectionsParameters, + IndexProjectionMode, + SimpleField, + BlobIndexerDataToExtract, + IndexerExecutionEnvironment, +) +from ai_search import AISearch +from environment import ( + IndexerType, +) + + +class RagDocumentsAISearch(AISearch): + """This class is used to deploy the rag document index.""" + + def __init__( + self, + suffix: str | None = None, + rebuild: bool | None = False, + enable_page_by_chunking=False, + ): + """Initialize the RagDocumentsAISearch class. This class implements the deployment of the rag document index. + + Args: + suffix (str, optional): The suffix for the indexer. Defaults to None. If an suffix is provided, it is assumed to be a test indexer. + rebuild (bool, optional): Whether to rebuild the index. Defaults to False. + """ + self.indexer_type = IndexerType.RAG_DOCUMENTS + super().__init__(suffix, rebuild) + + if enable_page_by_chunking is not None: + self.enable_page_by_chunking = enable_page_by_chunking + else: + self.enable_page_by_chunking = False + + def get_index_fields(self) -> list[SearchableField]: + """This function returns the index fields for rag document. + + Returns: + list[SearchableField]: The index fields for rag document""" + + fields = [ + SimpleField(name="Id", type=SearchFieldDataType.String, filterable=True), + SearchableField( + name="Title", type=SearchFieldDataType.String, filterable=True + ), + SearchableField( + name="ChunkId", + type=SearchFieldDataType.String, + key=True, + analyzer_name="keyword", + ), + SearchableField( + name="Chunk", + type=SearchFieldDataType.String, + sortable=False, + filterable=False, + facetable=False, + ), + SearchableField( + name="Sections", + type=SearchFieldDataType.String, + collection=True, + ), + SearchField( + name="ChunkEmbedding", + type=SearchFieldDataType.Collection(SearchFieldDataType.Single), + vector_search_dimensions=self.environment.open_ai_embedding_dimensions, + vector_search_profile_name=self.vector_search_profile_name, + ), + SearchableField( + name="Keywords", type=SearchFieldDataType.String, collection=True + ), + SearchableField( + name="SourceUri", + type=SearchFieldDataType.String, + sortable=True, + filterable=True, + facetable=True, + ), + ] + + if self.enable_page_by_chunking: + fields.extend( + [ + SearchableField( + name="PageNumber", + type=SearchFieldDataType.Int64, + sortable=True, + filterable=True, + facetable=True, + ) + ] + ) + + return fields + + def get_semantic_search(self) -> SemanticSearch: + """This function returns the semantic search configuration for rag document + + Returns: + SemanticSearch: The semantic search configuration""" + + semantic_config = SemanticConfiguration( + name=self.semantic_config_name, + prioritized_fields=SemanticPrioritizedFields( + title_field=SemanticField(field_name="Title"), + content_fields=[SemanticField(field_name="Chunk")], + keywords_fields=[ + SemanticField(field_name="Keywords"), + SemanticField(field_name="Sections"), + ], + ), + ) + + semantic_search = SemanticSearch(configurations=[semantic_config]) + + return semantic_search + + def get_skills(self) -> list: + """Get the skillset for the indexer. + + Returns: + list: The skillsets used in the indexer""" + + adi_skill = self.get_adi_skill(self.enable_page_by_chunking) + + text_split_skill = self.get_text_split_skill( + "/document", "/document/extracted_content/content" + ) + + pre_embedding_cleaner_skill = self.get_pre_embedding_cleaner_skill( + "/document/pages/*", "/document/pages/*" + ) + + key_phrase_extraction_skill = self.get_key_phrase_extraction_skill( + "/document/pages/*", "/document/pages/*/cleaned_chunk" + ) + + embedding_skill = self.get_vector_skill( + "/document/pages/*", "/document/pages/*/cleaned_chunk" + ) + + if self.enable_page_by_chunking: + skills = [ + adi_skill, + pre_embedding_cleaner_skill, + key_phrase_extraction_skill, + embedding_skill, + ] + else: + skills = [ + adi_skill, + text_split_skill, + pre_embedding_cleaner_skill, + key_phrase_extraction_skill, + embedding_skill, + ] + + return skills + + def get_index_projections(self) -> SearchIndexerIndexProjections: + """This function returns the index projections for rag document.""" + mappings = [ + InputFieldMappingEntry(name="Chunk", source="/document/pages/*/chunk"), + InputFieldMappingEntry( + name="ChunkEmbedding", + source="/document/pages/*/vector", + ), + InputFieldMappingEntry(name="Title", source="/document/Title"), + InputFieldMappingEntry(name="SourceUri", source="/document/SourceUri"), + InputFieldMappingEntry( + name="Keywords", source="/document/pages/*/keywords" + ), + InputFieldMappingEntry( + name="Sections", source="/document/pages/*/cleaned_sections" + ), + ] + + if self.enable_page_by_chunking: + mappings.extend( + [ + InputFieldMappingEntry( + name="PageNumber", source="/document/pages/*/page_number" + ) + ] + ) + + index_projections = SearchIndexerIndexProjections( + selectors=[ + SearchIndexerIndexProjectionSelector( + target_index_name=self.index_name, + parent_key_field_name="Id", + source_context="/document/pages/*", + mappings=mappings, + ), + ], + parameters=SearchIndexerIndexProjectionsParameters( + projection_mode=IndexProjectionMode.SKIP_INDEXING_PARENT_DOCUMENTS + ), + ) + + return index_projections + + def get_indexer(self) -> SearchIndexer: + """This function returns the indexer for rag document. + + Returns: + SearchIndexer: The indexer for rag document""" + + # Only place on schedule if it is not a test deployment + if self.test: + schedule = None + batch_size = 4 + else: + schedule = {"interval": "PT15M"} + batch_size = 16 + + if self.environment.use_private_endpoint: + execution_environment = IndexerExecutionEnvironment.PRIVATE + else: + execution_environment = IndexerExecutionEnvironment.STANDARD + + indexer_parameters = IndexingParameters( + batch_size=batch_size, + configuration=IndexingParametersConfiguration( + data_to_extract=BlobIndexerDataToExtract.ALL_METADATA, + query_timeout=None, + execution_environment=execution_environment, + fail_on_unprocessable_document=False, + fail_on_unsupported_content_type=False, + index_storage_metadata_only_for_oversized_documents=True, + indexed_file_name_extensions=".pdf,.pptx,.docx,.xlsx,.txt,.png,.jpg,.jpeg", + ), + max_failed_items=5, + ) + + indexer = SearchIndexer( + name=self.indexer_name, + description="Indexer to index documents and generate embeddings", + skillset_name=self.skillset_name, + target_index_name=self.index_name, + data_source_name=self.data_source_name, + schedule=schedule, + field_mappings=[ + FieldMapping( + source_field_name="metadata_storage_name", target_field_name="Title" + ), + FieldMapping( + source_field_name="metadata_storage_path", + target_field_name="SourceUri", + ), + ], + parameters=indexer_parameters, + ) + + return indexer diff --git a/deploy_ai_search/requirements.txt b/deploy_ai_search/requirements.txt new file mode 100644 index 0000000..4a3e38d --- /dev/null +++ b/deploy_ai_search/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv +azure-search-documents==11.6.0b4 +azure-storage-blob +azure-identity +azure-mgmt-web