Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional heavy logging #4058

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
7 changes: 7 additions & 0 deletions backend/ee/onyx/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from onyx.configs.constants import DocumentSource
from onyx.db.connector_credential_pair import get_connector_credential_pair
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
from onyx.db.models import Connector
from onyx.db.models import ConnectorCredentialPair
from onyx.db.models import UserGroup__ConnectorCredentialPair
Expand Down Expand Up @@ -36,6 +37,7 @@ def get_cc_pairs_by_source(
db_session: Session,
source_type: DocumentSource,
only_sync: bool,
only_valid: bool,
) -> list[ConnectorCredentialPair]:
"""
Get all cc_pairs for a given source type (and optionally only sync)
Expand All @@ -51,6 +53,11 @@ def get_cc_pairs_by_source(
if only_sync:
query = query.filter(ConnectorCredentialPair.access_type == AccessType.SYNC)

if only_valid:
query = query.filter(
ConnectorCredentialPair.status == ConnectorCredentialPairStatus.ACTIVE
)

cc_pairs = query.all()
return cc_pairs

Expand Down
11 changes: 11 additions & 0 deletions backend/ee/onyx/external_permissions/google_drive/doc_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ def _fetch_permissions_for_permission_ids(
user_email=(owner_email or google_drive_connector.primary_admin_email),
)

# We continue on 404 or 403 because the document may not exist or the user may not have access to it
fetched_permissions = execute_paginated_retrieval(
retrieval_function=drive_service.permissions().list,
list_key="permissions",
fileId=doc_id,
fields="permissions(id, emailAddress, type, domain)",
supportsAllDrives=True,
continue_on_404_or_403=True,
)

permissions_for_doc_id = []
Expand Down Expand Up @@ -104,7 +106,13 @@ def _get_permissions_from_slim_doc(
user_emails: set[str] = set()
group_emails: set[str] = set()
public = False
skipped_permissions = 0

for permission in permissions_list:
if not permission:
skipped_permissions += 1
continue

permission_type = permission["type"]
if permission_type == "user":
user_emails.add(permission["emailAddress"])
Expand All @@ -121,6 +129,9 @@ def _get_permissions_from_slim_doc(
elif permission_type == "anyone":
public = True

logger.info(
f"Skipped {skipped_permissions} permissions for document {slim_doc.id}– {len(permissions_list)} permissions fetched"
)
drive_id = permission_info.get("drive_id")
group_ids = group_emails | ({drive_id} if drive_id is not None else set())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.db.connector import mark_cc_pair_as_permissions_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.document import upsert_document_by_connector_credential_pair
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
Expand Down Expand Up @@ -193,12 +195,19 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str) -> bool | None
monitor_ccpair_permissions_taskset(
tenant_id, key_bytes, r, db_session
)
task_logger.info(f"check_for_doc_permissions_sync finished: tenant={tenant_id}")
except SoftTimeLimitExceeded:
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(
f"Unexpected check_for_doc_permissions_sync exception: tenant={tenant_id}"
)
finally:
if lock_beat.owned():
lock_beat.release()
Expand Down Expand Up @@ -282,13 +291,19 @@ def try_creating_permissions_sync_task(
redis_connector.permissions.set_fence(payload)

payload_id = payload.id
except Exception:
task_logger.exception(f"Unexpected exception: cc_pair={cc_pair_id}")
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected try_creating_permissions_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
return None
finally:
if lock.owned():
lock.release()

task_logger.info(
f"try_creating_permissions_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id


Expand Down Expand Up @@ -388,6 +403,30 @@ def connector_permission_sync_generator_task(
f"No connector credential pair found for id: {cc_pair_id}"
)

try:
created = validate_ccpair_for_user(
cc_pair.connector.id,
cc_pair.credential.id,
db_session,
tenant_id,
enforce_creation=False,
)
if not created:
task_logger.warning(
f"Unable to create connector credential pair for id: {cc_pair_id}"
)
except Exception:
task_logger.exception(
f"validate_ccpair_permissions_sync exceptioned: cc_pair={cc_pair_id}"
)
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise

source_type = cc_pair.connector.source

doc_sync_func = DOC_PERMISSIONS_FUNC_MAP.get(source_type)
Expand Down Expand Up @@ -439,6 +478,10 @@ def connector_permission_sync_generator_task(
redis_connector.permissions.generator_complete = tasks_generated

except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id} {error_msg}"
)
task_logger.exception(
f"Permission sync exceptioned: cc_pair={cc_pair_id} payload_id={payload_id}"
)
Expand Down Expand Up @@ -512,13 +555,20 @@ def update_external_document_permissions_task(
f"elapsed={elapsed:.2f}"
)

except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Exception in update_external_document_permissions_task: connector_id={connector_id} doc_id={doc_id} {error_msg}"
)
task_logger.exception(
f"Exception in update_external_document_permissions_task: "
f"connector_id={connector_id} doc_id={doc_id}"
)
return False

task_logger.info(
f"update_external_document_permissions_task finished: connector_id={connector_id} doc_id={doc_id}"
)
return True


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.connectors.factory import validate_ccpair_for_user
from onyx.db.connector import mark_cc_pair_as_external_group_synced
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
from onyx.db.connector_credential_pair import update_connector_credential_pair
from onyx.db.engine import get_session_with_current_tenant
from onyx.db.enums import AccessType
from onyx.db.enums import ConnectorCredentialPairStatus
Expand Down Expand Up @@ -148,7 +151,7 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
for source in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC:
# These are ordered by cc_pair id so the first one is the one we want
cc_pairs_to_dedupe = get_cc_pairs_by_source(
db_session, source, only_sync=True
db_session, source, only_sync=True, only_valid=True
)
# We only want to sync one cc_pair per source type
# in GROUP_PERMISSIONS_IS_CC_PAIR_AGNOSTIC so we dedupe here
Expand Down Expand Up @@ -195,12 +198,17 @@ def check_for_external_group_sync(self: Task, *, tenant_id: str | None) -> bool
task_logger.info(
"Soft time limit exceeded, task is being terminated gracefully."
)
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected check_for_external_group_sync exception: tenant={tenant_id} {error_msg}"
)
task_logger.exception(f"Unexpected exception: tenant={tenant_id}")
finally:
if lock_beat.owned():
lock_beat.release()

task_logger.info(f"check_for_external_group_sync finished: tenant={tenant_id}")
return True


Expand Down Expand Up @@ -267,12 +275,19 @@ def try_creating_external_group_sync_task(
redis_connector.external_group_sync.set_fence(payload)

payload_id = payload.id
except Exception:
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"Unexpected try_creating_external_group_sync_task exception: cc_pair={cc_pair_id} {error_msg}"
)
task_logger.exception(
f"Unexpected exception while trying to create external group sync task: cc_pair={cc_pair_id}"
)
return None

task_logger.info(
f"try_creating_external_group_sync_task finished: cc_pair={cc_pair_id} payload_id={payload_id}"
)
return payload_id


Expand Down Expand Up @@ -361,12 +376,37 @@ def connector_external_group_sync_generator_task(
cc_pair = get_connector_credential_pair_from_id(
db_session=db_session,
cc_pair_id=cc_pair_id,
eager_load_credential=True,
)
if cc_pair is None:
raise ValueError(
f"No connector credential pair found for id: {cc_pair_id}"
)

try:
created = validate_ccpair_for_user(
cc_pair.connector.id,
cc_pair.credential.id,
db_session,
tenant_id,
enforce_creation=False,
)
if not created:
task_logger.warning(
f"Unable to create connector credential pair for id: {cc_pair_id}"
)
except Exception:
task_logger.exception(
f"validate_ccpair_permissions_sync exceptioned: cc_pair={cc_pair_id}"
)
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise

source_type = cc_pair.connector.source

ext_group_sync_func = GROUP_PERMISSIONS_FUNC_MAP.get(source_type)
Expand All @@ -378,8 +418,18 @@ def connector_external_group_sync_generator_task(
logger.info(
f"Syncing external groups for {source_type} for cc_pair: {cc_pair_id}"
)

external_user_groups: list[ExternalUserGroup] = ext_group_sync_func(cc_pair)
external_user_groups: list[ExternalUserGroup] = []
try:
external_user_groups = ext_group_sync_func(cc_pair)
except ConnectorValidationError as e:
msg = f"Error syncing external groups for {source_type} for cc_pair: {cc_pair_id} {e}"
update_connector_credential_pair(
db_session=db_session,
connector_id=cc_pair.connector.id,
credential_id=cc_pair.credential.id,
status=ConnectorCredentialPairStatus.INVALID,
)
raise e

logger.info(
f"Syncing {len(external_user_groups)} external user groups for {source_type}"
Expand All @@ -405,6 +455,14 @@ def connector_external_group_sync_generator_task(
sync_status=SyncStatus.SUCCESS,
)
except Exception as e:
error_msg = str(e).replace("\n", " ")
task_logger.warning(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id} {error_msg}"
)
task_logger.exception(
f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
)

msg = f"External group sync exceptioned: cc_pair={cc_pair_id} payload_id={payload.id}"
task_logger.exception(msg)
emit_background_error(msg + f"\n\n{e}", cc_pair_id=cc_pair_id)
Expand Down
12 changes: 11 additions & 1 deletion backend/onyx/background/celery/tasks/indexing/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from onyx.configs.constants import OnyxRedisConstants
from onyx.configs.constants import OnyxRedisLocks
from onyx.configs.constants import OnyxRedisSignals
from onyx.connectors.interfaces import ConnectorValidationError
from onyx.connectors.exceptions import ConnectorValidationError
from onyx.db.connector import mark_ccpair_with_indexing_trigger
from onyx.db.connector_credential_pair import fetch_connector_credential_pairs
from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id
Expand Down Expand Up @@ -77,6 +77,7 @@
from shared_configs.configs import INDEXING_MODEL_SERVER_PORT
from shared_configs.configs import MULTI_TENANT
from shared_configs.configs import SENTRY_DSN
from shared_configs.contextvars import CURRENT_TENANT_ID_CONTEXTVAR

logger = setup_logger()

Expand Down Expand Up @@ -617,6 +618,12 @@ def connector_indexing_task(
This will cause the primary worker to abort the indexing attempt and clean up.
"""

tenant_id_from_context = CURRENT_TENANT_ID_CONTEXTVAR.get()
logger.info(f"connector_indexing_task with tenant_id={tenant_id_from_context}")
logger.info(
f"connector_indexing_task with args={index_attempt_id}, {cc_pair_id}, {search_settings_id}, {is_ee}, {tenant_id}"
)

# Since connector_indexing_proxy_task spawns a new process using this function as
# the entrypoint, we init Sentry here.
if SENTRY_DSN:
Expand Down Expand Up @@ -924,6 +931,7 @@ def connector_indexing_proxy_task(
task_logger.error("self.request.id is None!")

client = SimpleJobClient()
task_logger.info(f"submitting connector_indexing_task with tenant_id={tenant_id}")

job = client.submit(
connector_indexing_task,
Expand Down Expand Up @@ -1070,6 +1078,7 @@ def connector_indexing_proxy_task(

if not index_attempt.is_finished():
continue

except Exception:
# if the DB exceptioned, just restart the check.
# polling the index attempt status doesn't need to be strongly consistent
Expand All @@ -1079,6 +1088,7 @@ def connector_indexing_proxy_task(
)
)
continue

except Exception as e:
result.status = IndexingWatchdogTerminalStatus.WATCHDOG_EXCEPTIONED
if isinstance(e, ConnectorValidationError):
Expand Down
Loading
Loading