diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 00000000..13d18df7 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,30 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Attach to Process", + "type": "node", + "request": "attach", + "port": 9229, + "restart": true, + "timeout": 10000, + "skipFiles": ["/**"] + }, + { + "name": "Run crawler dev", + "type": "node", + "request": "launch", + "runtimeExecutable": "pnpm", + "runtimeArgs": ["dev"], + "cwd": "${workspaceFolder}/services/cron-jobs/crawler", + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen" + } + ], + "compounds": [ + { + "name": "Debug crawler dev", + "configurations": ["Run crawler dev", "Attach to Process"] + } + ] +} diff --git a/services/cron-jobs/crawler/package.json b/services/cron-jobs/crawler/package.json index 968983ef..abd81947 100644 --- a/services/cron-jobs/crawler/package.json +++ b/services/cron-jobs/crawler/package.json @@ -2,16 +2,16 @@ "name": "crawler", "version": "1.1.4", "description": "Kubernetes cron-job to collect data from various sources for bundestag.io", - "main": "index.ts", + "main": "build/main.js", "repository": "https://github.com/demokratie-live/democracy-development/", "author": "Manuel Ruck, Ulf Gebhardt, Robert Schäfer", "license": "MIT", "scripts": { - "dev": "tsx --env-file .env --env-file .env.local --watch src/import-procedures/index.ts", - "garden:dev": "tsx --watch src/import-procedures/index.ts", + "dev": "tsx --env-file .env --env-file .env.local --watch src/main.ts", + "garden:dev": "tsx --watch src/main.ts", "build": "tsup-node", "lint": "eslint .", - "start": "node ./build/index.js" + "start": "node ./build/main.js" }, "dependencies": { "@democracy-deutschland/bt-dip-sdk": "1.3.0-alpha.0", diff --git a/services/cron-jobs/crawler/src/axios.ts b/services/cron-jobs/crawler/src/axios.ts index a9c11fda..402beb56 100644 --- a/services/cron-jobs/crawler/src/axios.ts +++ b/services/cron-jobs/crawler/src/axios.ts @@ -1,37 +1,43 @@ -import axios from 'axios'; +import axios, { AxiosInstance, AxiosResponse, InternalAxiosRequestConfig } from 'axios'; const MAX_REQUESTS_COUNT = 1; const INTERVAL_MS = 1000; let PENDING_REQUESTS = 0; -// create new axios instance -const api = axios.create({}); -/** - * Axios Request Interceptor - */ -api.interceptors.request.use(function (config) { - return new Promise((resolve) => { - const interval = setInterval(() => { - if (PENDING_REQUESTS < MAX_REQUESTS_COUNT) { - PENDING_REQUESTS++; - clearInterval(interval); - resolve(config); - } - }, INTERVAL_MS); +function setupRequestInterceptor(apiInstance: AxiosInstance): void { + apiInstance.interceptors.request.use(function (config: InternalAxiosRequestConfig) { + return new Promise((resolve) => { + const interval = setInterval(() => { + if (PENDING_REQUESTS < MAX_REQUESTS_COUNT) { + PENDING_REQUESTS++; + clearInterval(interval); + resolve(config); + } + }, INTERVAL_MS); + }); }); -}); -/** - * Axios Response Interceptor - */ -api.interceptors.response.use( - function (response) { - PENDING_REQUESTS = Math.max(0, PENDING_REQUESTS - 1); - return Promise.resolve(response); - }, - function (error) { - PENDING_REQUESTS = Math.max(0, PENDING_REQUESTS - 1); - return Promise.reject(error); - }, -); +} + +function setupResponseInterceptor(apiInstance: AxiosInstance): void { + apiInstance.interceptors.response.use( + function (response: AxiosResponse) { + PENDING_REQUESTS = Math.max(0, PENDING_REQUESTS - 1); + return Promise.resolve(response); + }, + function (error: unknown) { + PENDING_REQUESTS = Math.max(0, PENDING_REQUESTS - 1); + return Promise.reject(error); + }, + ); +} + +const createAxiosInstance = (): AxiosInstance => { + const api = axios.create({}); + setupRequestInterceptor(api); + setupResponseInterceptor(api); + return api; +}; + +const api = createAxiosInstance(); export default api; diff --git a/services/cron-jobs/crawler/src/config.ts b/services/cron-jobs/crawler/src/config.ts index 520e017b..aebbb50c 100644 --- a/services/cron-jobs/crawler/src/config.ts +++ b/services/cron-jobs/crawler/src/config.ts @@ -1,27 +1,39 @@ -const { - DB_URL = 'mongodb://localhost:27017/bundestagio', - IMPORT_PROCEDURES_START_CURSOR = '*', - IMPORT_PROCEDURES_FILTER_BEFORE = new Date().toISOString().slice(0, 10), - IMPORT_PROCEDURES_FILTER_AFTER = new Date(Number(new Date()) - 1000 * 60 * 60 * 24 * 7 * 4) - .toISOString() - .slice(0, 10), -} = process.env; +const parseEnvVariables = () => { + const { + DB_URL = 'mongodb://localhost:27017/bundestagio', + IMPORT_PROCEDURES_START_CURSOR = '*', + IMPORT_PROCEDURES_FILTER_BEFORE = new Date().toISOString().slice(0, 10), + IMPORT_PROCEDURES_FILTER_AFTER = new Date(Number(new Date()) - 1000 * 60 * 60 * 24 * 7 * 4) + .toISOString() + .slice(0, 10), + } = process.env; -let { IMPORT_PROCEDURES_CHUNK_SIZE = 100, IMPORT_PROCEDURES_CHUNK_ROUNDS = 5 } = process.env; + let { IMPORT_PROCEDURES_CHUNK_SIZE = 100, IMPORT_PROCEDURES_CHUNK_ROUNDS = 5 } = process.env; -IMPORT_PROCEDURES_CHUNK_SIZE = Number(IMPORT_PROCEDURES_CHUNK_SIZE); -IMPORT_PROCEDURES_CHUNK_ROUNDS = Number(IMPORT_PROCEDURES_CHUNK_ROUNDS); -const IMPORT_PROCEDURES_FILTER_TYPES = process.env.IMPORT_PROCEDURES_FILTER_TYPES - ? process.env.IMPORT_PROCEDURES_FILTER_TYPES.split(',') - : undefined; + IMPORT_PROCEDURES_CHUNK_SIZE = Number(IMPORT_PROCEDURES_CHUNK_SIZE); + IMPORT_PROCEDURES_CHUNK_ROUNDS = Number(IMPORT_PROCEDURES_CHUNK_ROUNDS); + const IMPORT_PROCEDURES_FILTER_TYPES = process.env.IMPORT_PROCEDURES_FILTER_TYPES + ? process.env.IMPORT_PROCEDURES_FILTER_TYPES.split(',') + : undefined; -export const CONFIG = { - DIP_API_KEY: process.env.DIP_API_KEY || '', - DB_URL, - IMPORT_PROCEDURES_CHUNK_SIZE, - IMPORT_PROCEDURES_CHUNK_ROUNDS, - IMPORT_PROCEDURES_FILTER_BEFORE, - IMPORT_PROCEDURES_FILTER_AFTER, - IMPORT_PROCEDURES_FILTER_TYPES, - IMPORT_PROCEDURES_START_CURSOR, -} as const; + return { + DB_URL, + IMPORT_PROCEDURES_START_CURSOR, + IMPORT_PROCEDURES_FILTER_BEFORE, + IMPORT_PROCEDURES_FILTER_AFTER, + IMPORT_PROCEDURES_CHUNK_SIZE, + IMPORT_PROCEDURES_CHUNK_ROUNDS, + IMPORT_PROCEDURES_FILTER_TYPES, + }; +}; + +function getConfig() { + const envVariables = parseEnvVariables(); + + return { + DIP_API_KEY: process.env.DIP_API_KEY || '', + ...envVariables, + } as const; +} + +export const CONFIG = getConfig(); diff --git a/services/cron-jobs/crawler/src/cronJob.ts b/services/cron-jobs/crawler/src/cronJob.ts new file mode 100644 index 00000000..fd5551ef --- /dev/null +++ b/services/cron-jobs/crawler/src/cronJob.ts @@ -0,0 +1,25 @@ +import { getCron, setCronStart, setCronSuccess, ICronJob } from '@democracy-deutschland/bundestagio-common'; +import { Logger } from './logger'; +import { executeImportProcedures } from './importProceduresExecutor'; +import { CONFIG } from './config'; + +const CRON_JOB_NAME = 'import-procedures'; + +/** + * Handles the cron job execution. + * @param config - The configuration object. + * @param logger - The logger instance. + */ +export const handleCronJob = async (config: typeof CONFIG, logger: Logger): Promise => { + try { + logger.log('Handling cron job...'); + const cronjob: ICronJob = await getCron({ name: CRON_JOB_NAME }); + await setCronStart({ name: CRON_JOB_NAME }); + await executeImportProcedures(cronjob, config, logger); + await setCronSuccess({ name: CRON_JOB_NAME, successStartDate: cronjob.lastStartDate || new Date() }); + logger.log('Cron job handled successfully.'); + } catch (error) { + logger.error('Failed to handle cron job: ' + (error instanceof Error ? error.message : error)); + process.exit(1); + } +}; diff --git a/services/cron-jobs/crawler/src/database.ts b/services/cron-jobs/crawler/src/database.ts new file mode 100644 index 00000000..30fac2e7 --- /dev/null +++ b/services/cron-jobs/crawler/src/database.ts @@ -0,0 +1,13 @@ +import { mongoConnect } from '@democracy-deutschland/bundestagio-common'; +import { Logger } from './logger'; + +export const connectToDatabase = async (dbUrl: string, logger: Logger): Promise => { + try { + logger.log('Connecting to the database...'); + await mongoConnect(dbUrl); + logger.log('Database connection successful.'); + } catch (error) { + logger.error('Failed to connect to the database: ' + (error instanceof Error ? error.message : error)); + process.exit(1); + } +}; diff --git a/services/cron-jobs/crawler/src/import-procedures/import-procedures.ts b/services/cron-jobs/crawler/src/import-procedures/import-procedures.ts index 130f9693..79d82a5d 100644 --- a/services/cron-jobs/crawler/src/import-procedures/import-procedures.ts +++ b/services/cron-jobs/crawler/src/import-procedures/import-procedures.ts @@ -14,52 +14,167 @@ import axios from '../axios'; import { IProcedure } from '@democracy-deutschland/bundestagio-common/dist/models/Procedure/schema'; import { IDocument } from '@democracy-deutschland/bundestagio-common/dist/models/Procedure/Procedure/Document'; import { IProcessFlow } from '@democracy-deutschland/bundestagio-common/dist/models/Procedure/Procedure/ProcessFlow'; -import { germanDateFormat } from './utils'; +import { formatGermanDate } from './utils'; -const config = new Configuration({ - apiKey: `ApiKey ${CONFIG.DIP_API_KEY}`, // Replace #YOUR_API_KEY# with your api key -}); -const vorgangApi = new VorgngeApi(config, undefined, axios); -const vorgangspositionenApi = new VorgangspositionenApi(config, undefined, axios); +const createConfiguration = (apiKey: string) => { + return new Configuration({ + apiKey: `ApiKey ${apiKey}`, + }); +}; -export default async function importProcedures(config: typeof CONFIG): Promise { - const { - IMPORT_PROCEDURES_START_CURSOR, - IMPORT_PROCEDURES_CHUNK_SIZE, - IMPORT_PROCEDURES_CHUNK_ROUNDS, - IMPORT_PROCEDURES_FILTER_BEFORE, - IMPORT_PROCEDURES_FILTER_AFTER, - IMPORT_PROCEDURES_FILTER_TYPES, - } = config; +const config = createConfiguration(CONFIG.DIP_API_KEY); - const variables = { - cursor: IMPORT_PROCEDURES_START_CURSOR, - filter: { - after: IMPORT_PROCEDURES_FILTER_AFTER, - before: IMPORT_PROCEDURES_FILTER_BEFORE, - types: IMPORT_PROCEDURES_FILTER_TYPES, - }, - limit: IMPORT_PROCEDURES_CHUNK_SIZE, - }; +const createApiInstances = (config: Configuration) => { + const vorgangApi = new VorgngeApi(config, undefined, axios); + const vorgangspositionenApi = new VorgangspositionenApi(config, undefined, axios); + return { vorgangApi, vorgangspositionenApi }; +}; + +const { vorgangApi, vorgangspositionenApi } = createApiInstances(config); + +const logImportDetails = ( + rounds: number, + size: number, + filter: { after: string; before: string; types?: string[] }, +) => { log(` -------------------------------------- - Importing ${IMPORT_PROCEDURES_CHUNK_ROUNDS}*${IMPORT_PROCEDURES_CHUNK_SIZE} procedures. - Between ${variables.filter.after} and ${variables.filter.before}. - Filter: ${ - variables.filter.types && variables.filter.types.length > 0 - ? `[types: ${variables.filter.types.join(', ')}]` - : 'none' - } + Importing ${rounds}*${size} procedures. + Between ${filter.after} and ${filter.before}. + Filter: ${filter.types && filter.types.length > 0 ? `[types: ${filter.types.join(', ')}]` : 'none'} -------------------------------------- `); +}; + +const createHistory = (vorgangspositionen: Vorgangsposition[]): IProcessFlow[] => { + return vorgangspositionen.map((d) => { + const getFindSpot = (d: Vorgangsposition) => { + const { fundstelle } = d; + if (!fundstelle) return; + const { herausgeber, dokumentart, dokumentnummer } = fundstelle; + const datum = formatGermanDate(new Date(fundstelle.datum)); + const result = `${datum} - ${herausgeber}-${dokumentart} ${dokumentnummer}`; + const { anfangsseite, endseite, anfangsquadrant, endquadrant } = fundstelle; + if (![anfangsseite, endseite, anfangsquadrant, endquadrant].every(Boolean)) return result; + return `${result}, S. ${anfangsseite}${anfangsquadrant} - ${endseite}${endquadrant}`; + }; + + const getDecision = (d: Vorgangsposition) => { + const { beschlussfassung } = d; + if (!beschlussfassung) return []; + return beschlussfassung.map((b) => ({ + page: b.seite, + tenor: b.beschlusstenor, + document: b.dokumentnummer, + type: b.abstimmungsart, + comment: b.abstimm_ergebnis_bemerkung, + foundation: b.grundlage, + majority: b.mehrheit, + })); + }; + + return { + assignment: d.fundstelle.herausgeber, + initiator: + d.fundstelle.urheber.length > 0 + ? `${d.vorgangsposition}, Urheber : ${d.fundstelle.urheber.join(', ')}` + : d.vorgangsposition, + findSpot: getFindSpot(d), + findSpotUrl: d.fundstelle.pdf_url, + decision: getDecision(d), + abstract: d.abstract, + date: new Date(d.fundstelle.datum), + }; + }); +}; + +const createImportantDocuments = (vorgangspositionen: Vorgangsposition[]): IDocument[] => { + return vorgangspositionen + .filter((v) => v.fundstelle?.dokumentart === 'Drucksache') + .map((d) => ({ + editor: d.fundstelle.herausgeber, + number: d.fundstelle.dokumentnummer, + type: d.fundstelle.drucksachetyp!, + url: d.fundstelle.pdf_url!, + })); +}; + +const createLegalValidity = (inkrafttreten: { datum: string; erlaeuterung?: string }[]): string[] => { + return inkrafttreten.map((i) => `${i.datum}${i.erlaeuterung ? ` (${i.erlaeuterung})` : ''}`); +}; + +const createProcedure = (node: Vorgang, vorgangspositionen: Vorgangsposition[]): Partial => { + const importantDocuments = createImportantDocuments(vorgangspositionen); + + const legalValidity = createLegalValidity(node.inkrafttreten || []); + + const history = createHistory(vorgangspositionen); + + return { + ...node, + procedureId: node.id, + type: node.vorgangstyp, + tags: node.deskriptor?.map((d) => d.name) || [], + title: node.titel, + currentStatus: node.beratungsstand, + period: node.wahlperiode, + subjectGroups: node.sachgebiet, + importantDocuments, + gestOrderNumber: node.gesta, + legalValidity, + history, + }; +}; + +const fetchVorgangspositionen = async (vorgangspositionenApi: VorgangspositionenApi, nodeId: number) => { + return vorgangspositionenApi + .getVorgangspositionList({ + fVorgang: nodeId, + }) + .then((res) => res.data.documents); +}; + +const processProcedures = async (edges: { node: Vorgang }[], vorgangspositionenApi: VorgangspositionenApi) => { + let procedures: Partial[] = []; - if (variables.limit % 50 !== 0) + for (const edge of edges) { + log(`${edge.node.aktualisiert} ${edge.node.id} - ${edge.node.titel}`); + const { node } = edge; + const vorgangspositionen = await fetchVorgangspositionen(vorgangspositionenApi, Number(node.id)); + const procedure = createProcedure(node, vorgangspositionen); + procedures = [...procedures, procedure]; + } + + await ProcedureModel.bulkWrite( + procedures.map((node) => ({ + updateOne: { + filter: { procedureId: node.procedureId }, + update: { $set: node }, + upsert: true, + }, + })), + ); +}; + +const validateVariables = (variables: { limit: number }) => { + if (variables.limit % 50 !== 0) { throw new Error( - 'DIP has a fixed page size of 50. Make sure your limt is a multiple of 50 to avoid inconsistencies with cursor based pagination.', + 'DIP has a fixed page size of 50. Make sure your limit is a multiple of 50 to avoid inconsistencies with cursor based pagination.', ); + } +}; + +const logRoundDetails = (round: number, cursor: string) => { + log(`Round ${round} - Cursor ${cursor}`); +}; - for (const round of Array.from(Array(IMPORT_PROCEDURES_CHUNK_ROUNDS).keys())) { - log(`Round ${round} - Cursor ${variables.cursor}`); +const handleImportRounds = async ( + rounds: number, + variables: { cursor: string; limit: number; filter: ReturnType }, + vorgangspositionenApi: VorgangspositionenApi, +) => { + for (const round of Array.from(Array(rounds).keys())) { + logRoundDetails(round, variables.cursor); const { edges, @@ -74,101 +189,56 @@ export default async function importProcedures(config: typeof CONFIG): Promise[] = []; - - for (const edge of edges) { - log(`${edge.node.aktualisiert} ${edge.node.id} - ${edge.node.titel}`); - const { node } = edge; - const vorgangspositionen = await vorgangspositionenApi - .getVorgangspositionList({ - fVorgang: Number(node.id), - }) - .then((res) => res.data.documents); - - const importantDocuments = vorgangspositionen - .filter((v) => v.fundstelle?.dokumentart === 'Drucksache') - .map((d) => ({ - editor: d.fundstelle.herausgeber, - number: d.fundstelle.dokumentnummer, - type: d.fundstelle.drucksachetyp!, - url: d.fundstelle.pdf_url!, - })); - - const legalValidity = - node.inkrafttreten?.map((i) => `${i.datum}${i.erlaeuterung ? ` (${i.erlaeuterung})` : ''}`) || []; - - const history = vorgangspositionen.map((d) => { - const getFindSpot = (d: Vorgangsposition) => { - const { fundstelle } = d; - if (!fundstelle) return; - const { herausgeber, dokumentart, dokumentnummer } = fundstelle; - const datum = germanDateFormat.format(new Date(fundstelle.datum)); - const result = `${datum} - ${herausgeber}-${dokumentart} ${dokumentnummer}`; - const { anfangsseite, endseite, anfangsquadrant, endquadrant } = fundstelle; - if (![anfangsseite, endseite, anfangsquadrant, endquadrant].every(Boolean)) return result; - return `${result}, S. ${anfangsseite}${anfangsquadrant} - ${endseite}${endquadrant}`; - }; - - const getDecision = (d: Vorgangsposition) => { - const { beschlussfassung } = d; - if (!beschlussfassung) return []; - return beschlussfassung.map((b) => ({ - page: b.seite, - tenor: b.beschlusstenor, - document: b.dokumentnummer, - type: b.abstimmungsart, - comment: b.abstimm_ergebnis_bemerkung, - foundation: b.grundlage, - majority: b.mehrheit, - })); - }; - - return { - assignment: d.fundstelle.herausgeber, - initiator: - d.fundstelle.urheber.length > 0 - ? `${d.vorgangsposition}, Urheber : ${d.fundstelle.urheber.join(', ')}` - : d.vorgangsposition, - findSpot: getFindSpot(d), - findSpotUrl: d.fundstelle.pdf_url, - decision: getDecision(d), - abstract: d.abstract, - date: new Date(d.fundstelle.datum), - }; - }); - - const procedure: Partial = { - ...node, - procedureId: node.id, - type: node.vorgangstyp, - tags: node.deskriptor?.map((d) => d.name) || [], - title: node.titel, - currentStatus: node.beratungsstand, - period: node.wahlperiode, - subjectGroups: node.sachgebiet, - importantDocuments, - gestOrderNumber: node.gesta, - legalValidity, - history, - }; - - procedures = [...procedures, procedure]; - } + await processProcedures(edges, vorgangspositionenApi); - await ProcedureModel.bulkWrite( - procedures.map((node) => ({ - updateOne: { - filter: { procedureId: node.procedureId }, - update: { $set: node }, - upsert: true, - }, - })), - ); if (!hasNextPage) { break; } variables.cursor = endCursor; } +}; + +const createProcedureFilter = (after: string, before: string, types?: string[]) => ({ + after, + before, + types, +}); + +const createVariables = ( + startCursor: string, + chunkSize: number, + filterAfter: string, + filterBefore: string, + filterTypes?: string[], +) => ({ + cursor: startCursor, + filter: createProcedureFilter(filterAfter, filterBefore, filterTypes), + limit: chunkSize, +}); + +export default async function importProcedures(config: typeof CONFIG): Promise { + const { + IMPORT_PROCEDURES_START_CURSOR, + IMPORT_PROCEDURES_CHUNK_SIZE, + IMPORT_PROCEDURES_CHUNK_ROUNDS, + IMPORT_PROCEDURES_FILTER_BEFORE, + IMPORT_PROCEDURES_FILTER_AFTER, + IMPORT_PROCEDURES_FILTER_TYPES, + } = config; + + const variables = createVariables( + IMPORT_PROCEDURES_START_CURSOR, + IMPORT_PROCEDURES_CHUNK_SIZE, + IMPORT_PROCEDURES_FILTER_AFTER, + IMPORT_PROCEDURES_FILTER_BEFORE, + IMPORT_PROCEDURES_FILTER_TYPES, + ); + + logImportDetails(IMPORT_PROCEDURES_CHUNK_ROUNDS, IMPORT_PROCEDURES_CHUNK_SIZE, variables.filter); + + validateVariables(variables); + + await handleImportRounds(IMPORT_PROCEDURES_CHUNK_ROUNDS, variables, vorgangspositionenApi); } export type ProcedureFilter = { @@ -205,7 +275,6 @@ const getVorgaenge = async (args: ProceduresArgs) => { fAktualisiertEnd: args.filter?.end?.toISOString(), fVorgangstyp: args.filter?.types, }); - // const res = await this.get(`/api/v1/vorgang`, { ...filter, cursor }); totalCount = Number(data.numFound); documents = documents.concat(data.documents!); hasNextPage = cursor !== data.cursor; diff --git a/services/cron-jobs/crawler/src/import-procedures/index.ts b/services/cron-jobs/crawler/src/import-procedures/index.ts index 93c04c11..7cc87af9 100644 --- a/services/cron-jobs/crawler/src/import-procedures/index.ts +++ b/services/cron-jobs/crawler/src/import-procedures/index.ts @@ -1,17 +1,32 @@ -import { getCron, mongoConnect, setCronStart, setCronSuccess } from '@democracy-deutschland/bundestagio-common'; import { CONFIG } from '../config'; -import importProcedures from './import-procedures'; +import { Logger } from '../logger'; +import { connectToDatabase } from '../database'; +import { handleCronJob } from '../cronJob'; -(async () => { - await mongoConnect(CONFIG.DB_URL); - const cronjob = await getCron({ name: 'import-procedures' }); - console.log('cronjob', cronjob.lastSuccessStartDate); - await setCronStart({ name: 'import-procedures' }); - await importProcedures({ - ...CONFIG, - IMPORT_PROCEDURES_FILTER_AFTER: - cronjob?.lastSuccessStartDate?.toISOString() || CONFIG.IMPORT_PROCEDURES_FILTER_AFTER, - }); - await setCronSuccess({ name: 'import-procedures', successStartDate: cronjob.lastStartDate || new Date() }); - process.exit(0); -})(); +/** + * Runs the import procedures by connecting to the database, handling the cron job, and exiting the process. + * @param config - The configuration object. + * @param logger - The logger instance. + */ +const logImportStart = (logger: Logger) => { + logger.log('Starting import procedures...'); +}; + +const logImportEnd = (logger: Logger) => { + logger.log('Import procedures completed.'); +}; + +const connectAndRunImport = async (config: typeof CONFIG, logger: Logger) => { + await connectToDatabase(config.DB_URL, logger); + await handleCronJob(config, logger); +}; + +const runImport = async (config: typeof CONFIG, logger: Logger) => { + logImportStart(logger); + await connectAndRunImport(config, logger); + logImportEnd(logger); +}; + +export const runImportProcedures = async (config: typeof CONFIG, logger: Logger): Promise => { + await runImport(config, logger); +}; diff --git a/services/cron-jobs/crawler/src/import-procedures/utils.ts b/services/cron-jobs/crawler/src/import-procedures/utils.ts index fa94f65e..9013f01b 100644 --- a/services/cron-jobs/crawler/src/import-procedures/utils.ts +++ b/services/cron-jobs/crawler/src/import-procedures/utils.ts @@ -1,5 +1,13 @@ -export const germanDateFormat = new Intl.DateTimeFormat('de-DE', { - year: 'numeric', - month: '2-digit', - day: '2-digit', -}); +const createDateFormatter = (locale: string) => { + const formatter = new Intl.DateTimeFormat(locale, { + year: 'numeric', + month: '2-digit', + day: '2-digit', + }); + + return (date: Date) => formatter.format(date); +}; + +const formatGermanDate = createDateFormatter('de-DE'); + +export { createDateFormatter, formatGermanDate }; diff --git a/services/cron-jobs/crawler/src/importProceduresExecutor.ts b/services/cron-jobs/crawler/src/importProceduresExecutor.ts new file mode 100644 index 00000000..0c63083b --- /dev/null +++ b/services/cron-jobs/crawler/src/importProceduresExecutor.ts @@ -0,0 +1,46 @@ +import { Logger } from './logger'; +import { CONFIG } from './config'; +import { ICronJob } from '@democracy-deutschland/bundestagio-common'; +import importProcedures from './import-procedures/import-procedures'; +import axios from 'axios'; + +const handleImportError = (error: unknown, logger: Logger) => { + if (axios.isAxiosError(error)) { + const statusCode = error.response?.status; + let errorMessage = `Failed to execute import procedures: ${error.message}`; + if (statusCode) { + errorMessage += ` (HTTP status: ${statusCode})`; + if (statusCode === 401) { + errorMessage += ' - Unauthorized. Please check your API key.'; + } + } + logger.error(errorMessage); + } else { + logger.error('Failed to execute import procedures: ' + (error instanceof Error ? error.message : error)); + } + process.exit(1); +}; + +/** + * Executes the import procedures. + * @param cronjob - The cron job details. + * @param config - The configuration object. + * @param logger - The logger instance. + */ +export const executeImportProcedures = async ( + cronjob: ICronJob, + config: typeof CONFIG, + logger: Logger, +): Promise => { + try { + logger.log('Executing import procedures...'); + await importProcedures({ + ...config, + IMPORT_PROCEDURES_FILTER_AFTER: + cronjob?.lastSuccessStartDate?.toISOString() || config.IMPORT_PROCEDURES_FILTER_AFTER, + }); + logger.log('Import procedures executed successfully.'); + } catch (error) { + handleImportError(error, logger); + } +}; diff --git a/services/cron-jobs/crawler/src/logger.ts b/services/cron-jobs/crawler/src/logger.ts new file mode 100644 index 00000000..1e8fa188 --- /dev/null +++ b/services/cron-jobs/crawler/src/logger.ts @@ -0,0 +1,13 @@ +export interface Logger { + log: (message: string) => void; + error: (message: string) => void; +} + +export const logger: Logger = { + log: (message: string): void => { + console.log(message); + }, + error: (message: string): void => { + console.error(message); + }, +}; diff --git a/services/cron-jobs/crawler/src/main.ts b/services/cron-jobs/crawler/src/main.ts new file mode 100644 index 00000000..421938f2 --- /dev/null +++ b/services/cron-jobs/crawler/src/main.ts @@ -0,0 +1,20 @@ +import { CONFIG } from './config'; +import { logger } from './logger'; +import { runImportProcedures } from './import-procedures'; + +/** + * Main function to run the import procedures. + */ +const main = async (): Promise => { + try { + await runImportProcedures(CONFIG, logger); + } catch (error) { + logger.error('An unexpected error occurred: ' + (error instanceof Error ? error.message : error)); + process.exit(1); + } +}; + +(async () => { + await main(); + process.exit(0); +})(); diff --git a/services/cron-jobs/crawler/tsup.config.ts b/services/cron-jobs/crawler/tsup.config.ts index a0dc3ea8..46c069d5 100644 --- a/services/cron-jobs/crawler/tsup.config.ts +++ b/services/cron-jobs/crawler/tsup.config.ts @@ -3,5 +3,5 @@ import { tsupConfig } from 'tsup-config'; export default defineConfig({ ...tsupConfig, - entry: ['./src/import-procedures'], + entry: ['./src/main.ts'], });