Skip to content

Commit

Permalink
Merge pull request #407 from TogetherCrew/add-temporal
Browse files Browse the repository at this point in the history
Add temporal
  • Loading branch information
Behzad-rabiei authored Nov 18, 2024
2 parents 2ef8651 + 9b1e192 commit e58277a
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 29 deletions.
209 changes: 208 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"@discordjs/rest": "^1.7.0",
"@notionhq/client": "^2.2.3",
"@sentry/node": "^7.50.0",
"@temporalio/client": "^1.11.3",
"@togethercrew.dev/db": "^3.0.72",
"@togethercrew.dev/tc-messagebroker": "^0.0.50",
"@types/express-session": "^1.17.7",
Expand Down
8 changes: 5 additions & 3 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ const envVarsSchema = Joi.object()
REDIS_HOST: Joi.string().required().description('Redis host'),
REDIS_PORT: Joi.string().required().description('Redis port'),
REDIS_PASSWORD: Joi.string().required().description('Reids password').allow(''),
DISCOURSE_EXTRACTION_URL: Joi.string().required().description('Discourse extraction url'),
OCI_BACKEND_URL: Joi.string().required().description('Oci Backend url'),
TEMPORAL_URI: Joi.string().required().description('Temporal address'),
TEMPORAL_QUEUE_HEAVY: Joi.string().required().description('Queue for heavy workflows'),
})
.unknown();

Expand Down Expand Up @@ -156,8 +157,9 @@ export default {
session: {
secret: envVars.SESSION_SECRET,
},
discourse: {
extractionURL: envVars.DISCOURSE_EXTRACTION_URL,
temporal: {
uri: envVars.TEMPORAL_URI,
heavyQueue: envVars.TEMPORAL_QUEUE_HEAVY,
},
ociBackendURL: envVars.OCI_BACKEND_URL,
};
33 changes: 10 additions & 23 deletions src/services/discourse/core.service.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import parentLogger from '../../config/logger';
import { IAuthAndPlatform } from '../../interfaces';
import categoryService from './category.service';
import { ApiError, pick, sort } from '../../utils';
import { Types } from 'mongoose';
import config from '../../config';
import { ApiError, pick } from '../../utils';
import temporalDiscourse from '../temporal/discourse.service';
const logger = parentLogger.child({ module: 'DiscourseCoreService' });

async function getPropertyHandler(req: IAuthAndPlatform) {
Expand All @@ -19,31 +18,19 @@ async function getPropertyHandler(req: IAuthAndPlatform) {
* @param {String} platformId
* @returns {Promise<Void>}
*/
async function runDiscourseExtraction(platformId: string): Promise<void> {
async function createDiscourseSchedule(platformId: string, endpoint: string): Promise<string> {
try {
const data = {
platform_id: platformId,
};
logger.debug(data);
const response = await fetch(config.discourse.extractionURL, {
method: 'POST',
body: JSON.stringify(data),
headers: { 'Content-Type': 'application/json' },
});
if (response.ok) {
logger.debug(await response.json());
return;
} else {
const errorResponse = await response.text();
logger.error({ error: errorResponse });
}
const schedule = await temporalDiscourse.createSchedule(platformId, endpoint);
logger.info(`Started schedule '${schedule.scheduleId}'`);
await schedule.trigger();
return schedule.scheduleId;
} catch (error) {
logger.error(error, 'Failed to run discourse extraction discourse');
throw new ApiError(590, 'Failed to run discourse extraction discourse');
logger.error(error, 'Failed to create discourse schedule');
throw new ApiError(590, 'Failed to create discourse schedule');
}
}

export default {
getPropertyHandler,
runDiscourseExtraction,
createDiscourseSchedule,
};
9 changes: 7 additions & 2 deletions src/services/platform.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ const getPlatformById = async (id: Types.ObjectId): Promise<HydratedDocument<IPl
* @param {HydratedDocument<IPlatform>} platform
* @returns {Promise<Void>}
*/
const callExtractionApp = (platform: HydratedDocument<IPlatform>): void => {
const callExtractionApp = async (platform: HydratedDocument<IPlatform>): Promise<void> => {
switch (platform.name) {
case PlatformNames.Discourse: {
discourseService.coreService.runDiscourseExtraction(platform.id as string);
const scheduleId = await discourseService.coreService.createDiscourseSchedule(
platform.id as string,
platform.metadata?.id as string,
);
platform.set('metadata.scheduleId', scheduleId);
await platform.save();
return;
}
default: {
Expand Down
Loading

0 comments on commit e58277a

Please sign in to comment.