Skip to content

Commit

Permalink
feat: internal media messages are now stored to inbox directly (#292)
Browse files Browse the repository at this point in the history
[AB#43722]
  • Loading branch information
AxTrusov authored Apr 25, 2024
1 parent c620b37 commit e9798a3
Show file tree
Hide file tree
Showing 76 changed files with 491 additions and 546 deletions.
4 changes: 2 additions & 2 deletions libs/media-messages/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
"build:compile": "tsc"
},
"devDependencies": {
"@axinom/mosaic-cli": "0.33.0-rc.1",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.1",
"@axinom/mosaic-cli": "0.33.0-rc.3",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.3",
"@types/glob": "^7.2.0",
"concurrently": "^5.3.0",
"rimraf": "^3.0.2",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"yarn:de-dupe": "npx yarn-deduplicate yarn.lock"
},
"devDependencies": {
"@axinom/mosaic-cli": "0.33.0-rc.1",
"@axinom/mosaic-cli": "0.33.0-rc.3",
"@dbeining/react-atom": "^4.1.21",
"@jest/globals": "^29.5.0",
"@libre/atom": "^1.3.3",
Expand Down
16 changes: 8 additions & 8 deletions services/catalog/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@
"util:define-func-migration": "yarn mosaic generate-define-func-migration"
},
"dependencies": {
"@axinom/mosaic-db-common": "0.37.0-rc.1",
"@axinom/mosaic-graphql-common": "0.13.0-rc.1",
"@axinom/mosaic-id-guard": "0.32.0-rc.1",
"@axinom/mosaic-messages": "0.43.0-rc.1",
"@axinom/mosaic-message-bus": "0.27.0-rc.1",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.1",
"@axinom/mosaic-service-common": "0.49.0-rc.1",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.1",
"@axinom/mosaic-db-common": "0.37.0-rc.3",
"@axinom/mosaic-graphql-common": "0.13.0-rc.3",
"@axinom/mosaic-id-guard": "0.32.0-rc.3",
"@axinom/mosaic-messages": "0.43.0-rc.3",
"@axinom/mosaic-message-bus": "0.27.0-rc.3",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.3",
"@axinom/mosaic-service-common": "0.49.0-rc.3",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.3",
"@graphile-contrib/pg-simplify-inflector": "^6.1.0",
"add": "^2.0.6",
"amqplib": "^0.6.0",
Expand Down
16 changes: 8 additions & 8 deletions services/entitlement/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@
"util:define-func-migration": "yarn mosaic generate-define-func-migration"
},
"dependencies": {
"@axinom/mosaic-db-common": "0.37.0-rc.1",
"@axinom/mosaic-graphql-common": "0.13.0-rc.1",
"@axinom/mosaic-id-guard": "0.32.0-rc.1",
"@axinom/mosaic-id-utils": "0.15.17-rc.1",
"@axinom/mosaic-message-bus": "0.27.0-rc.1",
"@axinom/mosaic-messages": "0.43.0-rc.1",
"@axinom/mosaic-service-common": "0.49.0-rc.1",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.1",
"@axinom/mosaic-db-common": "0.37.0-rc.3",
"@axinom/mosaic-graphql-common": "0.13.0-rc.3",
"@axinom/mosaic-id-guard": "0.32.0-rc.3",
"@axinom/mosaic-id-utils": "0.15.17-rc.3",
"@axinom/mosaic-message-bus": "0.27.0-rc.3",
"@axinom/mosaic-messages": "0.43.0-rc.3",
"@axinom/mosaic-service-common": "0.49.0-rc.3",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.3",
"ajv": "^7.2.4",
"ajv-formats": "^1.6.1",
"@graphile-contrib/pg-simplify-inflector": "^6.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ export const registerMessaging = async (
config,
inboxConfig,
inboxLogger,
logMapper,
shutdownActions,
);

Expand Down Expand Up @@ -123,7 +122,6 @@ const registerRabbitMqMessaging = async (
config: Config,
inboxConfig: PollingListenerConfig,
inboxLogger: Logger,
logMapper: TransactionalLogMapper,
shutdownActions: ShutdownActionsMiddleware,
): Promise<Broker> => {
const storeInboxMessage = setupInboxStorage(inboxConfig, inboxLogger, config);
Expand Down
18 changes: 9 additions & 9 deletions services/media/service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@
"util:define-func-migration": "yarn mosaic generate-define-func-migration"
},
"dependencies": {
"@axinom/mosaic-db-common": "0.37.0-rc.1",
"@axinom/mosaic-graphql-common": "0.13.0-rc.1",
"@axinom/mosaic-id-guard": "0.32.0-rc.1",
"@axinom/mosaic-id-link-be": "0.21.0-rc.1",
"@axinom/mosaic-message-bus": "0.27.0-rc.1",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.1",
"@axinom/mosaic-messages": "0.43.0-rc.1",
"@axinom/mosaic-service-common": "0.49.0-rc.1",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.1",
"@axinom/mosaic-db-common": "0.37.0-rc.3",
"@axinom/mosaic-graphql-common": "0.13.0-rc.3",
"@axinom/mosaic-id-guard": "0.32.0-rc.3",
"@axinom/mosaic-id-link-be": "0.21.0-rc.3",
"@axinom/mosaic-message-bus": "0.27.0-rc.3",
"@axinom/mosaic-message-bus-abstractions": "0.14.0-rc.3",
"@axinom/mosaic-messages": "0.43.0-rc.3",
"@axinom/mosaic-service-common": "0.49.0-rc.3",
"@axinom/mosaic-transactional-inbox-outbox": "0.9.0-rc.3",
"@faker-js/faker": "^7.6.0",
"@graphile-contrib/pg-simplify-inflector": "^6.1.0",
"ajv": "^7.2.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
} from '../../../tests/test-utils';
import { DeleteEntityHandler } from './delete-entity-handler';

describe('Start Ingest Item Handler', () => {
describe('Delete Entity Handler', () => {
let ctx: ITestContext;
let user: AuthenticatedManagementSubject;
let handler: DeleteEntityHandler;
Expand Down
90 changes: 9 additions & 81 deletions services/media/service/src/generated/messaging/rascal-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@
}
}
},
"media-service:inbox": {
"ax-video-service:inbox": {
"options": {
"arguments": {
"x-dead-letter-exchange": "dead_letter",
"x-dead-letter-routing-key": "media-service.dead_letter",
"x-dead-letter-routing-key": "ax-video-service.dead_letter",
"x-queue-type": "quorum"
}
}
},
"ax-video-service:inbox": {
"media-service:inbox": {
"options": {
"arguments": {
"x-dead-letter-exchange": "dead_letter",
"x-dead-letter-routing-key": "ax-video-service.dead_letter",
"x-dead-letter-routing-key": "media-service.dead_letter",
"x-queue-type": "quorum"
}
}
Expand Down Expand Up @@ -96,41 +96,16 @@
"bindingKey": "delay.30sec",
"destination": "delay:30sec"
},
"retry:media-service:inbox": {
"source": "retry",
"bindingKey": "media-service:inbox.#",
"destination": "media-service:inbox"
},
"media-service.ingest.start": {
"source": "command",
"bindingKey": "media-service.ingest.start",
"destination": "media-service:inbox"
},
"media-service.ingest.start_item": {
"source": "command",
"bindingKey": "media-service.ingest.start_item",
"destination": "media-service:inbox"
},
"media-service.ingest.update_metadata": {
"source": "command",
"bindingKey": "media-service.ingest.update_metadata",
"destination": "media-service:inbox"
},
"media-service.ingest.check_finish_item": {
"source": "command",
"bindingKey": "media-service.ingest.check_finish_item",
"destination": "media-service:inbox"
},
"media-service.ingest.check_finish_document": {
"source": "command",
"bindingKey": "media-service.ingest.check_finish_document",
"destination": "media-service:inbox"
},
"retry:ax-video-service:inbox": {
"source": "retry",
"bindingKey": "ax-video-service:inbox.#",
"destination": "ax-video-service:inbox"
},
"retry:media-service:inbox": {
"source": "retry",
"bindingKey": "media-service:inbox.#",
"destination": "media-service:inbox"
},
"ax-video-service.*.*.video.ensure_exists_already_existed": {
"source": "event",
"bindingKey": "ax-video-service.*.*.video.ensure_exists_already_existed",
Expand Down Expand Up @@ -191,21 +166,6 @@
"bindingKey": "ax-localization-service.*.*.entity.localize_failed",
"destination": "media-service:inbox"
},
"media-service.entity.publish_entity": {
"source": "command",
"bindingKey": "media-service.entity.publish_entity",
"destination": "media-service:inbox"
},
"media-service.entity.unpublish_entity": {
"source": "command",
"bindingKey": "media-service.entity.unpublish_entity",
"destination": "media-service:inbox"
},
"media-service.entity.delete": {
"source": "command",
"bindingKey": "media-service.entity.delete",
"destination": "media-service:inbox"
},
"ax-image-service.*.*.image_types.declared": {
"source": "event",
"bindingKey": "ax-image-service.*.*.image_types.declared",
Expand Down Expand Up @@ -264,26 +224,6 @@
]
}
},
"StartIngest": {
"exchange": "command",
"routingKey": "media-service.ingest.start"
},
"StartIngestItem": {
"exchange": "command",
"routingKey": "media-service.ingest.start_item"
},
"UpdateMetadata": {
"exchange": "command",
"routingKey": "media-service.ingest.update_metadata"
},
"CheckFinishIngestItem": {
"exchange": "command",
"routingKey": "media-service.ingest.check_finish_item"
},
"CheckFinishIngestDocument": {
"exchange": "command",
"routingKey": "media-service.ingest.check_finish_document"
},
"EnsureVideoExists": {
"exchange": "command",
"routingKey": "ax-video-service.*.*.video.ensure_exists"
Expand All @@ -296,14 +236,6 @@
"exchange": "command",
"routingKey": "ax-localization-service.*.*.entity.localize"
},
"PublishEntity": {
"exchange": "command",
"routingKey": "media-service.entity.publish_entity"
},
"UnpublishEntity": {
"exchange": "command",
"routingKey": "media-service.entity.unpublish_entity"
},
"MoviePublished": {
"exchange": "event",
"routingKey": "media-service.movie.published"
Expand Down Expand Up @@ -360,10 +292,6 @@
"exchange": "event",
"routingKey": "media-service.collection.unpublished"
},
"DeleteEntity": {
"exchange": "command",
"routingKey": "media-service.entity.delete"
},
"DeclareImageTypes": {
"exchange": "command",
"routingKey": "ax-image-service.*.*.image_types.declare"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,14 @@ export interface BulkOperationResult {
const defaultResolverBodyBuilder =
(messagingSettings: MessagingSettings): BulkResolverBodyBuilder =>
async (ids, filter, context, input, token) => {
const { storeOutboxMessage, pgClient } =
const { storeInboxMessage, pgClient } =
getValidatedExtendedContext(context);

if (ids.length > 0) {
const { input: additionalInput } = input;

for (const id of ids) {
await storeOutboxMessage(
await storeInboxMessage(
id.toString(),
messagingSettings,
{
Expand All @@ -173,7 +173,7 @@ const defaultResolverBodyBuilder =
input: additionalInput,
},
pgClient,
{ envelopeOverrides: { auth_token: token } },
{ metadata: { authToken: token } },
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { OwnerPgPool } from '@axinom/mosaic-db-common';
import { ManagementAuthenticationContext } from '@axinom/mosaic-id-guard';
import { assertObjectHasProperties } from '@axinom/mosaic-service-common';
import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import {
StoreInboxMessage,
StoreOutboxMessage,
} from '@axinom/mosaic-transactional-inbox-outbox';
import { WebSocket } from 'graphql-ws';
import { Client } from 'pg';
import {
Expand All @@ -16,6 +19,7 @@ export interface ExtendedGraphQLContext
config: Config;
mutationAtomicityContext?: MutationAtomicityContext;
storeOutboxMessage: StoreOutboxMessage;
storeInboxMessage: StoreInboxMessage;
ownerPool: OwnerPgPool;
jwtToken?: string;
pgRole?: string; // set from PostGraphile
Expand All @@ -31,6 +35,7 @@ export function getValidatedExtendedContext(
[
'config',
'storeOutboxMessage',
'storeInboxMessage',
'ownerPool',
'jwtToken',
'pgRole',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ export const MediaBulkPluginFactory = (
tableName,
graphQLAdditionalInput,
}: BulkIdParameters): Promise<void> => {
const { jwtToken, config, storeOutboxMessage, pgClient } =
const { jwtToken, config, storeInboxMessage, pgClient } =
getValidatedExtendedContext(graphQLContext);

if (entityIds.length > 0) {
const token = await getLongLivedToken(jwtToken, config);
for (const id of entityIds) {
await storeOutboxMessage(
await storeInboxMessage(
id.toString(),
messagingSettings,
{
Expand All @@ -39,7 +39,7 @@ export const MediaBulkPluginFactory = (
input: graphQLAdditionalInput,
},
pgClient,
{ envelopeOverrides: { auth_token: token } },
{ metadata: { authToken: token } },
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import {
getHttpServer,
getWebsocketMiddlewares,
} from '@axinom/mosaic-service-common';
import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import {
StoreInboxMessage,
StoreOutboxMessage,
} from '@axinom/mosaic-transactional-inbox-outbox';
import { altairExpress } from 'altair-express-middleware';
import { Express } from 'express';
import { enhanceHttpServerWithSubscriptions, postgraphile } from 'postgraphile';
Expand All @@ -19,12 +22,14 @@ export const setupPostGraphile = async (
config: Config,
authConfig: AuthenticationConfig,
storeOutboxMessage: StoreOutboxMessage,
storeInboxMessage: StoreInboxMessage,
): Promise<void> => {
const websocketMiddlewares = getWebsocketMiddlewares(app);
const options = buildPostgraphileOptions(
config,
ownerPool,
storeOutboxMessage,
storeInboxMessage,
websocketMiddlewares,
authConfig,
);
Expand Down
7 changes: 6 additions & 1 deletion services/media/service/src/graphql/postgraphile-options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import {
logGraphQlError,
MosaicErrors,
} from '@axinom/mosaic-service-common';
import { StoreOutboxMessage } from '@axinom/mosaic-transactional-inbox-outbox';
import {
StoreInboxMessage,
StoreOutboxMessage,
} from '@axinom/mosaic-transactional-inbox-outbox';
import PgSimplifyInflectorPlugin from '@graphile-contrib/pg-simplify-inflector';
import { Request, Response } from 'express';
import { Middleware, PostGraphileOptions } from 'postgraphile';
Expand All @@ -47,6 +50,7 @@ export const buildPostgraphileOptions = (
config: Config,
ownerPool: OwnerPgPool,
storeOutboxMessage: StoreOutboxMessage,
storeInboxMessage: StoreInboxMessage,
websocketMiddlewares: Middleware<Request, Response>[] = [],
authConfig?: AuthenticationConfig,
): PostGraphileOptions<Request, Response> => {
Expand Down Expand Up @@ -124,6 +128,7 @@ export const buildPostgraphileOptions = (
subject,
ownerPool,
storeOutboxMessage,
storeInboxMessage,
jwtToken: extendedRequest?.token,
authErrorInfo,
mutationAtomicityContext: getMutationAtomicityContext(req, true),
Expand Down
Loading

0 comments on commit e9798a3

Please sign in to comment.