Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch to Server Sent Events based Querying #67

Merged
merged 3 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ TODO
# Deployment Files
.env
docker-compose.override.yml
*.priv.pem
9 changes: 3 additions & 6 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ services:
- "default"

spot:
# NOTE: This will be replaced by https://github.com/samply/spot soon.
image: docker.verbis.dkfz.de/ccp-private/central-spot
image: samply/rustyspot:main
ports:
- 8080:8080
environment:
BEAM_SECRET: "${LOCAL_BEAM_SECRET}"
BEAM_URL: http://beam-proxy:8081
BEAM_PROXY_ID: ${LOCAL_BEAM_ID}
BEAM_BROKER_ID: ${BROKER_HOST}
BEAM_APP_ID: "focus"
BEAM_PROXY_URL: http://beam-proxy:8081
BEAM_APP_ID: "focus.${LOCAL_BEAM_ID}.${BROKER_HOST}"
depends_on:
- "beam-proxy"
profiles:
Expand Down
66 changes: 30 additions & 36 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,36 @@ services:
- /var/run/docker.sock:/var/run/docker.sock:ro

traefik-forward-auth:
image: thomseddon/traefik-forward-auth:2
image: quay.io/oauth2-proxy/oauth2-proxy:latest
environment:
- http_proxy=${http_proxy}
- https_proxy=${https_proxy}
- DEFAULT_PROVIDER=oidc
# TODO: https://login.bbmri-eric.eu/oidc/
- PROVIDERS_OIDC_ISSUER_URL=https://git.verbis.dkfz.de
- PROVIDERS_OIDC_CLIENT_ID=${OAUTH_CLIENT_ID}
- PROVIDERS_OIDC_CLIENT_SECRET=${OAUTH_CLIENT_SECRET}
- SECRET=${AUTHENTICATION_SECRET}
- COOKIE_DOMAIN=${GUI_HOST}
- OAUTH2_PROXY_PROVIDER=oidc
- OAUTH2_PROXY_SKIP_PROVIDER_BUTTON=true
- OAUTH2_PROXY_OIDC_ISSUER_URL=${OAUTH_ISSUER_URL}
- OAUTH2_PROXY_CLIENT_ID=${OAUTH_CLIENT_ID}
- OAUTH2_PROXY_CLIENT_SECRET=${OAUTH_CLIENT_SECRET}
- OAUTH2_PROXY_COOKIE_SECRET=${AUTHENTICATION_SECRET}
- OAUTH2_PROXY_COOKIE_DOMAINS=.${GUI_HOST}
- OAUTH2_PROXY_HTTP_ADDRESS=:4180
- OAUTH2_PROXY_REVERSE_PROXY=true
- OAUTH2_PROXY_WHITELIST_DOMAINS=.${GUI_HOST}
- OAUTH2_PROXY_UPSTREAMS=static://202
- OAUTH2_PROXY_EMAIL_DOMAINS=*
- OAUTH2_PROXY_ALLOWED_GROUPS=${ALLOWED_GROUPS}
# For some reason, login.verbis.dkfz.de does not have a "groups" scope but this comes automatically through a
# scope called microprofile-jwt. Remove the following line once we have a "groups" scope.
- OAUTH2_PROXY_SCOPE=openid profile email
labels:
- "traefik.enable=true"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.address=http://traefik-forward-auth:4181"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.address=http://traefik-forward-auth:4180"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.authResponseHeaders=X-Forwarded-User"
- "traefik.http.services.traefik-forward-auth.loadbalancer.server.port=4181"
- "traefik.http.services.traefik-forward-auth.loadbalancer.server.port=4180"
- "traefik.http.routers.oauth2.rule=Host(`${GUI_HOST}`) && PathPrefix(`/oauth2/`)"
- "traefik.http.routers.oauth2.tls=true"

lens-web-components:
image: lens-web-components
image: samply/lens:main
build: .
labels:
- "traefik.enable=true"
Expand All @@ -46,14 +57,12 @@ services:
- "traefik.http.routers.lens.middlewares=traefik-forward-auth"

spot:
image: docker.verbis.dkfz.de/ccp-private/central-spot
image: samply/rustyspot:main
environment:
BEAM_SECRET: "${LOCAL_BEAM_SECRET}"
BEAM_URL: http://beam-proxy:8081
BEAM_PROXY_ID: ${LOCAL_BEAM_ID}
BEAM_BROKER_ID: ${BROKER_HOST}
# TODO: Implement Switch between spot and focus
BEAM_APP_ID: "spot"
BEAM_PROXY_URL: http://beam-proxy:8081
BEAM_APP_ID: "focus.${LOCAL_BEAM_ID}.${BROKER_HOST}"
CORS_ORIGIN: "https://${GUI_HOST}"
depends_on:
- "beam-proxy"
labels:
Expand All @@ -63,38 +72,23 @@ services:
- "traefik.http.middlewares.corsheaders.headers.accesscontrolalloworiginlist=https://${GUI_HOST}"
- "traefik.http.middlewares.corsheaders.headers.accesscontrolallowcredentials=true"
- "traefik.http.middlewares.corsheaders.headers.accesscontrolmaxage=-1"
- "traefik.http.routers.spot.rule=Host(`backend.${GUI_HOST}`)"
- "traefik.http.routers.spot.rule=Host(`backend.${GUI_HOST}`) && PathPrefix(`/prod`)"
- "traefik.http.middlewares.stripprefix_spot_prod.stripprefix.prefixes=/prod"
- "traefik.http.routers.spot.tls=true"
- "traefik.http.routers.spot.middlewares=corsheaders,traefik-forward-auth"
- "traefik.http.routers.spot.middlewares=corsheaders,traefik-forward-auth,stripprefix_spot_prod"

beam-proxy:
image: docker.verbis.dkfz.de/cache/samply/beam-proxy:develop
environment:
BROKER_URL: https://${BROKER_HOST}
PROXY_ID: ${LOCAL_BEAM_ID}.${BROKER_HOST}
# TODO: Same for focus here
APP_spot_KEY: ${LOCAL_BEAM_SECRET}
APP_focus_KEY: ${LOCAL_BEAM_SECRET}
PRIVKEY_FILE: /run/secrets/proxy.pem
ALL_PROXY: ${http_proxy}
secrets:
- proxy.pem
- root.crt.pem

## Only use this for test purposes
blaze:
image: samply/blaze:develop
ports:
- "8082:8080"
profiles: ["development"]

test-data-loader:
image: samply/test-data-loader
environment:
FHIR_STORE_URL: "http://blaze:8080/fhir"
PATIENT_COUNT: "2000"
command: sh -c "sleep 60 && /app/run.sh"
profiles: ["development"]

secrets:
proxy.pem:
# TODO: Key in BBMRI was directly stored in lens directory!
Expand Down
6 changes: 4 additions & 2 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# Docker Compose will now respect the values set in this file for variable substitution

# Applications DNS Name, decide wether for prod or test
GUI_HOST="data.dktk.dkfz.de|demo.lens.samply.de";
GUI_HOST="data.dktk.dkfz.de|demo.lens.samply.de"

# Beam Configuration
# Read more about samply.beam at https://github.com/samply/beam
BROKER_HOST="broker.ccp-it.dktk.dkfz.de "
BROKER_HOST="broker.ccp-it.dktk.dkfz.de"
LOCAL_BEAM_ID="your-proxy-id"
LOCAL_BEAM_SECRET="insert-a-random-passphrase-here"

Expand All @@ -17,3 +17,5 @@ OAUTH_ISSUER_URL="the-discovery-adress-of-your-oauth-provider"
OAUTH_CLIENT_ID="your-oauth-client-id"
OAUTH_CLIENT_SECRET="your-oauth-client-id"
AUTHENTICATION_SECRET="insert-a-random-passphrase-here"

ALLOWED_GROUPS="SPACE SEPARATED LIST OF GROUPS"
115 changes: 46 additions & 69 deletions packages/lib/src/classes/spot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import { responseStore } from "../stores/response";
import type { ResponseStore } from "../types/backend";

import type { Site, SiteData, Status } from "../types/response";
import type { SiteData, Status } from "../types/response";

type BeamResult = {
body: string;
Expand All @@ -16,18 +16,17 @@ type BeamResult = {
to: string[];
};

/**
* Implements requests to multiple targets through the middleware spot (see: https://github.com/samply/spot).
* The responses are received via Server Sent Events
*/
export class Spot {
private storeCache!: ResponseStore;
private currentTask!: string;

constructor(
private url: URL,
private sites: Array<string>,
) {
responseStore.subscribe(
(store: ResponseStore) => (this.storeCache = store),
);
}
) {}

/**
* sends the query to beam and updates the store with the results
Expand All @@ -36,13 +35,21 @@ export class Spot {
*/
async send(query: string, controller?: AbortController): Promise<void> {
try {
this.currentTask = crypto.randomUUID();
const beamTaskResponse = await fetch(
`${this.url}tasks?sites=${this.sites.toString()}`,
`${this.url}beam?sites=${this.sites.toString()}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
credentials: import.meta.env.PROD ? "include" : "omit",
body: query,
signal: controller?.signal,
body: JSON.stringify({
id: this.currentTask,
sites: this.sites,
query: query,
}),
signal: controller.signal,
},
);
if (!beamTaskResponse.ok) {
Expand All @@ -52,71 +59,41 @@ export class Spot {
);
throw new Error(`Unable to create new beam task.`);
}
this.currentTask = (await beamTaskResponse.json()).id;

let responseCount: number = 0;

do {
const beamResponses: Response = await fetch(
`${this.url}tasks/${this.currentTask}?wait_count=${responseCount + 1}`,
{
credentials: import.meta.env.PROD ? "include" : "omit",
signal: controller?.signal,
},
);

if (!beamResponses.ok) {
const error: string = await beamResponses.text();
console.debug(
`Received ${beamResponses.status} with message ${error}`,
);
throw new Error(
`Error then retrieving responses from Beam. Abborting requests ...`,
);
}

const beamResponseData: Array<BeamResult> =
await beamResponses.json();
console.info(`Created new Beam Task with id ${this.currentTask}`);

const changes = new Map<string, Site>();
beamResponseData.forEach((response: BeamResult) => {
if (response.task !== this.currentTask) return;
const site: string = response.from.split(".")[1];
const status: Status = response.status;
const body: SiteData =
status === "succeeded"
? JSON.parse(atob(response.body))
: null;

// if the site is already in the store and the status is claimed, don't update the store
if (this.storeCache.get(site)?.status === status) return;
const eventSource = new EventSource(
`${this.url.toString()}beam/${this.currentTask}?wait_count=${this.sites.length}`,
);
eventSource.addEventListener("new_result", (message) => {
const response: BeamResult = JSON.parse(message.data);
if (response.task !== this.currentTask) return;
const site: string = response.from.split(".")[1];
const status: Status = response.status;
const body: SiteData =
status === "succeeded"
? JSON.parse(atob(response.body))
: null;

changes.set(site, { status: status, data: body });
responseStore.update((store: ResponseStore): ResponseStore => {
store.set(site, { status: status, data: body });
return store;
});
if (changes.size > 0) {
responseStore.update(
(store: ResponseStore): ResponseStore => {
changes.forEach((value, key) => {
store.set(key, value);
});
return store;
},
);
}
});

responseCount = beamResponseData.length;
const realResponseCount = beamResponseData.filter(
(response) => response.status !== "claimed",
).length;
// read error events from beam
eventSource.addEventListener("error", (message) => {
console.error(`Beam returned error ${message}`);
eventSource.close();
});

if (
(beamResponses.status !== 200 &&
beamResponses.status !== 206) ||
realResponseCount === this.sites.length
) {
break;
}
} while (true);
// event source in javascript throws an error then the event source is closed by backend
eventSource.onerror = () => {
patrickskowronekdkfz marked this conversation as resolved.
Show resolved Hide resolved
console.info(
`Querying results from sites for task ${this.currentTask} finished.`,
);
eventSource.close();
};
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
console.log(`Aborting request ${this.currentTask}`);
Expand Down
Loading