Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use the upload.id as parallel_idx #720

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 0 additions & 64 deletions helpers/parallel_upload_processing.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,4 @@
import copy

import sentry_sdk
from shared.utils.sessions import SessionType

from database.models.reports import Upload


# Copied from shared/reports/resources.py Report.next_session_number()
def next_session_number(session_dict):
start_number = len(session_dict)
while start_number in session_dict or str(start_number) in session_dict:
start_number += 1
return start_number


# Copied and cut down from worker/services/report/raw_upload_processor.py
# this version stripped out all the ATS label stuff
def _adjust_sessions(
original_sessions: dict,
to_merge_flags,
current_yaml,
):
session_ids_to_fully_delete = []
flags_under_carryforward_rules = [
f for f in to_merge_flags if current_yaml.flag_has_carryfoward(f)
]
if flags_under_carryforward_rules:
for sess_id, curr_sess in original_sessions.items():
if curr_sess.session_type == SessionType.carriedforward:
if curr_sess.flags:
if any(
f in flags_under_carryforward_rules for f in curr_sess.flags
):
session_ids_to_fully_delete.append(sess_id)
if session_ids_to_fully_delete:
# delete sessions from dict
for id in session_ids_to_fully_delete:
original_sessions.pop(id, None)
return


def get_parallel_session_ids(
sessions, argument_list, db_session, report_service, commit_yaml
):
num_sessions = len(argument_list)

mock_sessions = copy.deepcopy(sessions) # the sessions already in the report
get_parallel_session_ids = []

# iterate over all uploads, get the next session id, and adjust sessions (remove CFF logic)
for i in range(num_sessions):
next_session_id = next_session_number(mock_sessions)

upload_pk = argument_list[i]["upload_pk"]
upload = db_session.query(Upload).filter_by(id_=upload_pk).first()
to_merge_session = report_service.build_session(upload)
flags = upload.flag_names

mock_sessions[next_session_id] = to_merge_session
_adjust_sessions(mock_sessions, flags, commit_yaml)

get_parallel_session_ids.append(next_session_id)

return get_parallel_session_ids


@sentry_sdk.trace
Expand Down
35 changes: 14 additions & 21 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def has_initialized_report(self, commit: Commit) -> bool:

@sentry_sdk.trace
def initialize_and_save_report(
self, commit: Commit, report_code: str = None
self, commit: Commit, report_code: str | None = None
) -> CommitReport:
"""
Initializes the commit report
Expand Down Expand Up @@ -410,7 +410,7 @@ def build_sessions(self, commit: Commit) -> dict[int, Session]:
Does not include CF sessions if there is also an upload session with the same
flag name.
"""
sessions = {}
sessions: dict[int, Session] = {}

carryforward_sessions = {}
uploaded_flags = set()
Expand All @@ -429,9 +429,9 @@ def build_sessions(self, commit: Commit) -> dict[int, Session]:
for upload in report_uploads:
session = self.build_session(upload)
if session.session_type == SessionType.carriedforward:
carryforward_sessions[upload.order_number] = session
carryforward_sessions[session.id] = session
else:
sessions[upload.order_number] = session
sessions[session.id] = session
uploaded_flags |= set(session.flags)

for sid, session in carryforward_sessions.items():
Expand Down Expand Up @@ -861,7 +861,6 @@ def build_report_from_raw_content(
report: Report,
raw_report_info: RawReportInfo,
upload: Upload,
parallel_idx=None,
) -> ProcessingResult:
"""
Processes an upload on top of an existing report `master` and returns
Expand All @@ -872,23 +871,18 @@ def build_report_from_raw_content(
"""
commit = upload.report.commit
flags = upload.flag_names
service = upload.provider
build_url = upload.build_url
build = upload.build_code
job = upload.job_code
name = upload.name
archive_url = upload.storage_path
reportid = upload.external_id

session = Session(
provider=service,
build=build,
job=job,
name=name,
provider=upload.provider,
build=upload.build_code,
job=upload.job_code,
name=upload.name,
time=int(time()),
flags=flags,
archive=archive_url,
url=build_url,
url=upload.build_url,
)
result = ProcessingResult(session=session)

Expand All @@ -907,7 +901,6 @@ def build_report_from_raw_content(
reportid=reportid,
commit_yaml=self.current_yaml.to_dict(),
archive_url=archive_url,
in_parallel=parallel_idx is not None,
),
)
result.error = ProcessingError(
Expand Down Expand Up @@ -943,13 +936,11 @@ def build_report_from_raw_content(
raw_report,
flags,
session,
upload=upload,
parallel_idx=parallel_idx,
upload,
)
result.report = process_result.report
log.info(
"Successfully processed report"
+ (" (in parallel)" if parallel_idx is not None else ""),
"Successfully processed report",
extra=dict(
session=session.id,
ci=f"{session.provider}:{session.build}:{session.job}",
Expand Down Expand Up @@ -1054,7 +1045,7 @@ def update_upload_with_processing_result(
error_params=error.params,
)
db_session.add(error_obj)
db_session.flush()
db_session.flush()

@sentry_sdk.trace
def save_report(self, commit: Commit, report: Report, report_code=None):
Expand Down Expand Up @@ -1189,6 +1180,8 @@ def save_full_report(
upload_totals.update_from_totals(
session.totals, precision=precision, rounding=rounding
)
db_session.flush()

return res

@sentry_sdk.trace
Expand Down
45 changes: 16 additions & 29 deletions services/report/raw_upload_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@

log = logging.getLogger(__name__)


# This is a lambda function to return different objects
def DEFAULT_LABEL_INDEX():
return {
SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label
}
DEFAULT_LABEL_INDEX = {
SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index: SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label
}


@dataclass
Expand All @@ -47,7 +44,6 @@
flags,
session: Session,
upload: Upload | None = None,
parallel_idx=None,
) -> UploadProcessingResult:
toc, env = None, None

Expand All @@ -71,23 +67,15 @@
else:
ignored_file_lines = None

# Get a sessionid to merge into
# anything merged into the original_report
# will take on this sessionid
# But we don't actually merge yet in case the report is empty.
# This is done to avoid garbage sessions to build up in the report
# How can you be sure this will be the sessionid used when you actually merge it? Remember that this piece of code runs inside a lock u.u
if parallel_idx is not None:
sessionid = parallel_idx
else:
sessionid = report.next_session_number()
session.id = sessionid
if env:
session.env = dict([e.split("=", 1) for e in env.split("\n") if "=" in e])

if flags:
session.flags = flags

sessionid = report.next_session_number()
session.id = sessionid

# [javascript] check for both coverage.json and coverage/coverage.lcov
skip_files = set()
for report_file in raw_reports.get_uploaded_files():
Expand All @@ -105,7 +93,7 @@
if should_use_encoded_labels:
# We initialize the labels_index (which defaults to {}) to force the special label
# to always be index 0
temporary_report.labels_index = DEFAULT_LABEL_INDEX()
temporary_report.labels_index = dict(DEFAULT_LABEL_INDEX)

Check warning on line 96 in services/report/raw_upload_processor.py

View check run for this annotation

Codecov Notifications / codecov/patch

services/report/raw_upload_processor.py#L96

Added line #L96 was not covered by tests

joined = True
for flag in flags or []:
Expand Down Expand Up @@ -154,7 +142,7 @@

if (
should_use_encoded_labels
and temporary_report.labels_index == DEFAULT_LABEL_INDEX()
and temporary_report.labels_index == DEFAULT_LABEL_INDEX
):
# This means that, even though this report _could_ use encoded labels,
# none of the reports processed contributed any new labels to it.
Expand All @@ -163,9 +151,7 @@

# Now we actually add the session to the original_report
# Because we know that the processing was successful
sessionid, session = report.add_session(
session, use_id_from_session=parallel_idx is not None
)
_sessionid, session = report.add_session(session, use_id_from_session=True)
# Adjust sessions removed carryforward sessions that are being replaced
if session.flags:
session_adjustment = clear_carryforward_sessions(
Expand All @@ -191,12 +177,13 @@
}
if original_report.labels_index is None:
original_report.labels_index = {}
labels_index = original_report.labels_index

if (
SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index
not in original_report.labels_index
not in labels_index
):
original_report.labels_index[
labels_index[
SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_index
] = SpecialLabelsEnum.CODECOV_ALL_LABELS_PLACEHOLDER.corresponding_label

Expand All @@ -206,17 +193,17 @@
if label_or_id in reverse_index_cache:
return reverse_index_cache[label_or_id]
# Search for label in the report index
for idx, label in original_report.labels_index.items():
for idx, label in labels_index.items():
if label == label_or_id:
reverse_index_cache[label] = idx
return idx
# Label is not present. Add to index.
# Notice that this never picks index 0, that is reserved for the special label
new_index = max(original_report.labels_index.keys()) + 1
new_index = max(labels_index.keys()) + 1
reverse_index_cache[label_or_id] = new_index
# It's OK to update this here because it's inside the
# UploadProcessing lock, so it's exclusive access
original_report.labels_index[new_index] = label_or_id
labels_index[new_index] = label_or_id
return new_index

for report_file in original_report:
Expand All @@ -238,7 +225,7 @@
Uses the original_report as reference, and fixes the to_merge_report as needed
it also extendes the original_report.labels_index with new labels as needed.
"""
if to_merge_report.labels_index is None:
if to_merge_report.labels_index is None or original_report.labels_index is None:
# The new report doesn't have labels to fix
return

Expand Down
23 changes: 21 additions & 2 deletions tasks/tests/integration/test_upload_e2e.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import random
from functools import partial
from typing import Iterable
from uuid import uuid4
Expand Down Expand Up @@ -40,6 +41,8 @@ def write_raw_upload(
redis_key = f"uploads/{repoid}/{commitid}"
redis.lpush(redis_key, upload)

return upload_json


def lines(lines: Iterable[tuple[int, ReportLine]]) -> list[tuple[int, int]]:
return list(((lineno, line.coverage) for lineno, line in lines))
Expand Down Expand Up @@ -209,16 +212,32 @@ def test_full_upload(
commitid,
)

do_upload(
report_service = ReportService({})
commit_report = report_service.initialize_and_save_report(commit)

upload_id = 2**33 + int(random.random() * 2**15)

first_upload_json = do_upload(
b"""
a.rs
<<<<<< network
# path=coverage.lcov
SF:a.rs
DA:1,1
end_of_record
"""
""",
{"upload_id": upload_id},
)

first_upload = report_service.create_report_upload(first_upload_json, commit_report)
first_upload.flags = []
dbsession.flush()

# force the upload to have a really high ID:
dbsession.execute(
f"UPDATE reports_upload SET id={upload_id} WHERE id={first_upload.id}"
)

do_upload(
b"""
a.rs
Expand Down
Loading
Loading