From e592179dcf52ca615b70516eee0edfdc99e613fa Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 22 Feb 2024 20:54:52 +0530 Subject: [PATCH 1/8] add modules to local requirements.txt --- scripts/earthengine/earthengine_image.py | 1 + scripts/earthengine/requirements.txt | 3 +++ scripts/floods/requirements.txt | 3 +++ 3 files changed, 7 insertions(+) diff --git a/scripts/earthengine/earthengine_image.py b/scripts/earthengine/earthengine_image.py index a0d7a1185c..6cbfa263c6 100644 --- a/scripts/earthengine/earthengine_image.py +++ b/scripts/earthengine/earthengine_image.py @@ -605,6 +605,7 @@ def ee_process(config) -> list: if config['ee_wait_task'] is True, else a list of tasks launched. ''' ee_tasks = [] + ee.Authenticate() ee.Initialize() config['ee_image_count'] = config.get('ee_image_count', 1) time_period = config.get('time_period', 'P1M') diff --git a/scripts/earthengine/requirements.txt b/scripts/earthengine/requirements.txt index 548a8288df..d0e7d0349f 100644 --- a/scripts/earthengine/requirements.txt +++ b/scripts/earthengine/requirements.txt @@ -4,12 +4,15 @@ datacommons earthengine-api geojson geopandas +geopy googleapi google-cloud-logging lxml openpyxl +rasterio rdp requests +requests_cache retry s2sphere shapely diff --git a/scripts/floods/requirements.txt b/scripts/floods/requirements.txt index 548a8288df..d0e7d0349f 100644 --- a/scripts/floods/requirements.txt +++ b/scripts/floods/requirements.txt @@ -4,12 +4,15 @@ datacommons earthengine-api geojson geopandas +geopy googleapi google-cloud-logging lxml openpyxl +rasterio rdp requests +requests_cache retry s2sphere shapely From b174b91788fd227909e07582f5c272acd6101434 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 22 Feb 2024 21:26:21 +0530 Subject: [PATCH 2/8] add bigquery --- scripts/earthengine/requirements.txt | 1 + scripts/floods/requirements.txt | 1 + 2 files changed, 2 insertions(+) diff --git a/scripts/earthengine/requirements.txt b/scripts/earthengine/requirements.txt index d0e7d0349f..eeb30941f7 100644 --- a/scripts/earthengine/requirements.txt +++ b/scripts/earthengine/requirements.txt @@ -1,4 +1,5 @@ # Requirements for Python code in this directory. +bigquery dataclasses datacommons earthengine-api diff --git a/scripts/floods/requirements.txt b/scripts/floods/requirements.txt index d0e7d0349f..eeb30941f7 100644 --- a/scripts/floods/requirements.txt +++ b/scripts/floods/requirements.txt @@ -1,4 +1,5 @@ # Requirements for Python code in this directory. +bigquery dataclasses datacommons earthengine-api From ab2d0fce11b97ac893e4bda3a642d4ca80daa83f Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Thu, 22 Feb 2024 22:45:33 +0530 Subject: [PATCH 3/8] prune requirements --- scripts/earthengine/requirements.txt | 4 ---- scripts/floods/requirements.txt | 4 ---- 2 files changed, 8 deletions(-) diff --git a/scripts/earthengine/requirements.txt b/scripts/earthengine/requirements.txt index eeb30941f7..23f7d8b613 100644 --- a/scripts/earthengine/requirements.txt +++ b/scripts/earthengine/requirements.txt @@ -4,18 +4,14 @@ dataclasses datacommons earthengine-api geojson -geopandas geopy -googleapi google-cloud-logging lxml openpyxl rasterio -rdp requests requests_cache retry s2sphere shapely urllib3 -xlrd diff --git a/scripts/floods/requirements.txt b/scripts/floods/requirements.txt index eeb30941f7..23f7d8b613 100644 --- a/scripts/floods/requirements.txt +++ b/scripts/floods/requirements.txt @@ -4,18 +4,14 @@ dataclasses datacommons earthengine-api geojson -geopandas geopy -googleapi google-cloud-logging lxml openpyxl rasterio -rdp requests requests_cache retry s2sphere shapely urllib3 -xlrd From fb7396ec54f069535ecc8a5de7cdd6f82e768bc5 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 23 Feb 2024 13:24:35 +0530 Subject: [PATCH 4/8] Add manifest for fires import --- .../executor/app/executor/validation.py | 2 +- scripts/earthengine/earthengine_image.py | 36 +++- scripts/fires/firms/fire_event_svobs.tmcf | 7 + scripts/fires/firms/fire_events.tmcf | 12 ++ .../firms/fire_events_pipeline_config.py | 194 ++++++++++++++++++ scripts/fires/firms/manifest.json | 18 ++ scripts/fires/firms/requirements.txt | 17 ++ 7 files changed, 282 insertions(+), 4 deletions(-) create mode 100644 scripts/fires/firms/fire_event_svobs.tmcf create mode 100644 scripts/fires/firms/fire_events.tmcf create mode 100644 scripts/fires/firms/fire_events_pipeline_config.py create mode 100644 scripts/fires/firms/manifest.json create mode 100644 scripts/fires/firms/requirements.txt diff --git a/import-automation/executor/app/executor/validation.py b/import-automation/executor/app/executor/validation.py index a16f0f8dcf..8fca36e609 100644 --- a/import-automation/executor/app/executor/validation.py +++ b/import-automation/executor/app/executor/validation.py @@ -196,7 +196,7 @@ def _is_import_spec_valid(import_spec, repo_dir, import_dir): f'import specification ({import_spec})') absolute_script_paths = [ - os.path.join(repo_dir, import_dir, path) + os.path.join(repo_dir, import_dir, path.split(' ')[0]) for path in import_spec.get('scripts', []) ] missing_paths = _filter_missing_paths(absolute_script_paths) diff --git a/scripts/earthengine/earthengine_image.py b/scripts/earthengine/earthengine_image.py index 3b5d60859b..78330fd34d 100644 --- a/scripts/earthengine/earthengine_image.py +++ b/scripts/earthengine/earthengine_image.py @@ -48,6 +48,12 @@ import sys import time +# Workaround for collection.Callable needed for ee.Initialize() +import collections +import collections.abc + +collections.Callable = collections.abc.Callable + from absl import app from absl import flags from absl import logging @@ -55,6 +61,7 @@ from datetime import datetime from datetime import timedelta from dateutil.relativedelta import relativedelta +from google.auth import compute_engine _SCRIPTS_DIR = os.path.dirname(__file__) sys.path.append(_SCRIPTS_DIR) @@ -69,6 +76,9 @@ from counters import Counters +flags.DEFINE_bool( + 'ee_remote', False, + 'Set to True to use service account auth when running remotely') flags.DEFINE_string('ee_gcloud_project', 'datcom-import-automation-prod', 'Gcloud project with Earth Engine API enabled.') flags.DEFINE_string('ee_dataset', '', @@ -171,6 +181,8 @@ } EE_DEFAULT_CONFIG = { + # Auth mode + 'ee_remote': _FLAGS.ee_remote, # GCloud project 'ee_gcloud_project': _FLAGS.ee_gcloud_project, # Image loading settings. @@ -596,7 +608,26 @@ def export_ee_image_to_gcs(ee_image: ee.Image, config: dict = {}) -> str: return task -def ee_process(config) -> list: +def ee_init(config: dict): + '''Initialize Earth Engine APIs. + Args: + config: dict with the following parameters + ee_remote: bool if True uses EE service account auth. + ee_gcloud_project: Project to use with EE API. + ''' + ee.Authenticate() + # By default use local credentials + credentials = 'persistent' + if config.get('ee_remote'): + # Use the service account scope + scopes = ["https://www.googleapis.com/auth/earthengine"] + credentials = compute_engine.Credentials(scopes=scopes) + + ee.Initialize(credentials=credentials, + project=config.get('ee_gcloud_project')) + + +def ee_process(config: dict) -> list: '''Generate earth engine images and export to GCS. Called should wait for the task to complete. Args: @@ -609,8 +640,7 @@ def ee_process(config) -> list: if config['ee_wait_task'] is True, else a list of tasks launched. ''' ee_tasks = [] - ee.Authenticate() - ee.Initialize(project=config.get('ee_gcloud_project')) + ee_init(config) config['ee_image_count'] = config.get('ee_image_count', 1) time_period = config.get('time_period', 'P1M') cur_date = utils.date_format_by_time_period(utils.date_today(), time_period) diff --git a/scripts/fires/firms/fire_event_svobs.tmcf b/scripts/fires/firms/fire_event_svobs.tmcf new file mode 100644 index 0000000000..2bb2ae164f --- /dev/null +++ b/scripts/fires/firms/fire_event_svobs.tmcf @@ -0,0 +1,7 @@ +Node: E:Events->E0 +typeOf: dcs:StatVarObservation +observationAbout: C:Events->dcid +observationDate: C:Events->observationDate +variableMeasured: dcs:Area_FireEvent +value: C:Events->area +unit: dcs:SquareKilometer diff --git a/scripts/fires/firms/fire_events.tmcf b/scripts/fires/firms/fire_events.tmcf new file mode 100644 index 0000000000..50e8f981c4 --- /dev/null +++ b/scripts/fires/firms/fire_events.tmcf @@ -0,0 +1,12 @@ +Node: E:Events->E0 +dcid: C:Events->dcid +typeOf: C:Events->typeOf +name: C:Events->name +startDate: C:Events->startDate +endDate: C:Events->endDate +observationPeriod: C:Events->observationPeriod +startLocation: C:Events->startLocation +affectedPlace: C:Events->affectedPlace +area: C:Events->area +observationDate: C:Events->observationDate +geoJsonCoordinates: C:Events->geoJsonCoordinates diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py new file mode 100644 index 0000000000..d735d0b9ab --- /dev/null +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -0,0 +1,194 @@ +# Config to generate FireEvent through the script: events_pipeline.py +{ + 'defaults': { + 'import_name': 'FIRMSFires', + # Set start_date to start of year to be processed. + # Defaults to Jan 1 of current year if left empty. + 'start_date': '', + # Aggregate events upto the end of the year from per-day source files. + 'end_date': '{year}-12-31', + 'batch_days': 1, + 'time_period': 'P{batch_days}D', + + # GCS settings + 'gcs_project': 'datcom', + 'gcs_bucket': 'datcom-prod-imports', + 'gcs_folder': 'scripts/fires/firms', + }, + # State of previous run of the pipeline with input/output for each stage. + 'pipeline_state_file': + 'gs://datcom-prod-imports/scripts/fires/firms/flood_event_pipeline_state_{year}.py', + + # Pipeline stages to generate flood events. + 'stages': [ + # Download NASA FIRMS fires data using the API + { + 'stage': + 'download', + + # API key for NASA FIRMS data download + # Get a MAPS_KEY from https://firms.modaps.eosdis.nasa.gov/api/area/ + #'nasa_firms_api_key': + # '', + 'nasa_firms_api_key': + '712d4ebd9b976c57efddedad99e72cd3', + 'nasa_data_source': "VIIRS_SNPP_NRT", # upto last 60 days + #'nasa_data_source': + # "VIIRS_SNPP_SP", # older than 60 days + 'batch_days': + 1, + 'url': + "https://firms.modaps.eosdis.nasa.gov/api/area/csv/{nasa_firms_api_key}/{nasa_data_source}/world/{batch_days}/{start_date}", + # API rate limits downloads. + # retry downloads after 200 secs until a CSV with date is downloaded. + 'successful_response_regex': + '{year}', + 'retry_interval': + 200, + 'retry_count': + 10, + 'output_file': + 'gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{start_date}-{time_period}.csv', + 'skip_existing_output': + True, + }, + + # Add S2 cells to the downloaded CSV files. + { + 'stage': + 'raster_csv', + 'time_period': + 'P{batch_days}D', + 's2_level': + 10, + 'aggregate': + None, + 'input_data_filter': { + 'area': { + # pick max area for s2 cell. + # each fire in input is a fixed region. + 'aggregate': 'max' + }, + }, + 'input_files': + 'gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{year}*.csv', + 'output_dir': + 'gs://{gcs_bucket}/{gcs_folder}/{stage}/{year}', + 'skip_existing_output': + True, + }, + + # Generate events from the CSV with fires in S2 cells + { + 'stage': + 'events', + + # Process all data files for the whole year. + 'input_files': + 'gs://{gcs_bucket}/{gcs_folder}/raster_csv/{year}/*{year}*.csv', + # Output events csv into a common folder with a year in filename, + # as the import automation can copy all files in a single folder. + 'output_dir': + 'gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-', + 'event_type': + 'FireEvent', + + # Input settings. + # Columns of input_csv that are added as event properties + 'data_columns': [ + 'area', 'frp', 'bright_ti4', 'bright_ti5', 'confidence' + ], + # Columns of input_csv that contains the s2 cell id. + 'place_column': + 's2CellId', + # Input column for date. + 'date_column': + 'acq_date', + + # Processing settings + # Maximum distance within which 2 events are merged. + 'max_overlap_distance_km': + 0, + # Maximum number of cells of same level in between 2 events to be merged. + 'max_overlap_place_hop': + 1, + # S2 level to which data is aggregated. + 's2_level': + 10, # Events are at resolution of level-10 S2 cells. + 'aggregate': + 'sum', # default aggregation for all properties + # Per property settings + 'property_config': { + 'area': { + 'aggregate': 'sum', + 'unit': 'SquareKilometer', + }, + 'affectedPlace': { + 'aggregate': 'list', + }, + }, + # Per property filter params for input data. + 'input_filter_config': { + 'confidence': { + 'regex': '[nh]', + } + }, + 'output_events_filter_config': { + 'AreaSqKm': { + # Only allow fire events with atleast 4sqkm (10%) of events. + 'min': 4.0, + }, + 'affectedPlace': { + # Ignore fires in USA also generated by a different import + 'ignore': 'country/USA' + } + }, + # Per property settings + 'property_config': { + 'aggregate': 'max', + 'area': { + 'aggregate': 'sum', + 'unit': 'SquareKilometer', + }, + }, + # Treat events at the same location beyond 3 days as separate events. + 'max_event_interval_days': + 3, + # Limit time range for an event to 3 months, roughly a season + 'max_event_duration_days': + 90, + # Limit event affected region to 1000 L10 s2 cells, roughly 100K sqkm. + 'max_event_places': + 1000, + + # Enable DC API lookup for place properties + 'dc_api_enabled': + False, + 'dc_api_batch_size': + 200, + # Cache file for place properties like name, location, typeOf + # Cache is updated with new places looked up. + 'place_property_cache_file': + 'gs://datcom-prod-imports/place_cache/place_properties_cache_with_s2_10.pkl', + + # Output settings. + 'output_svobs': + True, + 'output_delimiter': + ',', + 'output_affected_place_polygon': + 'geoJsonCoordinates', + 'polygon_simplification_factor': + None, + 'output_geojson_string': + False, + + # Output svobs per place + # Place svobs generated by entity aggregation pipeline + 'output_place_svobs': + False, + 'output_place_svobs_properties': ['area', 'count'], + 'output_place_svobs_dates': ['YYYY-MM-DD', 'YYYY-MM', 'YYYY'], + }, + ], +} diff --git a/scripts/fires/firms/manifest.json b/scripts/fires/firms/manifest.json new file mode 100644 index 0000000000..b7d5764ff2 --- /dev/null +++ b/scripts/fires/firms/manifest.json @@ -0,0 +1,18 @@ +{ + "import_specifications": [{ + "import_name": + "NASA_VIIRSActiveFiresEvents", + "curator_emails": ["ajaits@google.com"], + "provenance_url": + "https://www.earthdata.nasa.gov/learn/find-data/near-real-time/firms/vnp14imgtdlnrt", + "provenance_description": + "Active fire data across the world as detected by VIIRS onboard the Suomi NPP satellite at 375m resolution.", + "scripts": ["../../earthengine/events_pipeline.py --pipeline_config=fire_events_pipeline_config.py"], + "import_inputs": [{ + "template_mcf": "", + "cleaned_csv": "" + }], + "cron_schedule": + "0 5 * * *" + }] +} diff --git a/scripts/fires/firms/requirements.txt b/scripts/fires/firms/requirements.txt new file mode 100644 index 0000000000..23f7d8b613 --- /dev/null +++ b/scripts/fires/firms/requirements.txt @@ -0,0 +1,17 @@ +# Requirements for Python code in this directory. +bigquery +dataclasses +datacommons +earthengine-api +geojson +geopy +google-cloud-logging +lxml +openpyxl +rasterio +requests +requests_cache +retry +s2sphere +shapely +urllib3 From c81347f92c7efb62de56dc94d354627781709f8f Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 23 Feb 2024 17:14:18 +0530 Subject: [PATCH 5/8] update events script to support invocation from automation from a different folder --- .../executor/app/executor/import_executor.py | 6 ++-- scripts/earthengine/earthengine_image.py | 2 +- scripts/earthengine/events_pipeline.py | 2 +- .../earthengine/pipeline_stage_bigquery.py | 2 +- .../earthengine/pipeline_stage_download.py | 2 +- .../earthengine/pipeline_stage_earthengine.py | 2 +- scripts/earthengine/pipeline_stage_events.py | 2 +- .../earthengine/pipeline_stage_raster_csv.py | 2 +- scripts/earthengine/pipeline_stage_runner.py | 2 +- scripts/earthengine/process_events.py | 2 +- scripts/earthengine/raster_to_csv.py | 2 +- scripts/earthengine/utils.py | 2 +- scripts/fires/firms/README.md | 34 +++++++++++++++++++ .../firms/fire_events_pipeline_config.py | 4 +-- scripts/fires/firms/manifest.json | 2 +- 15 files changed, 50 insertions(+), 18 deletions(-) create mode 100644 scripts/fires/firms/README.md diff --git a/import-automation/executor/app/executor/import_executor.py b/import-automation/executor/app/executor/import_executor.py index e9323bd983..952d4b471b 100644 --- a/import-automation/executor/app/executor/import_executor.py +++ b/import-automation/executor/app/executor/import_executor.py @@ -664,12 +664,12 @@ def _run_with_timeout_async(args: List[str], ) # Log output continuously until the command completes. - for line in process.stdout: - stdout.append(line) - logging.info(f'Process stdout: {line}') for line in process.stderr: stderr.append(line) logging.info(f'Process stderr: {line}') + for line in process.stdout: + stdout.append(line) + logging.info(f'Process stdout: {line}') end_time = time.time() diff --git a/scripts/earthengine/earthengine_image.py b/scripts/earthengine/earthengine_image.py index 78330fd34d..05782a1f85 100644 --- a/scripts/earthengine/earthengine_image.py +++ b/scripts/earthengine/earthengine_image.py @@ -63,7 +63,7 @@ from dateutil.relativedelta import relativedelta from google.auth import compute_engine -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/events_pipeline.py b/scripts/earthengine/events_pipeline.py index f654f87329..f071d3bab1 100644 --- a/scripts/earthengine/events_pipeline.py +++ b/scripts/earthengine/events_pipeline.py @@ -101,7 +101,7 @@ _FLAGS = flags.FLAGS -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_bigquery.py b/scripts/earthengine/pipeline_stage_bigquery.py index afdf1c25ec..b97fdb72ad 100644 --- a/scripts/earthengine/pipeline_stage_bigquery.py +++ b/scripts/earthengine/pipeline_stage_bigquery.py @@ -20,7 +20,7 @@ from absl import logging from google.cloud import bigquery -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_download.py b/scripts/earthengine/pipeline_stage_download.py index 59c48fc468..7d11191a96 100644 --- a/scripts/earthengine/pipeline_stage_download.py +++ b/scripts/earthengine/pipeline_stage_download.py @@ -20,7 +20,7 @@ from absl import logging -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_earthengine.py b/scripts/earthengine/pipeline_stage_earthengine.py index 9844bbc436..7f1bcfa09c 100644 --- a/scripts/earthengine/pipeline_stage_earthengine.py +++ b/scripts/earthengine/pipeline_stage_earthengine.py @@ -19,7 +19,7 @@ from absl import logging -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_events.py b/scripts/earthengine/pipeline_stage_events.py index 4b28162726..5d19712f2f 100644 --- a/scripts/earthengine/pipeline_stage_events.py +++ b/scripts/earthengine/pipeline_stage_events.py @@ -19,7 +19,7 @@ from absl import logging -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_raster_csv.py b/scripts/earthengine/pipeline_stage_raster_csv.py index 76dc1b89ca..804ee03d2f 100644 --- a/scripts/earthengine/pipeline_stage_raster_csv.py +++ b/scripts/earthengine/pipeline_stage_raster_csv.py @@ -21,7 +21,7 @@ from absl import logging -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/pipeline_stage_runner.py b/scripts/earthengine/pipeline_stage_runner.py index 43a8b0ef28..d27d33eac7 100644 --- a/scripts/earthengine/pipeline_stage_runner.py +++ b/scripts/earthengine/pipeline_stage_runner.py @@ -23,7 +23,7 @@ from absl import logging -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/process_events.py b/scripts/earthengine/process_events.py index 16810def14..0991f82995 100644 --- a/scripts/earthengine/process_events.py +++ b/scripts/earthengine/process_events.py @@ -58,7 +58,7 @@ _FLAGS = flags.FLAGS -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/raster_to_csv.py b/scripts/earthengine/raster_to_csv.py index 26b2acef5e..631918295e 100644 --- a/scripts/earthengine/raster_to_csv.py +++ b/scripts/earthengine/raster_to_csv.py @@ -66,7 +66,7 @@ import rasterio import s2sphere -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/earthengine/utils.py b/scripts/earthengine/utils.py index 2c1673a780..2f014c7dd2 100644 --- a/scripts/earthengine/utils.py +++ b/scripts/earthengine/utils.py @@ -33,7 +33,7 @@ from s2sphere import Cell, CellId, LatLng from shapely.geometry import Polygon -_SCRIPTS_DIR = os.path.dirname(__file__) +_SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(_SCRIPTS_DIR) sys.path.append(os.path.dirname(_SCRIPTS_DIR)) sys.path.append(os.path.dirname(os.path.dirname(_SCRIPTS_DIR))) diff --git a/scripts/fires/firms/README.md b/scripts/fires/firms/README.md new file mode 100644 index 0000000000..6eb05eaac0 --- /dev/null +++ b/scripts/fires/firms/README.md @@ -0,0 +1,34 @@ +# Global fire events using NASA FIRMS dataset +This folder contains configs to generate global fire events using the fire data +from [NASA Fire Information for Resource Management System +(FIRMS)](https://firms.modaps.eosdis.nasa.gov/). + +The active and historical fires can be viewed on the [FIRMS Fire +Map](https://firms.modaps.eosdis.nasa.gov/). + +The [NASA API](https://firms.modaps.eosdis.nasa.gov/api/area/) provides the +fires data as a CSV with the location and area of each fire. This is used to +generate fire events by merging regions with fire that are next to each other +within time window. + +The [events +pipeline](https://github.com/datacommonsorg/data/blob/master/scripts/earthengine/events_pipeline.py) +script is used with the `fire_events_pipeline_config.py` that downloads the +latest data form source incrementally and generates fire events for the current +year. + +To run the script, get an API key from +[NASA](https://firms.modaps.eosdis.nasa.gov/api/area/), add it to the config +`fire_events_pipeline_config.py`, update the GCS project and buckets in the +config or set the `output_file` to a local foldea. +Then run the pipeline with the command: +``` +pip install -r requirements.txt +python3 ../../earthengine/events_pipeline.config --pipeline_config=fire_events_pipeline_config.py +``` + +This generates the following output files: + - events.{csv,tmcf}: Data for each fire event + - events-svobs.{csv,tmcf}: StatVarObservations for area of each fire event + - place-svobs.{csv,tmcf}: StatVarObservations for area and count of fires + across places diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py index d735d0b9ab..5358f21ac5 100644 --- a/scripts/fires/firms/fire_events_pipeline_config.py +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -28,10 +28,8 @@ # API key for NASA FIRMS data download # Get a MAPS_KEY from https://firms.modaps.eosdis.nasa.gov/api/area/ - #'nasa_firms_api_key': - # '', 'nasa_firms_api_key': - '712d4ebd9b976c57efddedad99e72cd3', + '', 'nasa_data_source': "VIIRS_SNPP_NRT", # upto last 60 days #'nasa_data_source': # "VIIRS_SNPP_SP", # older than 60 days diff --git a/scripts/fires/firms/manifest.json b/scripts/fires/firms/manifest.json index b7d5764ff2..f59e527d6e 100644 --- a/scripts/fires/firms/manifest.json +++ b/scripts/fires/firms/manifest.json @@ -7,7 +7,7 @@ "https://www.earthdata.nasa.gov/learn/find-data/near-real-time/firms/vnp14imgtdlnrt", "provenance_description": "Active fire data across the world as detected by VIIRS onboard the Suomi NPP satellite at 375m resolution.", - "scripts": ["../../earthengine/events_pipeline.py --pipeline_config=fire_events_pipeline_config.py"], + "scripts": ["../../earthengine/events_pipeline.py --pipeline_config=gs://datcom-prod-imports/scripts/fires/firms/fire_events_pipeline_config.py"], "import_inputs": [{ "template_mcf": "", "cleaned_csv": "" From 4513f2c590e0ec2dcbdd449b3d5b3adf9f6278e8 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 23 Feb 2024 17:18:23 +0530 Subject: [PATCH 6/8] fix path --- import-automation/executor/app/executor/validation.py | 2 +- scripts/earthengine/earthengine_image.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/import-automation/executor/app/executor/validation.py b/import-automation/executor/app/executor/validation.py index 8fca36e609..843c3ded2e 100644 --- a/import-automation/executor/app/executor/validation.py +++ b/import-automation/executor/app/executor/validation.py @@ -197,7 +197,7 @@ def _is_import_spec_valid(import_spec, repo_dir, import_dir): absolute_script_paths = [ os.path.join(repo_dir, import_dir, path.split(' ')[0]) - for path in import_spec.get('scripts', []) + for path in import_spec.get('scripts', []) if path ] missing_paths = _filter_missing_paths(absolute_script_paths) if missing_paths: diff --git a/scripts/earthengine/earthengine_image.py b/scripts/earthengine/earthengine_image.py index 05782a1f85..c4f61d1d70 100644 --- a/scripts/earthengine/earthengine_image.py +++ b/scripts/earthengine/earthengine_image.py @@ -51,7 +51,6 @@ # Workaround for collection.Callable needed for ee.Initialize() import collections import collections.abc - collections.Callable = collections.abc.Callable from absl import app From a33de749af81ef5e7017e71554b13cf1d532bf11 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 23 Feb 2024 17:38:18 +0530 Subject: [PATCH 7/8] fix lint --- .../executor/app/executor/validation.py | 6 +- scripts/earthengine/earthengine_image.py | 1 + .../firms/fire_events_pipeline_config.py | 195 +++++++++--------- 3 files changed, 103 insertions(+), 99 deletions(-) diff --git a/import-automation/executor/app/executor/validation.py b/import-automation/executor/app/executor/validation.py index 843c3ded2e..2b5ce54e72 100644 --- a/import-automation/executor/app/executor/validation.py +++ b/import-automation/executor/app/executor/validation.py @@ -196,8 +196,10 @@ def _is_import_spec_valid(import_spec, repo_dir, import_dir): f'import specification ({import_spec})') absolute_script_paths = [ - os.path.join(repo_dir, import_dir, path.split(' ')[0]) - for path in import_spec.get('scripts', []) if path + os.path.join(repo_dir, import_dir, + path.split(' ')[0]) + for path in import_spec.get('scripts', []) + if path ] missing_paths = _filter_missing_paths(absolute_script_paths) if missing_paths: diff --git a/scripts/earthengine/earthengine_image.py b/scripts/earthengine/earthengine_image.py index c4f61d1d70..05782a1f85 100644 --- a/scripts/earthengine/earthengine_image.py +++ b/scripts/earthengine/earthengine_image.py @@ -51,6 +51,7 @@ # Workaround for collection.Callable needed for ee.Initialize() import collections import collections.abc + collections.Callable = collections.abc.Callable from absl import app diff --git a/scripts/fires/firms/fire_events_pipeline_config.py b/scripts/fires/firms/fire_events_pipeline_config.py index 5358f21ac5..6eb6ce1be7 100644 --- a/scripts/fires/firms/fire_events_pipeline_config.py +++ b/scripts/fires/firms/fire_events_pipeline_config.py @@ -1,192 +1,193 @@ # Config to generate FireEvent through the script: events_pipeline.py { - 'defaults': { - 'import_name': 'FIRMSFires', + "defaults": { + "import_name": "FIRMSFires", # Set start_date to start of year to be processed. # Defaults to Jan 1 of current year if left empty. - 'start_date': '', + "start_date": "", # Aggregate events upto the end of the year from per-day source files. - 'end_date': '{year}-12-31', - 'batch_days': 1, - 'time_period': 'P{batch_days}D', + "end_date": "{year}-12-31", + "batch_days": 1, + "time_period": "P{batch_days}D", # GCS settings - 'gcs_project': 'datcom', - 'gcs_bucket': 'datcom-prod-imports', - 'gcs_folder': 'scripts/fires/firms', + "gcs_project": "datcom", + "gcs_bucket": "datcom-prod-imports", + "gcs_folder": "scripts/fires/firms", }, # State of previous run of the pipeline with input/output for each stage. - 'pipeline_state_file': - 'gs://datcom-prod-imports/scripts/fires/firms/flood_event_pipeline_state_{year}.py', + "pipeline_state_file": + "gs://datcom-prod-imports/scripts/fires/firms/flood_event_pipeline_state_{year}.py", # Pipeline stages to generate flood events. - 'stages': [ + "stages": [ # Download NASA FIRMS fires data using the API { - 'stage': - 'download', + "stage": + "download", # API key for NASA FIRMS data download # Get a MAPS_KEY from https://firms.modaps.eosdis.nasa.gov/api/area/ - 'nasa_firms_api_key': - '', - 'nasa_data_source': "VIIRS_SNPP_NRT", # upto last 60 days - #'nasa_data_source': - # "VIIRS_SNPP_SP", # older than 60 days - 'batch_days': + "nasa_firms_api_key": + "", + "nasa_data_source": + "VIIRS_SNPP_NRT", # upto last 60 days + # Use this if processing data older than 60 days + #"nasa_data_source": "VIIRS_SNPP_SP", + "batch_days": 1, - 'url': + "url": "https://firms.modaps.eosdis.nasa.gov/api/area/csv/{nasa_firms_api_key}/{nasa_data_source}/world/{batch_days}/{start_date}", # API rate limits downloads. # retry downloads after 200 secs until a CSV with date is downloaded. - 'successful_response_regex': - '{year}', - 'retry_interval': + "successful_response_regex": + "{year}", + "retry_interval": 200, - 'retry_count': + "retry_count": 10, - 'output_file': - 'gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{start_date}-{time_period}.csv', - 'skip_existing_output': + "output_file": + "gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{start_date}-{time_period}.csv", + "skip_existing_output": True, }, # Add S2 cells to the downloaded CSV files. { - 'stage': - 'raster_csv', - 'time_period': - 'P{batch_days}D', - 's2_level': + "stage": + "raster_csv", + "time_period": + "P{batch_days}D", + "s2_level": 10, - 'aggregate': + "aggregate": None, - 'input_data_filter': { - 'area': { + "input_data_filter": { + "area": { # pick max area for s2 cell. # each fire in input is a fixed region. - 'aggregate': 'max' + "aggregate": "max" }, }, - 'input_files': - 'gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{year}*.csv', - 'output_dir': - 'gs://{gcs_bucket}/{gcs_folder}/{stage}/{year}', - 'skip_existing_output': + "input_files": + "gs://{gcs_bucket}/{gcs_folder}/download/{year}/{import_name}-download-{year}*.csv", + "output_dir": + "gs://{gcs_bucket}/{gcs_folder}/{stage}/{year}", + "skip_existing_output": True, }, # Generate events from the CSV with fires in S2 cells { - 'stage': - 'events', + "stage": + "events", # Process all data files for the whole year. - 'input_files': - 'gs://{gcs_bucket}/{gcs_folder}/raster_csv/{year}/*{year}*.csv', + "input_files": + "gs://{gcs_bucket}/{gcs_folder}/raster_csv/{year}/*{year}*.csv", # Output events csv into a common folder with a year in filename, # as the import automation can copy all files in a single folder. - 'output_dir': - 'gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-', - 'event_type': - 'FireEvent', + "output_dir": + "gs://{gcs_bucket}/{gcs_folder}/{stage}/{import_name}-{stage}-{year}-without-usa-", + "event_type": + "FireEvent", # Input settings. # Columns of input_csv that are added as event properties - 'data_columns': [ - 'area', 'frp', 'bright_ti4', 'bright_ti5', 'confidence' + "data_columns": [ + "area", "frp", "bright_ti4", "bright_ti5", "confidence" ], # Columns of input_csv that contains the s2 cell id. - 'place_column': - 's2CellId', + "place_column": + "s2CellId", # Input column for date. - 'date_column': - 'acq_date', + "date_column": + "acq_date", # Processing settings # Maximum distance within which 2 events are merged. - 'max_overlap_distance_km': + "max_overlap_distance_km": 0, # Maximum number of cells of same level in between 2 events to be merged. - 'max_overlap_place_hop': + "max_overlap_place_hop": 1, # S2 level to which data is aggregated. - 's2_level': + "s2_level": 10, # Events are at resolution of level-10 S2 cells. - 'aggregate': - 'sum', # default aggregation for all properties + "aggregate": + "sum", # default aggregation for all properties # Per property settings - 'property_config': { - 'area': { - 'aggregate': 'sum', - 'unit': 'SquareKilometer', + "property_config": { + "area": { + "aggregate": "sum", + "unit": "SquareKilometer", }, - 'affectedPlace': { - 'aggregate': 'list', + "affectedPlace": { + "aggregate": "list", }, }, # Per property filter params for input data. - 'input_filter_config': { - 'confidence': { - 'regex': '[nh]', + "input_filter_config": { + "confidence": { + "regex": "[nh]", } }, - 'output_events_filter_config': { - 'AreaSqKm': { + "output_events_filter_config": { + "AreaSqKm": { # Only allow fire events with atleast 4sqkm (10%) of events. - 'min': 4.0, + "min": 4.0, }, - 'affectedPlace': { + "affectedPlace": { # Ignore fires in USA also generated by a different import - 'ignore': 'country/USA' + "ignore": "country/USA" } }, # Per property settings - 'property_config': { - 'aggregate': 'max', - 'area': { - 'aggregate': 'sum', - 'unit': 'SquareKilometer', + "property_config": { + "aggregate": "max", + "area": { + "aggregate": "sum", + "unit": "SquareKilometer", }, }, # Treat events at the same location beyond 3 days as separate events. - 'max_event_interval_days': + "max_event_interval_days": 3, # Limit time range for an event to 3 months, roughly a season - 'max_event_duration_days': + "max_event_duration_days": 90, # Limit event affected region to 1000 L10 s2 cells, roughly 100K sqkm. - 'max_event_places': + "max_event_places": 1000, # Enable DC API lookup for place properties - 'dc_api_enabled': + "dc_api_enabled": False, - 'dc_api_batch_size': + "dc_api_batch_size": 200, # Cache file for place properties like name, location, typeOf # Cache is updated with new places looked up. - 'place_property_cache_file': - 'gs://datcom-prod-imports/place_cache/place_properties_cache_with_s2_10.pkl', + "place_property_cache_file": + "gs://datcom-prod-imports/place_cache/place_properties_cache_with_s2_10.pkl", # Output settings. - 'output_svobs': + "output_svobs": True, - 'output_delimiter': - ',', - 'output_affected_place_polygon': - 'geoJsonCoordinates', - 'polygon_simplification_factor': + "output_delimiter": + ",", + "output_affected_place_polygon": + "geoJsonCoordinates", + "polygon_simplification_factor": None, - 'output_geojson_string': + "output_geojson_string": False, # Output svobs per place # Place svobs generated by entity aggregation pipeline - 'output_place_svobs': + "output_place_svobs": False, - 'output_place_svobs_properties': ['area', 'count'], - 'output_place_svobs_dates': ['YYYY-MM-DD', 'YYYY-MM', 'YYYY'], + "output_place_svobs_properties": ["area", "count"], + "output_place_svobs_dates": ["YYYY-MM-DD", "YYYY-MM", "YYYY"], }, ], } From 99325b37c9c2907c2b8159f2febbc42653b5d6e3 Mon Sep 17 00:00:00 2001 From: Ajai Tirumali Date: Fri, 23 Feb 2024 18:35:30 +0530 Subject: [PATCH 8/8] disable timeout for gunicorn --- import-automation/executor/Dockerfile | 2 +- import-automation/executor/app.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/import-automation/executor/Dockerfile b/import-automation/executor/Dockerfile index 1bba3650c7..ecb75719a3 100644 --- a/import-automation/executor/Dockerfile +++ b/import-automation/executor/Dockerfile @@ -27,4 +27,4 @@ RUN pip install -r /workspace/requirements.txt COPY app/. /workspace/app/ -CMD gunicorn --timeout 1800 --workers 5 -b :$PORT app.main:FLASK_APP +CMD gunicorn --timeout 0 --workers 5 -b :$PORT app.main:FLASK_APP diff --git a/import-automation/executor/app.yaml b/import-automation/executor/app.yaml index 2169ce2150..fc52629491 100644 --- a/import-automation/executor/app.yaml +++ b/import-automation/executor/app.yaml @@ -1,5 +1,5 @@ runtime: python37 -entrypoint: gunicorn --timeout 1800 -b :$PORT app.main:FLASK_APP +entrypoint: gunicorn --timeout 0 -b :$PORT app.main:FLASK_APP env_variables: EXECUTOR_PRODUCTION: "True" TMPDIR: "/tmp"