Skip to content

Commit

Permalink
General speedups for the pipeline; Fix functionality for export (#90)
Browse files Browse the repository at this point in the history
- the prov graph and data graph are separate and thus don't need to have
a dependency in the scheduler pipeline
- the final export just exports the data graph not the `prov` graph so
we don't need to wait on `prov` for exports
- schedule clears the partition status when it is ran, that way the
export won't be ran again until everything completes.
- fairly sure this gives the behavior we want; namely that graphs are
only generated whenever the schedule requests a new crawl; not if we
just have a one off asset completion.
- get rid of a compose container to make the buckets and just put this
behavior directly in python
- we can download the gleaner and nabu images in parallel with threading

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Benjamin Webb <[email protected]>
  • Loading branch information
3 people authored Feb 5, 2025
1 parent c4168db commit 4fb73de
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 58 deletions.
18 changes: 0 additions & 18 deletions Docker/Docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,6 @@ services:
networks:
- dagster_network

# Creates buckets for MinIO
createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 10;
/usr/bin/mc alias set myminio http://minio:9000 minio_access_key minio_secret_key;
/usr/bin/mc mb myminio/gleanerbucket;
/usr/bin/mc anonymous set public myminio/gleanerbucket;
sleep infinity;
"
networks:
- dagster_network
profiles:
- localInfra

# GraphDB service for storage
graphdb:
image: khaller/graphdb-free
Expand Down
3 changes: 3 additions & 0 deletions userCode/lib/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ def __init__(self):
secret_key=GLEANER_MINIO_SECRET_KEY,
)

if not self.client.bucket_exists(GLEANER_MINIO_BUCKET):
self.client.make_bucket(GLEANER_MINIO_BUCKET)

def load(self, data: Any, remote_path: str):
"""Load arbitrary data into s3 bucket"""
f = io.BytesIO()
Expand Down
91 changes: 53 additions & 38 deletions userCode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import platform
import shutil
import subprocess
from typing import Optional, Tuple
from typing import Optional
import zipfile
from aiohttp import ClientSession, ClientTimeout
from aiohttp import ClientSession
from bs4 import BeautifulSoup
from dagster import (
AssetCheckResult,
Expand All @@ -33,6 +33,7 @@
import docker
import dagster_slack
import docker.errors
from threading import Thread
import requests
import yaml

Expand Down Expand Up @@ -265,20 +266,20 @@ def gleaner_links_are_valid():
config = s3_client.read("configs/gleanerconfig.yaml")
yaml_config = yaml.safe_load(config)

dead_links: list[dict[str, Tuple[int, str]]] = []
dead_links: list[dict[str, int]] = []

async def validate_url(url: str):
# Geoconnex links generally take at absolute max 8 seconds if it is very large sitemap
# If it is above 12 seconds that is a good signal that something is wrong
async with ClientSession(timeout=ClientTimeout(total=12)) as session:
resp = await session.get(url)

if resp.status != 200:
content = await resp.text()
get_dagster_logger().error(
f"URL {url} returned status code {resp.status} with content: {content}"
)
dead_links.append({url: (resp.status, content)})
async with ClientSession() as session:
# only request the headers of each geoconnex sitemap
# no reason to download all the content
async with session.head(url) as response:
if response.status == 200:
get_dagster_logger().debug(f"URL {url} exists.")
else:
get_dagster_logger().debug(
f"URL {url} returned status code {response.status}."
)
dead_links.append({url: response.status})

async def main(urls):
tasks = [validate_url(url) for url in urls]
Expand All @@ -296,16 +297,21 @@ async def main(urls):
)


@asset(deps=[gleaner_config, nabu_config, rclone_config])
@asset()
def docker_client_environment():
"""Set up dagster by pulling both the gleaner and nabu images and moving the config files into docker configs"""
get_dagster_logger().info("Initializing docker client and pulling images: ")
client = docker.DockerClient()

get_dagster_logger().info(f"Pulling {GLEANER_IMAGE} and {NABU_IMAGE}")
# check if the docker socket is available
client.images.pull(GLEANER_IMAGE)
client.images.pull(NABU_IMAGE)

# Use threading since pull is not async and we want to do both at the same time
gleaner_thread = Thread(target=client.images.pull, args=(GLEANER_IMAGE,))
nabu_thread = Thread(target=client.images.pull, args=(NABU_IMAGE,))
gleaner_thread.start()
nabu_thread.start()
gleaner_thread.join()
nabu_thread.join()


@asset_check(asset=docker_client_environment)
Expand All @@ -330,20 +336,22 @@ def can_contact_headless():
)


@asset(partitions_def=sources_partitions_def, deps=[docker_client_environment])
@asset(
partitions_def=sources_partitions_def,
deps=[docker_client_environment, gleaner_config, nabu_config],
)
def gleaner(context: OpExecutionContext):
"""Get the jsonld for each site in the gleaner config"""
source = context.partition_key
ARGS = ["--cfg", "gleanerconfig.yaml", "--source", source, "--rude", "--setup"]
ARGS = ["--cfg", "gleanerconfig.yaml", "--source", source, "--rude"]

returned_value = run_scheduler_docker_image(
run_scheduler_docker_image(
source,
GLEANER_IMAGE,
ARGS,
"gleaner",
volumeMapping=["/tmp/geoconnex/gleanerconfig.yaml:/app/gleanerconfig.yaml"],
)
get_dagster_logger().info(f"Gleaner returned value: '{returned_value}'")


@asset(partitions_def=sources_partitions_def, deps=[gleaner])
Expand Down Expand Up @@ -518,7 +526,7 @@ def nabu_orgs(context: OpExecutionContext):

@asset(
partitions_def=sources_partitions_def,
deps=[nabu_orgs, nabu_prov_object, nabu_prune],
deps=[nabu_orgs, nabu_prune],
)
def finished_individual_crawl(context: OpExecutionContext):
"""Dummy asset signifying the geoconnex crawl is completed once the orgs and prov nq files are in the graphdb and the graph is synced with the s3 bucket"""
Expand All @@ -541,7 +549,10 @@ def export_graph_as_nquads(context: OpExecutionContext) -> Optional[str]:
f"{base_url}/repositories/{GLEANERIO_DATAGRAPH_ENDPOINT}/statements?infer=false"
)

# Send the POST request to export the data
get_dagster_logger().info(
f"Exporting graphdb to nquads; fetching data from {endpoint}"
)
# Download the nq export
response = requests.get(
endpoint,
headers={
Expand All @@ -556,8 +567,9 @@ def export_graph_as_nquads(context: OpExecutionContext) -> Optional[str]:
f.write(response.content)
get_dagster_logger().info("Export of graphdb to nquads successful")
else:
get_dagster_logger().error(f"Response: {response.text}")
raise RuntimeError(f"Export failed, status code: {response.status_code}")
raise RuntimeError(
f"Export failed, status code: {response.status_code} with response {response.text}"
)

s3_client = S3()
filename = f"backups/nquads_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
Expand Down Expand Up @@ -613,23 +625,26 @@ def nquads_to_renci(
def crawl_entire_graph_schedule(context: ScheduleEvaluationContext):
get_dagster_logger().info("Schedule triggered.")

get_dagster_logger().info("Deleting old partition status before new crawl")
filter_partitions(context.instance, "sources_partitions_def", keys_to_keep=set())

result = materialize([gleaner_config], instance=context.instance)
if not result.success:
raise Exception(f"Failed to materialize gleaner_config!: {result}")

partition_keys = context.instance.get_dynamic_partitions("sources_partitions_def")
get_dagster_logger().info(f"Partition keys: {partition_keys}")

if partition_keys:
for partition_key in partition_keys:
yield RunRequest(
job_name="harvest_source",
run_key="havest_weekly",
partition_key=partition_key,
tags={"run_type": "harvest_weekly"},
)
else:
RuntimeError("No partition keys found.")
get_dagster_logger().info(f"Found partition keys: {partition_keys}")

if not partition_keys:
raise Exception("No partition keys found after materializing gleaner_config!")

for partition_key in partition_keys:
yield RunRequest(
job_name="harvest_source",
run_key="havest_weekly",
partition_key=partition_key,
tags={"run_type": "harvest_weekly"},
)


# expose all the code needed for our dagster repo
Expand Down
4 changes: 2 additions & 2 deletions userCode/templates/nabuconfig.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ sparql:
authenticate: false
username: ""
password: ""
repository: iow
repository: {{ GLEANERIO_DATAGRAPH_ENDPOINT }}
prefixes:
summoned/providera
- summoned/providera
- prov/providera
- orgs

0 comments on commit 4fb73de

Please sign in to comment.