Skip to content

Commit

Permalink
lint & typing
Browse files Browse the repository at this point in the history
  • Loading branch information
atolopko-czi committed Jan 9, 2024
1 parent 8611fcc commit f4bd2a5
Showing 1 changed file with 17 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ def compute_all(cube_path: str, query_filter: str, treatment: str, n_threads: in
# compute each feature group in parallel
n_feature_groups = min(len(features), n_threads)
feature_groups = [features.tolist() for features in np.array_split(np.array(features), n_feature_groups)]
print(f"computing for {len(obs_groups_df)} obs groups ({obs_groups_df.n_obs.sum()} cells) and {len(features)} features using {n_feature_groups} processes, {len(features) // n_feature_groups} features/process")
print(
f"computing for {len(obs_groups_df)} obs groups ({obs_groups_df.n_obs.sum()} cells) and {len(features)} features using {n_feature_groups} processes, {len(features) // n_feature_groups} features/process"
)

# make treatment variable be in the first column of the design matrix
variables = [treatment] + [covariate for covariate in CUBE_LOGICAL_DIMS_OBS if covariate != treatment]
design = pd.get_dummies(obs_groups_df[variables], drop_first=True, dtype=int)

result_groups = ProcessPoolExecutor(max_workers=n_threads).map(
partial(compute_for_features, cube_path, design, obs_groups_df),
feature_groups,
range(len(feature_groups))
partial(compute_for_features, cube_path, design, obs_groups_df), feature_groups, range(len(feature_groups))
)

# flatten results
Expand All @@ -86,30 +86,33 @@ def compute_all(cube_path: str, query_filter: str, treatment: str, n_threads: in
return pd.DataFrame(results, columns=["feature_id", "coef", "z", "pval"], copy=False).set_index("feature_id")


def get_features(cube_path):
def get_features(cube_path: str) -> List[str]:
feature_id_path = os.path.join(cube_path, "feature_ids.json")
if os.path.isfile(feature_id_path):
with open(feature_id_path) as f:
features = json.load(f)
else:
with tiledb.open(os.path.join(cube_path, ESTIMATORS_ARRAY), "r",
config={"soma.init_buffer_bytes": 2 ** 32}) as estimators_array:
features = estimators_array.query(attrs=[], dims=["feature_id"]).df[:][
"feature_id"].drop_duplicates().tolist()
with tiledb.open(
os.path.join(cube_path, ESTIMATORS_ARRAY), "r", config={"soma.init_buffer_bytes": 2**32}
) as estimators_array:
features = (
estimators_array.query(attrs=[], dims=["feature_id"]).df[:]["feature_id"].drop_duplicates().tolist()
)
with open(feature_id_path, "w") as f:
json.dump(features, f)
return features
return cast(List[str], features)


def compute_for_features(cube_path: str, design: pd.DataFrame, obs_groups_df: pd.DataFrame, features: List[str],
feature_group_key: int) -> List[Tuple[str, np.float32, np.float32, np.float32]]:
def compute_for_features(
cube_path: str, design: pd.DataFrame, obs_groups_df: pd.DataFrame, features: List[str], feature_group_key: int
) -> List[Tuple[str, np.float32, np.float32, np.float32]]:
print(f"computing for feature group {feature_group_key}, {features[0]}..{features[-1]}...")
estimators = query_estimators(cube_path, obs_groups_df, features)
cell_counts = obs_groups_df["n_obs"].values
obs_group_joinids = obs_groups_df[["obs_group_joinid"]]

result = [
(feature, *compute_for_feature(cell_counts, obs_group_joinids, design, estimators, feature))
(feature, *compute_for_feature(cell_counts, obs_group_joinids, design, estimators, feature)) # type:ignore
for feature in features
]

Expand All @@ -129,9 +132,7 @@ def compute_for_feature(
feature_estimators = estimators[estimators.feature_id == feature][["obs_group_joinid", "mean", "sem"]]

# ensure estimators are available for all obs groups (for when feature had no expression data for some obs groups)
feature_estimators = obs_group_joinids.merge(
feature_estimators, on="obs_group_joinid", how="left"
)
feature_estimators = obs_group_joinids.merge(feature_estimators, on="obs_group_joinid", how="left")
m = cast(npt.NDArray[np.float32], feature_estimators["mean"].fillna(1e-3).values)
sem = cast(npt.NDArray[np.float32], feature_estimators["sem"].fillna(1e-4).values)

Expand Down

0 comments on commit f4bd2a5

Please sign in to comment.