diff --git a/webhook-doc-status-sample/Dockerfile b/webhook-doc-status-sample/Dockerfile new file mode 100644 index 0000000..4a8d78e --- /dev/null +++ b/webhook-doc-status-sample/Dockerfile @@ -0,0 +1,13 @@ +FROM --platform=linux/amd64 python:3-alpine + +WORKDIR /app + +COPY requirements.txt ./ + +RUN pip install --upgrade pip && \ + pip install -r requirements.txt && \ + rm requirements.txt + +COPY main.py ./ + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] \ No newline at end of file diff --git a/webhook-doc-status-sample/README.md b/webhook-doc-status-sample/README.md new file mode 100644 index 0000000..3fbbccf --- /dev/null +++ b/webhook-doc-status-sample/README.md @@ -0,0 +1,62 @@ +# Document Processing application using Document Status webhook + +Document Processing application that utilizes Watson Discovery collection and webhook support of Document Status API. +This is just a sample application, not production code. + +## Requirements +- Instance of Watson Discovery Plus/Enterprise plan on IBM Cloud. + +## Setup Instructions + +### Deploy the document processing application to Code Engine +In this tutorial, we will use [IBM Cloud Code Engine](https://www.ibm.com/cloud/code-engine) as the infrastructure for the application of document processing which receives the document processing status events. Of course, you can deploy the application in any environment you like. + +1. [Create a project](https://cloud.ibm.com/docs/codeengine?topic=codeengine-manage-project#create-a-project) of Code Engine. +2. [Deploy the application](https://cloud.ibm.com/docs/codeengine?topic=codeengine-app-source-code) from this repository source code. + - In **Create application**, click **Specify build details** and enter the following: + - Source + - Code repo URL: `https://github.com/watson-developer-cloud/doc-tutorial-downloads` + - Code repo access: `None` + - Branch name: `master` + - Context directory: `discovery-data/webhook-doc-status-sample` + - Strategy + - Strategy: `Dockerfile` + - Output + - Enter your container image registry information. + - Set **Min number of instances** and **Max number of instances** to `1`. +3. [Add service binding](https://cloud.ibm.com/docs/codeengine?topic=codeengine-bind-services) to the application. + - In **IBM Cloud service instance**, specify the service instance of Watson Discovery Plus/Enterprise plan on IBM Cloud +4. Confirm that the application status changes to **Ready**. + +### Configure Discovery collection +1. Create a project. +2. Create a collection in the project and apply the document status webhook to the collection. `{webhook-doc-status-sample-url}` is URL to the deployed application. +```sh +curl -X POST {auth} \ + '{url}/v2/projects/{project_id}/collections?version=2023-03-31' \ + --header 'Content-Type: application/json' \ + --data-raw '{ + "name":"DocProc App", + "webhooks": { + "document_status": [ + { + "url": "{webhook-doc-status-sample-url}/webhook" + } + ] + } +}' +``` + +### Process documents +Process a document and return it for realtime use. +The file is stored in the collection and is processed according to the collection's configuration settings. To remove the processed documents in the collection, you need to remove them manually via Tooling or API. + +Example: + +```sh +curl -X POST \ + '{webhook-doc-status-sample-url}/projects/{project_id}/collections/{collection_id}/extract' \ + -H 'accept: application/json' \ + -H 'Content-Type: multipart/form-data' \ + -F 'file=@sample.pdf;type=application/pdf' +``` diff --git a/webhook-doc-status-sample/main.py b/webhook-doc-status-sample/main.py new file mode 100644 index 0000000..0e1962f --- /dev/null +++ b/webhook-doc-status-sample/main.py @@ -0,0 +1,171 @@ +from dataclasses import dataclass +import json +from asyncio import Future +import asyncio +from typing import Any, BinaryIO, Dict +import logging + +from fastapi import FastAPI, Request, UploadFile +from fastapi.responses import JSONResponse + +from ibm_watson import DiscoveryV2 +from ibm_watson.discovery_v2 import QueryLargePassages + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + +app = FastAPI() + +# in-memory store for mapping (project_id, collection_id, document_id) to Future object +docproc_requests: dict[(str, str, str), Future] = {} + +discovery = DiscoveryV2(version='2023-03-31') + + +@app.post("/webhook") +async def webhook( + request: Request, +): + status_code = 200 + try: + body = await request.json() + except json.decoder.JSONDecodeError: + content = await request.body() + body = f"Invalid JSON or no body. Body was: {str(content)}" + status_code = 400 + if status_code == 200: + event = body["event"] + response_body: dict[str, Any] = {} + if event == "ping": + response_body["accepted"] = True + elif event == "document.status": + data = body["data"] + project_id = data['project_id'] + collection_id = data['collection_id'] + status = data["status"] + if status in set(["available", "failed"]): + for document_id in data["document_ids"]: + # resume the suspended document processing request + notify_document_completion_status(project_id, collection_id, document_id, status) + response_body["accepted"] = True + else: + status_code = 400 + return JSONResponse(content=response_body, status_code=status_code) + + +@app.post("/projects/{project_id}/collections/{collection_id}/extract") +async def post_and_extraction( + project_id: str, + collection_id: str, + file: UploadFile +): + # Ingest the received document into the underlying Discovery project/collection + logger.info(f'using project/collection {project_id}/{collection_id}') + document_id = add_document(project_id, collection_id, file.file, file.filename) + + # Wait until the ingested document become available + logger.info(f'waiting for {document_id} become available') + available = await wait_document_completion(project_id, collection_id, document_id) + + # Retrieve the processed document + logger.info(f'{document_id} is available:{available}') + document = get_document(project_id, collection_id, document_id) + return JSONResponse(content=document) + + +def add_document( + project_id: str, + collection_id: str, + file: BinaryIO, + filename: Any +): + response = discovery.add_document(project_id, collection_id, file=file, filename=filename) + document_id = response.get_result()['document_id'] + return document_id + + +def get_document( + project_id: str, + collection_id: str, + document_id: str, +): + response = discovery.query(project_id=project_id, collection_ids=[collection_id], filter=f'document_id::{document_id}', passages=QueryLargePassages(enabled=False)) + document = response.get_result()['results'][0] + return document + + +async def wait_document_completion( + project_id: str, + collection_id: str, + document_id: str, +): + global docproc_requests + docproc_request = Future() + key = (project_id, collection_id, document_id) + docproc_requests[key] = docproc_request + + # Start a background task to pull the processing status periodically when the collection is not configured with document status webhook + if not is_webhook_status_enabled(project_id, collection_id): + asyncio.create_task(wait_document_available(project_id, collection_id, document_id)) + + # Wait until the document become available or failed + status = await docproc_request + return status == "available" + + +def is_webhook_status_enabled( + project_id: str, + collection_id: str +): + webhook = discovery.get_collection(project_id, collection_id).get_result().get('webhooks') + return (webhook is not None) and ('document_status' in webhook) + + +async def wait_document_available( + project_id: str, + collection_id: str, + document_id: str +): + # Pull the document processing status periodically (1 sec interval) and wait until the completion + while(discovery.list_documents( + project_id, + collection_id, + parent_document_id=document_id, + count=0, + status='pending,processing' + ).get_result()['matching_results'] != 0 + ): + await asyncio.sleep(1) + + # Retrieve the document processing status + status = discovery.get_document( + project_id, + collection_id, + document_id + ).get_result()['status'] + + # Then, notify it + notify_document_completion_status( + project_id, + collection_id, + document_id, + status + ) + + +def notify_document_completion_status( + project_id: str, + collection_id: str, + document_id: str, + status: str +): + global docproc_requests + key = (project_id, collection_id, document_id) + docproc_request = docproc_requests.get(key) + if docproc_request: + docproc_request.set_result(status) + docproc_requests.pop(key) + + + + diff --git a/webhook-doc-status-sample/requirements.txt b/webhook-doc-status-sample/requirements.txt new file mode 100644 index 0000000..07452ce --- /dev/null +++ b/webhook-doc-status-sample/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.110.0,<0.111.0 +uvicorn>=0.27.0,<0.28.0 +python-multipart>=0.0.9,<0.0.10 +ibm-watson>=8.0.0 \ No newline at end of file