Skip to content
This repository has been archived by the owner on Apr 29, 2024. It is now read-only.

Commit

Permalink
Fix and simplify confluence data retrieval (#8)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Kamil K. Lemański <[email protected]>
  • Loading branch information
id-ilych and kml authored Apr 5, 2024
1 parent 5ae881a commit afe2fb1
Showing 1 changed file with 49 additions and 147 deletions.
196 changes: 49 additions & 147 deletions confluence_integration/retrieve_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
from persistqueue import Queue
from configuration import persist_page_processing_queue_path
from confluence_integration.confluence_client import ConfluenceClient
import requests
import logging
from threading import Thread


# Initialize Confluence API
Expand All @@ -22,126 +20,32 @@
)


# Get top level pages from a space
def get_top_level_ids(space_key):
def get_space_page_ids(space_key):
"""
Retrieve IDs of top-level pages in a specified Confluence space.
Args:
space_key (str): The key of the Confluence space.
Returns:
list: A list of page IDs for the top-level pages in the space.
"""
top_level_pages = confluence.get_all_pages_from_space(space_key)
return [page['id'] for page in top_level_pages]


# Get child pages from a page
def get_child_ids(item_id, content_type):
"""
Retrieve IDs of child items (pages or comments) for a given Confluence item.
Args:
item_id (str): The ID of the Confluence page or comment.
content_type (str): Type of content to retrieve ('page' or 'comment').
Returns:
list: A list of IDs for child items.
"""
try:
child_items = confluence.get_page_child_by_type(item_id, type=content_type)
return [child['id'] for child in child_items]
except requests.exceptions.HTTPError as e:
logging.error(f"Error retrieving child items for item ID {item_id}: {e}")
return []


def get_all_page_ids_recursive(space_key):
"""
Recursively retrieves all page IDs in a given space, including child pages, with minimal threading
for initial child page fetching.
Retrieves all page IDs in a given space, including child pages.
Args:
space_key (str): The key of the Confluence space.
Returns:
list: A list of all page IDs in the space.
"""
all_pages = []
top_level_ids = get_top_level_ids(space_key)
threads = []

def get_child_pages_recursively(page_id):
"""
Inner function to recursively get child page IDs
"""
child_pages = []
child_page_ids = get_child_ids(page_id, content_type='page')

# Error handling added within the recursive fetching process
for child_id in child_page_ids:
try:
child_pages.append(child_id)
child_pages.extend(get_child_pages_recursively(child_id))
except Exception as e:
logging.error(f"Error fetching child pages for {child_id}: {e}")

return child_pages

def fetch_and_store_child_pages(top_level_id):
"""
Fetches child pages for a given top-level page and stores them in all_pages.
Designed to be run in a separate thread.
"""
nonlocal all_pages
page_ids = []
start = 0
limit = 50
while True:
try:
all_pages.append(top_level_id)
all_pages.extend(get_child_pages_recursively(top_level_id))
chunk = confluence.get_all_pages_from_space(space_key, start=start, limit=limit)
start += limit
except Exception as e:
logging.error(f"Error processing top-level page {top_level_id}: {e}")

# Start a thread for each top-level page to fetch its child pages
for top_level_id in top_level_ids:
thread = Thread(target=fetch_and_store_child_pages, args=(top_level_id,))
threads.append(thread)
thread.start()

# Wait for all threads to complete
for thread in threads:
thread.join()

return all_pages

def get_all_comment_ids_recursive(page_id):
"""
Recursively retrieves all comment IDs for a given Confluence page.
logging.error(f"Error fetching pages (space_key={space_key} start={start} limit={limit}): {e}")
break

Args:
page_id (str): The ID of the Confluence page.
Returns:
list: A list of all comment IDs for the page.
"""
page_ids.extend([page["id"] for page in chunk])
if len(chunk) < limit:
break

def get_child_comment_ids_recursively(comment_id):
# Inner function to recursively get child comment IDs
child_comment_ids = [] # Use a separate list to accumulate child comment IDs
try:
immediate_child_ids = get_child_ids(comment_id, content_type='comment')
for child_id in immediate_child_ids:
child_comment_ids.append(child_id)
child_comment_ids.extend(get_child_comment_ids_recursively(child_id))
except requests.exceptions.HTTPError as e:
logging.error(f"Error retrieving child comments for comment ID {comment_id}: {e}")
return child_comment_ids

all_comment_ids = []
top_level_comment_ids = get_child_ids(page_id, content_type='comment')
for comment_id in top_level_comment_ids:
all_comment_ids.append(comment_id)
all_comment_ids.extend(get_child_comment_ids_recursively(comment_id))
return all_comment_ids
return page_ids


def choose_space():
Expand Down Expand Up @@ -176,19 +80,19 @@ def strip_html_tags(content):
return soup.get_text()


def check_date_filter(update_date, all_page_ids):
def check_date_filter(update_date, space_page_ids):
"""
Filter pages based on their last updated date.
Args:
update_date (datetime): The threshold date for filtering. Pages updated after this date will be included.
all_page_ids (list): A list of page IDs to be filtered.
space_page_ids (list): A list of page IDs to be filtered.
Returns:
list: A list of page IDs that were last updated on or after the specified update_date.
"""
updated_pages = []
for page_id in all_page_ids:
for page_id in space_page_ids:
try:
page_history = confluence.history(page_id) # directly use page_id
except Exception as e:
Expand Down Expand Up @@ -219,24 +123,36 @@ def format_page_content_for_llm(page_data):
return content


def get_comment_content(comment_id):
def get_page_comments_content(page_id):
"""
Retrieve the content of a comment.
Retrieve and format the content of all comments on a Confluence page.
Args:
comment_id (str): The ID of the comment.
page_id (str): The ID of the Confluence page.
Returns:
str: The content of the comment.
str: A string containing the content of all comments on the page.
"""
try:
comment = confluence.get_page_by_id(comment_id, expand='body.storage')
comment_content = comment.get('body', {}).get('storage', {}).get('value', '')
comment_text = strip_html_tags(comment_content)
return comment_text
except Exception as e:
logging.error(f"Error retrieving content for comment ID {comment_id}: {e}")
return "" # Return empty string if an error occurs
result = []
start = 0
limit = 25
while True:
try:
chunk = confluence.get_page_comments(page_id, depth='all', start=start, limit=limit, expand='body.storage')['results']
except Exception as e:
logging.error(f"Error fetching page comments (page_id={page_id}) start={start} limit={limit}): {e}")
break

for comment in chunk:
content = comment['body']['storage']['value']
content = strip_html_tags(content)
result.append(content)

start += limit
if len(chunk) < limit:
break

return '\n'.join(result)


def process_page(page_id, space_key, file_manager, page_content_map):
Expand All @@ -261,11 +177,7 @@ def process_page(page_id, space_key, file_manager, page_content_map):
created_date = page['history']['createdDate']
last_updated = page['version']['when']
page_content = strip_html_tags(page.get('body', {}).get('storage', {}).get('value', ''))
page_comments_content = ""
page_comment_ids = get_all_comment_ids_recursive(page_id)

for comment_id in page_comment_ids:
page_comments_content += get_comment_content(comment_id)
page_comments_content = get_page_comments_content(page_id)

page_data = {
'spaceKey': space_key,
Expand Down Expand Up @@ -310,29 +222,19 @@ def get_space_content(space_key, update_date=None):
list: A list of IDs of all pages that were processed.
"""

all_page_ids = get_all_page_ids_recursive(space_key)
all_page_ids = set(all_page_ids)
space_page_ids = get_space_page_ids(space_key)
space_page_ids = set(space_page_ids)

if update_date is not None:
all_page_ids = check_date_filter(update_date, all_page_ids)
space_page_ids = check_date_filter(update_date, space_page_ids)

# Setting up the persist-queue
queue_path = os.path.join(persist_page_processing_queue_path, space_key)
page_queue = Queue(queue_path)
# make all page ids a set to eliminate duplicates
for page_id in all_page_ids:
for page_id in space_page_ids:
page_queue.put(page_id)

print(f"Enqueued {len(all_page_ids)} pages for processing.")
print(f"Pages queued {all_page_ids}")
return space_key


if __name__ == "__main__":

# Initial space retrieve
space_key = choose_space()
get_space_content(space_key)
# Initial space retrieve
# Space update retrieve
# get_space_content(update_date=datetime(2023, 12, 1, 0, 0, 0))
print(f"Enqueued {len(space_page_ids)} pages for processing.")
print(f"Pages queued {space_page_ids}")
return space_key

0 comments on commit afe2fb1

Please sign in to comment.