diff --git a/.github/workflows/export-data-to-csv.yml b/.github/workflows/export-data-to-csv.yml new file mode 100644 index 0000000000..4d2f2507c9 --- /dev/null +++ b/.github/workflows/export-data-to-csv.yml @@ -0,0 +1,55 @@ +--- +name: Export Data to CSV +on: + # schedule: + # Monthly, on the 5th, at 8am UTC (3am EST) + # - cron: '0 8 5 * *' + workflow_dispatch: + inputs: + environment: + required: true + type: choice + description: The environment the workflow should run on. + options: + - dev + - staging + - preview + - production + +jobs: + scheduled-data-export: + if: ${{ github.event_name == 'schedule' }} + strategy: + matrix: + environments: ["production"] # For now, just do the scheduled job on production to save space + name: Run data export on ${{ inputs.environment }} + runs-on: ubuntu-latest + environment: ${{ matrix.environments }} + env: + space: ${{ matrix.environments }} + steps: + - name: Run Command + uses: cloud-gov/cg-cli-tools@main + with: + cf_username: ${{ secrets.CF_USERNAME }} + cf_password: ${{ secrets.CF_PASSWORD }} + cf_org: gsa-tts-oros-fac + cf_space: ${{ env.space }} + command: cf run-task gsa-fac -k 2G -m 2G --name export_data_to_csv --command "python manage.py export_data" + + dispatch-data-export: + if: ${{ github.event.inputs.environment != '' }} + name: Run data export on ${{ inputs.environment }} + runs-on: ubuntu-latest + environment: ${{ inputs.environment }} + env: + space: ${{ inputs.environment }} + steps: + - name: Run Command + uses: cloud-gov/cg-cli-tools@main + with: + cf_username: ${{ secrets.CF_USERNAME }} + cf_password: ${{ secrets.CF_PASSWORD }} + cf_org: gsa-tts-oros-fac + cf_space: ${{ env.space }} + command: cf run-task gsa-fac -k 2G -m 2G --name export_data_to_csv --command "python manage.py export_data" diff --git a/backend/requirements.txt b/backend/requirements.txt index 001cc55b9b..c10366fda1 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -1193,6 +1193,14 @@ six==1.16.0 \ # orderedmultidict # pyjwkest # python-dateutil +sling==1.3.4 \ + --hash=sha256:70541da99b48313a0cfd5e61c52d984e4d50113b7945308697d32cd50a6bb183 + # via -r ./requirements/requirements.in +sling-linux-arm64==1.3.4 \ + --hash=sha256:08977df437742b415232e8c09c2f7f43181a3b764bef601fa6ebd369f6777783 +sling-linux-amd64==1.3.4 \ + --hash=sha256:d5f3b71fb191e8b9663f88890c464d84ab4c36df3a5fd6148a51cdd2ea9a5099 + # via sling sqlalchemy==2.0.36 \ --hash=sha256:03e08af7a5f9386a43919eda9de33ffda16b44eb11f3b313e6822243770e9763 \ --hash=sha256:0572f4bd6f94752167adfd7c1bed84f4b240ee6203a95e05d1e208d488d0d436 \ diff --git a/backend/requirements/requirements.in b/backend/requirements/requirements.in index d179734444..7a8d4a8915 100644 --- a/backend/requirements/requirements.in +++ b/backend/requirements/requirements.in @@ -36,6 +36,7 @@ python-slugify pyyaml requests>=2.32.3 sqlalchemy +sling sqlparse>=0.5.0 types-python-dateutil==2.9.0.20240821 uritemplate diff --git a/backend/support/management/commands/export_data.py b/backend/support/management/commands/export_data.py new file mode 100644 index 0000000000..498cbf1869 --- /dev/null +++ b/backend/support/management/commands/export_data.py @@ -0,0 +1,138 @@ +import os +import logging + +from config import settings +from datetime import datetime +from django.core.management.base import BaseCommand, CommandError +from sling import Replication, ReplicationStream + +from support.decorators import newrelic_timing_metric +from dissemination.summary_reports import restricted_model_names + +logger = logging.getLogger(__name__) + +S3_CONNECTION = f"""{{ + "type": "s3", + "bucket": "{settings.AWS_PRIVATE_STORAGE_BUCKET_NAME}", + "access_key_id": "{settings.AWS_PRIVATE_ACCESS_KEY_ID}", + "secret_access_key": "{settings.AWS_PRIVATE_SECRET_ACCESS_KEY}", + "endpoint": "{settings.AWS_S3_ENDPOINT_URL}" + }} + """ +DB_URL = os.environ.get("DATABASE_URL") +FAC_DB_URL = ( + f"{DB_URL}?sslmode=disable" if settings.ENVIRONMENT in ["LOCAL", "TEST"] else DB_URL +) +DEFAULT_OPTIONS = { + "target_options": { + "format": "csv", + "compression": "gzip", + "file_max_rows": 0, + } +} + + +class StreamGenerator: + EXCLUDE_NONPUBLIC_QUERY = ( + "select * from {table_name} where report_id in (" + " select dg.report_id from public.dissemination_general dg" + " where dg.audit_year = '{audit_year}' and dg.is_public = 'true' )" + ) + + UNRESTRICTED_QUERY = ( + "select * from {table_name} where report_id in (" + " select dg.report_id from public.dissemination_general dg" + " where dg.audit_year = '{audit_year}')" + ) + + def __init__(self, table_name, friendly_name, query_override=None): + self.table_name = table_name + self.friendly_name = friendly_name + + restricted_tables = [ + "dissemination_" + model for model in restricted_model_names + ] + default_query = ( + self.EXCLUDE_NONPUBLIC_QUERY + if table_name in restricted_tables + else self.UNRESTRICTED_QUERY + ) + self.query = query_override or default_query + + def generate_stream(self, audit_year): + return ( + f"{self.table_name}.{audit_year}", + ReplicationStream( + object=f"bulk_export/{{MM}}/{audit_year}_{self.friendly_name}.csv", + sql=self.query.format( + table_name=self.table_name, audit_year=audit_year + ), + mode="full-refresh", + target_options={"format": "csv"}, + ), + ) + + +STREAM_GENERATORS = [ + StreamGenerator( + friendly_name="General", + table_name="dissemination_general", + query_override="select * from dissemination_general where audit_year = '{audit_year}'", + ), + StreamGenerator( + friendly_name="AdditionalEIN", table_name="dissemination_additionalein" + ), + StreamGenerator( + friendly_name="AdditionalUEI", table_name="dissemination_additionaluei" + ), + StreamGenerator( + friendly_name="CorrectiveActionPlans", table_name="dissemination_captext" + ), + StreamGenerator( + friendly_name="FederalAward", table_name="dissemination_federalaward" + ), + StreamGenerator(friendly_name="Finding", table_name="dissemination_finding"), + StreamGenerator( + friendly_name="FindingText", table_name="dissemination_findingtext" + ), + StreamGenerator(friendly_name="Note", table_name="dissemination_note"), + StreamGenerator( + friendly_name="PassThrough", table_name="dissemination_passthrough" + ), + StreamGenerator( + friendly_name="SecondaryAuditor", table_name="dissemination_secondaryauditor" + ), +] + + +@newrelic_timing_metric("data_export") +def _run_data_export(): + logger.info("Begin exporting data") + # We may want to consider instead of hardcoding 2016 only export the past X years. + # This will only export data that exists, so doing +2 just incase some data is in early + years = range(2016, datetime.today().year + 2) + streams = {} + for stream_generator in STREAM_GENERATORS: + for year in years: + streams.update([stream_generator.generate_stream(year)]) + + replication = Replication( + source="FAC_DB", + target="BULK_DATA_EXPORT", + streams=streams, + defaults=DEFAULT_OPTIONS, + env=dict(FAC_DB=FAC_DB_URL, BULK_DATA_EXPORT=S3_CONNECTION), + debug=settings.DEBUG, + ) + logger.info(f"Exporting {len(streams)} streams") + replication.run() + logger.info("Successfully exported data") + + +class Command(BaseCommand): + def handle(self, *args, **kwargs): + try: + _run_data_export() + except Exception as ex: + logger.error("An error occurred while exporting data", exc_info=ex) + raise CommandError("Error while exporting data")