diff --git a/examples/configurations/dataset-store/prod_nexus_datasets.yml b/examples/configurations/dataset-store/prod_nexus_datasets.yml new file mode 100644 index 000000000..7b9a49473 --- /dev/null +++ b/examples/configurations/dataset-store/prod_nexus_datasets.yml @@ -0,0 +1,98 @@ +Model: + name: RdfModel + origin: store + source: BlueBrainNexus + context: + iri: "https://bbp.neuroshapes.org" + bucket: "neurosciencegraph/datamodels" +Store: + name: BlueBrainNexus + endpoint: https://bbp.epfl.ch/nexus/v1 + model: + name: RdfModel + searchendpoints: + sparql: + endpoint: "https://bluebrain.github.io/nexus/vocabulary/defaultSparqlIndex" + elastic: + endpoint: "https://bbp.epfl.ch/neurosciencegraph/data/views/aggreg-es/dataset" + mapping: "https://bbp.epfl.ch/neurosciencegraph/data/views/es/dataset" + default_str_keyword_field: "keyword" + vocabulary: + metadata: + iri: "https://bluebrain.github.io/nexus/contexts/metadata.json" + local_iri: "https://bluebrainnexus.io/contexts/metadata.json" + namespace: "https://bluebrain.github.io/nexus/vocabulary/" + deprecated_property: "https://bluebrain.github.io/nexus/vocabulary/deprecated" + project_property: "https://bluebrain.github.io/nexus/vocabulary/project" + max_connection: 50 + versioned_id_template: "{x.id}?rev={x._store_metadata._rev}" + file_resource_mapping: https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/configurations/nexus-store/file-to-resource-mapping.hjson + +Resolvers: + ontology: + - resolver: OntologyResolver + origin: store + source: BlueBrainNexus + targets: + - identifier: terms + bucket: neurosciencegraph/datamodels + searchendpoints: + sparql: + endpoint: "https://bluebrain.github.io/nexus/vocabulary/defaultSparqlIndex" + result_resource_mapping: https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/configurations/nexus-resolver/term-to-resource-mapping.hjson + agent: + - resolver: AgentResolver + origin: store + source: BlueBrainNexus + targets: + - identifier: agents + bucket: bbp/agents + searchendpoints: + sparql: + endpoint: "https://bluebrain.github.io/nexus/vocabulary/defaultSparqlIndex" + result_resource_mapping: https://raw.githubusercontent.com/BlueBrain/nexus-forge/master/examples/configurations/nexus-resolver/agent-to-resource-mapping.hjson + +Formatters: + identifier: https://bbp.epfl.ch/neurosciencegraph/data/{}/{} + identifier_neuroelectro: http://neuroelectro.org/api/1/{}/{} + identifier_neurolex: http://neurolex.org/wiki/{} + +Datasets: + UniProt: + origin: store + source: SPARQLStore + searchendpoints: + sparql: + endpoint: "https://sparql.uniprot.org/sparql" + model: + name: RdfModel + origin: directory + source: /Users/cgonzale/Documents/code/nexus-forge/examples/database-sources/UniProt + context: + iri: /Users/cgonzale/Documents/code/nexus-forge/examples/database-sources/UniProt/jsonld_context.json + NeuroElectro: + origin: store + source: BlueBrainNexus + bucket: bbp/neuroelectro + model: + name: RdfModel + origin: directory + source: /Users/cgonzale/Documents/code/nexus-forge/examples/database-sources/NeuroElectro + context: + iri: /Users/cgonzale/Documents/code/nexus-forge/examples/models/neuroshapes_context.json + NeuroMorpho: + origin: web_service + source: http://neuromorpho.org/api/neuron + health: http://neuromorpho.org/api/health + files_download: + endpoint: https://neuromorpho.org/dableFiles + Accept: "*/*" + max_connection: 50 + content_type: application/json;charset=UTF-8 + accept: "*/*" + model: + name: RdfModel + origin: directory + source: /Users/cgonzale/Documents/code/nexus-forge/examples/database-sources/Neuromorpho + context: + iri: /Users/cgonzale/Documents/code/nexus-forge/examples/database-sources/Neuromorpho/jsonld_context.json \ No newline at end of file diff --git a/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson b/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson index 6ff77840c..2d1918375 100644 --- a/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson +++ b/examples/mappings/NeuroMorpho/mappings/DictionaryMapping/NeuronMorphology.hjson @@ -4,7 +4,6 @@ NeuronMorphology Entity ] - id: forge.format('identifier', 'neuronmorphologies/neuromorpho', f'{x.bbpID}') brainLocation: { type: BrainLocation brainRegion: forge.resolve(x.brain_region[0], scope='ontology', strategy='EXACT_CASE_INSENSITIVE_MATCH') diff --git a/examples/notebooks/getting-started/17 - Database-sources.ipynb b/examples/notebooks/getting-started/17 - Database-sources.ipynb new file mode 100644 index 000000000..904376583 --- /dev/null +++ b/examples/notebooks/getting-started/17 - Database-sources.ipynb @@ -0,0 +1,1163 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Querying external database sources of interest\n", + "\n", + "* Enable users to integrate data from external databases of interest within BBP KG\n", + "* While using the Nexus Forge interface and BMO vocabulary as much as possible as\n", + "* While benefiting from out of the box (meta)data transformation to make them ready for BBP internal pipelines and applications\n", + "* Demo with Mouselight, NeuroElectro, UniProt" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": { + "ExecuteTime": { + "end_time": "2019-09-23T18:50:20.068658Z", + "start_time": "2019-09-23T18:50:19.054054Z" + } + }, + "outputs": [], + "source": [ + "import os\n", + "import uuid\n", + "import json\n", + "\n", + "from kgforge.core import KnowledgeGraphForge\n", + "from kgforge.specializations.resources import Dataset" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "# import getpass\n", + "# TOKEN = getpass.getpass()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "endpoint = \"https://staging.nise.bbp.epfl.ch/nexus/v1\"\n", + "BUCKET = \"bbp/atlas\"\n", + "forge = KnowledgeGraphForge(\"../../configurations/database-sources/prod-nexus-sources.yml\",\n", + " # endpoint=endpoint,\n", + " bucket=BUCKET, debug=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# List of Data sources" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Available Database sources:\n", + "UniProt\n", + "NeuroElectro\n", + "NeuroMorpho\n" + ] + } + ], + "source": [ + "forge.dataset_sources(pretty=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "sources = forge.dataset_sources()" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "data = {\n", + " 'origin': 'store',\n", + " 'source': 'DemoStore',\n", + " 'model': { \n", + " 'name': 'DemoModel',\n", + " 'origin': 'directory',\n", + " 'source': \"../../../tests/data/demo-model/\" \n", + " }\n", + "}\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "from kgforge.specializations.databases import StoreDatabase\n", + "ds = StoreDatabase(forge, name=\"DemoDB\", **data)" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "forge.add_dataset_source(ds)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Available Database sources:\n", + "UniProt\n", + "NeuroElectro\n", + "NeuroMorpho\n", + "DemoDB\n" + ] + } + ], + "source": [ + "forge.dataset_sources(pretty=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Data source metadata" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "neuroelectro = sources['NeuroElectro']" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Get data mappings (hold transformations logic) per data type\n", + "\n", + "* Data mappings are used to transform results obtained from the external data sources so that they are ready for consumption by BBP tools\n", + "* Perform automatic ontology linking" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Managed mappings for the data source per entity type and mapping type:\n", + " - ElectrophysiologicalFeatureAnnotation:\n", + " * DictionaryMapping\n", + " - ParameterAnnotation:\n", + " * DictionaryMapping\n", + " - ParameterBody:\n", + " * DictionaryMapping\n", + " - ScholarlyArticle:\n", + " * DictionaryMapping\n", + " - SeriesBody:\n", + " * DictionaryMapping\n" + ] + } + ], + "source": [ + "forge.mappings(source=\"NeuroElectro\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Managed mappings for the data source per entity type and mapping type:\n", + " - Gene:\n", + " * DictionaryMapping\n", + " - Protein:\n", + " * DictionaryMapping\n" + ] + } + ], + "source": [ + "forge.mappings('UniProt')" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "from kgforge.specializations.mappings import DictionaryMapping\n", + "mapping = forge.mapping(entity=\"ScholarlyArticle\", source=\"NeuroElectro\")" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " id: forge.format(\"identifier\", \"scholarlyarticles\", x.id)\n", + " type:\n", + " [\n", + " Entity\n", + " ScholarlyArticle\n", + " ]\n", + " abstract: x.abstract\n", + " author: x.authors_shaped\n", + " datePublished: x.date_issued\n", + " identifier: x.identifiers\n", + " isPartOf:\n", + " {\n", + " type: Periodical\n", + " issn: x.issn\n", + " name: x.journal\n", + " publisher: x.publisher\n", + " }\n", + " name: f\"article_{x.id}\"\n", + " sameAs: x.full_text_link\n", + " title: x.title\n", + " url: x.full_text_link\n", + "}\n" + ] + } + ], + "source": [ + "print(mapping)" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Available Database sources:\n", + "UniProt\n" + ] + } + ], + "source": [ + "forge.dataset_sources(type_='Gene', pretty=True)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Search and Access data from data source\n", + "\n", + "* Mapping are automatically applied to search results\n", + "* takes a mn for now => working on making it faster " + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Submitted query:\n", + " PREFIX bmc: \n", + " PREFIX bmo: \n", + " PREFIX commonshapes: \n", + " PREFIX datashapes: \n", + " PREFIX dc: \n", + " PREFIX dcat: \n", + " PREFIX dcterms: \n", + " PREFIX mba: \n", + " PREFIX nsg: \n", + " PREFIX nxv: \n", + " PREFIX oa: \n", + " PREFIX obo: \n", + " PREFIX owl: \n", + " PREFIX prov: \n", + " PREFIX rdf: \n", + " PREFIX rdfs: \n", + " PREFIX schema: \n", + " PREFIX sh: \n", + " PREFIX shsh: \n", + " PREFIX skos: \n", + " PREFIX vann: \n", + " PREFIX void: \n", + " PREFIX xml: \n", + " PREFIX xsd: \n", + " PREFIX : \n", + " SELECT ?id ?_constrainedBy ?_createdAt ?_createdBy ?_deprecated ?_incoming ?_outgoing ?_project ?_rev ?_schemaProject ?_self ?_updatedAt ?_updatedBy WHERE { Graph ?g {?id rdf:type schema:ScholarlyArticle;\n", + " ?_constrainedBy;\n", + " ?_createdAt;\n", + " ?_createdBy;\n", + " ?_deprecated;\n", + " ?_incoming;\n", + " ?_outgoing;\n", + " ?_project;\n", + " ?_rev;\n", + " ?_schemaProject;\n", + " ?_self;\n", + " ?_updatedAt;\n", + " ?_updatedBy . \n", + " Filter (?_deprecated = 'false'^^xsd:boolean)\n", + " Filter (?_project = )}} LIMIT 2\n", + "\n" + ] + } + ], + "source": [ + "filters = {\"type\":\"ScholarlyArticle\"}\n", + "#map=True, use_cache=True, # download=True\n", + "resources = forge.search(filters, dataset_source=\"NeuroElectro\", limit=2, debug=True) \n", + "# Add function for checking datsource health => reqsuire health url from db\n" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "2" + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(resources)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " context: https://bbp.neuroshapes.org\n", + " id: https://bbp.epfl.ch/neurosciencegraph/data/scholarlyarticles/35177\n", + " type:\n", + " [\n", + " Entity\n", + " ScholarlyArticle\n", + " ]\n", + " abstract: On the one hand, neuronal activity can cause changes in pH; on the other hand, changes in pH can modulate neuronal activity. Consequently, the pH of the brain is regulated at various levels. Here we show that steady-state pH and acid extrusion were diminished in cultured hippocampal neurons of mice with a targeted disruption of the Na(+)-driven Cl(-)/HCO(3)(-) exchanger Slc4a8. Because Slc4a8 was found to predominantly localize to presynaptic nerve endings, we hypothesize that Slc4a8 is a key regulator of presynaptic pH. Supporting this hypothesis, spontaneous glutamate release in the CA1 pyramidal layer was reduced but could be rescued by increasing the intracellular pH. The reduced excitability in vitro correlated with an increased seizure threshold in vivo. Together with the altered kinetics of stimulated synaptic vesicle release, these data suggest that Slc4a8 modulates glutamate release in a pH-dependent manner.\n", + " author:\n", + " [\n", + " {\n", + " type: Person\n", + " familyName: Sinning\n", + " givenName: Anne\n", + " }\n", + " {\n", + " type: Person\n", + " familyName: Liebmann\n", + " givenName: Lutz\n", + " }\n", + " {\n", + " type: Person\n", + " familyName: Kougioumtzes\n", + " givenName: Alexandra\n", + " }\n", + " {\n", + " type: Person\n", + " familyName: Westermann\n", + " givenName: Martin\n", + " }\n", + " {\n", + " type: Person\n", + " familyName: Bruehl\n", + " givenName: Claus\n", + " }\n", + " {\n", + " type: Person\n", + " familyName: Hübner\n", + " givenName: Christian A\n", + " }\n", + " ]\n", + " datePublished: 2011-5-18\n", + " identifier:\n", + " [\n", + " {\n", + " propertyID: PMID\n", + " value: 21593314\n", + " }\n", + " {\n", + " propertyID: doi\n", + " value: 10.1523/JNEUROSCI.0269-11.2011\n", + " }\n", + " ]\n", + " isPartOf:\n", + " {\n", + " type: Periodical\n", + " issn: 0270-6474\n", + " name: The Journal of neuroscience : the official journal of the Society for Neuroscience\n", + " publisher: Society for Neuroscience\n", + " }\n", + " name: article_35177\n", + " sameAs: http://www.jneurosci.org/content/31/20/7300.long\n", + " title: Synaptic glutamate release is modulated by the Na+ -driven Cl-/HCO₃⁻ exchanger Slc4a8.\n", + " url: http://www.jneurosci.org/content/31/20/7300.long\n", + "}\n" + ] + } + ], + "source": [ + "print(resources[0])" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [], + "source": [ + "uquery = \"\"\"\n", + "PREFIX up: \n", + "SELECT ?protein\n", + "WHERE {\n", + " ?protein a up:Protein ;\n", + " up:reviewed true.\n", + "}\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Submitted query:\n", + " \n", + " PREFIX up: \n", + " SELECT ?protein\n", + " WHERE {\n", + " ?protein a up:Protein ;\n", + " up:reviewed true.\n", + " }\n", + " LIMIT 10\n", + "\n" + ] + } + ], + "source": [ + "uresources = forge.sparql(query=uquery, dataset_source='UniProt', limit=10, debug=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "10" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "len(uresources)" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Resource(_last_action=None, _validated=False, _synchronized=False, _store_metadata=None, _inner_sync=False, protein='http://purl.uniprot.org/uniprot/A0B137')" + ] + }, + "execution_count": 22, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "uresources[0]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Use Filters to search" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [], + "source": [ + "from kgforge.core.wrappings.paths import Filter, FilterOperator" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Submitted query:\n", + " PREFIX up: \n", + " PREFIX owl: \n", + " PREFIX owl2xml: \n", + " PREFIX swrlb: \n", + " PREFIX protege: \n", + " PREFIX swrl: \n", + " PREFIX xsd: \n", + " PREFIX skos: \n", + " PREFIX rdfs: \n", + " PREFIX dc11: \n", + " PREFIX rdf: \n", + " PREFIX foaf: \n", + " SELECT ?id WHERE {?id rdf:type up:Protein;\n", + " up:reviewed ?v1 . \n", + " FILTER(?v1 = 'true'^^xsd:boolean)\n", + " } LIMIT 10\n", + "\n" + ] + } + ], + "source": [ + "\n", + "proteins = forge.search({'type': 'Protein', 'up:reviewed': True}, dataset_source='UniProt', limit=10, debug=True)" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Resource(_last_action=None, _validated=False, _synchronized=False, _store_metadata=None, id='http://purl.uniprot.org/uniprot/A0B137', _inner_sync=False)" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "proteins[0]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Map resources" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [], + "source": [ + "uniprot = sources['UniProt']" + ] + }, + { + "cell_type": "code", + "execution_count": 27, + "metadata": {}, + "outputs": [], + "source": [ + "complete_query = \"\"\"\n", + "PREFIX up: \n", + "SELECT ?id ?gene ?label ?subject ?gene_label\n", + "WHERE {\n", + " ?id a up:Protein ;\n", + " up:reviewed true ;\n", + " up:encodedBy ?gene ;\n", + " up:recommendedName / up:fullName ?label ;\n", + " up:organism / up:scientificName ?subject .\n", + " ?gene skos:prefLabel ?gene_label . \n", + "}\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "metadata": {}, + "outputs": [], + "source": [ + "raw_proteins = uniprot.sparql(complete_query)" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [], + "source": [ + "new_resource = uniprot.map(raw_proteins[0], 'Protein')" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[\n", + " {\n", + " \"@context\": \"https://bbp.neuroshapes.org\",\n", + " \"@id\": \"https://bbp.epfl.ch/neurosciencegraph/data/proteins/P0DJN9\",\n", + " \"@type\": [\n", + " \"Entity\",\n", + " \"Protein\"\n", + " ],\n", + " \"encodedBy\": {\n", + " \"@id\": \"http://purl.uniprot.org/uniprot/P0DJN9#gene-MD5A00DD99270221B359AB0AE338E423668\",\n", + " \"label\": \"acsF\"\n", + " },\n", + " \"identifier\": {\n", + " \"propertyID\": \"UniProtKB\",\n", + " \"value\": \"P0DJN9\"\n", + " },\n", + " \"name\": \"Protein P0DJN9 from UniProtKB\",\n", + " \"label\": \"Aerobic magnesium-protoporphyrin IX monomethyl ester [oxidative] cyclase\",\n", + " \"subject\": {\n", + " \"label\": \"Rubrivivax gelatinosus\"\n", + " }\n", + " }\n", + "]\n" + ] + } + ], + "source": [ + "\n", + "print(json.dumps(forge.as_jsonld(new_resource), indent=4))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### same result could be obtain from a dictionary and a DictionaryMapping instance" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[\n", + " {\n", + " \"@context\": \"https://bbp.neuroshapes.org\",\n", + " \"@id\": \"https://bbp.epfl.ch/neurosciencegraph/data/proteins/P0DJN9\",\n", + " \"@type\": [\n", + " \"Entity\",\n", + " \"Protein\"\n", + " ],\n", + " \"encodedBy\": {\n", + " \"@id\": \"http://purl.uniprot.org/uniprot/P0DJN9#gene-MD5A00DD99270221B359AB0AE338E423668\",\n", + " \"label\": \"acsF\"\n", + " },\n", + " \"identifier\": {\n", + " \"propertyID\": \"UniProtKB\",\n", + " \"value\": \"P0DJN9\"\n", + " },\n", + " \"name\": \"Protein P0DJN9 from UniProtKB\",\n", + " \"label\": \"Aerobic magnesium-protoporphyrin IX monomethyl ester [oxidative] cyclase\",\n", + " \"subject\": {\n", + " \"label\": \"Rubrivivax gelatinosus\"\n", + " }\n", + " }\n", + "]\n" + ] + } + ], + "source": [ + "dict_resource = forge.as_json(raw_proteins[0])\n", + "mapping = DictionaryMapping.load(\"../../database-sources/UniProt/mappings/DictionaryMapping/Protein.hjson\")\n", + "print(json.dumps(forge.as_jsonld(uniprot.map(dict_resource, mapping)), indent=4))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Query the NeuroMorpho WebService" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [], + "source": [ + "neuromorpho = sources['NeuroMorpho']" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [], + "source": [ + "nmo_filters = {\"species\": \"rat,mouse,human\", \"response_loc\": [\"_embedded\", \"neuronResources\"]}" + ] + }, + { + "cell_type": "code", + "execution_count": 56, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/miniconda3/envs/kgforge/lib/python3.7/site-packages/urllib3/connectionpool.py:1052: InsecureRequestWarning: Unverified HTTPS request is being made to host 'neuromorpho.org'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings\n", + " InsecureRequestWarning,\n" + ] + } + ], + "source": [ + "nmo_resources = forge.search(nmo_filters, dataset_source='NeuroMorpho', size=3, searchendpoint='select_query', q=\"species:mouse\")" + ] + }, + { + "cell_type": "code", + "execution_count": 50, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "Resource(_last_action=None, _validated=False, _synchronized=False, _store_metadata=None, id='http://purl.obolibrary.org/obo/UBERON_0002301', type='Class', label='Neocortical Layer', _inner_sync=False, prefLabel='Neocortical Layer', subClassOf='bmo:BrainLayer')" + ] + }, + "execution_count": 50, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "forge.resolve(nmo_resources[0].brain_region[0], scope='ontology', strategy='BEST_MATCH')" + ] + }, + { + "cell_type": "code", + "execution_count": 51, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['neocortex', 'occipital', 'layer 6']" + ] + }, + "execution_count": 51, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "nmo_resources[0].brain_region" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Format date" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "metadata": {}, + "outputs": [], + "source": [ + "for resource in nmo_resources:\n", + " resource.bbpID = str(uuid.uuid4())\n", + " date_ints = [int(p) for p in resource.deposition_date.split('-')]\n", + " date_str = datetime(*date_ints)\n", + " resource.date_formatted = date_str.strftime(\"%Y-%m-%dT%H:%M:%S\")" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [], + "source": [ + "new_morphologies = neuromorpho.map(nmo_resources, 'NeuronMorphology')" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [], + "source": [ + "format_file = neuromorpho.service.files_download['endpoint'] + \"/{}/CNG version/{}.CNG.swc\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Attach file as distribution with morphologies" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [], + "source": [ + "import morphio" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/opt/miniconda3/envs/kgforge/lib/python3.7/site-packages/urllib3/connectionpool.py:1052: InsecureRequestWarning: Unverified HTTPS request is being made to host 'neuromorpho.org'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings\n", + " InsecureRequestWarning,\n", + "/opt/miniconda3/envs/kgforge/lib/python3.7/site-packages/urllib3/connectionpool.py:1052: InsecureRequestWarning: Unverified HTTPS request is being made to host 'neuromorpho.org'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings\n", + " InsecureRequestWarning,\n", + "/opt/miniconda3/envs/kgforge/lib/python3.7/site-packages/urllib3/connectionpool.py:1052: InsecureRequestWarning: Unverified HTTPS request is being made to host 'neuromorpho.org'. Adding certificate verification is strongly advised. See: https://urllib3.readthedocs.io/en/1.26.x/advanced-usage.html#ssl-warnings\n", + " InsecureRequestWarning,\n" + ] + } + ], + "source": [ + "for morphology in new_morphologies:\n", + " url = format_file.format(morphology.archive.lower(), morphology.name)\n", + " base_name = ''.join(url.split('.')[:-2])\n", + " file_path = f\"./downloaded/{base_name.split('/')[-1]}\"\n", + " swc = file_path + '.swc'\n", + " neuromorpho.download(url, swc)\n", + " neuromorpho.attach_file(morphology, swc, content_type='application/swc')\n", + " # Generate other morphology files\n", + " m = morphio.mut.Morphology(swc)\n", + " for extension in ['h5', 'asc']:\n", + " path = f\"{file_path}.{extension}\"\n", + " m.write(path)\n", + " neuromorpho.attach_file(morphology, path, content_type=f'application/{extension}')\n", + " m.write(file_path+'.asc')" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " _register_one\n", + " False\n", + " RegistrationError: 400 Client Error: Bad Request for url: https://bbp.epfl.ch/nexus/v1/resources/bbp/atlas/datashapes%3Aneuronmorphology\n" + ] + } + ], + "source": [ + "forge.register(new_morphologies[0], schema_id=\"datashapes:neuronmorphology\")" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [], + "source": [ + "# forge.validate(new_morphologies[0], type_=\"NeuronMorphology\")" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{\n", + " id: https://bbp.epfl.ch/neurosciencegraph/data/neuronmorphologies/neuromorpho/0b869f02-85b3-4c49-9cb3-5d6f12bbde28\n", + " type:\n", + " [\n", + " Dataset\n", + " NeuronMorphology\n", + " ]\n", + " archive: Scanziani\n", + " brainLocation:\n", + " {\n", + " type: BrainLocation\n", + " }\n", + " contribution:\n", + " {\n", + " type: Contribution\n", + " agent:\n", + " {\n", + " id: https://ror.org/02jqj7156\n", + " type: Organization\n", + " label: George Mason University\n", + " }\n", + " }\n", + " dateCreated:\n", + " {\n", + " type: xsd:dateTime\n", + " @value: 2012-07-01T00:00:00\n", + " }\n", + " description: Morphology of TypeA-10 obtained from NeuroMorpho API.\n", + " distribution:\n", + " [\n", + " {\n", + " type: DataDownload\n", + " atLocation:\n", + " {\n", + " type: Location\n", + " location: file:///gpfs/bbp.cscs.ch/data/project/proj39/nexus/bbp/atlas/0/4/8/5/1/9/5/c/TypeA-10.swc\n", + " store:\n", + " {\n", + " id: https://bbp.epfl.ch/neurosciencegraph/data/2f120131-0bbc-4a7e-b84b-52f7f00eec9e\n", + " type: RemoteDiskStorage\n", + " _rev: 1\n", + " }\n", + " }\n", + " contentSize:\n", + " {\n", + " unitCode: bytes\n", + " value: 80865\n", + " }\n", + " contentUrl: https://bbp.epfl.ch/nexus/v1/files/bbp/atlas/85d5c8d9-788d-4777-b62f-825b2ff12a51\n", + " digest:\n", + " {\n", + " algorithm: SHA-256\n", + " value: d0e898900e129fe41173eb0fe36bd84bd87a2100acb303e0ee96bf584eccfae0\n", + " }\n", + " encodingFormat: application/swc\n", + " name: TypeA-10.swc\n", + " }\n", + " {\n", + " type: DataDownload\n", + " atLocation:\n", + " {\n", + " type: Location\n", + " location: file:///gpfs/bbp.cscs.ch/data/project/proj39/nexus/bbp/atlas/c/f/7/d/4/6/8/7/TypeA-10.h5\n", + " store:\n", + " {\n", + " id: https://bbp.epfl.ch/neurosciencegraph/data/2f120131-0bbc-4a7e-b84b-52f7f00eec9e\n", + " type: RemoteDiskStorage\n", + " _rev: 1\n", + " }\n", + " }\n", + " contentSize:\n", + " {\n", + " unitCode: bytes\n", + " value: 45952\n", + " }\n", + " contentUrl: https://bbp.epfl.ch/nexus/v1/files/bbp/atlas/03edd654-0c39-4069-9b24-c7e3d68cd3ad\n", + " digest:\n", + " {\n", + " algorithm: SHA-256\n", + " value: 6cd5a3786e5502ce9443616089924ca8d22718a765d8e8202a5619f34b4afc9e\n", + " }\n", + " encodingFormat: application/h5\n", + " name: TypeA-10.h5\n", + " }\n", + " {\n", + " type: DataDownload\n", + " atLocation:\n", + " {\n", + " type: Location\n", + " location: file:///gpfs/bbp.cscs.ch/data/project/proj39/nexus/bbp/atlas/a/0/4/e/6/9/d/9/TypeA-10.asc\n", + " store:\n", + " {\n", + " id: https://bbp.epfl.ch/neurosciencegraph/data/2f120131-0bbc-4a7e-b84b-52f7f00eec9e\n", + " type: RemoteDiskStorage\n", + " _rev: 1\n", + " }\n", + " }\n", + " contentSize:\n", + " {\n", + " unitCode: bytes\n", + " value: 147432\n", + " }\n", + " contentUrl: https://bbp.epfl.ch/nexus/v1/files/bbp/atlas/00a51e52-b9f5-47e6-b61f-c640820d01bf\n", + " digest:\n", + " {\n", + " algorithm: SHA-256\n", + " value: a2469aceb35eb4569f9b2826ff099fb362f25bd5501ba468b269e8d1a1a3d93c\n", + " }\n", + " encodingFormat: application/asc\n", + " name: TypeA-10.asc\n", + " }\n", + " ]\n", + " generation:\n", + " {\n", + " type: Generation\n", + " activity:\n", + " {\n", + " type: nsg:NeuronMorphologyReconstruction\n", + " }\n", + " }\n", + " identifier:\n", + " {\n", + " propertyID: NeuroMorhoID\n", + " value: 10047\n", + " }\n", + " license:\n", + " {\n", + " id: https://neuromorpho.org\n", + " type: License\n", + " }\n", + " name: TypeA-10\n", + " objectOfStudy:\n", + " {\n", + " id: http://bbp.epfl.ch/neurosciencegraph/taxonomies/objectsofstudy/singlecells\n", + " type: ObjectOfStudy\n", + " label: Single Cell\n", + " }\n", + " stain: biocytin\n", + " subject:\n", + " {\n", + " type: Subject\n", + " species:\n", + " {\n", + " id: http://purl.obolibrary.org/obo/NCBITaxon_10088\n", + " type: Class\n", + " label: Mouse\n", + " subClassOf: obo:NCBITaxon_9989\n", + " }\n", + " }\n", + " version: 8.4.7\n", + "}\n" + ] + } + ], + "source": [ + "print(new_morphologies[0])" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3.7.13 ('kgforge')", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.7.13" + }, + "vscode": { + "interpreter": { + "hash": "9ac393a5ddd595f2c78ea58b15bf8d269850a4413729cbea5c5fae9013762763" + } + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/kgforge/core/archetypes/dataset_store.py b/kgforge/core/archetypes/dataset_store.py index bf5fed035..49f3cc2e5 100644 --- a/kgforge/core/archetypes/dataset_store.py +++ b/kgforge/core/archetypes/dataset_store.py @@ -15,6 +15,7 @@ from abc import abstractmethod from typing import Optional, Union, List, Type, Dict +from kgforge.core.archetypes.model import Model from kgforge.core.resource import Resource from kgforge.core.archetypes.read_only_store import ReadOnlyStore from kgforge.core.archetypes.resolver import Resolver @@ -26,7 +27,10 @@ class DatasetStore(ReadOnlyStore): - """A class to link to external databases, query and search directly on datasets. """ + """A class to link to external databases, query and search directly on datasets.""" + + def __init__(self, model: Optional[Model] = None) -> None: + super().__init__(model) @property @abstractmethod @@ -34,10 +38,12 @@ def mapper(self) -> Type[Mapper]: """Mapper class to map a Resource to the store metadata format.""" ... - def map(self, resources: Union[List[Union[Resource, str]], Union[Resource, str]], - type_: Optional[Union[str, Mapping]] = None, - ) -> Optional[Union[Resource, List[Resource]]]: - """ Use mappings to transform resources from and to the store model + def map( + self, + resources: Union[List[Union[Resource, str]], Union[Resource, str]], + type_: Optional[Union[str, Mapping]] = None, + ) -> Optional[Union[Resource, List[Resource]]]: + """Use mappings to transform resources from and to the store model :param resources: data to be transformed :param type_: type (schema) of the data @@ -45,13 +51,17 @@ def map(self, resources: Union[List[Union[Resource, str]], Union[Resource, str]] mappings = self.model.mappings(self.model.source, False) mapper = self.mapper() mapped_resources = [] - resources = (resources if isinstance(resources, list) else [resources]) + resources = resources if isinstance(resources, list) else [resources] for resource in resources: if isinstance(resource, Resource): - resource_dict = as_json(resource, expanded=False, store_metadata=False, - model_context=self.model_context(), - metadata_context=None, - context_resolver=self.model.resolve_context) + resource_dict = as_json( + resource, + expanded=False, + store_metadata=False, + model_context=self.model_context(), + metadata_context=None, + context_resolver=self.model.resolve_context, + ) else: resource_dict = resource resource = from_json(resource_dict, None) @@ -64,7 +74,9 @@ def map(self, resources: Union[List[Union[Resource, str]], Union[Resource, str]] mapped_resources.append(mapper.map(resource_dict, type_)) elif type_ in mappings: # type_ is the entity here - mapping_class: Type[Mapping] = import_class(mappings[type_][0], "mappings") + mapping_class: Type[Mapping] = import_class( + mappings[type_][0], "mappings" + ) mapping = self.model.mapping(type_, self.model.source, mapping_class) mapped_resources.append(mapper.map(resource_dict, mapping)) else: @@ -84,10 +96,9 @@ def search( """ Search within the database. """ - filters = list(filters) unmapped_resources = self._search(filters, resolvers, **params) - if not params.pop('map', True): + if not params.pop("map", True): return unmapped_resources # Try to find the type of the resources within the filters resource_type = type_from_filters(filters) @@ -95,21 +106,26 @@ def search( @abstractmethod def _search( - self, filters: List[Union[Dict, Filter]], resolvers: Optional[List[Resolver]] = None, - **params - ) -> Optional[List[Resource]]: - ... + self, + filters: List[Union[Dict, Filter]], + resolvers: Optional[List[Resolver]] = None, + **params + ) -> Optional[List[Resource]]: ... def sparql( - self, query: str, debug: bool = False, limit: Optional[int] = None, - offset: Optional[int] = None, **params + self, + query: str, + debug: bool = False, + limit: Optional[int] = None, + offset: Optional[int] = None, + **params ) -> Optional[Union[List[Resource], Resource]]: """ Use SPARQL within the database. """ unmapped_resources = super().sparql(query, debug, limit, offset, **params) - if not params.pop('map', True): + if not params.pop("map", True): return unmapped_resources return self.map(unmapped_resources) @@ -122,10 +138,10 @@ def type_from_filters(filters: List[Union[Filter, Dict]]) -> Optional[str]: for f in filters: if isinstance(f, dict): - if 'type' in f: - return f['type'] + if "type" in f: + return f["type"] elif isinstance(f, Filter): - if 'type' in f.path and f.operator == "__eq__": + if "type" in f.path and f.operator == "__eq__": return f.value else: raise ValueError("Invalid filter type: Can only be a Dict or Filter") diff --git a/kgforge/core/archetypes/read_only_store.py b/kgforge/core/archetypes/read_only_store.py index 3f0560a10..f18b4320a 100644 --- a/kgforge/core/archetypes/read_only_store.py +++ b/kgforge/core/archetypes/read_only_store.py @@ -63,7 +63,7 @@ def _context_to_dict(context: Context): def get_context_prefix_vocab(self) -> Tuple[Optional[Dict], Optional[Dict], Optional[str]]: return ( - ReadOnlyStore._context_to_dict(self.model_context().document), + ReadOnlyStore._context_to_dict(self.model_context()), self.model_context().prefixes, self.model_context().vocab ) @@ -259,7 +259,6 @@ def elastic( def _initialize_service( self, endpoint: Optional[str], - bucket: Optional[str], token: Optional[str], searchendpoints: Optional[Dict] = None, **store_config, diff --git a/kgforge/core/archetypes/store.py b/kgforge/core/archetypes/store.py index e656e8689..9a7cb5c2e 100644 --- a/kgforge/core/archetypes/store.py +++ b/kgforge/core/archetypes/store.py @@ -14,13 +14,14 @@ import json from abc import abstractmethod from pathlib import Path -from typing import Any, Dict, List, Optional, Union, Type, Match +from typing import Any, Dict, List, Optional, Union, Type from kgforge.core.archetypes.read_only_store import ReadOnlyStore, DEFAULT_LIMIT, DEFAULT_OFFSET from kgforge.core.archetypes.model import Model from kgforge.core.commons import Context from kgforge.core.resource import Resource from kgforge.core.archetypes.mapping import Mapping +from kgforge.core.archetypes.resolver import Resolver from kgforge.core.archetypes.mapper import Mapper from kgforge.core.commons.attributes import repr_class from kgforge.core.commons.es_query_builder import ESQueryBuilder @@ -66,7 +67,7 @@ def __init__( if file_resource_mapping else None self.service: Any = self._initialize_service( - self.endpoint, self.bucket, self.token, searchendpoints, **store_config + self.endpoint, self.token, searchendpoints, **store_config ) def __repr__(self) -> str: @@ -247,6 +248,8 @@ def _deprecate_one(self, resource: Resource) -> None: # POLICY Resource _store_metadata should be set using wrappers.dict.wrap_dict(). ... + # Querying + def elastic( self, query: str, debug: bool, limit: int = DEFAULT_LIMIT, offset: int = DEFAULT_OFFSET, **params diff --git a/kgforge/core/commons/execution.py b/kgforge/core/commons/execution.py index e7e7a9d4a..addd07eda 100644 --- a/kgforge/core/commons/execution.py +++ b/kgforge/core/commons/execution.py @@ -15,12 +15,16 @@ import inspect import traceback from functools import wraps -from typing import Any, Callable, List, Optional, Tuple, Union, Type +from typing import Any, Callable, List, Optional, Tuple, Union, Type, Dict import requests from kgforge.core.resource import Resource -from kgforge.core.commons.actions import (Action, Actions, collect_lazy_actions, - execute_lazy_actions) +from kgforge.core.commons.actions import ( + Action, + Actions, + collect_lazy_actions, + execute_lazy_actions, +) from kgforge.core.commons.exceptions import NotSupportedError, RunException @@ -58,14 +62,20 @@ def wrapper(*args, **kwargs): if type(self).__name__ == class_name: forge = self else: - forge = next(x for x in self.__dict__.values() if type(x).__name__ == class_name) + forge = next( + x for x in self.__dict__.values() if type(x).__name__ == class_name + ) debug = forge._debug try: return fun(*args, **kwargs) except Exception as e: stack = traceback.extract_stack() - it = (x for x in stack if x.name == "wrapper" and x.filename == stack[-1].filename) + it = ( + x + for x in stack + if x.name == "wrapper" and x.filename == stack[-1].filename + ) next(it) try: next(it) @@ -77,8 +87,7 @@ def wrapper(*args, **kwargs): if called_once and not debug: tb = e.__traceback__ fs = traceback.extract_tb(tb)[-1] - print(f" {fs.name}" - f"\n {type(e).__name__}: {e}\n") + print(f" {fs.name}" f"\n {type(e).__name__}: {e}\n") return None raise @@ -87,8 +96,13 @@ def wrapper(*args, **kwargs): # @functools.singledispatchmethod is introduced in Python 3.8. -def dispatch(data: Union[Resource, List[Resource]], fun_many: Callable, - fun_one: Callable, *args, **params) -> Any: +def dispatch( + data: Union[Resource, List[Resource]], + fun_many: Callable, + fun_one: Callable, + *args, + **params, +) -> Any: # POLICY The method calling this function should be decorated with execution.catch(). if isinstance(data, List) and all(isinstance(x, Resource) for x in data): return fun_many(data, *args, **params) @@ -99,9 +113,10 @@ def dispatch(data: Union[Resource, List[Resource]], fun_many: Callable, def catch_http_error( - r: requests.Response, error_to_throw: Type[BaseException], - error_message_formatter: Callable, - to_catch: Type[BaseException] + r: requests.Response, + error_to_throw: Type[BaseException], + error_message_formatter: Callable, + to_catch: Type[BaseException], ): try: r.raise_for_status() @@ -110,30 +125,48 @@ def catch_http_error( def run( - fun_one: Callable, - fun_many: Optional[Callable], - data: Union[Resource, List[Resource]], - exception: Type[RunException], - id_required: bool = False, - required_synchronized: Optional[bool] = None, - execute_actions: bool = False, - monitored_status: Optional[str] = None, - catch_exceptions: bool = True, - **kwargs + fun_one: Callable, + fun_many: Optional[Callable], + data: Union[Resource, List[Resource]], + exception: Type[RunException], + id_required: bool = False, + required_synchronized: Optional[bool] = None, + execute_actions: bool = False, + monitored_status: Optional[str] = None, + catch_exceptions: bool = True, + **kwargs, ) -> None: # POLICY Should be called for operations on resources where recovering from errors is needed. if isinstance(data, List) and all(isinstance(x, Resource) for x in data): if fun_many is None: - _run_many(fun_one, data, exception, id_required, required_synchronized, - execute_actions, monitored_status, catch_exceptions, **kwargs) + _run_many( + fun_one, + data, + exception, + id_required, + required_synchronized, + execute_actions, + monitored_status, + catch_exceptions, + **kwargs, + ) else: fun_many(data, **kwargs) actions = Actions.from_resources(data) print(actions) elif isinstance(data, Resource): - _run_one(fun_one, data, exception, id_required, required_synchronized, execute_actions, - monitored_status, catch_exceptions, **kwargs) + _run_one( + fun_one, + data, + exception, + id_required, + required_synchronized, + execute_actions, + monitored_status, + catch_exceptions, + **kwargs, + ) action = data._last_action print(action) else: @@ -146,15 +179,15 @@ def _run_many(fun: Callable, resources: List[Resource], *args, **kwargs) -> None def _run_one( - fun: Callable, - resource: Resource, - exception: Type[RunException], - id_required: bool, - required_synchronized: Optional[bool], - execute_actions: bool, - monitored_status: Optional[str], - catch_exceptions: bool, - **kwargs + fun: Callable, + resource: Resource, + exception: Type[RunException], + id_required: bool, + required_synchronized: Optional[bool], + execute_actions: bool, + monitored_status: Optional[str], + catch_exceptions: bool, + **kwargs, ) -> None: try: @@ -162,7 +195,10 @@ def _run_one( raise exception("resource should have an id") synchronized = resource._synchronized - if required_synchronized is not None and synchronized is not required_synchronized: + if ( + required_synchronized is not None + and synchronized is not required_synchronized + ): be_or_not_be = "be" if required_synchronized is True else "not be" raise exception(f"resource should {be_or_not_be} synchronized") @@ -170,7 +206,9 @@ def _run_one( if execute_actions: execute_lazy_actions(resource, lazy_actions) elif lazy_actions: - raise exception("resource has lazy actions which need to be executed before") + raise exception( + "resource has lazy actions which need to be executed before" + ) result = fun(resource, **kwargs) except Exception as e: diff --git a/kgforge/core/commons/parser.py b/kgforge/core/commons/parser.py index 0ab99d41c..abd69e5ae 100644 --- a/kgforge/core/commons/parser.py +++ b/kgforge/core/commons/parser.py @@ -14,6 +14,7 @@ from typing import Any import datetime +import json def _parse_type(value: Any, parse_str: bool = False): @@ -42,3 +43,19 @@ def _parse_type(value: Any, parse_str: bool = False): return _type, value except Exception: return _type, value + + +def _process_types(values): + """Assign correct data type to values from a request response""" + if values['type'] == 'literal' and 'datatype' in values and values['datatype'] == \ + 'http://www.w3.org/2001/XMLSchema#boolean': + + return json.loads(str(values["value"]).lower()) + + elif values['type'] == 'literal' and 'datatype' in values and values['datatype'] == \ + 'http://www.w3.org/2001/XMLSchema#integer': + + return int(values["value"]) + + else: + return values["value"] diff --git a/kgforge/core/commons/sparql_query_builder.py b/kgforge/core/commons/sparql_query_builder.py index 3e52705af..f195a0861 100644 --- a/kgforge/core/commons/sparql_query_builder.py +++ b/kgforge/core/commons/sparql_query_builder.py @@ -28,7 +28,7 @@ from kgforge.core.archetypes.resolver import Resolver from kgforge.core.commons.context import Context from kgforge.core.commons.files import is_valid_url -from kgforge.core.commons.parser import _parse_type +from kgforge.core.commons.parser import _parse_type, _process_types from kgforge.core.commons.query_builder import QueryBuilder from kgforge.core.wrappings.paths import Filter @@ -169,6 +169,7 @@ def build( value = format_type[value_type]( parsed_value if parsed_value else f.value ) + if value_type is CategoryDataType.LITERAL and f.operator not in ["__eq__", "__ne__"]: raise NotImplementedError( "supported operators are '==' and '!=' when filtering with a str." diff --git a/kgforge/core/forge.py b/kgforge/core/forge.py index 22fbbdb5e..a7096bb15 100644 --- a/kgforge/core/forge.py +++ b/kgforge/core/forge.py @@ -24,6 +24,7 @@ from kgforge.core.archetypes.mapping import Mapping from kgforge.core.archetypes.model import Model from kgforge.core.archetypes.resolver import Resolver +from kgforge.core.archetypes.dataset_store import DatasetStore from kgforge.core.archetypes.mapper import Mapper from kgforge.core.archetypes.store import Store from kgforge.core.commons.files import load_yaml_from_file @@ -185,6 +186,13 @@ def __init__(self, configuration: Union[str, Dict], **kwargs) -> None: "": , ..., }, + "Datasets":{ + "": { + "origin" : , + "source": + ..., + } + } } In the configuration, Class name could be provided in three formats: @@ -259,6 +267,12 @@ def __init__(self, configuration: Union[str, Dict], **kwargs) -> None: # Formatters. self._formatters: Optional[Dict[str, str]] = config.pop("Formatters", None) + # Datasets + dataset_config = config.pop("Datasets", None) + self._dataset_sources: Optional[Dict[str, DatasetStore]] = self.create_datasets( + dataset_config, store_config + ) + @staticmethod def set_environment_variables(): # Set environment variable for pyshacl @@ -562,7 +576,11 @@ def mappings( :param pretty: a boolean :return: Optional[Dict[str, List[str]]] """ - return self._model.mappings(source, pretty) + if source in self._dataset_sources: + ds = self._dataset_sources[source] + return ds._model.mappings(ds._model.source, pretty) + else: + return self._model.mappings(source, pretty) @catch def mapping(self, entity: str, source: str, type: Type[Mapping] = None) -> Mapping: @@ -576,7 +594,11 @@ def mapping(self, entity: str, source: str, type: Type[Mapping] = None) -> Mappi """ if type is None: type = self._store.mapping - return self._model.mapping(entity, source, type) + if source in self._dataset_sources: + ds = self._dataset_sources[source] + return ds._model.mapping(entity, ds._model.source, type) + else: + return self._model.mapping(entity, source, type) @catch def map( @@ -629,7 +651,7 @@ def retrieve( id: Union[str, List[str]], version: Optional[Union[int, str, List[Union[str, int]]]] = None, cross_bucket: bool = False, - **params + **params, ) -> Union[Optional[Resource], List[Optional[Resource]]]: """ Retrieve a resource by its identifier from the configured store and possibly at a given version. @@ -669,7 +691,16 @@ def search(self, *filters: Union[Dict, Filter], **params) -> List[Resource]: resolvers = ( list(self._resolvers.values()) if self._resolvers is not None else None ) - return self._store.search(resolvers=resolvers, *filters, **params) + + dataset = params.pop("source", None) + if dataset: + if dataset in self._dataset_sources: + return self._dataset_sources[dataset].search( + resolvers, *filters, **params + ) + else: + raise AttributeError("Selected database was not declared within forge.") + return self._store.search(resolvers, *filters, **params) @catch def sparql( @@ -690,6 +721,14 @@ def sparql( :param params: a dictionary of parameters. Supported params are: rewrite (whether to rewrite the sparql query or run it as is) :return: List[Resource] """ + dataset = params.pop("source", None) + if dataset: + if dataset in self._dataset_sources: + return self._dataset_sources[dataset].sparql( + query, debug, limit, offset, **params + ) + else: + raise AttributeError("Selected database was not declared within forge.") return self._store.sparql(query, debug, limit, offset, **params) @catch @@ -699,6 +738,7 @@ def elastic( debug: bool = False, limit: Optional[int] = None, offset: Optional[int] = None, + source: Optional[str] = None, **params, ) -> Union[List[Resource], Resource, List[Dict], Dict]: """ @@ -710,6 +750,13 @@ def elastic( :param offset: how many results to skip from the first one :return: List[Resource] """ + if source: + if source in self._dataset_sources: + return self._dataset_sources[source].elastic( + query, debug, limit, offset + ) + else: + raise AttributeError("Selected database was not declared within forge.") return self._store.elastic(query, debug, limit, offset, **params) @catch @@ -721,6 +768,7 @@ def download( overwrite: bool = False, cross_bucket: bool = False, content_type: str = None, + source: Optional[str] = None, ) -> None: """ Download files attached to a resource or a list of resources. @@ -733,6 +781,13 @@ def download( :param content_type: the content_type of the files to download """ # path: DirPath. + if source: + if source in self._dataset_sources: + return self._dataset_sources[source].download( + data, follow, path, overwrite, cross_bucket, content_type + ) + else: + raise AttributeError("Selected database was not declared within forge.") self._store.download(data, follow, path, overwrite, cross_bucket, content_type) # Storing User Interface. @@ -978,6 +1033,75 @@ def get_model_context(self): """Expose the context used in the model.""" return self._model.context() + @catch + def dataset_sources( + self, type_: Optional[List[str]] = None, pretty: bool = False + ) -> Optional[List[str]]: + """ + Print(pretty=True) or return (pretty=False) configured data sources. + :param pretty: a boolean + :return: Optional[List[str]] + """ + if type_ is None: + sources = self._dataset_sources + else: + sources = {} + if isinstance(type_, list): + for type in type_: + for ds, types in self._dataset_sources.items(): + try: + if type in types: + sources[ds] = types + except ValueError: + # skiping db without mappings + continue + else: + for ds, dstypes in self._dataset_sources.items(): + try: + types = dstypes.types() + if type_ in types: + sources[ds] = dstypes + except ValueError: + # skiping db without mappings + continue + if not sources: + print("No Database sources were found for the given type(s)") + if pretty: + print(*["Available Database sources:", *sources], sep="\n") + else: + return sources + + def add_dataset_source(self, dataset: DatasetStore) -> None: + """ + Add a DatabaseSource to the KG. + """ + self._dataset_sources[dataset.name] = dataset + + def create_datasets( + self, + all_config: Optional[Dict[str, Dict[str, str]]], + store_config: Optional[Dict[str, Dict[str, str]]], + ) -> Dict[str, DatasetStore]: + ds = {} + for name, config in all_config.items(): + origin = config["origin"] + source = config["source"] + # Reuse complete configuration of the store when Nexus is called + if source == store_config["name"] == "BlueBrainNexus": + store_copy = deepcopy(store_config) + with_defaults( + config, store_copy, "source", "name", list(store_copy.keys()) + ) + else: + try: + dataset = import_class(name, "stores") + ds[name]: DatasetStore = dataset(**config) + except Exception: + raise NotImplementedError( + f"Dataset from {origin} is not yet implemented." + ) + return ds + def prepare_resolvers( config: Dict, store_config: Dict diff --git a/kgforge/specializations/stores/__init__.py b/kgforge/specializations/stores/__init__.py index ec3d912fa..0cdf37557 100644 --- a/kgforge/specializations/stores/__init__.py +++ b/kgforge/specializations/stores/__init__.py @@ -14,3 +14,5 @@ from .bluebrain_nexus import BlueBrainNexus from .demo_store import DemoStore +from .sparql_store import SPARQLStore +from .web_service_store import WebServiceStore diff --git a/kgforge/specializations/stores/bluebrain_nexus.py b/kgforge/specializations/stores/bluebrain_nexus.py index 8ece889a8..cf1fe60dc 100644 --- a/kgforge/specializations/stores/bluebrain_nexus.py +++ b/kgforge/specializations/stores/bluebrain_nexus.py @@ -164,7 +164,9 @@ def register_callback(task: Task): def _register_one(self, resource: Resource, schema_id: str) -> None: method, url, resource, exception_, headers, params, payload = ( - prepare_methods.prepare_create(service=self.service, resource=resource, schema_id=schema_id) + prepare_methods.prepare_create( + service=self.service, resource=resource, schema_id=schema_id + ) ) response = requests.request( method=method, @@ -344,7 +346,6 @@ async def _retrieve_id( async def _merge_metadata_with_source_data( self, session, _self, data_not_source_with_metadata, query_params ): - async with session.request( method=hdrs.METH_GET, url=f"{_self}/source", @@ -760,7 +761,9 @@ def _update_many(self, resources: List[Resource], schema_id: str) -> None: def _update_one(self, resource: Resource, schema_id: str) -> None: method, url, resource, exception_, headers, params, payload = ( - prepare_methods.prepare_update(service=self.service, resource=resource, schema_id=schema_id) + prepare_methods.prepare_update( + service=self.service, resource=resource, schema_id=schema_id + ) ) response = requests.request( @@ -1311,7 +1314,9 @@ def _initialize_service( ) @staticmethod - def rewrite_uri_static(endpoint: str, bucket: str, uri: str, context: Context, **kwargs) -> str: + def rewrite_uri_static( + endpoint: str, bucket: str, uri: str, context: Context, **kwargs + ) -> str: is_file = kwargs.get("is_file", True) encoding = kwargs.get("encoding", None) @@ -1362,7 +1367,9 @@ def rewrite_uri_static(endpoint: str, bucket: str, uri: str, context: Context, * return uri def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: - return BlueBrainNexus.rewrite_uri_static(self.endpoint, self.bucket, uri, context, **kwargs) + return BlueBrainNexus.rewrite_uri_static( + self.endpoint, self.bucket, uri, context, **kwargs + ) def _freeze_many(self, resources: List[Resource]) -> None: raise not_supported() diff --git a/kgforge/specializations/stores/demo_store.py b/kgforge/specializations/stores/demo_store.py index 74d562c22..188f2fad7 100644 --- a/kgforge/specializations/stores/demo_store.py +++ b/kgforge/specializations/stores/demo_store.py @@ -169,8 +169,8 @@ def _elastic( # Utils. def _initialize_service( - self, endpoint: Optional[str], bucket: Optional[str], - token: Optional[str], searchendpoints: Optional[Dict] = None, **store_config, + self, endpoint: Optional[str], token: Optional[str], + searchendpoints: Optional[Dict] = None, **store_config, ): return StoreLibrary() diff --git a/kgforge/specializations/stores/nexus/service.py b/kgforge/specializations/stores/nexus/service.py index 311d0f848..2dba8521a 100644 --- a/kgforge/specializations/stores/nexus/service.py +++ b/kgforge/specializations/stores/nexus/service.py @@ -47,6 +47,7 @@ from kgforge.core.wrappings.dict import wrap_dict from kgforge.specializations.stores.nexus.http_helpers import views_fetch + from kgforge.core.conversions.rdf import _from_jsonld_one, _remove_ld_keys, recursive_resolve from kgforge.core.wrappings.dict import wrap_dict @@ -71,24 +72,24 @@ class Service: NEXUS_CONTENT_LENGTH_HEADER = "x-nxs-file-content-length" def __init__( - self, - endpoint: str, - org: str, - prj: str, - token: str, - model_context: Context, - max_connection: int, - searchendpoints: Dict, - store_context: str, - store_local_context: str, - namespace: str, - project_property: str, - deprecated_property: bool, - content_type: str, - accept: str, - files_upload_config: Dict, - files_download_config: Dict, - **params, + self, + endpoint: str, + org: str, + prj: str, + token: str, + model_context: Context, + max_connection: int, + searchendpoints: Dict, + store_context: str, + store_local_context: str, + namespace: str, + project_property: str, + deprecated_property: bool, + content_type: str, + accept: str, + files_upload_config: Dict, + files_download_config: Dict, + **params, ): self.endpoint = endpoint self.organisation = org @@ -102,8 +103,18 @@ def __init__( self.namespace = namespace self.project_property = project_property self.store_metadata_keys = [ - "_constrainedBy", "_createdAt", "_createdBy", "_deprecated", "_incoming", "_outgoing", - "_project", "_rev", "_schemaProject", "_self", "_updatedAt", "_updatedBy" + "_constrainedBy", + "_createdAt", + "_createdBy", + "_deprecated", + "_incoming", + "_outgoing", + "_project", + "_rev", + "_schemaProject", + "_self", + "_updatedAt", + "_updatedBy", ] self.deprecated_property = deprecated_property @@ -117,24 +128,30 @@ def __init__( elastic_config = searchendpoints.get("elastic", None) if searchendpoints else None self.headers_sparql = { - "Content-Type": sparql_config["Content-Type"] - if sparql_config and "Content-Type" in sparql_config - else "text/plain", - "Accept": sparql_config["Accept"] - if sparql_config and "Accept" in sparql_config - else "application/sparql-results+json", + "Content-Type": ( + sparql_config["Content-Type"] + if sparql_config and "Content-Type" in sparql_config + else "text/plain" + ), + "Accept": ( + sparql_config["Accept"] + if sparql_config and "Accept" in sparql_config + else "application/sparql-results+json" + ), } self.headers_elastic = { - "Content-Type": elastic_config["Content-Type"] - if elastic_config and "Content-Type" in elastic_config - else "application/json", - "Accept": elastic_config["Accept"] - if elastic_config and "Accept" in elastic_config - else "application/json", - } - self.headers_upload = { - "Accept": files_upload_config.pop("Accept"), + "Content-Type": ( + elastic_config["Content-Type"] + if elastic_config and "Content-Type" in elastic_config + else "application/json" + ), + "Accept": ( + elastic_config["Accept"] + if elastic_config and "Accept" in elastic_config + else "application/json" + ), } + self.headers_upload = {"Accept": files_upload_config.pop("Accept")} self.headers_download = {"Accept": files_download_config.pop("Accept")} self.token = token @@ -206,7 +223,7 @@ def __init__( except RuntimeError: pass - @staticmethod + @staticmetho def make_endpoint(endpoint: str, endpoint_type: str, organisation: str, project: str): return "/".join( (endpoint, endpoint_type, quote_plus(organisation), quote_plus(project)) @@ -273,8 +290,8 @@ def make_query_endpoint_self(self, view: str, endpoint_type: str): def get_project_context(self) -> Dict: project_data = kgforge.specializations.stores.nexus.http_helpers.project_fetch(endpoint=self.endpoint, token=self.token, org_label=self.organisation, project_label=self.project) context = {"@base": project_data["base"], "@vocab": project_data["vocab"]} - for mapping in project_data['apiMappings']: - context[mapping['prefix']] = mapping['namespace'] + for mapping in project_data["apiMappings"]: + context[mapping["prefix"]] = mapping["namespace"] return context def resolve_context(self, iri: str, local_only: Optional[bool] = False) -> Dict: @@ -287,7 +304,7 @@ def resolve_context(self, iri: str, local_only: Optional[bool] = False) -> Dict: url = Service.add_schema_and_id_to_endpoint( endpoint=self.url_resolver, - schema_id=None, + schema_id=None resource_id=context_to_resolve ) @@ -299,15 +316,19 @@ def resolve_context(self, iri: str, local_only: Optional[bool] = False) -> Dict: try: context = Context(context_to_resolve) except URLError as exc2: - raise ValueError(f"{context_to_resolve} is not resolvable") from exc2 + raise ValueError( + f"{context_to_resolve} is not resolvable" + ) from exc2 document = context.document["@context"] else: raise ValueError(f"{context_to_resolve} is not resolvable") from exc else: # Make sure context is not deprecated - if '_deprecated' in resource and resource['_deprecated']: - raise ConfigurationError(f"Context {context_to_resolve} exists but was deprecated") + if "_deprecated" in resource and resource["_deprecated"]: + raise ConfigurationError( + f"Context {context_to_resolve} exists but was deprecated" + ) document = json.loads(json.dumps(resource["@context"])) if isinstance(document, list): if self.store_context in document: @@ -391,13 +412,13 @@ def callback(task: Task): return callback def verify( - self, - resources: List[Resource], - function_name, - exception: Type[Exception], - id_required: bool, - required_synchronized: bool, - execute_actions: bool, + self, + resources: List[Resource], + function_name, + exception: Type[Exception], + id_required: bool, + required_synchronized: bool, + execute_actions: bool, ) -> List[Resource]: valid = [] for resource in resources: @@ -428,12 +449,15 @@ def verify( return valid def to_resource( - self, payload: Dict, sync_metadata: bool = True, **kwargs + self, payload: Dict, sync_metadata: bool = True, **kwargs ) -> Resource: # Use JSONLD context defined in Model if no context is retrieved from payload # Todo: BlueBrainNexus store is not indexing in ES the JSONLD context, user provided context can be changed to Model defined one data_context = deepcopy( - payload.get("@context", self.model_context.iri if self.model_context else None)) + payload.get( + "@context", self.model_context.iri if self.model_context else None + ) + ) if not isinstance(data_context, list): data_context = [data_context] if self.store_context in data_context: @@ -450,9 +474,9 @@ def to_resource( data[k] = v if ( - self.model_context - and data_context is not None - and data_context == self.model_context.iri + self.model_context + and data_context is not None + and data_context == self.model_context.iri ): resolved_ctx = self.model_context.document["@context"] elif data_context is not None: @@ -473,7 +497,7 @@ def to_resource( if len(metadata) > 0 and sync_metadata: metadata.update(kwargs) self.sync_metadata(resource, metadata) - if not hasattr(resource, "id") and kwargs and 'id' in kwargs.keys(): + if not hasattr(resource, "id") and kwargs and "id" in kwargs.keys(): resource.id = kwargs.get("id") return resource diff --git a/kgforge/specializations/stores/sparql_store.py b/kgforge/specializations/stores/sparql_store.py index d582bd19a..8d7d9f3bb 100644 --- a/kgforge/specializations/stores/sparql_store.py +++ b/kgforge/specializations/stores/sparql_store.py @@ -44,6 +44,7 @@ def __init__( searchendpoints: Optional[Dict] = None, **store_config, ) -> None: + super().__init__(model) self.endpoint = endpoint self.file_resource_mapping = file_resource_mapping @@ -51,6 +52,9 @@ def __init__( self.service = self._initialize_service( endpoint, searchendpoints, **store_config ) + self.service = self._initialize_service( + endpoint, searchendpoints, **store_config + ) @property def mapper(self) -> Optional[Type[Mapper]]: diff --git a/kgforge/specializations/stores/web_service/__init__.py b/kgforge/specializations/stores/web_service/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kgforge/specializations/stores/web_service/webservice.py b/kgforge/specializations/stores/web_service/webservice.py new file mode 100644 index 000000000..25a591c66 --- /dev/null +++ b/kgforge/specializations/stores/web_service/webservice.py @@ -0,0 +1,109 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . +from typing import Dict, Optional +import copy +import requests + +from kgforge.core.resource import Resource +from kgforge.core.commons.parser import _process_types +from kgforge.core.commons.exceptions import ConfigurationError, QueryingError + + +class WebService: + + def __init__( + self, + endpoint: str, + content_type: str, + accept: str = "*/*", + response_location: Optional[str] = None, + files_download: Optional[Dict] = None, + searchendpoints: Optional[Dict] = None, + **params, + ): + """A Web service""" + self.endpoint = endpoint + self.content_type = content_type + self.accept = accept + self.context_cache: Dict = [] + self.response_location = response_location + self.files_download = files_download + self.searchendpoints = searchendpoints + if self.searchendpoints: + if not isinstance(self.searchendpoints, dict): + raise ConfigurationError("searchendpoints must be a dict") + for endpoint in self.searchendpoints: + if not isinstance(endpoint, dict): + raise ConfigurationError("endpoint configuration must be a dict") + if 'endpoint' not in self.searchendpoints[endpoint]: + raise ConfigurationError("Missing endpoint searchenpoints") + self.max_connection = params.pop('max_connection', None) + self.params = copy.deepcopy(params) + + self.headers = {"Content-Type": content_type, "Accept": accept} + if files_download: + if 'Content-Type' not in files_download: + raise ConfigurationError("Files download configuration misses the `Content-Type` value") + if 'Accept' not in files_download: + raise ConfigurationError("Files download configuration misses the `Accept` value") + file_content_type = files_download['Content-Type'] + file_accept = files_download['Accept'] + else: + file_content_type = file_accept = "text/plain" + self.headers_download = { + "Content-Type": file_content_type, + "Accept": file_accept + } + + @staticmethod + def resources_from_request(url: str, + headers: Dict, + response_location: Dict, + **request_params): + """Perform a HTTP request + + :param headers: The headers to be passed to the request + :param response_loc: The nested location of the relevat metadata in the + response. Example: NeuroMorpho uses response["_embedded"]["neuronResources"] + which should be given as: response_loc = ["_embedded", "neuronResources"] + :param request_params: Any other parameter for the request + """ + try: + response = requests.get(url, params=request_params, + headers=headers, verify=False) + response.raise_for_status() + except Exception as e: + raise QueryingError(e) + else: + data = response.json() + if response_location: + # Get the resources directly from a location in the response + if isinstance(response_location, str): + results = data[response_location] + elif isinstance(response_location, (list, tuple)): + for inner in response_location: + data = data[inner] + results = data + return [Resource(**result) for result in results] + else: + # Standard response format + results = data["results"]["bindings"] + return WebService.build_resources_from_results(results) + + @staticmethod + def build_resources_from_results(results): + return [ + Resource(**{k: _process_types(v) for k, v in x.items()}) + for x in results + ] diff --git a/kgforge/specializations/stores/web_service_store.py b/kgforge/specializations/stores/web_service_store.py new file mode 100644 index 000000000..94a672dfd --- /dev/null +++ b/kgforge/specializations/stores/web_service_store.py @@ -0,0 +1,210 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . +from typing import Dict, List, Optional, Union, Type, Callable, Tuple +import copy +import asyncio +import requests +from aiohttp import ClientSession +from requests.exceptions import SSLError + + +from kgforge.core.resource import Resource +from kgforge.core.commons.context import Context +from kgforge.core.archetypes.model import Model +from kgforge.core.archetypes.mapper import Mapper +from kgforge.core.archetypes.resolver import Resolver +from kgforge.core.wrappings.dict import DictWrapper +from kgforge.core.archetypes.dataset_store import DatasetStore +from kgforge.specializations.mappers.dictionaries import DictionaryMapper +from kgforge.core.commons.exceptions import ConfigurationError, DownloadingError +from kgforge.core.commons.execution import not_supported, catch_http_error, error_message + + +from kgforge.core.wrappings.paths import Filter +from kgforge.specializations.stores.web_service.webservice import WebService + + +class WebServiceStore(DatasetStore): + """A high-level class to retrieve and create Datasets from a Web Service.""" + + def __init__( + self, + model: Model, + endpoint: str, + request_params: dict, + token: Optional[str] = None, + searchendpoints: Optional[Dict] = None, + health_endpoint: Optional[str] = None, + **params, + ): + super().__init__(model) + self.health_endpoint = health_endpoint + params.update({"request_params": request_params}) + self.service = self._initialize_service(endpoint=endpoint, + token=token, + searchendpoints=searchendpoints, + **params) + + @property + def mapper(self) -> Optional[Type[Mapper]]: + return DictionaryMapper + + def download(self, urls: Union[str, List[str]], + paths: Union[str, List[str]], overwrite: bool = False) -> None: + # path: DirPath. + """Download files """ + if isinstance(urls, list): + # Check consistancy between urls and paths + if not isinstance(paths, list): + raise TypeError("Given multiple urls, paths should also be a list.") + if len(paths) != len(urls): + raise ValueError(f"Missmatch between urls ({len(urls)}) and paths ({len(paths)}), \ + they should be the same amount.") + self._download_many(urls, paths) + else: + self._download_one(urls, paths) + + def _download_many(self, urls: List[str], + paths: List[str]) -> None: + async def _bulk(): + loop = asyncio.get_event_loop() + semaphore = asyncio.Semaphore(self.service.max_connection) + async with ClientSession(headers=self.service.headers_download) as session: + tasks = ( + _create_task(x, y, loop, semaphore, session) + for x, y in zip(urls, paths) + ) + return await asyncio.gather(*tasks) + + def _create_task(url, path, loop, semaphore, session): + return loop.create_task( + _download(url, path, semaphore, session) + ) + + async def _download(url, path, semaphore, session): + async with semaphore: + params_download = copy.deepcopy(self.service.params.get('download', {})) + async with session.get(url, params=params_download) as response: + catch_http_error( + response, DownloadingError, + error_message_formatter=lambda e: + f"Downloading url {url} failed: {error_message(e)}" + ) + with open(path, "wb") as f: + data = await response.read() + f.write(data) + + return asyncio.run(_bulk()) + + def _download_one(self, url: str, path: str) -> None: + params_download = copy.deepcopy(self.service.params.get('download', {})) + response = requests.get( + url=url, + headers=self.service.headers_download, + params=params_download, + verify=False + ) + catch_http_error( + response, DownloadingError, + error_message_formatter=lambda e: f"Downloading failed: " + f"{error_message(e)}" + ) + with open(path, "wb") as f: + for chunk in response.iter_content(chunk_size=4096): + f.write(chunk) + + def _prepare_download_one(self, url: str, store_metadata: Optional[DictWrapper], + cross_bucket: bool) -> Tuple[str, str]: + raise not_supported() + + def retrieve( + self, id: str, version: Optional[Union[int, str]], cross_bucket: bool, **params + ) -> Resource: + raise not_supported() + + def _retrieve_filename(self, id: str) -> str: + raise not_supported() + + def _search(self, resolvers: Optional[List[Resolver]], + filters: List[Union[Dict, Filter]], + **params + ) -> Optional[List[Resource]]: + # resolvers are not used, just passed because of previous methods shapes + if not isinstance(filters, dict): + raise NotImplementedError('Currently only the use of a dictionary as a filter is implemented') + searchendpoint = params.pop('searchendpoint', None) + if searchendpoint: + if self.service.searchendpoints is None: + raise ConfigurationError("No searchendpoints were given " + "in the initial configuration.") + try: + endpoint = self.service.searchendpoints[searchendpoint]['endpoint'] + except KeyError: + raise ConfigurationError(f"The {searchendpoint} searchpoint was not given " + "in the initial configuration.") + else: + endpoint = self.service.endpoint + # Combine the two dictionaries + for flr in filters: + params.update(flr) + return self.service.resources_from_request(endpoint, self.service.headers, + self.service.response_location, **params) + + def _sparql(self, query: str) -> Optional[Union[List[Resource], Resource]]: + raise not_supported() + + def elastic( + self, query: str, debug: bool, limit: int = None, offset: int = None, **params + ) -> Optional[Union[List[Resource], Resource]]: + raise not_supported() + + def rewrite_uri(self, uri: str, context: Context, **kwargs) -> str: + raise not_supported() + + def health(self) -> Dict: + if self.health_endpoint: + try: + response = requests.get(self.health_endpoint) + except Exception as error: + if isinstance(error, SSLError): + response = requests.get(self.health_endpoint, verify=False) + catch_http_error( + response, requests.HTTPError, + error_message_formatter=lambda e: + f"Health check failed: {error_message(e)}" + ) + return response.json() + else: + raise ConfigurationError(f"Health check failed: {error_message(error)}") + else: + raise ConfigurationError("Health information not found with given configuration. " + "Define health in configuration arguments or set _health.") + + def _initialize_service(self, endpoint: str, + token: Optional[str], + searchendpoints: Optional[Dict], + **params + ) -> WebService: + requests_params = params.pop("request_params") + # split the parameters before initializing the service + content_type = requests_params.get("content_type", None) + if not content_type: + raise ConfigurationError("Content type not specified in request_params: " + f"{requests_params}") + accept = requests_params.get("accept", None) + response_location = requests_params.get("response_location", None) + files_download = requests_params.get("files_download", None) + return WebService(endpoint, content_type, + accept, response_location, + files_download, searchendpoints, **params) diff --git a/tests/specializations/stores/test_sparql.py b/tests/specializations/stores/test_sparql.py index 1059f1c02..0426ab8d9 100644 --- a/tests/specializations/stores/test_sparql.py +++ b/tests/specializations/stores/test_sparql.py @@ -12,11 +12,13 @@ # You should have received a copy of the GNU Lesser General Public License # along with Blue Brain Nexus Forge. If not, see . +from typing import Any import pytest from utils import full_path_relative_to_root from kgforge.specializations.models.rdf_model import RdfModel from kgforge.specializations.stores.sparql_store import SPARQLStore +from kgforge.core.commons.exceptions import NotSupportedError SEARCH_ENDPOINT = {"sparql": {"endpoint": "http://dbpedia.org/sparql"}} @@ -58,3 +60,8 @@ def test_config(sparql_store, rdf_model): def test_search_params(sparql_store): with pytest.raises(AttributeError): sparql_store.search(resolvers=[None], filters=[None]) + + +def test_elastic_not_supported(sparql_store: Any): + with pytest.raises(NotSupportedError): + sparql_store.elastic(query=None, debug=False) diff --git a/tests/specializations/stores/test_web_service.py b/tests/specializations/stores/test_web_service.py new file mode 100644 index 000000000..2c6aeebaf --- /dev/null +++ b/tests/specializations/stores/test_web_service.py @@ -0,0 +1,112 @@ +# +# Blue Brain Nexus Forge is free software: you can redistribute it and/or modify +# it under the terms of the GNU Lesser General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Blue Brain Nexus Forge is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser +# General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with Blue Brain Nexus Forge. If not, see . + +from typing import Any +import pytest + +from utils import full_path_relative_to_root +from kgforge.core.commons.exceptions import ConfigurationError, NotSupportedError +from kgforge.specializations.models.rdf_model import RdfModel +from kgforge.specializations.stores.web_service_store import WebServiceStore + +ENDPOINT = "http://neuromorpho.org/api/neuron" + + +@pytest.fixture +def rdf_model(): + model = RdfModel( + origin='directory', + source=full_path_relative_to_root("examples/mappings/NeuroMorpho"), + context={ + 'iri': full_path_relative_to_root("examples/mappings/NeuroMorpho/jsonld_context.json") + } + ) + return model + + +def test_config_searchendpoints(rdf_model: RdfModel): + with pytest.raises(ConfigurationError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*"}, + searchendpoints={"elastic": None} + ) + + +def test_config_file_downloads_content_type(rdf_model: RdfModel): + with pytest.raises(ConfigurationError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Accept": "application/json"}}, + ) + + +def test_config_file_downloads_accept(rdf_model: RdfModel): + with pytest.raises(ConfigurationError): + WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Content-Type": "application/json"}}, + ) + + +@pytest.fixture +def web_service_store(rdf_model: RdfModel): + return WebServiceStore( + model=rdf_model, + endpoint=ENDPOINT, + request_params={ + "content_type":"application/json", + "accept":"*/*", + "files_download": {"Content-Type": "application/json", + "Accept": "*/*"} + }, + health_endpoint="https://mynotreal.com/health" + ) + + +def test_config(web_service_store: Any, rdf_model: RdfModel): + assert web_service_store.model == rdf_model + assert web_service_store.service.endpoint + assert web_service_store.model.context() == rdf_model.context() + + +def test_health_not_valid(web_service_store): + with pytest.raises(ConfigurationError): + web_service_store.health() + + +def test_sparql_not_implemented(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.sparql(query="SELECT * WHERE { ?s ?p ?o }") + + +def test_elastic_not_supported(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.elastic(query=None, debug=False) + + +def test_retrieve_not_supported(web_service_store: Any): + with pytest.raises(NotSupportedError): + web_service_store.retrieve(id=None, version=None, cross_bucket=False)