Skip to content

Commit

Permalink
implement script to update Rome and appellations from API ROME 4
Browse files Browse the repository at this point in the history
implement script to update all rome and appellations from API ROME 4
  • Loading branch information
JeromeBu committed Oct 7, 2024
1 parent 07499a1 commit 462f33a
Show file tree
Hide file tree
Showing 7 changed files with 325 additions and 3 deletions.
1 change: 1 addition & 0 deletions back/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"trigger-suggest-edit-form-establishment-every-6-months": "ts-node src/scripts/triggerSuggestEditFormEstablishmentEvery6Months.ts",
"trigger-update-establishments-from-sirene": "ts-node src/scripts/triggerUpdateEstablishmentsFromSireneApiScript.ts",
"trigger-update-all-establishments-scores": "ts-node src/scripts/triggerUpdateAllEstablishmentsScores.ts",
"trigger-update-rome-data": "ts-node src/scripts/triggerUpdateRomeData.ts",
"typecheck": "tsc --noEmit",
"kysely-codegen": "kysely-codegen --dialect postgres",
"update-agencies-from-PE-referential": "ts-node src/scripts/updateAllPEAgenciesFromPeAgencyReferential.ts",
Expand Down
4 changes: 4 additions & 0 deletions back/scalingo/cron.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
"command": "01 23 * * * pnpm back run trigger-update-all-establishments-scores",
"size": "M"
},
{
"command": "41 22 1 * * pnpm back run trigger-update-rome-data",
"size": "M"
},
{
"command": "27 23 * * * pnpm back run mark-establishments-as-searchable-when-max-contacts-allows",
"size": "M"
Expand Down
7 changes: 6 additions & 1 deletion back/src/config/pg/kysely/kyselyUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,16 @@ export const executeKyselyRawSqlQuery = <T extends QueryResultRow>(
values?: any[],
) => transaction.executeQuery<T>(CompiledQuery.raw(sqlQuery, values));

export const makeKyselyDb = (pool: Pool): KyselyDb => {
type KyselyOptions = {
skipErrorLog?: boolean;
};

export const makeKyselyDb = (pool: Pool, options?: KyselyOptions): KyselyDb => {
const logger = createLogger(__filename);
return new Kysely<Database>({
dialect: new PostgresDialect({ pool }),
log(event): void {
if (options?.skipErrorLog) return;
if (event.level === "error") {
const error: any = event.error;
const params = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import axios from "axios";
import { expectToEqual } from "shared";
import { createAxiosSharedClient } from "shared-routes/axios";
import { AppConfig } from "../../../../config/bootstrap/appConfig";
import { createPeAxiosSharedClient } from "../../../../config/helpers/createAxiosSharedClients";
import { HttpPoleEmploiGateway } from "../../../convention/adapters/pole-emploi-gateway/HttpPoleEmploiGateway";
import { PoleEmploiGetAccessTokenResponse } from "../../../convention/ports/PoleEmploiGateway";
import { InMemoryCachingGateway } from "../../../core/caching-gateway/adapters/InMemoryCachingGateway";
import { noRetries } from "../../../core/retry-strategy/ports/RetryStrategy";
import { RealTimeGateway } from "../../../core/time-gateway/adapters/RealTimeGateway";
import { HttpRome4Gateway, makeRome4Routes } from "./HttpRome4Gateway";

describe("HttpRome4Gateway", () => {
const config = AppConfig.createFromEnv();

const cachingGateway =
new InMemoryCachingGateway<PoleEmploiGetAccessTokenResponse>(
new RealTimeGateway(),
"expires_in",
);

const franceTravailGateway = new HttpPoleEmploiGateway(
createPeAxiosSharedClient(config),
cachingGateway,
config.peApiUrl,
config.poleEmploiAccessTokenConfig,
noRetries,
);

const httpRome4Gateway = new HttpRome4Gateway(
createAxiosSharedClient(makeRome4Routes(config.peApiUrl), axios),
franceTravailGateway,
config.poleEmploiClientId,
);

it("fetches the updated list of romes", async () => {
const response = await httpRome4Gateway.getAllRomes();
expect(response.length).toBeGreaterThan(600);
expect(response.length).toBeLessThan(700);
expectToEqual(response[0], {
romeCode: "A1101",
romeLabel: "Conducteur / Conductrice d'engins agricoles",
});
});

it("fetches the updated list of appellations", async () => {
const response = await httpRome4Gateway.getAllAppellations();
expect(response.length).toBeGreaterThan(11_500);
expect(response.length).toBeLessThan(15_000);
expectToEqual(response[0], {
appellationCode: "10605",
appellationLabel: "Agent / Agente de service expédition marchandises",
appellationLabelShort:
"Agent / Agente de service expédition marchandises",
romeCode: "N1103",
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import {
AbsoluteUrl,
AppellationDto,
RomeDto,
withAuthorizationHeaders,
} from "shared";
import { HttpClient, defineRoute, defineRoutes } from "shared-routes";
import { z } from "zod";
import { PoleEmploiGateway } from "../../../convention/ports/PoleEmploiGateway";

export type AppellationWithShortLabel = AppellationDto & {
romeCode: string;
appellationLabelShort: string;
};

interface Rome4Gateway {
getAllRomes(): Promise<RomeDto[]>;
getAllAppellations(): Promise<AppellationWithShortLabel[]>;
}

type Rome4Routes = ReturnType<typeof makeRome4Routes>;
export const makeRome4Routes = (peApiUrl: AbsoluteUrl) =>
defineRoutes({
getRomes: defineRoute({
method: "get",
url: `${peApiUrl}/partenaire/rome-metiers/v1/metiers/metier?champs=code,libelle`,
...withAuthorizationHeaders,
responses: {
200: z.array(z.object({ code: z.string(), libelle: z.string() })),
},
}),
getAppellations: defineRoute({
method: "get",
url: `${peApiUrl}/partenaire/rome-metiers/v1/metiers/appellation?champs=code,libelle,libelleCourt,metier(code)`,
...withAuthorizationHeaders,
responses: {
200: z.array(
z.object({
code: z.string(),
libelle: z.string(),
libelleCourt: z.string(),
metier: z.object({
code: z.string(),
}),
}),
),
},
}),
});

export class HttpRome4Gateway implements Rome4Gateway {
#httpClient: HttpClient<Rome4Routes>;
#poleEmploiGateway: PoleEmploiGateway;
#poleEmploiClientId: string;

constructor(
httpClient: HttpClient<Rome4Routes>,
poleEmploiGateway: PoleEmploiGateway,
poleEmploiClientId: string,
) {
this.#httpClient = httpClient;
this.#poleEmploiGateway = poleEmploiGateway;
this.#poleEmploiClientId = poleEmploiClientId;
}

#getScope() {
return `application_${this.#poleEmploiClientId} api_rome-metiersv1 nomenclatureRome`;
}

public async getAllRomes(): Promise<RomeDto[]> {
const { access_token } = await this.#poleEmploiGateway.getAccessToken(
this.#getScope(),
);

return this.#httpClient
.getRomes({ headers: { authorization: `Bearer ${access_token}` } })
.then(({ body }) =>
body.map(({ libelle, code }) => ({
romeLabel: libelle,
romeCode: code,
})),
);
}

public async getAllAppellations(): Promise<AppellationWithShortLabel[]> {
const { access_token } = await this.#poleEmploiGateway.getAccessToken(
this.#getScope(),
);
return this.#httpClient
.getAppellations({ headers: { authorization: `Bearer ${access_token}` } })
.then(({ body }) =>
body.map(({ code, libelle, libelleCourt, metier }) => ({
romeCode: metier.code,
appellationCode: code,
appellationLabel: libelle,
appellationLabelShort: libelleCourt,
})),
);
}
}
6 changes: 4 additions & 2 deletions back/src/scripts/handleEndOfScriptNotification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ const onScriptError =
({ start, context, logger, name }: ScriptContextParams) =>
(error: any): Promise<void> => {
const durationInSeconds = calculateDurationInSecondsFrom(start);
const reportTitle = `❌ Failure at ${new Date().toISOString()} - ${context}`;
const reportTitle = `❌ Failure at ${new Date().toISOString()} - ${context} - ${
error.message
}`;

logger.error({ error, durationInSeconds, message: reportTitle });
logger.error({ durationInSeconds, message: reportTitle, error });
const report = [
reportTitle,
`Script [${name}]`,
Expand Down
152 changes: 152 additions & 0 deletions back/src/scripts/triggerUpdateRomeData.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import axios from "axios";
import { sql } from "kysely";
import { Pool } from "pg";
import { map, splitEvery } from "ramda";
import { pipeWithValue, removeDiacritics, sleep } from "shared";
import { createAxiosSharedClient } from "shared-routes/axios";
import { AppConfig } from "../config/bootstrap/appConfig";
import { createPeAxiosSharedClient } from "../config/helpers/createAxiosSharedClients";
import { KyselyDb, makeKyselyDb } from "../config/pg/kysely/kyselyUtils";
import {
AppellationWithShortLabel,
HttpRome4Gateway,
makeRome4Routes,
} from "../domains/agency/adapters/pe-agencies-referential/HttpRome4Gateway";
import { HttpPoleEmploiGateway } from "../domains/convention/adapters/pole-emploi-gateway/HttpPoleEmploiGateway";
import { PoleEmploiGetAccessTokenResponse } from "../domains/convention/ports/PoleEmploiGateway";
import { InMemoryCachingGateway } from "../domains/core/caching-gateway/adapters/InMemoryCachingGateway";
import { noRetries } from "../domains/core/retry-strategy/ports/RetryStrategy";
import { RealTimeGateway } from "../domains/core/time-gateway/adapters/RealTimeGateway";
import { createLogger } from "../utils/logger";
import { handleEndOfScriptNotification } from "./handleEndOfScriptNotification";

const logger = createLogger(__filename);
const config = AppConfig.createFromEnv();

const removeAccentsAndParenthesis = (str: string) =>
removeDiacritics(str).replace(/[()[\]]/g, "");

const main = async () => {
const dbUrl = config.pgImmersionDbUrl;
const pool = new Pool({
connectionString: dbUrl,
});
const db = makeKyselyDb(pool, { skipErrorLog: true });

const cachingGateway =
new InMemoryCachingGateway<PoleEmploiGetAccessTokenResponse>(
new RealTimeGateway(),
"expires_in",
);

const franceTravailGateway = new HttpPoleEmploiGateway(
createPeAxiosSharedClient(config),
cachingGateway,
config.peApiUrl,
config.poleEmploiAccessTokenConfig,
noRetries,
);

const httpRome4Gateway = new HttpRome4Gateway(
createAxiosSharedClient(makeRome4Routes(config.peApiUrl), axios),
franceTravailGateway,
config.poleEmploiClientId,
);

const numberOfRomes = await db.transaction().execute(async (trx) => {
const romes = await httpRome4Gateway.getAllRomes();
await trx
.insertInto("public_romes_data")
.values(
romes.map(({ romeCode, romeLabel }) => ({
code_rome: romeCode,
libelle_rome: romeLabel,
libelle_rome_tsvector: sql`to_tsvector('french', ${romeLabel} )`,
})),
)
.onConflict((oc) =>
oc.column("code_rome").doUpdateSet(({ ref }) => ({
libelle_rome: ref("excluded.libelle_rome"),
libelle_rome_tsvector: sql`to_tsvector
('french', ${ref("excluded.libelle_rome")})`,
})),
)
.execute();
return romes.length;
});

await sleep(1000);

const numberOfAppellations = await db.transaction().execute(async (trx) => {
const appellations = await httpRome4Gateway.getAllAppellations();
await Promise.all(
pipeWithValue(
appellations,
splitEvery(5000),
map(insertAppellations(trx)),
),
);
return appellations.length;
});

pool.end();

return {
numberOfRomes,
numberOfAppellations,
};
};

const insertAppellations =
(db: KyselyDb) => (appellations: AppellationWithShortLabel[]) =>
db
.insertInto("public_appellations_data")
.values(
appellations.map(
({
appellationCode,
appellationLabel,
appellationLabelShort,
romeCode,
}) => {
const labelWithoutSpecialCharacters =
removeAccentsAndParenthesis(appellationLabel);
return {
ogr_appellation: parseInt(appellationCode, 10),
code_rome: romeCode,
libelle_appellation_long: appellationLabel,
libelle_appellation_long_without_special_char:
labelWithoutSpecialCharacters,
libelle_appellation_long_tsvector: sql`to_tsvector('french', ${labelWithoutSpecialCharacters} )`,
libelle_appellation_court: appellationLabelShort,
};
},
),
)
.onConflict((oc) =>
oc.column("ogr_appellation").doUpdateSet(({ ref }) => ({
code_rome: ref("excluded.code_rome"),
libelle_appellation_long: ref("excluded.libelle_appellation_long"),
libelle_appellation_long_without_special_char: ref(
"excluded.libelle_appellation_long_without_special_char",
),
libelle_appellation_long_tsvector: ref(
"excluded.libelle_appellation_long_tsvector",
),
libelle_appellation_court: ref("excluded.libelle_appellation_court"),
})),
)
.execute();

handleEndOfScriptNotification(
"update-rome-data-from-france-travail-ROME-4-api",
config,
main,
({ numberOfAppellations, numberOfRomes }) =>
[
"Updated successfully rome and appellations data from ROME-4 API",
`Number of romes: ${numberOfRomes}`,
`Number of appellations: ${numberOfAppellations}`,
].join("\n"),
logger,
);

0 comments on commit 462f33a

Please sign in to comment.