Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add analysis functions for M&E results #13

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
164 changes: 163 additions & 1 deletion ipod/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import Tuple
from typing import Dict, List, Tuple, Union

import numpy as np
import pyarrow as pa
Expand Down Expand Up @@ -356,3 +356,165 @@ def assign_duplicate_observations(
filtered_orbit_members = qv.defragment(filtered_orbit_members)

return filtered, filtered_orbit_members


class MergeSummary(qv.Table):
old_orbit_id = qv.LargeStringColumn()
merged_orbit_id = qv.LargeStringColumn()
old_obs_count = qv.Int64Column()
old_destination_orbit_obs_count = qv.Int64Column()
num_obs_carried_over = qv.Int64Column()
merged_orbit_obs_count = qv.Int64Column()


def identify_merged_dropped_orbits(
members_initial: FittedOrbitMembers,
orbits_initial: FittedOrbits,
members_me: FittedOrbitMembers,
orbits_me: FittedOrbits,
) -> Tuple[MergeSummary, list[str]]:
missing_orbits = orbits_initial.apply_mask(
pc.invert(
pc.is_in(orbits_initial.column("orbit_id"), orbits_me.orbit_id.unique())
)
)
missing_ids = missing_orbits.orbit_id.to_pylist()

# determine which orbits now contain obs from the subsumed orbits
merges: Dict[str, List[Union[str, int, None]]] = {
"old_orbit_id": [],
"merged_orbit_id": [],
"old_obs_count": [],
"old_destination_orbit_obs_count": [],
"num_obs_carried_over": [],
"merged_orbit_obs_count": [],
}
orbits_removed = []
for id in missing_ids:
old_members = members_initial.where(
pc.equal(members_initial.column("orbit_id"), id)
)
# now find what orbits contains these in the result members
obs_ids = old_members.obs_id
new_members = members_me.apply_mask(pc.is_in(members_me.obs_id, obs_ids))
if len(new_members) == 0:
orbits_removed.append(id)
# now find the orbit ids of these members
new_orbit_ids = new_members.orbit_id

# a few cases arise here, we could have one or more
# orbits that contain the obs from the old orbit

# this denotes a partial merge
for new_id in new_orbit_ids.unique().to_pylist():
members_merged_orbit = members_me.apply_mask(
pc.equal(members_me.column("orbit_id"), new_id)
)
merges["old_orbit_id"].append(id)
merges["merged_orbit_id"].append(new_id)
merges["old_obs_count"].append(len(obs_ids))
merges["old_destination_orbit_obs_count"].append(
len(
members_initial.where(
pc.equal(members_initial.column("orbit_id"), new_id)
)
)
)
merges["num_obs_carried_over"].append(
len(new_members.where(pc.equal(new_members.column("orbit_id"), new_id)))
)
merges["merged_orbit_obs_count"].append(len(members_merged_orbit))

return (
MergeSummary.from_kwargs(
old_orbit_id=merges["old_orbit_id"],
merged_orbit_id=merges["merged_orbit_id"],
old_obs_count=merges["old_obs_count"],
old_destination_orbit_obs_count=merges["old_destination_orbit_obs_count"],
num_obs_carried_over=merges["num_obs_carried_over"],
merged_orbit_obs_count=merges["merged_orbit_obs_count"],
),
orbits_removed,
)


class MERecoverySummary(qv.Table):
old_orbit_id = qv.LargeStringColumn()
merged_orbit_id = qv.LargeStringColumn()
expected_num_obs = qv.Int64Column()
number_observations_matched = qv.Int64Column()
total_obs_resulting_linkage = qv.Int64Column()


def analyze_me_output(
members_expected: FittedOrbitMembers,
members_initial: FittedOrbitMembers,
orbits_initial: FittedOrbits,
orbits_me: FittedOrbits,
members_me: FittedOrbitMembers,
) -> Tuple[MergeSummary, MERecoverySummary, list[str]]:
"""
Analyze the output of the merge and extend process. This function will return
summaries of merged orbits, orbits that were removed, and the results of ME
run based on an expected set of linkages (members_expected).
"""

# first find the merged/removed orbits
merge_summary, orbits_removed = identify_merged_dropped_orbits(
members_initial, orbits_initial, members_me, orbits_me
)

# now find the deltas from the expected members
me_results: Dict[str, List[Union[str, int, None]]] = {
"old_orbit_id": [],
"merged_orbit_id": [],
"expected_num_obs": [],
"number_observations_matched": [],
"total_obs_resulting_linkage": [],
}
for orb_id in members_expected.orbit_id.unique().to_pylist():
orbit_members = members_expected.where(
pc.equal(members_expected.column("orbit_id"), orb_id)
)
obs_ids = orbit_members.obs_id
# find obs in post-me members that match these ids
matching_members = members_me.apply_mask(pc.is_in(members_me.obs_id, obs_ids))
# Case where we only match to one orbit
if len(matching_members.orbit_id.unique()) == 1:
resulting_orbit_members_complete = members_me.where(
pc.equal(members_me.column("orbit_id"), matching_members.orbit_id[0])
)
me_results["old_orbit_id"].append(orb_id)
me_results["merged_orbit_id"].append(matching_members.orbit_id[0])
me_results["expected_num_obs"].append(len(orbit_members))
me_results["number_observations_matched"].append(len(matching_members))
me_results["total_obs_resulting_linkage"].append(
len(resulting_orbit_members_complete)
)
elif len(matching_members.orbit_id.unique()) > 1:
# case where an expected linkage was split between several orbits
for new_id in matching_members.orbit_id.unique():
resulting_orbit_members_complete = members_me.where(
pc.equal(members_me.column("orbit_id"), new_id)
)
me_results["old_orbit_id"].append(orb_id)
me_results["merged_orbit_id"].append(new_id)
me_results["expected_num_obs"].append(len(orbit_members))
me_results["number_observations_matched"].append(
len(
matching_members.where(
pc.equal(matching_members.column("orbit_id"), new_id)
)
)
)
me_results["total_obs_resulting_linkage"].append(
len(resulting_orbit_members_complete)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need the conditional and can just use the logic where we have more than one unique orbit id. We will also need to control for when 0 orbit members match the expected orbit id.

me_recovery_summary = MERecoverySummary.from_kwargs(
old_orbit_id=me_results["old_orbit_id"],
merged_orbit_id=me_results["merged_orbit_id"],
expected_num_obs=me_results["expected_num_obs"],
number_observations_matched=me_results["number_observations_matched"],
total_obs_resulting_linkage=me_results["total_obs_resulting_linkage"],
)
return merge_summary, me_recovery_summary, orbits_removed
Loading