Skip to content

Commit

Permalink
feat: message handler v0 (#19)
Browse files Browse the repository at this point in the history
  • Loading branch information
aleortega authored Jan 26, 2024
1 parent 2cc614e commit 3b24c67
Show file tree
Hide file tree
Showing 11 changed files with 157 additions and 32 deletions.
3 changes: 2 additions & 1 deletion consumer-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"@well-known-components/http-server": "^2.0.0",
"@well-known-components/interfaces": "^1.4.2",
"@well-known-components/logger": "^3.1.3",
"@well-known-components/metrics": "^2.0.1"
"@well-known-components/metrics": "^2.0.1",
"@well-known-components/pushable-channel": "^1.0.3"
},
"devDependencies": {
"@dcl/eslint-config": "^1.1.13",
Expand Down
28 changes: 28 additions & 0 deletions consumer-server/src/adapters/memory-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { AsyncQueue } from '@well-known-components/pushable-channel'
import { AppComponents, QueueService } from '../types'

export function createMemoryQueueAdapter({ logs }: Pick<AppComponents, 'logs'>): QueueService {
const logger = logs.getLogger('memory-queue')
const queue = new AsyncQueue((action) => void 0)

logger.info('Initializing memory queue adapter')

async function send(message: any) {
await queue.enqueue(message)
}

async function receiveSingleMessage() {
const message = (await queue.next()).value
return message ? [message] : []
}

async function deleteMessage() {
// noop
}

return {
send,
receiveSingleMessage,
deleteMessage
}
}
7 changes: 2 additions & 5 deletions consumer-server/src/adapters/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,9 @@ import {
SendMessageCommand
} from '@aws-sdk/client-sqs'

import { AppComponents, QueueMessage, QueueService } from '../types'
import { QueueMessage, QueueService } from '../types'

export async function createSqsAdapter({
config
}: Pick<AppComponents, | 'config'>): Promise<QueueService> {
const endpoint = await config.getString('QUEUE_URL')
export async function createSqsAdapter(endpoint: string): Promise<QueueService> {
const client = new SQSClient({ endpoint })

async function send(message: QueueMessage): Promise<void> {
Expand Down
26 changes: 19 additions & 7 deletions consumer-server/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,18 @@ import { metricDeclarations } from './metrics'
import { createSqsAdapter } from './adapters/sqs'
import { createMessagesConsumerComponent } from './logic/message-consumer'
import { buildLicense } from './utils/license-builder'
import { createMemoryQueueAdapter } from './adapters/memory-queue'
import { createLodGeneratorComponent } from './logic/lod-generator'
import { createMessageHandlerComponent } from './logic/message-handler'

export async function initComponents(): Promise<AppComponents> {
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] }, {
HTTP_SERVER_PORT: '3000',
HTTP_SERVER_HOST: '0.0.0.0'
})
const config = await createDotEnvConfigComponent(
{ path: ['.env.default', '.env'] },
{
HTTP_SERVER_PORT: '3000',
HTTP_SERVER_HOST: '0.0.0.0'
}
)

const metrics = await createMetricsComponent(metricDeclarations, { config })
const logs = await createLogComponent({ metrics })
Expand All @@ -22,8 +28,12 @@ export async function initComponents(): Promise<AppComponents> {

await instrumentHttpServerWithMetrics({ metrics, server, config })

const queue = await createSqsAdapter({ config })
const messageConsumer = await createMessagesConsumerComponent({ logs, queue })
const sqsEndpoint = await config.getString('QUEUE_URL')
const queue = !sqsEndpoint ? createMemoryQueueAdapter({ logs }) : await createSqsAdapter(sqsEndpoint)
const lodGenerator = createLodGeneratorComponent()
const messageHandler = createMessageHandlerComponent({ logs, lodGenerator })

const messageConsumer = await createMessagesConsumerComponent({ logs, queue, messageHandler })

await buildLicense({ config, logs })

Expand All @@ -34,6 +44,8 @@ export async function initComponents(): Promise<AppComponents> {
metrics,
statusChecks,
queue,
messageConsumer
messageConsumer,
lodGenerator,
messageHandler
}
}
38 changes: 38 additions & 0 deletions consumer-server/src/logic/lod-generator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { exec } from 'child_process'
import path from 'path'
import os from 'os'
import fs from 'fs'

import { LodGeneratorService } from '../types'

export function createLodGeneratorComponent(): LodGeneratorService {
const projectRoot = path.resolve(__dirname, '..', '..', '..') // project root according to Dockerfile bundling
const lodGeneratorProgram = path.join(projectRoot, 'api', 'DCL_PiXYZ.exe') // path to the lod generator program
const sceneLodEntitiesManifestBuilder = path.join(projectRoot, 'scene-lod') // path to the scene lod entities manifest builder

async function generate(entityId: string, basePointer: string): Promise<string[]> {
const outputPath = path.join(os.tmpdir(), entityId)

if (!fs.existsSync(outputPath)) {
fs.mkdirSync(outputPath, { recursive: true });
}

const commandToExecute = `${lodGeneratorProgram} "coords" "${basePointer}" ${sceneLodEntitiesManifestBuilder} "${outputPath}"`
const files: string[] = await new Promise((resolve, reject) => {
exec(commandToExecute, (_error, _stdout, _stderr) => {
const generatedFiles = fs.readdirSync(outputPath)
// if files exists return otherwise reject
if (generatedFiles.length > 0) {
resolve(generatedFiles)
} else {
reject()
}
})
})

fs.rmdirSync(outputPath)
return files
}

return { generate }
}
6 changes: 4 additions & 2 deletions consumer-server/src/logic/message-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import { AppComponents, QueueWorker } from '../types'

export async function createMessagesConsumerComponent({
logs,
queue
}: Pick<AppComponents, 'logs' | 'queue'>): Promise<QueueWorker> {
queue,
messageHandler
}: Pick<AppComponents, 'logs' | 'queue' | 'messageHandler'>): Promise<QueueWorker> {
const logger = logs.getLogger('messages-consumer')

async function start() {
Expand All @@ -19,6 +20,7 @@ export async function createMessagesConsumerComponent({
id: MessageId!,
message: parsedMessage.Message
})
await messageHandler.handle(parsedMessage)
} catch (error: any) {
logger.error('Failed while handling message from queue', {
id: MessageId!,
Expand Down
27 changes: 27 additions & 0 deletions consumer-server/src/logic/message-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { AppComponents, MessageHandler } from '../types'

export function createMessageHandlerComponent({
logs,
lodGenerator
}: Pick<AppComponents, 'logs' | 'lodGenerator'>): MessageHandler {
const logger = logs.getLogger('message-handler')

async function handle(message: { Message: string }): Promise<void> {
const parsedMessage = JSON.parse(message.Message)

if (parsedMessage.entityType !== 'scene') {
return
}

const { base, entityId } = parsedMessage

try {
const result = await lodGenerator.generate(entityId, base)
logger.info('LODs correctly generated', { files: result.join(', '), entityId })
} catch (error: any) {
logger.error('Failed while generating LODs', { error: error.message, entityId })
}
}

return { handle }
}
10 changes: 10 additions & 0 deletions consumer-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export type BaseComponents = {
metrics: IMetricsComponent<keyof typeof metricDeclarations>
queue: QueueService
messageConsumer: QueueWorker
lodGenerator: LodGeneratorService
messageHandler: MessageHandler
}

// components used in runtime
Expand Down Expand Up @@ -64,3 +66,11 @@ export type AwsConfig = {
}

export type QueueWorker = IBaseComponent

export type LodGeneratorService = {
generate(entityId: string, basePointer: string): Promise<string[]>
}

export type MessageHandler = {
handle(message: { Message: string }): Promise<void>
}
32 changes: 16 additions & 16 deletions consumer-server/src/utils/license-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,19 @@ import fs from 'fs'
import { AppComponents } from '../types'

export async function buildLicense({ config, logs }: Pick<AppComponents, 'config' | 'logs'>): Promise<void> {
const logger = logs.getLogger('license-builder')
const licenseKey = await config.getString('LODS_GENERATOR_LICENSE') || '' // this is a SSM parameter pulled from AWS
const projectRoot = path.resolve(__dirname, '..', '..', '..')
try {
const licenseKeyPath = path.resolve(projectRoot, 'pixyzsdk-29022024.lic')
const licenseKeyFile = fs.readFileSync(licenseKeyPath, 'utf8')
const replacedLicenseKey = licenseKeyFile.replace('{LICENSE_KEY}', licenseKey)
fs.writeFileSync(licenseKeyPath, replacedLicenseKey, 'utf8')
logger.info('PiXYZ license built correctly')
} catch (err: any) {
logger.error('Could not build PiXYZ license', { err: (err.message).replace(licenseKey, '****') || '' })
throw err
}
}
const logger = logs.getLogger('license-builder')
const licenseKey = (await config.getString('LODS_GENERATOR_LICENSE')) || '' // this is a SSM parameter pulled from AWS
const projectRoot = path.resolve(__dirname, '..', '..', '..')

try {
const licenseKeyPath = path.resolve(projectRoot, 'pixyzsdk-29022024.lic')
const licenseKeyFile = fs.readFileSync(licenseKeyPath, 'utf8')
const replacedLicenseKey = licenseKeyFile.replace('{LICENSE_KEY}', licenseKey)

fs.writeFileSync(licenseKeyPath, replacedLicenseKey, 'utf8')
logger.info('PiXYZ license built correctly')
} catch (err: any) {
logger.error('Could not build PiXYZ license', { err: err.message.replace(licenseKey, '****') || '' })
throw err
}
}
7 changes: 7 additions & 0 deletions consumer-server/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,13 @@
dependencies:
prom-client "^14.1.0"

"@well-known-components/pushable-channel@^1.0.3":
version "1.0.3"
resolved "https://registry.yarnpkg.com/@well-known-components/pushable-channel/-/pushable-channel-1.0.3.tgz#b8a803c483bb52c03339a98813dd370857129059"
integrity sha512-8ibswJXQx7YfmUgzXp02xsIBTw6zrVXgNybV8asvEr1vE/0m/xmZi41+NwTcZmLCNRzsE/i+aRUzNO+oyq/g2g==
dependencies:
mitt "^3.0.0"

"@well-known-components/test-helpers@^1.5.5":
version "1.5.5"
resolved "https://registry.yarnpkg.com/@well-known-components/test-helpers/-/test-helpers-1.5.5.tgz#73472e599a3e867889b03b226ea86c7aba1f4b6c"
Expand Down
5 changes: 4 additions & 1 deletion scene-lod-entities-manifest-builder/src/components.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import { BaseComponents } from './types'

// Initialize all the components of the app
export async function initComponents(): Promise<BaseComponents> {
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] })
const config = await createDotEnvConfigComponent({ path: ['.env.default', '.env'] }, {
HTTP_SERVER_PORT: '3001',
HTTP_SERVER_HOST: '0.0.0.0'
})
const fetch = createFetchComponent()

return {
Expand Down

0 comments on commit 3b24c67

Please sign in to comment.