Skip to content

Commit

Permalink
Separate prov assets; improve fanout; speed up pipeline a bit
Browse files Browse the repository at this point in the history
  • Loading branch information
C-Loftus committed Feb 5, 2025
1 parent 35559d1 commit c2e01a2
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 47 deletions.
18 changes: 0 additions & 18 deletions Docker/Docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,6 @@ services:
networks:
- dagster_network

# Creates buckets for MinIO
createbuckets:
image: minio/mc
depends_on:
- minio
entrypoint: >
/bin/sh -c "
sleep 10;
/usr/bin/mc alias set myminio http://minio:9000 minio_access_key minio_secret_key;
/usr/bin/mc mb myminio/gleanerbucket;
/usr/bin/mc anonymous set public myminio/gleanerbucket;
sleep infinity;
"
networks:
- dagster_network
profiles:
- localInfra

# GraphDB service for storage
graphdb:
image: khaller/graphdb-free
Expand Down
3 changes: 3 additions & 0 deletions userCode/lib/classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ def __init__(self):
secret_key=GLEANER_MINIO_SECRET_KEY,
)

if not self.client.bucket_exists(GLEANER_MINIO_BUCKET):
self.client.make_bucket(GLEANER_MINIO_BUCKET)

def load(self, data: Any, remote_path: str):
"""Load arbitrary data into s3 bucket"""
f = io.BytesIO()
Expand Down
69 changes: 42 additions & 27 deletions userCode/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import docker
import dagster_slack
import docker.errors
from threading import Thread
import requests
import yaml

Expand Down Expand Up @@ -195,9 +196,9 @@ def gleaner_config(context: AssetExecutionContext):
sources = []
names: set[str] = set()

assert (
len(Lines) > 0
), f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}"
assert len(Lines) > 0, (
f"No sitemaps found in sitemap index {REMOTE_GLEANER_SITEMAP}"
)

for line in Lines:
basename = REMOTE_GLEANER_SITEMAP.removesuffix(".xml")
Expand Down Expand Up @@ -293,16 +294,21 @@ async def main(urls):
)


@asset(deps=[gleaner_config, nabu_config, rclone_config])
@asset()
def docker_client_environment():
"""Set up dagster by pulling both the gleaner and nabu images and moving the config files into docker configs"""
get_dagster_logger().info("Initializing docker client and pulling images: ")
client = docker.DockerClient()

get_dagster_logger().info(f"Pulling {GLEANER_IMAGE} and {NABU_IMAGE}")
# check if the docker socket is available
client.images.pull(GLEANER_IMAGE)
client.images.pull(NABU_IMAGE)

# Use threading since pull is not async and we want to do both at the same time
gleaner_thread = Thread(target=client.images.pull, args=(GLEANER_IMAGE,))
nabu_thread = Thread(target=client.images.pull, args=(NABU_IMAGE,))
gleaner_thread.start()
nabu_thread.start()
gleaner_thread.join()
nabu_thread.join()


@asset_check(asset=docker_client_environment)
Expand All @@ -327,20 +333,22 @@ def can_contact_headless():
)


@asset(partitions_def=sources_partitions_def, deps=[docker_client_environment])
@asset(
partitions_def=sources_partitions_def,
deps=[docker_client_environment, gleaner_config, nabu_config],
)
def gleaner(context: OpExecutionContext):
"""Get the jsonld for each site in the gleaner config"""
source = context.partition_key
ARGS = ["--cfg", "gleanerconfig.yaml", "--source", source, "--rude", "--setup"]
ARGS = ["--cfg", "gleanerconfig.yaml", "--source", source, "--rude"]

returned_value = run_scheduler_docker_image(
run_scheduler_docker_image(
source,
GLEANER_IMAGE,
ARGS,
"gleaner",
volumeMapping=["/tmp/geoconnex/gleanerconfig.yaml:/app/gleanerconfig.yaml"],
)
get_dagster_logger().info(f"Gleaner returned value: '{returned_value}'")


@asset(partitions_def=sources_partitions_def, deps=[gleaner])
Expand Down Expand Up @@ -515,7 +523,7 @@ def nabu_orgs(context: OpExecutionContext):

@asset(
partitions_def=sources_partitions_def,
deps=[nabu_orgs, nabu_prov_object, nabu_prune],
deps=[nabu_orgs, nabu_prune],
)
def finished_individual_crawl(context: OpExecutionContext):
"""Dummy asset signifying the geoconnex crawl is completed once the orgs and prov nq files are in the graphdb and the graph is synced with the s3 bucket"""
Expand All @@ -538,7 +546,10 @@ def export_graph_as_nquads(context: OpExecutionContext) -> Optional[str]:
f"{base_url}/repositories/{GLEANERIO_DATAGRAPH_ENDPOINT}/statements?infer=false"
)

# Send the POST request to export the data
get_dagster_logger().info(
f"Exporting graphdb to nquads; fetching data from {endpoint}"
)
# Download the nq export
response = requests.get(
endpoint,
headers={
Expand All @@ -553,8 +564,9 @@ def export_graph_as_nquads(context: OpExecutionContext) -> Optional[str]:
f.write(response.content)
get_dagster_logger().info("Export of graphdb to nquads successful")
else:
get_dagster_logger().error(f"Response: {response.text}")
raise RuntimeError(f"Export failed, status code: {response.status_code}")
raise RuntimeError(
f"Export failed, status code: {response.status_code} with response {response.text}"
)

s3_client = S3()
filename = f"backups/nquads_{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
Expand Down Expand Up @@ -610,23 +622,26 @@ def nquads_to_renci(
def crawl_entire_graph_schedule(context: ScheduleEvaluationContext):
get_dagster_logger().info("Schedule triggered.")

get_dagster_logger().info("Deleting old partition status before new crawl")
filter_partitions(context.instance, "sources_partitions_def", keys_to_keep=set())

result = materialize([gleaner_config], instance=context.instance)
if not result.success:
raise Exception(f"Failed to materialize gleaner_config!: {result}")

partition_keys = context.instance.get_dynamic_partitions("sources_partitions_def")
get_dagster_logger().info(f"Partition keys: {partition_keys}")

if partition_keys:
for partition_key in partition_keys:
yield RunRequest(
job_name="harvest_source",
run_key="havest_weekly",
partition_key=partition_key,
tags={"run_type": "harvest_weekly"},
)
else:
RuntimeError("No partition keys found.")
get_dagster_logger().info(f"Found partition keys: {partition_keys}")

if not partition_keys:
raise Exception("No partition keys found after materializing gleaner_config!")

for partition_key in partition_keys:
yield RunRequest(
job_name="harvest_source",
run_key="havest_weekly",
partition_key=partition_key,
tags={"run_type": "harvest_weekly"},
)


# expose all the code needed for our dagster repo
Expand Down
4 changes: 2 additions & 2 deletions userCode/templates/nabuconfig.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ sparql:
authenticate: false
username: ""
password: ""
repository: iow
repository: {{ GLEANERIO_DATAGRAPH_ENDPOINT }}
prefixes:
summoned/providera
- summoned/providera
- prov/providera
- orgs

0 comments on commit c2e01a2

Please sign in to comment.