Skip to content

Commit

Permalink
Merge pull request #1045 from nasa/issue_1030_empty_orbit_file_check
Browse files Browse the repository at this point in the history
Empty orbit file check during SLC ingest
  • Loading branch information
hhlee445 authored Dec 17, 2024
2 parents a999b04 + 9f70322 commit 3da74d1
Show file tree
Hide file tree
Showing 19 changed files with 481 additions and 522 deletions.
143 changes: 0 additions & 143 deletions data_subscriber/asf_download.py

This file was deleted.

157 changes: 150 additions & 7 deletions data_subscriber/asf_slc_download.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,155 @@

import glob
import json
import netrc
import os
from datetime import datetime, timedelta
from pathlib import PurePath, Path
from os.path import abspath, getsize, join

import requests

from data_subscriber import ionosphere_download
from data_subscriber.asf_download import DaacDownloadAsf
from data_subscriber.download import DaacDownload
from data_subscriber.url import (
_has_url, _to_urls, _to_https_urls, _slc_url_to_chunk_id, form_batch_id
)
from tools import stage_orbit_file
from tools.stage_ionosphere_file import IonosphereFileNotFoundException
from tools.stage_orbit_file import (parse_orbit_time_range_from_safe,
NoQueryResultsException,
NoSuitableOrbitFileException,
T_ORBIT,
ORBIT_PAD)
from util.dataspace_util import (NoQueryResultsException,
NoSuitableOrbitFileException,
DEFAULT_DATASPACE_ENDPOINT)


class AsfDaacSlcDownload(DaacDownload):

def __init__(self, provider):
super().__init__(provider)
self.daac_s3_cred_settings_key = "SLC_DOWNLOAD"

def perform_download(self, session: requests.Session, es_conn, downloads: list[dict], args, token, job_id):
for download in downloads:
if not _has_url(download):
continue

if args.transfer_protocol == "https":
product_url = _to_https_urls(download)
else:
product_url = _to_urls(download)

self.logger.info("Processing product_url=%s", product_url)
product_id = _slc_url_to_chunk_id(product_url, str(download['revision_id']))

product_download_dir = self.downloads_dir / product_id
product_download_dir.mkdir(exist_ok=True)

if args.dry_run:
self.logger.info("args.dry_run=%s. Skipping download.", args.dry_run)
continue

if product_url.startswith("s3"):
product = product_filepath = self.download_product_using_s3(
product_url, token, target_dirpath=product_download_dir.resolve(), args=args
)
else:
product = product_filepath = self.download_asf_product(
product_url, token, product_download_dir
)

self.logger.info("Marking %s as downloaded.", product_filepath)
self.logger.debug("download['id']=%s", download['id'])

es_conn.mark_product_as_downloaded(download['id'], job_id)

self.logger.debug(f"product_url_downloaded={product_url}")

additional_metadata = {}

try:
additional_metadata['processing_mode'] = download['processing_mode']
except KeyError:
self.logger.warning("processing_mode not found in the slc_catalog ES index")

if download.get("intersects_north_america"):
self.logger.info("Adding intersects_north_america to dataset metadata")
additional_metadata["intersects_north_america"] = True

dataset_dir = self.extract_one_to_one(product, self.cfg, working_dir=Path.cwd(),
extra_metadata=additional_metadata,
name_postscript='-r'+str(download['revision_id']))

self.update_pending_dataset_with_index_name(dataset_dir, '-r' + str(download['revision_id']))

class AsfDaacSlcDownload(DaacDownloadAsf):
def download_orbit_file(self, dataset_dir, product_filepath, additional_metadata):
# Rename the dataset_dir to match the pattern w revision_id
new_dataset_dir = dataset_dir.parent / form_batch_id(dataset_dir.name, str(download['revision_id']))
self.logger.debug("new_dataset_dir=%s", str(new_dataset_dir))

os.rename(str(dataset_dir), str(new_dataset_dir))

self.download_orbit_file(new_dataset_dir, product_filepath)

# We've observed cases where the orbit file download seems to complete
# successfully, but the resulting files are empty, causing the PGE/SAS to crash.
# Check for any empty files now, so we can fail during this download job
# rather than during the SCIFLO job.
self.check_for_empty_orbit_files(new_dataset_dir)

if additional_metadata['processing_mode'] in ("historical", "reprocessing"):
self.logger.info(
"Processing mode is %s. Attempting to download ionosphere correction file.",
additional_metadata['processing_mode']
)
self.download_ionosphere_file(new_dataset_dir, product_filepath)

self.logger.info("Removing %s", product_filepath)
product_filepath.unlink(missing_ok=True)

def download_asf_product(self, product_url, token: str, target_dirpath: Path):
self.logger.info("Requesting from %s", product_url)

asf_response = self._handle_url_redirect(product_url, token)
asf_response.raise_for_status()

product_filename = PurePath(product_url).name
product_download_path = target_dirpath / product_filename

with open(product_download_path, "wb") as file:
file.write(asf_response.content)

return product_download_path.resolve()

def update_pending_dataset_with_index_name(self, dataset_dir: PurePath, postscript):
self.logger.info("Updating dataset's dataset.json with index name")

with Path(dataset_dir / f"{dataset_dir.name}{postscript}.dataset.json").open("r") as fp:
dataset_json: dict = json.load(fp)

with Path(dataset_dir / f"{dataset_dir.name}{postscript}.met.json").open("r") as fp:
met_dict: dict = json.load(fp)

dataset_json.update(
{
"index": {
"suffix": (
"{version}_{dataset}-{date}".format(version=dataset_json["version"],
dataset=met_dict["ProductType"],
date=datetime.utcnow().strftime("%Y.%m"))
).lower() # suffix index name with `-YYYY.MM
}
}
)

with Path(dataset_dir / f"{dataset_dir.name}.dataset.json").open("w") as fp:
json.dump(dataset_json, fp)

def download_orbit_file(self, dataset_dir, product_filepath):
self.logger.info("Downloading associated orbit file")

# Get the PCM username/password for authentication to Copernicus Dataspace
username, _, password = netrc.netrc().authenticators('dataspace.copernicus.eu')
username, _, password = netrc.netrc().authenticators(DEFAULT_DATASPACE_ENDPOINT)

(_, safe_start_time, safe_stop_time) = parse_orbit_time_range_from_safe(product_filepath)
safe_start_datetime = datetime.strptime(safe_start_time, "%Y%m%dT%H%M%S")
Expand Down Expand Up @@ -101,6 +231,20 @@ def download_orbit_file(self, dataset_dir, product_filepath, additional_metadata

self.logger.info("Added orbit file(s) to dataset")

def check_for_empty_orbit_files(self, dataset_dir):
self.logger.info("Checking for empty orbit file downloads within %s", dataset_dir)

orbit_file_pattern = join(abspath(dataset_dir), "*.EOF")

for orbit_file_path in glob.iglob(orbit_file_pattern):
if getsize(orbit_file_path) == 0:
raise RuntimeError(
f"Orbit file {orbit_file_path} was downloaded but empty.\n"
f"This download job will need to be retried once a valid orbit "
f"file is available."
)
else:
self.logger.info("All downloaded orbit files are non-empty")
def download_ionosphere_file(self, dataset_dir, product_filepath):
try:
output_ionosphere_filepath = ionosphere_download.download_ionosphere_correction_file(
Expand Down Expand Up @@ -130,4 +274,3 @@ def update_pending_dataset_metadata_with_ionosphere_metadata(self, dataset_dir:

with Path(dataset_dir / f"{dataset_dir.name}.met.json").open("w") as fp:
json.dump(met_json, fp)

31 changes: 3 additions & 28 deletions data_subscriber/download.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import logging

import shutil
from datetime import datetime
from pathlib import PurePath, Path
Expand All @@ -17,37 +17,12 @@
from data_subscriber.cmr import Provider, CMR_TIME_FORMAT
from data_subscriber.query import DateTimeRange
from data_subscriber.url import _to_batch_id, _to_orbit_number
from tools.stage_orbit_file import fatal_code
from util.backoff_util import fatal_code
from util.conf_util import SettingsConf

logger = logging.getLogger(__name__)
from util.edl_util import SessionWithHeaderRedirection

AWS_REGION = "us-west-2"

class SessionWithHeaderRedirection(requests.Session):
"""
Borrowed from https://wiki.earthdata.nasa.gov/display/EL/How+To+Access+Data+With+Python
"""

def __init__(self, username, password, auth_host):
super().__init__()
self.auth = (username, password)
self.auth_host = auth_host

# Overrides from the library to keep headers when redirected to or from
# the NASA auth host.
def rebuild_auth(self, prepared_request, response):
headers = prepared_request.headers
url = prepared_request.url

if "Authorization" in headers:
original_parsed = requests.utils.urlparse(response.request.url)
redirect_parsed = requests.utils.urlparse(url)
if (original_parsed.hostname != redirect_parsed.hostname) and \
redirect_parsed.hostname != self.auth_host and \
original_parsed.hostname != self.auth_host:
del headers["Authorization"]


class DaacDownload:

Expand Down
2 changes: 1 addition & 1 deletion tests/10TFP.geojson
2 changes: 1 addition & 1 deletion tests/california_opera.geojson
2 changes: 1 addition & 1 deletion tests/calval_test_frame_only.geojson
2 changes: 1 addition & 1 deletion tests/cslc-s1_priority_framebased.geojson
Loading

0 comments on commit 3da74d1

Please sign in to comment.