-
Notifications
You must be signed in to change notification settings - Fork 36
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
753 additions
and
247 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
# %% | ||
import os | ||
|
||
import torch | ||
from args import parse_args | ||
|
||
args = parse_args() | ||
# %% | ||
root_path = args.root_path | ||
region = args.region | ||
|
||
data_path = f"{region}/cctorch" | ||
result_path = f"{region}/cctorch/ccpairs" | ||
|
||
# data_path = f"{region}/cctorch_ca" | ||
# result_path = f"{region}/cctorch_ca/ccpairs" | ||
|
||
# data_path = f"{region}/cctorch_gamma" | ||
# result_path = f"{region}/cctorch_gamma/ccpairs" | ||
|
||
if not os.path.exists(f"{root_path}/{result_path}"): | ||
os.makedirs(f"{root_path}/{result_path}") | ||
|
||
|
||
## based on GPU memory | ||
|
||
batch = 1_024 | ||
block_size1 = 1000_000 | ||
block_size2 = 1000_000 | ||
|
||
if args.dtct_pair: | ||
dt_ct = f"{root_path}/{region}/hypodd/dt.ct" | ||
pair_list = f"{root_path}/{region}/hypodd/pairs.txt" | ||
lines = [] | ||
with open(dt_ct, "r") as fp: | ||
for line in fp: | ||
if line.startswith("#"): | ||
ev1, ev2 = line.split()[1:3] | ||
if ev1 > ev2: | ||
ev1, ev2 = ev2, ev1 | ||
lines.append(f"{ev1},{ev2}\n") | ||
|
||
print(f"Number of pairs from hypodd dt.ct: {len(lines)}") | ||
with open(f"{root_path}/{region}/hypodd/pairs.txt", "w") as fp: | ||
fp.writelines(lines) | ||
base_cmd = f"../CCTorch/run.py --pair_list={root_path}/{region}/hypodd/pairs.txt --data_path1={root_path}/{region}/cctorch/template.dat --data_format1=memmap --config={root_path}/{region}/cctorch/config.json --batch_size={batch} --block_size1={block_size1} --block_size2={block_size2} --result_path={root_path}/{result_path}" | ||
|
||
else: | ||
base_cmd = ( | ||
f"../CCTorch/run.py --pair_list={root_path}/{data_path}/pairs.txt --data_path1={root_path}/{data_path}/template.dat --data_format1=memmap " | ||
f"--data_list1={root_path}/{data_path}/cctorch_picks.csv " | ||
f"--events_csv={root_path}/{data_path}/cctorch_events.csv --picks_csv={root_path}/{data_path}/cctorch_picks.csv --stations_csv={root_path}/{data_path}/cctorch_stations.csv " | ||
f"--config={root_path}/{data_path}/config.json --batch_size={batch} --block_size1={block_size1} --block_size2={block_size2} " | ||
f"--result_path={root_path}/{result_path}" | ||
) | ||
|
||
|
||
if torch.cuda.is_available(): | ||
device = "cuda" | ||
num_gpu = torch.cuda.device_count() | ||
elif torch.backends.mps.is_available(): | ||
device = "mps" | ||
num_gpu = 0 | ||
else: | ||
device = "cpu" | ||
num_gpu = 0 | ||
|
||
if num_gpu > 0: | ||
cmd = f"torchrun --standalone --nproc_per_node {num_gpu} {base_cmd} --device={device}" | ||
else: | ||
cmd = f"python {base_cmd} --device={device}" | ||
print(cmd) | ||
os.system(cmd) | ||
|
||
# %% | ||
for rank in range(num_gpu): | ||
if not os.path.exists(f"{root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv"): | ||
continue | ||
if rank == 0: | ||
cmd = f"cat {root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv > {root_path}/{data_path}/dtcc.csv" | ||
else: | ||
cmd = ( | ||
f"tail -n +2 {root_path}/{result_path}/CC_{rank:03d}_{num_gpu:03d}.csv >> {root_path}/{data_path}/dtcc.csv" | ||
) | ||
print(cmd) | ||
os.system(cmd) | ||
|
||
|
||
cmd = f"cat {root_path}/{result_path}/CC_*_{num_gpu:03d}_dt.cc > {root_path}/{data_path}/dt.cc" | ||
print(cmd) | ||
os.system(cmd) | ||
|
||
# # %% | ||
# os.chdir(f"{root_path}/{region}/cctorch") | ||
# source_file = f"ccpairs/CC_{num_gpu:03d}_dt.cc" | ||
# target_file = f"dt.cc" | ||
# print(f"{source_file} -> {target_file}") | ||
# if os.path.lexists(target_file): | ||
# os.remove(target_file) | ||
# os.symlink(source_file, target_file) | ||
|
||
# source_file = f"ccpairs/CC_{num_gpu:03d}.csv" | ||
# target_file = f"dtcc.csv" | ||
# print(f"{source_file} -> {target_file}") | ||
# if os.path.lexists(target_file): | ||
# os.remove(target_file) | ||
# os.symlink(source_file, target_file) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
import argparse | ||
import time | ||
from concurrent.futures import ThreadPoolExecutor | ||
|
||
import sky | ||
|
||
|
||
# NUM_NODES = 8 | ||
def parse_args(): | ||
parser = argparse.ArgumentParser() | ||
parser.add_argument("--num_nodes", type=int, default=32) | ||
parser.add_argument("--year", type=int, default=2023) | ||
parser.add_argument("--region", type=str, default="CA") | ||
return parser.parse_args() | ||
|
||
|
||
args = parse_args() | ||
NUM_NODES = args.num_nodes | ||
YEAR = args.year | ||
REGION = args.region | ||
|
||
task = sky.Task( | ||
name="run_adloc", | ||
setup=""" | ||
echo "Begin setup." | ||
echo export WANDB_API_KEY=$WANDB_API_KEY >> ~/.bashrc | ||
pip install -U h5py tqdm wandb pandas scipy scikit-learn numpy==1.26.4 | ||
pip install -U fsspec gcsfs s3fs | ||
pip install -U obspy pyproj | ||
pip install -e /opt/ADLoc | ||
""", | ||
run=""" | ||
num_nodes=`echo "$SKYPILOT_NODE_IPS" | wc -l` | ||
master_addr=`echo "$SKYPILOT_NODE_IPS" | head -n1` | ||
if [ "$SKYPILOT_NODE_RANK" == "0" ]; then | ||
ls -al /opt | ||
ls -al /data | ||
ls -al ./ | ||
fi | ||
python cut_templates_cc.py --num_node $NUM_NODES --node_rank $NODE_RANK --year $YEAR | ||
""", | ||
workdir=".", | ||
num_nodes=1, | ||
envs={"NUM_NODES": NUM_NODES, "NODE_RANK": 0, "YEAR": YEAR}, | ||
) | ||
|
||
task.set_file_mounts( | ||
{ | ||
"/opt/ADLoc": "../../ADLoc", | ||
}, | ||
) | ||
# task.set_storage_mounts({ | ||
# '/remote/imagenet/': sky.Storage(name='my-bucket', | ||
# source='/local/imagenet'), | ||
# }) | ||
task.set_resources( | ||
sky.Resources( | ||
cloud=sky.GCP(), | ||
region="us-west1", # GCP | ||
# region="us-west-2", # AWS | ||
accelerators=None, | ||
cpus=16, | ||
disk_tier="low", | ||
disk_size=50, # GB | ||
memory=None, | ||
use_spot=False, | ||
), | ||
) | ||
|
||
# for NODE_RANK in range(NUM_NODES): | ||
# task.update_envs({"NODE_RANK": NODE_RANK}) | ||
# cluster_name = f"cctorch-{NODE_RANK:02d}" | ||
# print(f"Launching cluster {cluster_name}-{NUM_NODES}...") | ||
# sky.jobs.launch( | ||
# task, | ||
# name=f"{cluster_name}", | ||
# ) | ||
|
||
jobs = [] | ||
try: | ||
sky.status(refresh=True) | ||
except Exception as e: | ||
print(e) | ||
|
||
with ThreadPoolExecutor(max_workers=NUM_NODES) as executor: | ||
for NODE_RANK in range(NUM_NODES): | ||
|
||
task.update_envs({"NODE_RANK": NODE_RANK}) | ||
cluster_name = f"cctorch-{YEAR}-{NODE_RANK:02d}" | ||
|
||
status = sky.status(cluster_names=[f"{cluster_name}"], refresh=True) | ||
if len(status) > 0: | ||
if status[0]["status"].value == "INIT": | ||
sky.down(f"{cluster_name}") | ||
if (not status[0]["to_down"]) and (not status[0]["status"].value == "INIT"): | ||
sky.autostop(f"{cluster_name}", idle_minutes=10, down=True) | ||
print(f"Cluster {cluster_name}/{NUM_NODES} already exists.") | ||
continue | ||
|
||
status = sky.status(cluster_names=[f"{cluster_name}"]) | ||
if len(status) == 0: | ||
print(f"Launching cluster {cluster_name}/{NUM_NODES}...") | ||
jobs.append( | ||
executor.submit( | ||
sky.launch, | ||
task, | ||
cluster_name=f"{cluster_name}", | ||
idle_minutes_to_autostop=10, | ||
down=True, | ||
detach_setup=True, | ||
detach_run=True, | ||
) | ||
) | ||
time.sleep(5) | ||
|
||
for job in jobs: | ||
print(job.result()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
# %% | ||
import json | ||
import os | ||
from concurrent.futures import ThreadPoolExecutor | ||
|
||
import fsspec | ||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
import pandas as pd | ||
from tqdm import tqdm | ||
|
||
# %% | ||
if __name__ == "__main__": | ||
|
||
# %% | ||
result_path = "results/" | ||
if not os.path.exists(result_path): | ||
os.makedirs(result_path) | ||
|
||
# %% | ||
protocol = "gs" | ||
token_json = f"{os.environ['HOME']}/.config/gcloud/application_default_credentials.json" | ||
with open(token_json, "r") as fp: | ||
token = json.load(fp) | ||
|
||
bucket = "quakeflow_catalog" | ||
folder = "Cal/cctorch" | ||
|
||
fs = fsspec.filesystem(protocol, token=token) | ||
|
||
# %% | ||
def plot_templates(templates, events, picks): | ||
templates = templates - np.nanmean(templates, axis=(-1), keepdims=True) | ||
std = np.std(templates, axis=(-1), keepdims=True) | ||
std[std == 0] = 1.0 | ||
templates = templates / std | ||
|
||
plt.figure(figsize=(10, 10)) | ||
plt.imshow(templates[:, -1, 0, :], origin="lower", aspect="auto", vmin=-0.3, vmax=0.3, cmap="RdBu_r") | ||
plt.colorbar() | ||
plt.show() | ||
|
||
# %% | ||
years = [2023] | ||
|
||
for year in years: | ||
num_jday = 366 if (year % 4 == 0 and year % 100 != 0) or year % 400 == 0 else 365 | ||
|
||
for jday in range(1, num_jday + 1): | ||
|
||
if not fs.exists(f"{bucket}/{folder}/{year}/template_{jday:03d}.dat"): | ||
continue | ||
|
||
with fs.open(f"{bucket}/{folder}/{year}/cctorch_picks_{jday:03d}.csv", "r") as fp: | ||
picks = pd.read_csv(fp, dtype=str) | ||
with fs.open(f"{bucket}/{folder}/{year}/cctorch_events_{jday:03d}.csv", "r") as fp: | ||
events = pd.read_csv(fp, dtype=str) | ||
with fs.open(f"{bucket}/{folder}/{year}/config_{jday:03d}.json", "r") as fp: | ||
config = json.load(fp) | ||
template_file = fs.open(f"{bucket}/{folder}/{year}/template_{jday:03d}.dat", "rb") | ||
templates = np.frombuffer(template_file.read(), dtype=np.float32).reshape(tuple(config["template_shape"])) | ||
template_file.close() | ||
|
||
print(f"events: {len(events):,} ") | ||
print(f"picks: {len(picks):,} ") | ||
print(f"templates: {templates.shape}") | ||
|
||
picks.to_csv(f"{result_path}/picks_{year:04d}_{jday:03d}.csv", index=False) | ||
events.to_csv(f"{result_path}/events_{year:04d}_{jday:03d}.csv", index=False) | ||
np.save(f"{result_path}/templates_{year:04d}_{jday:03d}.npy", templates) | ||
|
||
plot_templates(templates, events, picks) | ||
|
||
# break | ||
|
||
# %% |