Skip to content

Commit

Permalink
Merge branch 'additional_heavy_logging' of https://github.com/onyx-do…
Browse files Browse the repository at this point in the history
…t-app/onyx into bugfix/indexing-query-2
  • Loading branch information
Richard Kuo (Danswer) committed Feb 19, 2025
2 parents 357d3d1 + 4f6d115 commit da1f128
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,19 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
task_logger.info(f"check_for_doc_permissions_sync finished: tenant={tenant_id}")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id}"
)
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -282,13 +289,19 @@ def try_creating_permissions_sync_task(
redis_connector.permissions.set_fence(payload)

payload_id = payload.id
except Exception:
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair_id}")
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected try_creating_permissions_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
return None
finally:
if lock.owned():
lock.release()

task_logger.info(
f"try_creating_permissions_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id


Expand Down Expand Up @@ -439,6 +452,10 @@ def connector_permission_sync_generator_task(
redis_connector.permissions.generator_complete = tasks_generated

except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id} {error_msg}"
)
task_logger.exception(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id}"
)
Expand Down Expand Up @@ -512,13 +529,20 @@ def update_external_document_permissions_task(
f"elapsed={elapsed:.2f}"
)

except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Exception in update_external_document_permissions_task: connector_id={connector_id} doc_id={doc_id} {error_msg}"
)
task_logger.exception(
f"Exception in update_external_document_permissions_task: "
f"connector_id={connector_id} doc_id={doc_id}"
)
return False

task_logger.info(
f"update_external_document_permissions_task finished: connector_id={connector_id} doc_id={doc_id}"
)
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,17 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected check_for_external_group_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally:
if lock_beat.owned():
lock_beat.release()

task_logger.info(f"check_for_external_group_sync finished: tenant={tenant_id}")
return True


Expand Down Expand Up @@ -267,12 +272,19 @@ def try_creating_external_group_sync_task(
redis_connector.external_group_sync.set_fence(payload)

payload_id = payload.id
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected try_creating_external_group_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
task_logger.exception(
f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}"
)
return None

task_logger.info(
f"try_creating_external_group_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id


Expand Down Expand Up @@ -405,6 +417,14 @@ def connector_external_group_sync_generator_task(
sync_status=SyncStatus.SUCCESS,
)
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id} {error_msg}"
)
task_logger.exception(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
)

msg = f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
task_logger.exception(msg)
emit_background_error(msg + f"\n\n{e}", cc_pair_id=cc_pair_id)
Expand Down
16 changes: 12 additions & 4 deletions backend/onyx/background/celery/tasks/pruning/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,14 @@ def check_for_pruning(self: Task, *, tenant_id: str | None) -> bool | None:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(f"Unexpected pruning check exception: {error_msg}")
task_logger.exception("Unexpected exception during pruning check")
finally:
if lock_beat.owned():
lock_beat.release()

task_logger.info(f"check_for_pruning finished: tenant={tenant_id}")
return True


Expand Down Expand Up @@ -301,13 +303,19 @@ def try_creating_prune_generator_task(
redis_connector.prune.set_fence(payload)

payload_id = payload.id
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected try_creating_prune_generator_task exception: cc_pair={cc_pair.id} {error_msg}"
)
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair.id}")
return None
finally:
if lock.owned():
lock.release()

task_logger.info(
f"try_creating_prune_generator_task finished: cc_pair={cc_pair.id} payload_id={payload_id}"
)
return payload_id


Expand Down
3 changes: 3 additions & 0 deletions backend/onyx/background/celery/tasks/shared/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ def document_by_cc_pair_cleanup_task(
)
return False

error_msg = str(e).replace("\n", " ")
task_logger.warning(f"Unexpected exception: doc={document_id} {error_msg}")
task_logger.exception(f"Unexpected exception: doc={document_id}")

if self.request.retries < DOCUMENT_BY_CC_PAIR_CLEANUP_MAX_RETRIES:
Expand Down Expand Up @@ -219,6 +221,7 @@ def document_by_cc_pair_cleanup_task(
mark_document_as_modified(document_id, db_session)
return False

task_logger.info(f"document_by_cc_pair_cleanup_task finished: doc={document_id}")
return True


Expand Down
74 changes: 74 additions & 0 deletions backend/onyx/connectors/blob/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,22 @@

import boto3 # type: ignore
from botocore.client import Config # type: ignore
from botocore.exceptions import ClientError
from botocore.exceptions import NoCredentialsError
from botocore.exceptions import PartialCredentialsError
from mypy_boto3_s3 import S3Client # type: ignore

from onyx.configs.app_configs import INDEX_BATCH_SIZE
from onyx.configs.constants import BlobType
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.interfaces import GenerateDocumentsOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import LoadConnector
from onyx.connectors.interfaces import PollConnector
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import UnexpectedError
from onyx.connectors.models import ConnectorMissingCredentialError
from onyx.connectors.models import Document
from onyx.connectors.models import Section
Expand Down Expand Up @@ -240,6 +247,73 @@ def poll_source(

return None

def validate_connector_settings(self) -> None:
if self.s3_client is None:
raise ConnectorMissingCredentialError(
"Blob storage credentials not loaded."
)

if not self.bucket_name:
raise ConnectorValidationError(
"No bucket name was provided in connector settings."
)

try:
# We only fetch one object/page as a light-weight validation step.
# This ensures we trigger typical S3 permission checks (ListObjectsV2, etc.).
self.s3_client.list_objects_v2(
Bucket=self.bucket_name, Prefix=self.prefix, MaxKeys=1
)

except NoCredentialsError:
raise ConnectorMissingCredentialError(
"No valid blob storage credentials found or provided to boto3."
)
except PartialCredentialsError:
raise ConnectorMissingCredentialError(
"Partial or incomplete blob storage credentials provided to boto3."
)
except ClientError as e:
error_code = e.response["Error"].get("Code", "")
status_code = e.response["ResponseMetadata"].get("HTTPStatusCode")

# Most common S3 error cases
if error_code in [
"AccessDenied",
"InvalidAccessKeyId",
"SignatureDoesNotMatch",
]:
if status_code == 403 or error_code == "AccessDenied":
raise InsufficientPermissionsError(
f"Insufficient permissions to list objects in bucket '{self.bucket_name}'. "
"Please check your bucket policy and/or IAM policy."
)
if status_code == 401 or error_code == "SignatureDoesNotMatch":
raise CredentialExpiredError(
"Provided blob storage credentials appear invalid or expired."
)

raise CredentialExpiredError(
f"Credential issue encountered ({error_code})."
)

if error_code == "NoSuchBucket" or status_code == 404:
raise ConnectorValidationError(
f"Bucket '{self.bucket_name}' does not exist or cannot be found."
)

raise ConnectorValidationError(
f"Unexpected S3 client error (code={error_code}, status={status_code}): {e}"
)

except Exception as e:
# Catch-all for anything not captured by the above
# Since we are unsure of the error and it may not disable the connector,
# raise an unexpected error (does not disable connector)
raise UnexpectedError(
f"Unexpected error during blob storage settings validation: {e}"
)


if __name__ == "__main__":
credentials_dict = {
Expand Down
10 changes: 6 additions & 4 deletions backend/onyx/connectors/hubspot/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,18 @@ def _process_tickets(
contact = api_client.crm.contacts.basic_api.get_by_id(
contact_id=contact.id
)
associated_emails.append(contact.properties["email"])
email = contact.properties.get("email")
if email is not None:
associated_emails.append(email)

if notes:
for note in notes.results:
note = api_client.crm.objects.notes.basic_api.get_by_id(
note_id=note.id, properties=["content", "hs_body_preview"]
)
if note.properties["hs_body_preview"] is None:
continue
associated_notes.append(note.properties["hs_body_preview"])
preview = note.properties.get("hs_body_preview")
if preview is not None:
associated_notes.append(preview)

associated_emails_str = " ,".join(associated_emails)
associated_notes_str = " ".join(associated_notes)
Expand Down
54 changes: 54 additions & 0 deletions backend/onyx/connectors/slack/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@
from onyx.configs.constants import DocumentSource
from onyx.connectors.interfaces import CheckpointConnector
from onyx.connectors.interfaces import CheckpointOutput
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.interfaces import CredentialExpiredError
from onyx.connectors.interfaces import GenerateSlimDocumentOutput
from onyx.connectors.interfaces import InsufficientPermissionsError
from onyx.connectors.interfaces import SecondsSinceUnixEpoch
from onyx.connectors.interfaces import SlimConnector
from onyx.connectors.interfaces import UnexpectedError
from onyx.connectors.models import BasicExpertInfo
from onyx.connectors.models import ConnectorCheckpoint
from onyx.connectors.models import ConnectorFailure
Expand Down Expand Up @@ -666,6 +670,56 @@ def load_from_checkpoint(
)
return checkpoint

def validate_connector_settings(self) -> None:
if self.client is None:
raise ConnectorMissingCredentialError("Slack credentials not loaded.")

try:
# Minimal API call to confirm we can list channels
# We set limit=1 for a lightweight check
response = self.client.conversations_list(limit=1, types=["public_channel"])
# Just ensure Slack responded "ok: True"
if not response.get("ok", False):
error_msg = response.get("error", "Unknown error from Slack")
if error_msg == "invalid_auth":
raise ConnectorValidationError(
f"Invalid or expired Slack bot token ({error_msg})."
)
elif error_msg == "not_authed":
raise CredentialExpiredError(
f"Invalid or expired Slack bot token ({error_msg})."
)
raise UnexpectedError(f"Slack API returned a failure: {error_msg}")

except SlackApiError as e:
slack_error = e.response.get("error", "")
if slack_error == "missing_scope":
# The needed scope is typically "channels:read" or "groups:read"
# for viewing channels. The error message might also contain the
# specific scope needed vs. provided.
raise InsufficientPermissionsError(
"Slack bot token lacks the necessary scope to list channels. "
"Please ensure your Slack app has 'channels:read' (or 'groups:read' for private channels) enabled."
)
elif slack_error == "invalid_auth":
raise CredentialExpiredError(
f"Invalid or expired Slack bot token ({slack_error})."
)
elif slack_error == "not_authed":
raise CredentialExpiredError(
f"Invalid or expired Slack bot token ({slack_error})."
)
else:
# Generic Slack error
raise UnexpectedError(
f"Unexpected Slack error '{slack_error}' during settings validation."
)
except Exception as e:
# Catch-all for unexpected exceptions
raise UnexpectedError(
f"Unexpected error during Slack settings validation: {e}"
)


if __name__ == "__main__":
import os
Expand Down
Loading

0 comments on commit da1f128

Please sign in to comment.