diff --git a/.dockerignore b/.dockerignore index 9138dfd7c..dbeecbc40 100644 --- a/.dockerignore +++ b/.dockerignore @@ -4,6 +4,7 @@ data data_genbank data_flu data_flu_small +data_flu_genbank data_gisaid_flu data_gisaid_rsv data_genbank_rsv diff --git a/.gcloudignore b/.gcloudignore index 439c793b6..0aa052d51 100644 --- a/.gcloudignore +++ b/.gcloudignore @@ -23,6 +23,7 @@ data_az data_ma data_flu data_flu_small +data_flu_genbank data_gisaid_flu data_gisaid_rsv dist diff --git a/.gitignore b/.gitignore index 57e76e5ce..ad73646b8 100644 --- a/.gitignore +++ b/.gitignore @@ -133,6 +133,7 @@ example_data_genbank/*/lineage_treetime/*.pdf data data_genbank +data_flu_genbank example_data_genbank/rsv/** example_data_genbank/flu/** example_data_genbank/sars2/** diff --git a/config/config_sars2_alpha.yaml b/config/config_sars2_alpha.yaml index c74312d85..08e96ea7e 100644 --- a/config/config_sars2_alpha.yaml +++ b/config/config_sars2_alpha.yaml @@ -15,10 +15,10 @@ static_data_folder: "static_data/sars2" # Path to folder with data to use in development # This path is relative to the project root -example_data_folder: "example_data_genbank" +example_data_folder: "data" # Database for this virus -postgres_db: "cg_alpha" +postgres_db: "cg_gisaid" # ------------------ # INGEST @@ -72,16 +72,44 @@ metadata_cols: title: "Originating lab" submitting_lab: title: "Submitting lab" + # PANGO metadata + conflict: + title: "PANGO conflict" + ambiguity_score: + title: "PANGO ambiguity score" + scorpio_call: + title: "scorpio call" + scorpio_support: + title: "scorpio support" + scorpio_conflict: + title: "scorpio conflict" + scorpio_notes: + title: "scorpio notes" + pangolin_is_designated: + title: "PANGO is_designated" + pangolin_qc_status: + title: "PANGO QC status" + pangolin_qc_notes: + title: "PANGO QC notes" + pangolin_note: + title: "pangolin note" group_cols: lineage: name: "lineage" - title: "Lineage" + title: "PANGO Lineage" description: "" link: title: "(Lineage Descriptions)" href: "https://cov-lineages.org/descriptions.html" show_collapse_options: true + gisaid_lineage: + name: "gisaid_lineage" + title: "PANGO Lineage (GISAID)" + description: "PANGO assignments from GISAID" + link: + title: "(Lineage Descriptions)" + href: "https://cov-lineages.org/descriptions.html" clade: name: "clade" title: "Clade" @@ -91,6 +119,10 @@ group_cols: href: "https://www.gisaid.org/references/statements-clarifications/clade-and-lineage-nomenclature-aids-in-genomic-epidemiology-of-active-hcov-19-viruses/" show_collapse_options: false +# AZ report options +report_gene: "S" +report_group_col: "lineage" + # Surveillance plot options # see: workflow_main/scripts/surveillance.py surv_group_col: "lineage" diff --git a/config/config_sars2_gisaid.yaml b/config/config_sars2_gisaid.yaml index 425f4cdac..345c2efa7 100644 --- a/config/config_sars2_gisaid.yaml +++ b/config/config_sars2_gisaid.yaml @@ -72,6 +72,27 @@ metadata_cols: title: "Originating lab" submitting_lab: title: "Submitting lab" + # PANGO metadata + conflict: + title: "PANGO conflict" + ambiguity_score: + title: "PANGO ambiguity score" + scorpio_call: + title: "scorpio call" + scorpio_support: + title: "scorpio support" + scorpio_conflict: + title: "scorpio conflict" + scorpio_notes: + title: "scorpio notes" + pangolin_is_designated: + title: "PANGO is_designated" + pangolin_qc_status: + title: "PANGO QC status" + pangolin_qc_notes: + title: "PANGO QC notes" + pangolin_note: + title: "pangolin note" group_cols: lineage: diff --git a/config/config_sars2_gisaid_private.yaml b/config/config_sars2_gisaid_private.yaml index a7dd66954..0f7b2c8cc 100644 --- a/config/config_sars2_gisaid_private.yaml +++ b/config/config_sars2_gisaid_private.yaml @@ -72,6 +72,27 @@ metadata_cols: title: "Originating lab" submitting_lab: title: "Submitting lab" + # PANGO metadata + conflict: + title: "PANGO conflict" + ambiguity_score: + title: "PANGO ambiguity score" + scorpio_call: + title: "scorpio call" + scorpio_support: + title: "scorpio support" + scorpio_conflict: + title: "scorpio conflict" + scorpio_notes: + title: "scorpio notes" + pangolin_is_designated: + title: "PANGO is_designated" + pangolin_qc_status: + title: "PANGO QC status" + pangolin_qc_notes: + title: "PANGO QC notes" + pangolin_note: + title: "pangolin note" group_cols: lineage: diff --git a/package-lock.json b/package-lock.json index e1862237e..bad27744f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "covidcg", - "version": "2.7.5-pango", + "version": "2.7.6-qual-rc1", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index d31b17c59..dc850ca94 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "covidcg", - "version": "2.7.5-pango", + "version": "2.7.6-qual-rc1", "description": "", "engines": { "node": ">=8", diff --git a/services/server/cg_server/db_seed/insert_sequences.py b/services/server/cg_server/db_seed/insert_sequences.py index 3b2fa1e2e..d40ef8335 100644 --- a/services/server/cg_server/db_seed/insert_sequences.py +++ b/services/server/cg_server/db_seed/insert_sequences.py @@ -76,7 +76,6 @@ def insert_sequences(conn, data_path, schema="public"): fasta_path = Path(data_path) / "fasta_processed" with conn.cursor() as cur: - cur.execute(sql.SQL("SET search_path TO {};").format(sql.Identifier(schema))) # The SARS-CoV-2 genomes are ~30kb, which seems like it @@ -112,7 +111,6 @@ def insert_sequences(conn, data_path, schema="public"): chunk_size = 10_000 for i, fasta_file in enumerate(sorted(fasta_path.iterdir())): - # if fasta_file.name in loaded_files: # print("Skipping: ", fasta_file.name) # continue @@ -136,6 +134,10 @@ def insert_sequences(conn, data_path, schema="public"): if not line and i < (len(lines) - 1): continue + # If we're on the last line, but the entry is empty, then skip + if not line and i == (len(lines) - 1) and not cur_seq: + continue + # If not the name of an entry, add this line to the current sequence # (some FASTA files will have multiple lines per sequence) if line[0] != ">": @@ -144,7 +146,7 @@ def insert_sequences(conn, data_path, schema="public"): # Start of another entry = end of the previous entry if line[0] == ">" or i == (len(lines) - 1): # Avoid capturing the first one and pushing an empty sequence - if cur_entry: + if cur_entry and cur_seq: buffer.write( cur_entry + "," + cur_seq + "," + fasta_file.name + "\n" ) diff --git a/services/server/cg_server/db_seed/seed.py b/services/server/cg_server/db_seed/seed.py index 2767a62bf..c0036769a 100644 --- a/services/server/cg_server/db_seed/seed.py +++ b/services/server/cg_server/db_seed/seed.py @@ -66,7 +66,6 @@ def df_to_sql(cur, df, table, index_label=None): def seed_database(conn, schema="public"): with conn.cursor() as cur: - cur.execute(sql.SQL("SET search_path TO {};").format(sql.Identifier(schema))) cur.execute("DROP EXTENSION IF EXISTS intarray;") @@ -195,7 +194,6 @@ def seed_database(conn, schema="public"): mutation_fields = ["dna", "gene_aa", "protein_aa"] for grouping in group_mutation_frequencies.keys(): - # Get references reference_names = sorted(group_mutation_frequencies[grouping].keys()) @@ -256,7 +254,6 @@ def seed_database(conn, schema="public"): # Build colormaps for grouping in config["group_cols"].keys(): - # Collect unique group names group_names = [] for reference in global_group_counts.keys(): @@ -296,12 +293,27 @@ def seed_database(conn, schema="public"): print("done") print("Writing sequence metadata...", end="", flush=True) + + isolate_df = pd.read_json(data_path / "isolate_data.json") + isolate_df["collection_date"] = pd.to_datetime(isolate_df["collection_date"]) + isolate_df["submission_date"] = pd.to_datetime(isolate_df["submission_date"]) + # print(isolate_df.columns) + # Make a column for each metadata field metadata_cols = [] metadata_col_defs = [] for field in list(config["metadata_cols"].keys()) + loc_levels: metadata_col_defs.append(sql.SQL(f"{field} INTEGER NOT NULL")) metadata_cols.append(field) + + # Make columns for sequence metadata, if they exist + if "length" in isolate_df.columns: + metadata_cols.append("length") + metadata_col_defs.append(sql.SQL("length INTEGER NOT NULL")) + if "percent_ambiguous" in isolate_df.columns: + metadata_cols.append("percent_ambiguous") + metadata_col_defs.append(sql.SQL("percent_ambiguous REAL NOT NULL")) + metadata_col_defs = sql.SQL(",\n").join(metadata_col_defs) # Make a column for each grouping @@ -338,11 +350,6 @@ def seed_database(conn, schema="public"): ) ) - isolate_df = pd.read_json(data_path / "isolate_data.json") - isolate_df["collection_date"] = pd.to_datetime(isolate_df["collection_date"]) - isolate_df["submission_date"] = pd.to_datetime(isolate_df["submission_date"]) - # print(isolate_df.columns) - # Partition settings min_date = isolate_df["collection_date"].min() # Round latest sequence to the nearest partition break @@ -423,8 +430,7 @@ def seed_database(conn, schema="public"): "segments", "accession_ids", ] - + list(config["metadata_cols"].keys()) - + loc_levels + + metadata_cols + list( filter(lambda x: x != "subtype", config["group_cols"].keys()) ) # Avoid duplicate subtype index @@ -480,7 +486,9 @@ def seed_database(conn, schema="public"): # Clean up the reference name as a SQL ident - no dots reference_name_sql = reference_name.replace(".", "_") - reference_partition_name = f"seqmut_{mutation_field}_{reference_name_sql}" + reference_partition_name = ( + f"seqmut_{mutation_field}_{reference_name_sql}" + ) # Create reference partition cur.execute( @@ -555,13 +563,11 @@ def seed_database(conn, schema="public"): "subtype", "reference", ] - + list(config["metadata_cols"].keys()) - + loc_levels + + metadata_cols + list( filter(lambda x: x != "subtype", config["group_cols"].keys()) ) # Avoid duplicate subtype index ): - cur.execute( sql.SQL( "CREATE INDEX {index_name} ON {table_name}({field});" @@ -756,13 +762,11 @@ def seed_database(conn, schema="public"): "subtype", "reference", ] - + list(config["metadata_cols"].keys()) - + loc_levels + + metadata_cols + list( filter(lambda x: x != "subtype", config["group_cols"].keys()) ) # Avoid duplicate subtype index ): - cur.execute( sql.SQL( "CREATE INDEX {index_name} ON {table_name}({field});" diff --git a/services/server/cg_server/download/genomes.py b/services/server/cg_server/download/genomes.py index 75fa74fcd..c5891c5e8 100644 --- a/services/server/cg_server/download/genomes.py +++ b/services/server/cg_server/download/genomes.py @@ -33,6 +33,8 @@ def download_genomes(conn, req): req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), req.get("selected_reference", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), ) cur.execute( diff --git a/services/server/cg_server/download/metadata.py b/services/server/cg_server/download/metadata.py index 1d4fffbc2..241a80160 100644 --- a/services/server/cg_server/download/metadata.py +++ b/services/server/cg_server/download/metadata.py @@ -16,7 +16,6 @@ def download_metadata(conn, req): - selected_reference = req.get("selected_reference", None) if not selected_reference: raise Exception("No reference specified") @@ -34,6 +33,8 @@ def download_metadata(conn, req): req.get("subm_end_date", None), req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), selected_reference, ) @@ -179,7 +180,10 @@ def download_metadata(conn, req): cur.execute(query) - res_df = pd.DataFrame.from_records(cur.fetchall(), columns=sequence_cols,) + res_df = pd.DataFrame.from_records( + cur.fetchall(), + columns=sequence_cols, + ) # Replace mutation IDs with names for mutation_field in ["dna", "gene_aa", "protein_aa"]: diff --git a/services/server/cg_server/query/group_mutation_frequencies.py b/services/server/cg_server/query/group_mutation_frequencies.py index d16ad6c9f..fa199f664 100644 --- a/services/server/cg_server/query/group_mutation_frequencies.py +++ b/services/server/cg_server/query/group_mutation_frequencies.py @@ -36,7 +36,9 @@ def query_group_mutation_frequencies(conn, req): "mutation_str", ] if mutation_type == "gene_aa" or mutation_type == "protein_aa": - mutation_cols = ["feature",] + mutation_cols + mutation_cols = [ + "feature", + ] + mutation_cols mutation_cols_expr = sql.SQL(",\n").join( [ @@ -94,10 +96,14 @@ def query_group_mutation_frequencies_dynamic(conn, req): mutation_table = "dna_mutation" elif mutation_type == "gene_aa": mutation_table = "gene_aa_mutation" - mutation_cols = ["feature",] + mutation_cols + mutation_cols = [ + "feature", + ] + mutation_cols elif mutation_type == "protein_aa": mutation_table = "protein_aa_mutation" - mutation_cols = ["feature",] + mutation_cols + mutation_cols = [ + "feature", + ] + mutation_cols sequence_where_filter = build_sequence_location_where_filter( group_key, @@ -108,6 +114,8 @@ def query_group_mutation_frequencies_dynamic(conn, req): req.get("subm_end_date", None), req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), selected_reference, ) sequence_mutation_table = "sequence_" + mutation_table @@ -167,4 +175,3 @@ def query_group_mutation_frequencies_dynamic(conn, req): ) return res.to_json(orient="records") - diff --git a/services/server/cg_server/query/metadata.py b/services/server/cg_server/query/metadata.py index e10f21fff..6bb001278 100644 --- a/services/server/cg_server/query/metadata.py +++ b/services/server/cg_server/query/metadata.py @@ -14,7 +14,6 @@ def query_metadata(conn, req): with conn.cursor() as cur: - sequence_where_filter = build_sequence_location_where_filter( req.get("group_key", None), get_loc_level_ids(req), @@ -25,6 +24,8 @@ def query_metadata(conn, req): req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), req.get("selected_reference", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), ) # Iterate over each metadata column, and aggregate counts diff --git a/services/server/cg_server/query/selection.py b/services/server/cg_server/query/selection.py index ab393af5b..60387f8c2 100644 --- a/services/server/cg_server/query/selection.py +++ b/services/server/cg_server/query/selection.py @@ -113,6 +113,8 @@ def build_sequence_where_filter( selected_metadata_fields=None, selected_group_fields=None, selected_reference=None, + sequence_length=None, + percent_ambiguous=None, ): """Build query for filtering sequences based on user's location/date selection and selected metadata fields @@ -138,6 +140,10 @@ def build_sequence_where_filter( - Values are a list of group values, i.e., ["B.1.617.2", "BA.1"] selected_reference: str - Reference name (e.g., "NC_012920.1") + sequence_length: pair of integers + - (min_length, max_length) + percent_ambiguous: pair of floats + - (min_percent, max_percent) Returns ------- @@ -230,12 +236,50 @@ def build_sequence_where_filter( else: group_filters = sql.SQL("") + if sequence_length: + sequence_length_filter = [] + if sequence_length[0] is not None: + sequence_length_filter.append( + sql.SQL('"length" >= {}').format(sql.Literal(sequence_length[0])) + ) + if sequence_length[1] is not None: + sequence_length_filter.append( + sql.SQL('"length" <= {}').format(sql.Literal(sequence_length[1])) + ) + sequence_length_filter = sql.Composed( + [sql.SQL(" AND "), sql.SQL(" AND ").join(sequence_length_filter)] + ) + else: + sequence_length_filter = sql.SQL("") + + if percent_ambiguous: + percent_ambiguous_filter = [] + if percent_ambiguous[0] is not None: + percent_ambiguous_filter.append( + sql.SQL('"percent_ambiguous" >= {}').format( + sql.Literal(percent_ambiguous[0]) + ) + ) + if percent_ambiguous[1] is not None: + percent_ambiguous_filter.append( + sql.SQL('"percent_ambiguous" <= {}').format( + sql.Literal(percent_ambiguous[1]) + ) + ) + percent_ambiguous_filter = sql.Composed( + [sql.SQL(" AND "), sql.SQL(" AND ").join(percent_ambiguous_filter)] + ) + else: + percent_ambiguous_filter = sql.SQL("") + sequence_where_filter = sql.SQL( """ {metadata_filters} {group_filters} "collection_date" >= {start_date} AND "collection_date" <= {end_date} {submission_date_filter} + {sequence_length_filter} + {percent_ambiguous_filter} """ ).format( metadata_filters=metadata_filters, @@ -243,6 +287,8 @@ def build_sequence_where_filter( start_date=sql.Literal(pd.to_datetime(start_date)), end_date=sql.Literal(pd.to_datetime(end_date)), submission_date_filter=submission_date_filter, + sequence_length_filter=sequence_length_filter, + percent_ambiguous_filter=percent_ambiguous_filter, ) return sequence_where_filter @@ -283,7 +329,8 @@ def build_sequence_location_where_filter(group_key, loc_level_ids, *args, **kwar continue loc_where.append( sql.SQL("({loc_level_col} = ANY({loc_ids}))").format( - loc_level_col=sql.Identifier(loc_level), loc_ids=sql.Literal(loc_ids), + loc_level_col=sql.Identifier(loc_level), + loc_ids=sql.Literal(loc_ids), ) ) @@ -370,7 +417,10 @@ def count_coverage( cur.execute(coverage_query) - coverage_df = pd.DataFrame.from_records(cur.fetchall(), columns=["ind", "count"],) + coverage_df = pd.DataFrame.from_records( + cur.fetchall(), + columns=["ind", "count"], + ) if dna_or_aa != constants["DNA_OR_AA"]["DNA"]: if coordinate_mode == constants["COORDINATE_MODES"]["COORD_GENE"]: @@ -417,7 +467,6 @@ def query_and_aggregate(conn, req): selected_protein = req.get("selected_protein", None) with conn.cursor() as cur: - main_query = [] for loc_level in constants["GEO_LEVELS"].values(): loc_ids = req.get(loc_level, None) @@ -433,6 +482,8 @@ def query_and_aggregate(conn, req): req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), req.get("selected_reference", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), ) sequence_where_filter = sql.SQL( "{prior} AND {loc_level_col} = ANY({loc_ids})" @@ -536,6 +587,8 @@ def query_and_aggregate(conn, req): req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), req.get("selected_reference", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), ) coverage_df = count_coverage( cur, diff --git a/services/server/cg_server/query/variant_table.py b/services/server/cg_server/query/variant_table.py index 379a5cdd5..0f97a6022 100644 --- a/services/server/cg_server/query/variant_table.py +++ b/services/server/cg_server/query/variant_table.py @@ -50,6 +50,8 @@ def build_variant_table(conn, req): req.get("subm_end_date", None), req.get("selected_metadata_fields", None), req.get("selected_group_fields", None), + req.get("sequence_length", None), + req.get("percent_ambiguous", None), selected_reference, ) diff --git a/src/components/Modals/SelectSequencesModal.js b/src/components/Modals/SelectSequencesModal.js index 3055523b1..6dfcd3e7c 100644 --- a/src/components/Modals/SelectSequencesModal.js +++ b/src/components/Modals/SelectSequencesModal.js @@ -26,6 +26,7 @@ import CoordinateSelect from '../Selection/CoordinateSelect'; import DateSelect from '../Selection/DateSelect'; import GroupSelect from '../Selection/GroupSelect'; import MetaFieldSelect from '../Selection/MetaFieldSelect'; +import QualitySelect from '../Selection/QualitySelect'; import LoadingSpinner from '../Common/LoadingSpinner'; import { @@ -81,6 +82,8 @@ const SelectSequencesContent = observer(({ onRequestClose }) => { submEndDate: configStore.submEndDate, selectedGroupFields: configStore.selectedGroupFields, selectedMetadataFields: configStore.selectedMetadataFields, + sequenceLengthRange: configStore.sequenceLengthRange, + percentAmbiguousRange: configStore.percentAmbiguousRange, ageRange: configStore.ageRange, }); @@ -443,6 +446,23 @@ const SelectSequencesContent = observer(({ onRequestClose }) => { selectedMetadataFields, }); }; + + const updateQualityFilters = (field, range) => { + const { sequenceLengthRange, percentAmbiguousRange } = metaPending; + if (field === 'sequenceLengthRange') { + sequenceLengthRange[0] = range[0]; + sequenceLengthRange[1] = range[1]; + } else if (field === 'percentAmbiguousRange') { + percentAmbiguousRange[0] = range[0]; + percentAmbiguousRange[1] = range[1]; + } + setMetaPending({ + ...metaPending, + sequenceLengthRange, + percentAmbiguousRange, + }); + }; + // const updateAgeRange = (ageRange) => { // setPending({ // ...pending, @@ -637,6 +657,11 @@ const SelectSequencesContent = observer(({ onRequestClose }) => { updateSelectedGroupFields={updateSelectedGroupFields} /> + + { + // Only render this if we have the quality filters available + if (config['virus'] !== 'sars2') { + return ''; + } + + const handleChange = (field, position, event) => { + //console.log(field, position, event.target.value); + let rng; + if (field === 'sequenceLengthRange') { + rng = sequenceLengthRange; + } else if (field === 'percentAmbiguousRange') { + rng = percentAmbiguousRange; + } + rng[position] = + event.target.value === '' ? null : parseFloat(event.target.value); + + updateQualityFilters(field, rng); + }; + + const qualityFilterItems = []; + + qualityFilterItems.push( + + Sequence Length (bases) + + + + + + + + + + ); + + qualityFilterItems.push( + + % Ambiguous (% N) + + + + + + + + + + ); + + return ( + + {' '} + + Sequence Quality + + + {qualityFilterItems} + + ); + } +); + +QualitySelect.propTypes = { + sequenceLengthRange: PropTypes.arrayOf(PropTypes.number), + percentAmbiguousRange: PropTypes.arrayOf(PropTypes.number), + updateQualityFilters: PropTypes.func, +}; + +export default QualitySelect; diff --git a/src/components/Selection/QualitySelect.styles.js b/src/components/Selection/QualitySelect.styles.js new file mode 100644 index 000000000..76d515ab0 --- /dev/null +++ b/src/components/Selection/QualitySelect.styles.js @@ -0,0 +1,43 @@ +import styled from 'styled-components'; + +export const QualitySelectContainer = styled.div` + display: flex; + flex-direction: column; + + padding-left: 15px; + margin-bottom: 10px; + + span.title { + font-weight: 500; + font-size: 1rem; + margin-bottom: 5px; + } +`; + +export const FormRow = styled.div` + display: flex; + flex-direction: row; + align-items: flex-end; +`; + +export const TitleColumn = styled.div` + display: flex; + flex-direction: column; + align-items: stretch; + + margin-right: 10px; + font-weight: 500; +`; + +export const FormColumn = styled.div` + display: flex; + flex-direction: column; + align-items: stretch; + max-width: 6em; + + margin-right: 10px; + + input { + font-family: inherit; + } +`; diff --git a/src/components/Sidebar/StatusBox.js b/src/components/Sidebar/StatusBox.js index 57c709e8e..fa9f53e79 100644 --- a/src/components/Sidebar/StatusBox.js +++ b/src/components/Sidebar/StatusBox.js @@ -1,6 +1,7 @@ import React from 'react'; import { observer } from 'mobx-react'; import { useStores } from '../../stores/connect'; +import { config } from '../../config'; import { DNA_OR_AA, @@ -31,6 +32,93 @@ const serializeAACoordinates = (coordinateRanges) => { const StatusBox = observer(() => { const { configStore, dataStore, UIStore } = useStores(); + let qualityFilters = ''; + const qualityFields = ['length', 'percent_ambiguous']; + const qualityFieldKeys = { + length: 'sequenceLengthRange', + percent_ambiguous: 'percentAmbiguousRange', + }; + const qualityFieldNames = { + length: 'Sequence Length', + percent_ambiguous: '% Ambiguous (N) Bases', + }; + // Only render this if we have the quality filters available + if (config['virus'] === 'sars2') { + const qualityFilterItems = []; + qualityFields.forEach((field) => { + const rng = configStore[qualityFieldKeys[field]]; + let suffix = ' bases'; + if (field === 'percent_ambiguous') { + suffix = '%'; + } + // If first value of the range undefined, then only maximum is set + if (rng[0] === null) { + qualityFilterItems.push( + + {qualityFieldNames[field]} ≤{' '} + + {rng[1]} + {suffix} + + + ); + } + // If second value of the range undefined, then only minimum is set + else if (rng[1] === null) { + qualityFilterItems.push( + + {qualityFieldNames[field]} ≥{' '} + + {rng[0]} + {suffix} + + + ); + } + // If both values of the range are defined, then both minimum and maximum are set + else { + qualityFilterItems.push( + + {qualityFieldNames[field]}:{' '} + + {rng[0]} + {suffix} + {' '} + –{' '} + + {rng[1]} + {suffix} + + + ); + } + }); + qualityFilters = <>{qualityFilterItems}; + } + + const selectedGroupFields = []; + Object.keys(configStore.selectedGroupFields).forEach((groupKey) => { + if (configStore.selectedGroupFields[groupKey].length === 0) { + return; + } + + const selectedGroupFieldItems = []; + configStore.selectedGroupFields[groupKey].forEach((group, i) => { + selectedGroupFieldItems.push( + {group} + ); + if (i < configStore.selectedGroupFields[groupKey].length - 1) { + selectedGroupFieldItems.push(','); + } + }); + + selectedGroupFields.push( + + Selected {groupKey}s: {selectedGroupFieldItems} + + ); + }); + let genomeSelection = ''; const residuesOrBases = configStore.dnaOrAa === DNA_OR_AA.DNA ? 'Bases' : 'Residues'; @@ -92,29 +180,6 @@ const StatusBox = observer(() => { ); } - const selectedGroupFields = []; - Object.keys(configStore.selectedGroupFields).forEach((groupKey) => { - if (configStore.selectedGroupFields[groupKey].length === 0) { - return; - } - - const selectedGroupFieldItems = []; - configStore.selectedGroupFields[groupKey].forEach((group, i) => { - selectedGroupFieldItems.push( - {group} - ); - if (i < configStore.selectedGroupFields[groupKey].length - 1) { - selectedGroupFieldItems.push(','); - } - }); - - selectedGroupFields.push( - - Selected {groupKey}s: {selectedGroupFieldItems} - - ); - }); - let selectedGroups = None; if (configStore.selectedGroups.length > 0) { if (configStore.groupKey === GROUP_MUTATION) { @@ -166,6 +231,7 @@ const StatusBox = observer(() => { Reference genome: {configStore.selectedReference} ( {getReferences()[configStore.selectedReference]['description']}). + {qualityFilters} {configStore.selectedLocationNodes.length} selected locations:{' '} diff --git a/src/constants/initialValues.js b/src/constants/initialValues.js index 6ced0cce2..decfea916 100644 --- a/src/constants/initialValues.js +++ b/src/constants/initialValues.js @@ -66,6 +66,9 @@ if (config['virus'] === 'sars2') { selectedMetadataFields: {}, ageRange: [null, null], + sequenceLengthRange: [29000, null], + percentAmbiguousRange: [null, 5], + // Location tab hoverLocation: null, focusedLocations: [], diff --git a/src/stores/configStore.js b/src/stores/configStore.js index d80e9afb3..34f8fb995 100644 --- a/src/stores/configStore.js +++ b/src/stores/configStore.js @@ -56,15 +56,23 @@ export class ConfigStore { @observable selectedMetadataFields = {}; @observable ageRange = []; + @observable sequenceLengthRange = [null, null]; + @observable percentAmbiguousRange = [null, null]; + @observable hoverLocation = null; @observable focusedLocations = []; constructor() {} init() { + // Set initial values this.initialValues = initialConfigStore; Object.keys(this.initialValues).forEach((key) => { + // Ignore fields that aren't defined in the initial values + if (!Object.prototype.hasOwnProperty.call(this.initialValues, key)) { + return; + } this[key] = this.initialValues[key]; }); } @@ -178,6 +186,15 @@ export class ConfigStore { urlParams.set(field, coordsToText(pending[field])); } else if (field === 'residueCoordinates') { urlParams.set(field, residueCoordsToText(pending[field])); + } else if ( + field === 'sequenceLengthRange' || + field === 'percentAmbiguousRange' + ) { + // Store ranged values, like sequence length and percent ambiguous + urlParams.set( + field, + pending[field].map((x) => (x === null ? '' : x.toString())).join(',') + ); } else { urlParams.set(field, String(pending[field])); } diff --git a/src/stores/dataStore.js b/src/stores/dataStore.js index 8f8f221ca..c1fa2d2dc 100644 --- a/src/stores/dataStore.js +++ b/src/stores/dataStore.js @@ -87,6 +87,10 @@ export class DataStore { end_date: toJS(rootStoreInstance.configStore.endDate), subm_start_date: toJS(rootStoreInstance.configStore.submStartDate), subm_end_date: toJS(rootStoreInstance.configStore.submEndDate), + sequence_length: toJS(rootStoreInstance.configStore.sequenceLengthRange), + percent_ambiguous: toJS( + rootStoreInstance.configStore.percentAmbiguousRange + ), }; fetch(hostname + '/data', { diff --git a/src/stores/urlMonitor.js b/src/stores/urlMonitor.js index 4766ec090..826787ca5 100644 --- a/src/stores/urlMonitor.js +++ b/src/stores/urlMonitor.js @@ -118,6 +118,18 @@ export class URLMonitor { if (primer !== undefined && primer !== null) this.pendingChanges.configStore[key].push(primer); }); + } else if ( + key === 'sequenceLengthRange' || + key === 'percentAmbiguousRange' + ) { + // Parse ranges + value = value + .split(',') + .map((x) => (x === '' ? null : parseFloat(x))); + if (key === 'sequenceLengthRange') { + value = value.map((x) => (x === null ? null : Math.round(x))); + } + this.pendingChanges.configStore[key] = value; } else { this.pendingChanges.configStore[key] = value; } diff --git a/src/utils/version.js b/src/utils/version.js index 70e4870c4..1df3c44bb 100644 --- a/src/utils/version.js +++ b/src/utils/version.js @@ -1 +1 @@ -export const version = '2.7.5-pango'; +export const version = '2.7.6-qual-rc1'; diff --git a/workflow_main/Snakefile b/workflow_main/Snakefile index 9dca62c80..52b41ddca 100644 --- a/workflow_main/Snakefile +++ b/workflow_main/Snakefile @@ -246,11 +246,12 @@ rule preprocess_sequences: shell: """ if [[ "{params.virus}" == "sars2" ]]; then - python3 scripts/preprocess_sars2_sequences.py \ - --input {input.fasta} \ - --nextstrain-exclusion-file {params.nextstrain_exclude} \ - --exclude-list {params.exclude_list} \ - --output {output.fasta} + #python3 scripts/preprocess_sars2_sequences.py \ + # --input {input.fasta} \ + # --nextstrain-exclusion-file {params.nextstrain_exclude} \ + # --exclude-list {params.exclude_list} \ + # --output {output.fasta} + cp {input.fasta} {output.fasta} elif [[ "{params.virus}" == "rsv" ]]; then python3 scripts/preprocess_rsv_sequences.py \ --input {input.fasta} \ diff --git a/workflow_main/scripts/build_full_dataframe.py b/workflow_main/scripts/build_full_dataframe.py index 7e1917fd0..4d5274fbf 100644 --- a/workflow_main/scripts/build_full_dataframe.py +++ b/workflow_main/scripts/build_full_dataframe.py @@ -7,7 +7,6 @@ def main(): - parser = argparse.ArgumentParser() parser.add_argument( diff --git a/workflow_main/scripts/collapse_to_isolate.py b/workflow_main/scripts/collapse_to_isolate.py index 81f4ae957..e1d2686c3 100755 --- a/workflow_main/scripts/collapse_to_isolate.py +++ b/workflow_main/scripts/collapse_to_isolate.py @@ -77,6 +77,12 @@ def main(): for col in args.group_cols + args.metadata_cols: column_aggs[col] = (col, "first") + # Sequence quality data + if "length" in sequence_df.columns: + column_aggs["length"] = ("length", "first") + if "percent_ambiguous" in sequence_df.columns: + column_aggs["percent_ambiguous"] = ("percent_ambiguous", "first") + isolate_df = sequence_df.groupby(["isolate_id", "reference"], as_index=False).agg( **column_aggs ) diff --git a/workflow_main/scripts/combine_all_data.py b/workflow_main/scripts/combine_all_data.py index 044fae963..46015ef21 100644 --- a/workflow_main/scripts/combine_all_data.py +++ b/workflow_main/scripts/combine_all_data.py @@ -22,10 +22,16 @@ def main(): parser.add_argument("--metadata", type=str, required=True, help="Metadata file") parser.add_argument( - "--manifest", type=str, required=True, help="Path to manifest CSV file", + "--manifest", + type=str, + required=True, + help="Path to manifest CSV file", ) parser.add_argument( - "--dna-mutation-dir", type=str, required=True, help="DNA mutation directory", + "--dna-mutation-dir", + type=str, + required=True, + help="DNA mutation directory", ) parser.add_argument( "--gene-aa-mutation-dir", @@ -147,7 +153,10 @@ def main(): ) # Join coverage data to main dataframe - coverage_dna = pd.read_csv(args.dna_coverage, dtype={"segment": str},) + coverage_dna = pd.read_csv( + args.dna_coverage, + dtype={"segment": str}, + ) # coverage_dna["range"] = coverage_dna[["start", "end"]].apply( # lambda x: tuple(x), axis=1 # ) @@ -217,6 +226,20 @@ def main(): .rename(columns={"range": "protein_aa_range"}) ) + # Some sequences won't have feature (gene, protein) coverage + # Fill these with empty lists + missing_gene_coverage_inds = df.loc[df["gene_aa_range"].isna()].index.values + if len(missing_gene_coverage_inds) > 0: + df.loc[missing_gene_coverage_inds, "gene_aa_range"] = df.loc[ + missing_gene_coverage_inds, "gene_aa_range" + ].apply(lambda x: []) + + missing_protein_coverage_inds = df.loc[df["protein_aa_range"].isna()].index.values + if len(missing_protein_coverage_inds) > 0: + df.loc[missing_protein_coverage_inds, "protein_aa_range"] = df.loc[ + missing_protein_coverage_inds, "protein_aa_range" + ].apply(lambda x: []) + # Remove rows with null dna_range # These are failed alignments with no mutation information df.drop(df.index[df["dna_range"].isna()], inplace=True) @@ -231,6 +254,12 @@ def main(): df.loc[:, col] = factor metadata_maps[col] = pd.Series(labels).to_dict() + # Special processing for sequence quality columns + if "length" in df.columns: + df.loc[:, "length"] = df["length"].fillna(0).astype(int) + if "percent_ambiguous" in df.columns: + df.loc[:, "percent_ambiguous"] = df["percent_ambiguous"].fillna(0).astype(float) + # Special processing for locations - leave missing data as -1 for col in ["region", "country", "division", "location"]: missing_inds = df[col] == "-1" diff --git a/workflow_main/scripts/sequence_manifest.py b/workflow_main/scripts/sequence_manifest.py index 0ae40bf3a..50a87268f 100644 --- a/workflow_main/scripts/sequence_manifest.py +++ b/workflow_main/scripts/sequence_manifest.py @@ -48,6 +48,10 @@ def extract_ids(fasta_file): if not line and i < (len(lines) - 1): continue + # If we're on the last line, but the entry is empty, then skip + if not line and i == (len(lines) - 1) and not cur_seq: + continue + # If not the name of an entry, add this line to the current sequence # (some FASTA files will have multiple lines per sequence) if line[0] != ">": @@ -57,7 +61,12 @@ def extract_ids(fasta_file): if line[0] == ">" or i == (len(lines) - 1): # Avoid capturing the first one and pushing an empty sequence if cur_entry: - out.append((cur_entry, file_name,)) + out.append( + ( + cur_entry, + file_name, + ) + ) # Clear the entry and sequence cur_entry = line[1:] @@ -72,8 +81,7 @@ def extract_ids(fasta_file): def main(): - """Get all sequence-reference pairs - """ + """Get all sequence-reference pairs""" parser = argparse.ArgumentParser() @@ -84,7 +92,10 @@ def main(): help="Path to processed FASTA file directory", ) parser.add_argument( - "--reference", type=str, required=True, help="Path to reference JSON file", + "--reference", + type=str, + required=True, + help="Path to reference JSON file", ) parser.add_argument( "--out", type=str, required=True, help="Output manifest CSV file" diff --git a/workflow_sars2_genbank_ingest/Snakefile b/workflow_sars2_genbank_ingest/Snakefile index 1696d8f73..87523e7df 100644 --- a/workflow_sars2_genbank_ingest/Snakefile +++ b/workflow_sars2_genbank_ingest/Snakefile @@ -8,11 +8,6 @@ from pathlib import Path import pandas as pd -# Import scripts -from scripts.chunk_data import chunk_data -from scripts.clean_metadata import clean_metadata -from scripts.combine_lineages import combine_lineages - configfile: "../config/config_sars2_genbank.yaml" # Get today's date in ISO format (YYYY-MM-DD) @@ -24,17 +19,18 @@ data_folder = os.path.join("..", config["data_folder"]) rule all: input: # Download latest data feed, process sequences - os.path.join( + download_status = os.path.join( data_folder, "status", "download_" + today_str + ".done" ), - os.path.join( + copy_status = os.path.join( data_folder, "status", "merge_sequences_" + today_str + ".done" ), # Cleaned metadata, with lineage assignments - os.path.join(data_folder, "metadata.csv") + metadata = os.path.join(data_folder, "metadata.csv") min_date = pd.to_datetime(config.get('min_date', '2019-12-01')) -max_date = pd.to_datetime(datetime.date.today().isoformat()) +# max_date = pd.to_datetime(datetime.date.today().isoformat()) +max_date = pd.to_datetime('2021-03-01') chunks = [d for d in pd.period_range(start=min_date, end=max_date, freq=config.get('dl_chunk_period', 'W'))] DL_CHUNKS = [i for i in range(len(chunks))] @@ -85,16 +81,19 @@ checkpoint chunk_data: input: feed = rules.combine_download_chunks.output.feed output: - fasta = directory(os.path.join(data_folder, "fasta_temp")), metadata_dirty = os.path.join(data_folder, "metadata_dirty.csv") params: + fasta = os.path.join(data_folder, "fasta_temp"), chunk_size = config["chunk_size"] - run: - chunk_data( - input.feed, output.fasta, output.metadata_dirty, - chunk_size=params.chunk_size, - processes=workflow.cores - ) + shell: + """ + python3 scripts/chunk_data.py \ + --data-feed {input.feed} \ + --out-fasta {params.fasta} \ + --out-metadata {output.metadata_dirty} \ + --chunk-size {params.chunk_size} \ + --processes {workflow.cores} + """ def get_num_seqs(fasta_gz): @@ -117,7 +116,7 @@ def get_changed_chunks(wildcards): """ # Only run to trigger DAG re-evaluation - checkpoint_output = checkpoints.chunk_data.get(**wildcards) + #checkpoint_output = checkpoints.chunk_data.get(**wildcards) chunks, = glob_wildcards(os.path.join(data_folder, "fasta_temp", "{i}.fa.gz")) # Keep track of which chunks have changed @@ -145,11 +144,11 @@ def get_changed_chunks(wildcards): changed_chunks.append(chunk) # Return a list of fasta_temp files that have changed, so that they can be copied - # over to fasta_raw by the below `copy_changed_files` rule + # over to fasta_raw by the below `copy_changed_files` rule) return expand(os.path.join(data_folder, "fasta_temp", "{i}.fa.gz"), i=changed_chunks) -checkpoint copy_changed_files: +rule copy_changed_files: """Using the `get_changed_chunks` function, only copy fasta files which have changed from the purgatory `fasta_temp` folder to the `fasta_raw` folder. By copying over the files, it will flag to snakemake that they (and only they - not the others) will need to be @@ -173,23 +172,6 @@ checkpoint copy_changed_files: """ -def get_chunks(wildcards): - """Get all copied chunks from copy_changed_files, as input for the - lineage assignments. While some of these assignments will be wasted on - sequences that will be filtered out downstream by the `preprocess_sequences` - rule in `workflow_main`, I think it's still better to do this during the - ingestion as the lineage/clade assignments are ingestion-specific - """ - - # Only do this to trigger the DAG recalculation - checkpoint_output = checkpoints.copy_changed_files.get(**wildcards) - - return expand( - os.path.join(data_folder, "lineages", "{chunk}.csv"), - chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i - ) - - rule pangolin_lineages: """Assign a lineage to each sequence using pangolin """ @@ -199,23 +181,73 @@ rule pangolin_lineages: fasta = temp(os.path.join(data_folder, "lineages", "{chunk}.fa")), lineages = os.path.join(data_folder, "lineages", "{chunk}.csv") conda: "envs/pangolin.yaml" - threads: workflow.cores + params: + cores = int(workflow.cores / 10), + tempdir = os.getenv("PANGOLIN_TEMPDIR", "/var/tmp") + threads: workflow.cores / 10 shell: """ + pangolin --update + pangolin --update-data # Pangolin can only use un-gzipped fasta files gunzip -c {input.fasta} > {output.fasta} - pangolin --outfile {output.lineages} {output.fasta} -t {workflow.cores} # --analysis-mode fast + pangolin --outfile {output.lineages} {output.fasta} \ + --tempdir {params.tempdir} \ + -t {params.cores} # --analysis-mode fast """ rule combine_lineages: """Combine all lineage result chunks """ input: - lineages = get_chunks + lineages = expand( + os.path.join(data_folder, "lineages", "{chunk}.csv"), + chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i + ), + status = rules.all.input.copy_status + params: + chunk_glob = os.path.join(data_folder, "lineages", "*.csv") output: lineages = os.path.join(data_folder, "lineages.csv") - run: - combine_lineages(input.lineages, output.lineages) + shell: + """ + echo {input.lineages} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.lineages} + """ + + +rule sequence_quality: + """Collect statistics about sequence quality + """ + input: + fasta = os.path.join(data_folder, "fasta_raw", "{chunk}.fa.gz") + output: + quality = os.path.join(data_folder, "quality", "{chunk}.csv") + shell: + """ + python3 scripts/sequence_quality.py \ + --input {input.fasta} \ + --output {output.quality} + """ + +rule combine_quality: + """Combine all quality result chunks + """ + input: + quality = expand( + os.path.join(data_folder, "quality", "{chunk}.csv"), + chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i + ), + status = rules.all.input.copy_status + params: + chunk_glob = os.path.join(data_folder, "quality", "*.csv") + output: + quality = os.path.join(data_folder, "quality.csv") + shell: + """ + echo {input.quality} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.quality} + """ rule clean_metadata: @@ -223,8 +255,15 @@ rule clean_metadata: """ input: metadata_dirty = rules.chunk_data.output.metadata_dirty, - lineages = rules.combine_lineages.output.lineages + lineages = rules.combine_lineages.output.lineages, + quality = rules.combine_quality.output.quality output: metadata_clean = os.path.join(data_folder, "metadata.csv") - run: - clean_metadata(input.metadata_dirty, input.lineages, output.metadata_clean) + shell: + """ + python3 scripts/clean_metadata.py \ + --metadata-in {input.metadata_dirty} \ + --lineages {input.lineages} \ + --quality {input.quality} \ + --metadata-out {output.metadata_clean} + """ diff --git a/workflow_sars2_genbank_ingest/envs/pangolin.yaml b/workflow_sars2_genbank_ingest/envs/pangolin.yaml index 550f8705b..82044908d 100644 --- a/workflow_sars2_genbank_ingest/envs/pangolin.yaml +++ b/workflow_sars2_genbank_ingest/envs/pangolin.yaml @@ -1,4 +1,4 @@ -# MODIFIED FROM: https://github.com/cov-lineages/pangolin, v4.1.2 +# MODIFIED FROM: https://github.com/cov-lineages/pangolin, v4.3.1 name: pangolin channels: @@ -6,21 +6,17 @@ channels: - bioconda - defaults dependencies: - - biopython=1.74 - - minimap2>=2.16 - - pip=19.3.1 - - python=3.7 - - snakemake-minimal<=6.8.0 - - gofasta - - pysam==0.16.0.1 - - ucsc-fatovcf>=426 - - usher>=0.5.4 + - biopython>=1.74 + #- minimap2>=2.16 + - pip>=19.3.1 + - python>=3.7 + - snakemake-minimal=7.24.0 + #- gofasta + #- ucsc-fatovcf>=426 + #- usher>=0.5.4 - git-lfs - pip: - - pandas==1.0.1 - - git+https://github.com/cov-lineages/pangolin.git@v4.1.2 - - git+https://github.com/cov-lineages/pangoLEARN.git - - git+https://github.com/cov-lineages/scorpio.git - - git+https://github.com/cov-lineages/constellations.git - - git+https://github.com/cov-lineages/pango-designation.git - - git+https://github.com/cov-lineages/pangolin-data.git + - git+https://github.com/cov-lineages/pangolin.git@v4.3.1 + - git+https://github.com/cov-lineages/scorpio.git + - git+https://github.com/cov-lineages/constellations.git + - git+https://github.com/cov-lineages/pangolin-data.git diff --git a/workflow_sars2_genbank_ingest/scripts/chunk_data.py b/workflow_sars2_genbank_ingest/scripts/chunk_data.py index 9169c4217..4a22bb7c9 100644 --- a/workflow_sars2_genbank_ingest/scripts/chunk_data.py +++ b/workflow_sars2_genbank_ingest/scripts/chunk_data.py @@ -5,6 +5,7 @@ Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) """ +import argparse import csv import datetime import gzip @@ -27,7 +28,7 @@ def write_sequences_day(fasta_out_path, seqs): fp_out.write(">" + seq[0] + "\n" + seq[1] + "\n") -def chunk_data(data_feed, out_fasta, out_metadata, chunk_size=100000, processes=1): +def main(): """Split up the data feed's individual objects into metadata and fasta files. Chunk the fasta files so that every day we only reprocess the subset of fasta files that have changed. The smaller the chunk size, the more efficient the updates, but the more files on the filesystem. On a 48-core workstation with 128 GB RAM, aligning 200 sequences takes about 10 minutes, and this is more acceptable than having to align 1000 sequences, which takes ~1 hour. We end up with hundreds of files, but the filesystem seems to be handling it well. @@ -48,7 +49,35 @@ def chunk_data(data_feed, out_fasta, out_metadata, chunk_size=100000, processes= ------- None """ - output_path = Path(out_fasta) + + parser = argparse.ArgumentParser() + parser.add_argument( + "--data-feed", type=str, required=True, help="Path to data feed CSV file" + ) + parser.add_argument( + "--out-fasta", type=str, required=True, help="Path to output fasta directory" + ) + parser.add_argument( + "--out-metadata", + type=str, + required=True, + help="Path to output metadata CSV file", + ) + parser.add_argument( + "--chunk-size", + type=int, + default=100000, + help="Number of records to hold in RAM before flushing to disk", + ) + parser.add_argument( + "--processes", + type=int, + default=1, + help="Number of processes to spawn when writing to disk", + ) + args = parser.parse_args() + + output_path = Path(args.out_fasta) # Make the output directory, if it hasn't been made yet # Snakemake won't make the directory itself, since it's a special @@ -69,7 +98,7 @@ def chunk_data(data_feed, out_fasta, out_metadata, chunk_size=100000, processes= metadata_df = [] def flush_chunk(fasta_by_subm_date): - with mp.get_context("spawn").Pool(processes=processes) as pool: + with mp.get_context("spawn").Pool(processes=args.processes) as pool: for date, seqs in fasta_by_subm_date.items(): # Open the output fasta file for this date chunk fasta_out_path = str(output_path / f"1_SARS-CoV-2_{date}.fa.gz") @@ -78,7 +107,7 @@ def flush_chunk(fasta_by_subm_date): pool.close() pool.join() - with open(data_feed, "r", newline="") as fp_in: + with open(args.data_feed, "r", newline="") as fp_in: # Open up the initial fasta file for the first chunk fasta_by_subm_date = defaultdict(list) @@ -86,9 +115,8 @@ def flush_chunk(fasta_by_subm_date): feed_reader = csv.DictReader(fp_in, delimiter=",", quotechar='"') for row in feed_reader: - # Flush results if chunk is full - if chunk_i == chunk_size: + if chunk_i == args.chunk_size: flush_chunk(fasta_by_subm_date) # Reset chunk counter chunk_i = 0 @@ -122,4 +150,8 @@ def flush_chunk(fasta_by_subm_date): # that I didn't want to implement manually (such as wrapping certain strings # in double quotes) metadata_df = pd.DataFrame(metadata_df) - metadata_df.to_csv(out_metadata, index=False) + metadata_df.to_csv(args.out_metadata, index=False) + + +if __name__ == "__main__": + main() diff --git a/workflow_sars2_genbank_ingest/scripts/clean_metadata.py b/workflow_sars2_genbank_ingest/scripts/clean_metadata.py index d00a7568b..f927f8cb6 100644 --- a/workflow_sars2_genbank_ingest/scripts/clean_metadata.py +++ b/workflow_sars2_genbank_ingest/scripts/clean_metadata.py @@ -5,11 +5,12 @@ Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) """ +import argparse import datetime import pandas as pd -def clean_metadata(metadata_in, lineages_in, metadata_out): +def main(): """Clean metadata from GenBank Required columns: @@ -24,19 +25,42 @@ def clean_metadata(metadata_in, lineages_in, metadata_out): "country" "division" "location" + """ - Parameters - ---------- - metadata_in: str - lineages_in: str - metadata_out: str + parser = argparse.ArgumentParser() - Returns - ------- - None - """ + parser.add_argument( + "-i", + "--metadata-in", + type=str, + required=True, + help="Path to input metadata CSV file", + ) - df = pd.read_csv(metadata_in) + parser.add_argument( + "--lineages", + type=str, + required=True, + help="Path to lineages CSV file", + ) + parser.add_argument( + "--quality", + type=str, + required=True, + help="Path to quality CSV file", + ) + + parser.add_argument( + "-o", + "--metadata-out", + type=str, + required=True, + help="Path to output metadata CSV file", + ) + + args = parser.parse_args() + + df = pd.read_csv(args.metadata_in) # Fields: # genbank_accession, @@ -139,7 +163,7 @@ def parse_genbank_location(s): df.loc[:, col] = df[col].fillna("Unknown") # Load lineages and join to dataframe - lineages_df = pd.read_csv(lineages_in) + lineages_df = pd.read_csv(args.lineages) lineages_df = lineages_df.rename(columns={"taxon": "Accession ID"}).set_index( "Accession ID" ) @@ -148,8 +172,23 @@ def parse_genbank_location(s): df.loc[:, "lineage"] = df["lineage"].fillna("None") # Isolate ID = same as Accession ID - df["isolate_id"] = df.index.values + df["isolate_id"] = df.index.values # Segment = 1 df["segment"] = 1 - df.to_csv(metadata_out) + # Load quality and join to dataframe + quality_df = pd.read_csv(args.quality, index_col="Accession ID") + df = df.join(quality_df, how="left") + df["length"] = df["length"].fillna(0).astype(int) + # Calculate percent ambiguous, drop the num_ambiguous column + df["num_ambiguous"] = ((df["num_ambiguous"] / df["length"]) * 100).fillna(0) + df.rename(columns={"num_ambiguous": "percent_ambiguous"}, inplace=True) + + # Filter out entries without any sequence + df = df.loc[df["length"] > 0] + + df.to_csv(args.metadata_out) + + +if __name__ == "__main__": + main() diff --git a/workflow_sars2_genbank_ingest/scripts/combine_lineages.py b/workflow_sars2_genbank_ingest/scripts/combine_lineages.py deleted file mode 100644 index d19b720f6..000000000 --- a/workflow_sars2_genbank_ingest/scripts/combine_lineages.py +++ /dev/null @@ -1,37 +0,0 @@ -# coding: utf-8 - -"""Combine all lineage assignments into one file - -Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) -""" - -import io -import pandas as pd - -from pathlib import Path - - -def combine_lineages(lineages, lineage_out): - # Dump all mutation chunks into a text buffer - df_io = io.StringIO() - for i, chunk in enumerate(lineages): - - # For some reason, snakemake likes to pass folders in - # just skip these - if not Path(chunk).is_file(): - continue - - with open(chunk, "r") as fp_in: - for j, line in enumerate(fp_in): - # Write the header of the first file - # Or write any line that's not the header - # (to avoid writing the header more than once) - if (i == 0 and j == 0) or j > 0: - df_io.write(line) - - # Read the buffer into a dataframe, then discard the buffer - df_io.seek(0) - df = pd.read_csv(df_io) - df_io.close() - - df.to_csv(lineage_out, index=False) diff --git a/workflow_sars2_genbank_ingest/scripts/sequence_quality.py b/workflow_sars2_genbank_ingest/scripts/sequence_quality.py new file mode 100644 index 000000000..6f6d1b2bf --- /dev/null +++ b/workflow_sars2_genbank_ingest/scripts/sequence_quality.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +"""Collect statistics on sequence quality + +Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) +""" + +import argparse +import gzip +import re + + +def main(): + """Collect statistics on sequence quality""" + + parser = argparse.ArgumentParser() + + parser.add_argument("--input", type=str, required=True, help="Input FASTA file") + parser.add_argument( + "--output", + type=str, + required=True, + help="Output CSV file", + ) + + args = parser.parse_args() + + fp_in = gzip.open(args.input, "rt") + fp_out = open(args.output, "wt") + + fp_out.write("Accession ID,length,num_ambiguous\n") + + num_sequences = 0 + cur_entry = "" + cur_seq = "" + while True: + line = fp_in.readline() + + # Beginning of a new entry, or EOF = end of current entry + if not line or line[0] == ">": + if cur_entry: + num_ambiguous = 0 + for char in cur_seq: + if char == "N": + num_ambiguous += 1 + + fp_out.write(f"{cur_entry},{str(len(cur_seq))},{str(num_ambiguous)}\n") + num_sequences += 1 + + # If it's the end, then break out + if not line: + break + + # Reset sequence and name + cur_seq = "" + # Extract the name (up to the first whitespace) + # [1:] excludes the first '>' + # .split() breaks up the line into chunks separated by whitespace + # [0] gets the first chunk + # cur_entry = line[1:].split()[0] + # Nevermind, the fasta entries sometimes have spaces..... + # Just rstrip to remove the newline, that should work good enough + cur_entry = line[1:].rstrip() + + # Otherwise add sequence to the current entry + elif cur_entry: + cur_seq += re.sub(r"\s+", "", line).strip() + + fp_in.close() + fp_out.close() + + print("Processed {:,} sequences".format(num_sequences), flush=True) + + +if __name__ == "__main__": + main() diff --git a/workflow_sars2_gisaid_ingest/Snakefile b/workflow_sars2_gisaid_ingest/Snakefile index c0a6df5b6..5dd55ee49 100644 --- a/workflow_sars2_gisaid_ingest/Snakefile +++ b/workflow_sars2_gisaid_ingest/Snakefile @@ -70,12 +70,19 @@ rule pangolin_lineages: fasta = temp(os.path.join(data_folder, "lineages", "{chunk}.fa")), lineages = os.path.join(data_folder, "lineages", "{chunk}.csv") conda: "envs/pangolin.yaml" - threads: workflow.cores + params: + cores = int(workflow.cores / 10), + tempdir = os.getenv("PANGOLIN_TEMPDIR", "/var/tmp") + threads: workflow.cores / 10 shell: """ + pangolin --update + pangolin --update-data # Pangolin can only use un-gzipped fasta files gunzip -c {input.fasta} > {output.fasta} - pangolin --outfile {output.lineages} {output.fasta} -t {workflow.cores} # --analysis-mode fast + pangolin --outfile {output.lineages} {output.fasta} \ + --tempdir {params.tempdir} \ + -t {params.cores} # --analysis-mode fast """ @@ -99,6 +106,40 @@ checkpoint combine_lineages: """ +rule sequence_quality: + """Collect statistics about sequence quality + """ + input: + fasta = os.path.join(data_folder, "fasta_raw", "{chunk}.fa.gz") + output: + quality = os.path.join(data_folder, "quality", "{chunk}.csv") + shell: + """ + python3 scripts/sequence_quality.py \ + --input {input.fasta} \ + --output {output.quality} + """ + +checkpoint combine_quality: + """Combine all quality result chunks + """ + input: + quality = expand( + os.path.join(data_folder, "quality", "{chunk}.csv"), + chunk=glob_wildcards(os.path.join(data_folder, "fasta_raw", "{i}.fa.gz")).i + ), + status = rules.all.input.copy_status + params: + chunk_glob = os.path.join(data_folder, "quality", "*.csv") + output: + quality = os.path.join(data_folder, "quality.csv") + shell: + """ + echo {input.quality} + awk '(NR == 1) || (FNR > 1)' {params.chunk_glob} > {output.quality} + """ + + rule clean_metadata: """Clean up metadata from GISAID """ @@ -107,7 +148,8 @@ rule clean_metadata: location_corrections = os.path.join( static_data_folder, "location_corrections.csv" ), - lineages = rules.combine_lineages.output.lineages + lineages = rules.combine_lineages.output.lineages, + quality = rules.combine_quality.output.quality output: metadata_clean = os.path.join(data_folder, "metadata.csv") shell: @@ -116,5 +158,6 @@ rule clean_metadata: --metadata-in {input.metadata_dirty} \ --location-corrections {input.location_corrections} \ --lineages {input.lineages} \ + --quality {input.quality} \ --metadata-out {output.metadata_clean} """ diff --git a/workflow_sars2_gisaid_ingest/envs/pangolin.yaml b/workflow_sars2_gisaid_ingest/envs/pangolin.yaml index a4e76c65a..2e3829abe 100644 --- a/workflow_sars2_gisaid_ingest/envs/pangolin.yaml +++ b/workflow_sars2_gisaid_ingest/envs/pangolin.yaml @@ -9,7 +9,7 @@ dependencies: - biopython=1.74 - minimap2>=2.16 - pip=19.3.1 - - python=3.7 + - python>=3.7 - snakemake-minimal<=7.24.0 - gofasta - ucsc-fatovcf>=426 diff --git a/workflow_sars2_gisaid_ingest/scripts/clean_metadata.py b/workflow_sars2_gisaid_ingest/scripts/clean_metadata.py index 08b0290a3..f2428c5a1 100755 --- a/workflow_sars2_gisaid_ingest/scripts/clean_metadata.py +++ b/workflow_sars2_gisaid_ingest/scripts/clean_metadata.py @@ -22,8 +22,7 @@ def clean_name_metadata(df): def clean_host_metadata(df): - """Clean host metadata - """ + """Clean host metadata""" # print("Cleaning host metadata", end="", flush=True) df["host"] = df["covv_host"].astype(str).str.strip() # In the future - collapse common + taxonomy names? @@ -33,8 +32,7 @@ def clean_host_metadata(df): def clean_gender_metadata(df): - """Clean patient gender metadata - """ + """Clean patient gender metadata""" # print("Cleaning patient gender metadata...", end="", flush=True) @@ -91,9 +89,9 @@ def clean_age_metadata(df): For each age value we want to define an age range that the value corresponds to. This is necessary since the metadata is provided in - various different specificities. + various different specificities. - i.e., exact age (72.343), year (42), or age range (20-30) + i.e., exact age (72.343), year (42), or age range (20-30) Define a range [start, end), for each age This ranges can then be filtered over in a way that includes as much data @@ -277,7 +275,6 @@ def clean_age_metadata(df): def clean_patient_status_metadata(df): - # print("Cleaning patient status metadata...", end="", flush=True) # Strip whitespace @@ -358,8 +355,7 @@ def clean_patient_status_metadata(df): def clean_passage_metadata(df): - """Clean cell passage metadata - """ + """Clean cell passage metadata""" print("Cleaning cell passage metadata...", end="", flush=True) @@ -414,6 +410,24 @@ def clean_passage_metadata(df): "origina", "Human", "direct sequencing", + "e.g. Original", + "Original isolate isolate", + "Original swab", + "Originalo", + "joriginal", + "originale", + "Original sample in syrian hamster_P2", + "Original sample in syrian hamster_P1", + "Originjal", + "New variant", + "Priginal", + "Criblage sauvage", + "Original.", + "Oriignal", + "originall", + "isolate", + "Oiriginal", + "Nasal swab", ] # "Vero": ["Vero cells"], # "Vero P1": [ @@ -512,7 +526,6 @@ def clean_passage_metadata(df): def clean_specimen_metadata(df): - # print("Cleaning specimen metadata...", end="", flush=True) # Basic cleanup @@ -795,8 +808,7 @@ def clean_specimen_metadata(df): def clean_date_metadata(df): - """Clean the collection and submission date metadata - """ + """Clean the collection and submission date metadata""" df.loc[:, "collection_date"] = df["covv_collection_date"].astype(str).str.strip() df.loc[:, "submission_date"] = df["covv_subm_date"].astype(str).str.strip() @@ -821,24 +833,23 @@ def clean_date_metadata(df): def clean_lineage_metadata(df): - df["lineage"] = df["covv_lineage"].astype(str).str.strip() + df["lineage"] = df["covv_lineage"].fillna("Unassigned").astype(str).str.strip() # Filter out "None" lineages - remove_seqs = (df["lineage"] == "None") | (df["lineage"] == "nan") - df = df.loc[~remove_seqs, :] - print("Removed {} sequences without a lineage assignment".format(remove_seqs.sum())) + # remove_seqs = (df["lineage"] == "None") | (df["lineage"] == "nan") + # df = df.loc[~remove_seqs, :] + # print("Removed {} sequences without a lineage assignment".format(remove_seqs.sum())) return df def clean_clade_metadata(df): - df["clade"] = df["covv_clade"].astype(str).str.strip() + df["clade"] = df["covv_clade"].fillna("Unassigned").astype(str).str.strip() return df def clean_seq_tech_metadata(df): - """Clean "Sequencing Technology" values - """ + """Clean "Sequencing Technology" values""" # print("Cleaning sequencing technology metadata...", end="", flush=True) @@ -946,8 +957,7 @@ def clean_seq_tech_metadata(df): def clean_assembly_metadata(df): - """Clean "Assembly Method" column - """ + """Clean "Assembly Method" column""" print("Cleaning assembly method metadata...", end="", flush=True) @@ -1054,7 +1064,6 @@ def clean_submitting_lab_metadata(df): def main(): - parser = argparse.ArgumentParser() parser.add_argument( @@ -1074,7 +1083,16 @@ def main(): ) parser.add_argument( - "--lineages", type=str, required=True, help="Path to lineages CSV file", + "--lineages", + type=str, + required=True, + help="Path to lineages CSV file", + ) + parser.add_argument( + "--quality", + type=str, + required=True, + help="Path to quality CSV file", ) parser.add_argument( @@ -1152,10 +1170,47 @@ def main(): "Accession ID" ) df = df.rename(columns={"lineage": "gisaid_lineage"}).join( - lineages_df[["lineage"]], how="left" + lineages_df[ + [ + "lineage", + "conflict", + "ambiguity_score", + "scorpio_call", + "scorpio_support", + "scorpio_conflict", + "scorpio_notes", + # "version", + # "pangolin_version", + # "scorpio_version", + # "constellation_version", + "is_designated", + "qc_status", + "qc_notes", + "note", + ] + ].rename( + columns={ + "is_designated": "pangolin_is_designated", + "qc_status": "pangolin_qc_status", + "qc_notes": "pangolin_qc_notes", + "note": "pangolin_note", + } + ), + how="left", ) # Fill in missing values with GISAID lineages - df.loc[:, "lineage"] = df["lineage"].combine_first(df['gisaid_lineage']) + df.loc[:, "lineage"] = df["lineage"].combine_first(df["gisaid_lineage"]) + + # Load quality and join to dataframe + quality_df = pd.read_csv(args.quality, index_col="Accession ID") + df = df.join(quality_df, how="left") + df["length"] = df["length"].fillna(0).astype(int) + # Calculate percent ambiguous, drop the num_ambiguous column + df["num_ambiguous"] = ((df["num_ambiguous"] / df["length"]) * 100).fillna(0) + df.rename(columns={"num_ambiguous": "percent_ambiguous"}, inplace=True) + + # Filter out entries without any sequence + df = df.loc[df["length"] > 0] df.to_csv(args.metadata_out) diff --git a/workflow_sars2_gisaid_ingest/scripts/sequence_quality.py b/workflow_sars2_gisaid_ingest/scripts/sequence_quality.py new file mode 100644 index 000000000..6f6d1b2bf --- /dev/null +++ b/workflow_sars2_gisaid_ingest/scripts/sequence_quality.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# coding: utf-8 + +"""Collect statistics on sequence quality + +Author: Albert Chen - Vector Engineering Team (chena@broadinstitute.org) +""" + +import argparse +import gzip +import re + + +def main(): + """Collect statistics on sequence quality""" + + parser = argparse.ArgumentParser() + + parser.add_argument("--input", type=str, required=True, help="Input FASTA file") + parser.add_argument( + "--output", + type=str, + required=True, + help="Output CSV file", + ) + + args = parser.parse_args() + + fp_in = gzip.open(args.input, "rt") + fp_out = open(args.output, "wt") + + fp_out.write("Accession ID,length,num_ambiguous\n") + + num_sequences = 0 + cur_entry = "" + cur_seq = "" + while True: + line = fp_in.readline() + + # Beginning of a new entry, or EOF = end of current entry + if not line or line[0] == ">": + if cur_entry: + num_ambiguous = 0 + for char in cur_seq: + if char == "N": + num_ambiguous += 1 + + fp_out.write(f"{cur_entry},{str(len(cur_seq))},{str(num_ambiguous)}\n") + num_sequences += 1 + + # If it's the end, then break out + if not line: + break + + # Reset sequence and name + cur_seq = "" + # Extract the name (up to the first whitespace) + # [1:] excludes the first '>' + # .split() breaks up the line into chunks separated by whitespace + # [0] gets the first chunk + # cur_entry = line[1:].split()[0] + # Nevermind, the fasta entries sometimes have spaces..... + # Just rstrip to remove the newline, that should work good enough + cur_entry = line[1:].rstrip() + + # Otherwise add sequence to the current entry + elif cur_entry: + cur_seq += re.sub(r"\s+", "", line).strip() + + fp_in.close() + fp_out.close() + + print("Processed {:,} sequences".format(num_sequences), flush=True) + + +if __name__ == "__main__": + main()