From afe2fb1b9329c42ca9a1529023cfb07f4cbde698 Mon Sep 17 00:00:00 2001 From: Ilya Denisov Date: Fri, 5 Apr 2024 18:12:54 +0500 Subject: [PATCH] Fix and simplify confluence data retrieval (#8) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --------- Co-authored-by: Kamil K. LemaƄski --- confluence_integration/retrieve_space.py | 196 ++++++----------------- 1 file changed, 49 insertions(+), 147 deletions(-) diff --git a/confluence_integration/retrieve_space.py b/confluence_integration/retrieve_space.py index 7aa2b477..e8b9e6b7 100644 --- a/confluence_integration/retrieve_space.py +++ b/confluence_integration/retrieve_space.py @@ -9,9 +9,7 @@ from persistqueue import Queue from configuration import persist_page_processing_queue_path from confluence_integration.confluence_client import ConfluenceClient -import requests import logging -from threading import Thread # Initialize Confluence API @@ -22,45 +20,9 @@ ) -# Get top level pages from a space -def get_top_level_ids(space_key): +def get_space_page_ids(space_key): """ - Retrieve IDs of top-level pages in a specified Confluence space. - - Args: - space_key (str): The key of the Confluence space. - - Returns: - list: A list of page IDs for the top-level pages in the space. - """ - top_level_pages = confluence.get_all_pages_from_space(space_key) - return [page['id'] for page in top_level_pages] - - -# Get child pages from a page -def get_child_ids(item_id, content_type): - """ - Retrieve IDs of child items (pages or comments) for a given Confluence item. - - Args: - item_id (str): The ID of the Confluence page or comment. - content_type (str): Type of content to retrieve ('page' or 'comment'). - - Returns: - list: A list of IDs for child items. - """ - try: - child_items = confluence.get_page_child_by_type(item_id, type=content_type) - return [child['id'] for child in child_items] - except requests.exceptions.HTTPError as e: - logging.error(f"Error retrieving child items for item ID {item_id}: {e}") - return [] - - -def get_all_page_ids_recursive(space_key): - """ - Recursively retrieves all page IDs in a given space, including child pages, with minimal threading - for initial child page fetching. + Retrieves all page IDs in a given space, including child pages. Args: space_key (str): The key of the Confluence space. @@ -68,80 +30,22 @@ def get_all_page_ids_recursive(space_key): Returns: list: A list of all page IDs in the space. """ - all_pages = [] - top_level_ids = get_top_level_ids(space_key) - threads = [] - - def get_child_pages_recursively(page_id): - """ - Inner function to recursively get child page IDs - """ - child_pages = [] - child_page_ids = get_child_ids(page_id, content_type='page') - - # Error handling added within the recursive fetching process - for child_id in child_page_ids: - try: - child_pages.append(child_id) - child_pages.extend(get_child_pages_recursively(child_id)) - except Exception as e: - logging.error(f"Error fetching child pages for {child_id}: {e}") - - return child_pages - - def fetch_and_store_child_pages(top_level_id): - """ - Fetches child pages for a given top-level page and stores them in all_pages. - Designed to be run in a separate thread. - """ - nonlocal all_pages + page_ids = [] + start = 0 + limit = 50 + while True: try: - all_pages.append(top_level_id) - all_pages.extend(get_child_pages_recursively(top_level_id)) + chunk = confluence.get_all_pages_from_space(space_key, start=start, limit=limit) + start += limit except Exception as e: - logging.error(f"Error processing top-level page {top_level_id}: {e}") - - # Start a thread for each top-level page to fetch its child pages - for top_level_id in top_level_ids: - thread = Thread(target=fetch_and_store_child_pages, args=(top_level_id,)) - threads.append(thread) - thread.start() - - # Wait for all threads to complete - for thread in threads: - thread.join() - - return all_pages - -def get_all_comment_ids_recursive(page_id): - """ - Recursively retrieves all comment IDs for a given Confluence page. + logging.error(f"Error fetching pages (space_key={space_key} start={start} limit={limit}): {e}") + break - Args: - page_id (str): The ID of the Confluence page. - - Returns: - list: A list of all comment IDs for the page. - """ + page_ids.extend([page["id"] for page in chunk]) + if len(chunk) < limit: + break - def get_child_comment_ids_recursively(comment_id): - # Inner function to recursively get child comment IDs - child_comment_ids = [] # Use a separate list to accumulate child comment IDs - try: - immediate_child_ids = get_child_ids(comment_id, content_type='comment') - for child_id in immediate_child_ids: - child_comment_ids.append(child_id) - child_comment_ids.extend(get_child_comment_ids_recursively(child_id)) - except requests.exceptions.HTTPError as e: - logging.error(f"Error retrieving child comments for comment ID {comment_id}: {e}") - return child_comment_ids - - all_comment_ids = [] - top_level_comment_ids = get_child_ids(page_id, content_type='comment') - for comment_id in top_level_comment_ids: - all_comment_ids.append(comment_id) - all_comment_ids.extend(get_child_comment_ids_recursively(comment_id)) - return all_comment_ids + return page_ids def choose_space(): @@ -176,19 +80,19 @@ def strip_html_tags(content): return soup.get_text() -def check_date_filter(update_date, all_page_ids): +def check_date_filter(update_date, space_page_ids): """ Filter pages based on their last updated date. Args: update_date (datetime): The threshold date for filtering. Pages updated after this date will be included. - all_page_ids (list): A list of page IDs to be filtered. + space_page_ids (list): A list of page IDs to be filtered. Returns: list: A list of page IDs that were last updated on or after the specified update_date. """ updated_pages = [] - for page_id in all_page_ids: + for page_id in space_page_ids: try: page_history = confluence.history(page_id) # directly use page_id except Exception as e: @@ -219,24 +123,36 @@ def format_page_content_for_llm(page_data): return content -def get_comment_content(comment_id): +def get_page_comments_content(page_id): """ - Retrieve the content of a comment. + Retrieve and format the content of all comments on a Confluence page. Args: - comment_id (str): The ID of the comment. + page_id (str): The ID of the Confluence page. Returns: - str: The content of the comment. + str: A string containing the content of all comments on the page. """ - try: - comment = confluence.get_page_by_id(comment_id, expand='body.storage') - comment_content = comment.get('body', {}).get('storage', {}).get('value', '') - comment_text = strip_html_tags(comment_content) - return comment_text - except Exception as e: - logging.error(f"Error retrieving content for comment ID {comment_id}: {e}") - return "" # Return empty string if an error occurs + result = [] + start = 0 + limit = 25 + while True: + try: + chunk = confluence.get_page_comments(page_id, depth='all', start=start, limit=limit, expand='body.storage')['results'] + except Exception as e: + logging.error(f"Error fetching page comments (page_id={page_id}) start={start} limit={limit}): {e}") + break + + for comment in chunk: + content = comment['body']['storage']['value'] + content = strip_html_tags(content) + result.append(content) + + start += limit + if len(chunk) < limit: + break + + return '\n'.join(result) def process_page(page_id, space_key, file_manager, page_content_map): @@ -261,11 +177,7 @@ def process_page(page_id, space_key, file_manager, page_content_map): created_date = page['history']['createdDate'] last_updated = page['version']['when'] page_content = strip_html_tags(page.get('body', {}).get('storage', {}).get('value', '')) - page_comments_content = "" - page_comment_ids = get_all_comment_ids_recursive(page_id) - - for comment_id in page_comment_ids: - page_comments_content += get_comment_content(comment_id) + page_comments_content = get_page_comments_content(page_id) page_data = { 'spaceKey': space_key, @@ -310,29 +222,19 @@ def get_space_content(space_key, update_date=None): list: A list of IDs of all pages that were processed. """ - all_page_ids = get_all_page_ids_recursive(space_key) - all_page_ids = set(all_page_ids) + space_page_ids = get_space_page_ids(space_key) + space_page_ids = set(space_page_ids) if update_date is not None: - all_page_ids = check_date_filter(update_date, all_page_ids) + space_page_ids = check_date_filter(update_date, space_page_ids) # Setting up the persist-queue queue_path = os.path.join(persist_page_processing_queue_path, space_key) page_queue = Queue(queue_path) # make all page ids a set to eliminate duplicates - for page_id in all_page_ids: + for page_id in space_page_ids: page_queue.put(page_id) - print(f"Enqueued {len(all_page_ids)} pages for processing.") - print(f"Pages queued {all_page_ids}") - return space_key - - -if __name__ == "__main__": - - # Initial space retrieve - space_key = choose_space() - get_space_content(space_key) - # Initial space retrieve - # Space update retrieve - # get_space_content(update_date=datetime(2023, 12, 1, 0, 0, 0)) + print(f"Enqueued {len(space_page_ids)} pages for processing.") + print(f"Pages queued {space_page_ids}") + return space_key \ No newline at end of file