Skip to content

Commit

Permalink
Merge pull request #67 from samply/feature/switchToSSE
Browse files Browse the repository at this point in the history
Switch to Server Sent Events based Querying
  • Loading branch information
torbrenner authored Mar 20, 2024
2 parents d3a002d + 5d3cdea commit 3bc4167
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 113 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ TODO
# Deployment Files
.env
docker-compose.override.yml
*.priv.pem
9 changes: 3 additions & 6 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ services:
- "default"

spot:
# NOTE: This will be replaced by https://github.com/samply/spot soon.
image: docker.verbis.dkfz.de/ccp-private/central-spot
image: samply/rustyspot:main
ports:
- 8080:8080
environment:
BEAM_SECRET: "${LOCAL_BEAM_SECRET}"
BEAM_URL: http://beam-proxy:8081
BEAM_PROXY_ID: ${LOCAL_BEAM_ID}
BEAM_BROKER_ID: ${BROKER_HOST}
BEAM_APP_ID: "focus"
BEAM_PROXY_URL: http://beam-proxy:8081
BEAM_APP_ID: "focus.${LOCAL_BEAM_ID}.${BROKER_HOST}"
depends_on:
- "beam-proxy"
profiles:
Expand Down
66 changes: 30 additions & 36 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,36 @@ services:
- /var/run/docker.sock:/var/run/docker.sock:ro

traefik-forward-auth:
image: thomseddon/traefik-forward-auth:2
image: quay.io/oauth2-proxy/oauth2-proxy:latest
environment:
- http_proxy=${http_proxy}
- https_proxy=${https_proxy}
- DEFAULT_PROVIDER=oidc
# TODO: https://login.bbmri-eric.eu/oidc/
- PROVIDERS_OIDC_ISSUER_URL=https://git.verbis.dkfz.de
- PROVIDERS_OIDC_CLIENT_ID=${OAUTH_CLIENT_ID}
- PROVIDERS_OIDC_CLIENT_SECRET=${OAUTH_CLIENT_SECRET}
- SECRET=${AUTHENTICATION_SECRET}
- COOKIE_DOMAIN=${GUI_HOST}
- OAUTH2_PROXY_PROVIDER=oidc
- OAUTH2_PROXY_SKIP_PROVIDER_BUTTON=true
- OAUTH2_PROXY_OIDC_ISSUER_URL=${OAUTH_ISSUER_URL}
- OAUTH2_PROXY_CLIENT_ID=${OAUTH_CLIENT_ID}
- OAUTH2_PROXY_CLIENT_SECRET=${OAUTH_CLIENT_SECRET}
- OAUTH2_PROXY_COOKIE_SECRET=${AUTHENTICATION_SECRET}
- OAUTH2_PROXY_COOKIE_DOMAINS=.${GUI_HOST}
- OAUTH2_PROXY_HTTP_ADDRESS=:4180
- OAUTH2_PROXY_REVERSE_PROXY=true
- OAUTH2_PROXY_WHITELIST_DOMAINS=.${GUI_HOST}
- OAUTH2_PROXY_UPSTREAMS=static://202
- OAUTH2_PROXY_EMAIL_DOMAINS=*
- OAUTH2_PROXY_ALLOWED_GROUPS=${ALLOWED_GROUPS}
# For some reason, login.verbis.dkfz.de does not have a "groups" scope but this comes automatically through a
# scope called microprofile-jwt. Remove the following line once we have a "groups" scope.
- OAUTH2_PROXY_SCOPE=openid profile email
labels:
- "traefik.enable=true"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.address=http://traefik-forward-auth:4181"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.address=http://traefik-forward-auth:4180"
- "traefik.http.middlewares.traefik-forward-auth.forwardauth.authResponseHeaders=X-Forwarded-User"
- "traefik.http.services.traefik-forward-auth.loadbalancer.server.port=4181"
- "traefik.http.services.traefik-forward-auth.loadbalancer.server.port=4180"
- "traefik.http.routers.oauth2.rule=Host(`${GUI_HOST}`) && PathPrefix(`/oauth2/`)"
- "traefik.http.routers.oauth2.tls=true"

lens-web-components:
image: lens-web-components
image: samply/lens:main
build: .
labels:
- "traefik.enable=true"
Expand All @@ -46,14 +57,12 @@ services:
- "traefik.http.routers.lens.middlewares=traefik-forward-auth"

spot:
image: docker.verbis.dkfz.de/ccp-private/central-spot
image: samply/rustyspot:main
environment:
BEAM_SECRET: "${LOCAL_BEAM_SECRET}"
BEAM_URL: http://beam-proxy:8081
BEAM_PROXY_ID: ${LOCAL_BEAM_ID}
BEAM_BROKER_ID: ${BROKER_HOST}
# TODO: Implement Switch between spot and focus
BEAM_APP_ID: "spot"
BEAM_PROXY_URL: http://beam-proxy:8081
BEAM_APP_ID: "focus.${LOCAL_BEAM_ID}.${BROKER_HOST}"
CORS_ORIGIN: "https://${GUI_HOST}"
depends_on:
- "beam-proxy"
labels:
Expand All @@ -63,38 +72,23 @@ services:
- "traefik.http.middlewares.corsheaders.headers.accesscontrolalloworiginlist=https://${GUI_HOST}"
- "traefik.http.middlewares.corsheaders.headers.accesscontrolallowcredentials=true"
- "traefik.http.middlewares.corsheaders.headers.accesscontrolmaxage=-1"
- "traefik.http.routers.spot.rule=Host(`backend.${GUI_HOST}`)"
- "traefik.http.routers.spot.rule=Host(`backend.${GUI_HOST}`) && PathPrefix(`/prod`)"
- "traefik.http.middlewares.stripprefix_spot_prod.stripprefix.prefixes=/prod"
- "traefik.http.routers.spot.tls=true"
- "traefik.http.routers.spot.middlewares=corsheaders,traefik-forward-auth"
- "traefik.http.routers.spot.middlewares=corsheaders,traefik-forward-auth,stripprefix_spot_prod"

beam-proxy:
image: docker.verbis.dkfz.de/cache/samply/beam-proxy:develop
environment:
BROKER_URL: https://${BROKER_HOST}
PROXY_ID: ${LOCAL_BEAM_ID}.${BROKER_HOST}
# TODO: Same for focus here
APP_spot_KEY: ${LOCAL_BEAM_SECRET}
APP_focus_KEY: ${LOCAL_BEAM_SECRET}
PRIVKEY_FILE: /run/secrets/proxy.pem
ALL_PROXY: ${http_proxy}
secrets:
- proxy.pem
- root.crt.pem

## Only use this for test purposes
blaze:
image: samply/blaze:develop
ports:
- "8082:8080"
profiles: ["development"]

test-data-loader:
image: samply/test-data-loader
environment:
FHIR_STORE_URL: "http://blaze:8080/fhir"
PATIENT_COUNT: "2000"
command: sh -c "sleep 60 && /app/run.sh"
profiles: ["development"]

secrets:
proxy.pem:
# TODO: Key in BBMRI was directly stored in lens directory!
Expand Down
6 changes: 4 additions & 2 deletions example.env
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
# Docker Compose will now respect the values set in this file for variable substitution

# Applications DNS Name, decide wether for prod or test
GUI_HOST="data.dktk.dkfz.de|demo.lens.samply.de";
GUI_HOST="data.dktk.dkfz.de|demo.lens.samply.de"

# Beam Configuration
# Read more about samply.beam at https://github.com/samply/beam
BROKER_HOST="broker.ccp-it.dktk.dkfz.de "
BROKER_HOST="broker.ccp-it.dktk.dkfz.de"
LOCAL_BEAM_ID="your-proxy-id"
LOCAL_BEAM_SECRET="insert-a-random-passphrase-here"

Expand All @@ -17,3 +17,5 @@ OAUTH_ISSUER_URL="the-discovery-adress-of-your-oauth-provider"
OAUTH_CLIENT_ID="your-oauth-client-id"
OAUTH_CLIENT_SECRET="your-oauth-client-id"
AUTHENTICATION_SECRET="insert-a-random-passphrase-here"

ALLOWED_GROUPS="SPACE SEPARATED LIST OF GROUPS"
115 changes: 46 additions & 69 deletions packages/lib/src/classes/spot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import { responseStore } from "../stores/response";
import type { ResponseStore } from "../types/backend";

import type { Site, SiteData, Status } from "../types/response";
import type { SiteData, Status } from "../types/response";

type BeamResult = {
body: string;
Expand All @@ -16,18 +16,17 @@ type BeamResult = {
to: string[];
};

/**
* Implements requests to multiple targets through the middleware spot (see: https://github.com/samply/spot).
* The responses are received via Server Sent Events
*/
export class Spot {
private storeCache!: ResponseStore;
private currentTask!: string;

constructor(
private url: URL,
private sites: Array<string>,
) {
responseStore.subscribe(
(store: ResponseStore) => (this.storeCache = store),
);
}
) {}

/**
* sends the query to beam and updates the store with the results
Expand All @@ -36,13 +35,21 @@ export class Spot {
*/
async send(query: string, controller?: AbortController): Promise<void> {
try {
this.currentTask = crypto.randomUUID();
const beamTaskResponse = await fetch(
`${this.url}tasks?sites=${this.sites.toString()}`,
`${this.url}beam?sites=${this.sites.toString()}`,
{
method: "POST",
headers: {
"Content-Type": "application/json",
},
credentials: import.meta.env.PROD ? "include" : "omit",
body: query,
signal: controller?.signal,
body: JSON.stringify({
id: this.currentTask,
sites: this.sites,
query: query,
}),
signal: controller.signal,
},
);
if (!beamTaskResponse.ok) {
Expand All @@ -52,71 +59,41 @@ export class Spot {
);
throw new Error(`Unable to create new beam task.`);
}
this.currentTask = (await beamTaskResponse.json()).id;

let responseCount: number = 0;

do {
const beamResponses: Response = await fetch(
`${this.url}tasks/${this.currentTask}?wait_count=${responseCount + 1}`,
{
credentials: import.meta.env.PROD ? "include" : "omit",
signal: controller?.signal,
},
);

if (!beamResponses.ok) {
const error: string = await beamResponses.text();
console.debug(
`Received ${beamResponses.status} with message ${error}`,
);
throw new Error(
`Error then retrieving responses from Beam. Abborting requests ...`,
);
}

const beamResponseData: Array<BeamResult> =
await beamResponses.json();
console.info(`Created new Beam Task with id ${this.currentTask}`);

const changes = new Map<string, Site>();
beamResponseData.forEach((response: BeamResult) => {
if (response.task !== this.currentTask) return;
const site: string = response.from.split(".")[1];
const status: Status = response.status;
const body: SiteData =
status === "succeeded"
? JSON.parse(atob(response.body))
: null;

// if the site is already in the store and the status is claimed, don't update the store
if (this.storeCache.get(site)?.status === status) return;
const eventSource = new EventSource(
`${this.url.toString()}beam/${this.currentTask}?wait_count=${this.sites.length}`,
);
eventSource.addEventListener("new_result", (message) => {
const response: BeamResult = JSON.parse(message.data);
if (response.task !== this.currentTask) return;
const site: string = response.from.split(".")[1];
const status: Status = response.status;
const body: SiteData =
status === "succeeded"
? JSON.parse(atob(response.body))
: null;

changes.set(site, { status: status, data: body });
responseStore.update((store: ResponseStore): ResponseStore => {
store.set(site, { status: status, data: body });
return store;
});
if (changes.size > 0) {
responseStore.update(
(store: ResponseStore): ResponseStore => {
changes.forEach((value, key) => {
store.set(key, value);
});
return store;
},
);
}
});

responseCount = beamResponseData.length;
const realResponseCount = beamResponseData.filter(
(response) => response.status !== "claimed",
).length;
// read error events from beam
eventSource.addEventListener("error", (message) => {
console.error(`Beam returned error ${message}`);
eventSource.close();
});

if (
(beamResponses.status !== 200 &&
beamResponses.status !== 206) ||
realResponseCount === this.sites.length
) {
break;
}
} while (true);
// event source in javascript throws an error then the event source is closed by backend
eventSource.onerror = () => {
console.info(
`Querying results from sites for task ${this.currentTask} finished.`,
);
eventSource.close();
};
} catch (err) {
if (err instanceof Error && err.name === "AbortError") {
console.log(`Aborting request ${this.currentTask}`);
Expand Down

0 comments on commit 3bc4167

Please sign in to comment.