Skip to content

Commit

Permalink
Config & safer connection (#4)
Browse files Browse the repository at this point in the history
* more logging

* workflow changed

* timeout operator changed

* reverted back build-deploy-k8s-sandbox-azure.yml

* reverted back build-deploy-k8s-sandbox-azure.yml

* reverted back build-deploy-k8s-sandbox-azure.yml
  • Loading branch information
hero101 authored and valentinyanakiev committed May 28, 2024
1 parent e535318 commit 88d0272
Showing 1 changed file with 51 additions and 12 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { WINSTON_MODULE_NEST_PROVIDER } from 'nest-winston';
import { Inject, Injectable, LoggerService } from '@nestjs/common';
import { firstValueFrom, timeout } from 'rxjs';
import { firstValueFrom, map, timeInterval, timeout } from 'rxjs';
import {
ClientProxy,
ClientProxyFactory,
Expand All @@ -18,6 +18,7 @@ import { InfoOutputData } from './outputs';
import { WhiteboardIntegrationEventPattern } from './event.pattern.enum';
import { ConfigService } from '@nestjs/config';
import { ConfigType } from '../../config';
import { RmqOptions } from '@nestjs/microservices/interfaces/microservice-configuration.interface';

@Injectable()
export class WhiteboardIntegrationAdapterService {
Expand All @@ -44,11 +45,21 @@ export class WhiteboardIntegrationAdapterService {
);

if (!this.client) {
console.error(
this.logger.error(
`${WhiteboardIntegrationAdapterService.name} not initialized`,
);
return;
}

this.client
.connect()
.then(() => {
this.logger.verbose?.(
'Client proxy successfully connected to RabbitMQ',
);
})
.catch(this.logger.error);

this.timeoutMs = this.configService.get(
'settings.application.queue_response_timeout',
{ infer: true },
Expand Down Expand Up @@ -114,9 +125,19 @@ export class WhiteboardIntegrationAdapterService {
throw new Error(`Connection was not established. Send failed.`);
}

const result$ = this.client
.send<TResult, TInput>(pattern, data)
.pipe(timeout({ first: this.timeoutMs }));
const result$ = this.client.send<TResult, TInput>(pattern, data).pipe(
timeInterval(),
map((x) => {
this.logger.debug?.({
method: `sendWithResponse response took ${x.interval}`,
pattern,
data,
value: x.value,
});
return x.value;
}),
timeout({ each: this.timeoutMs }),
);

return firstValueFrom(result$).catch((err) => {
this.logger.error(
Expand Down Expand Up @@ -147,6 +168,12 @@ export class WhiteboardIntegrationAdapterService {
throw new Error(`Connection was not established. Send failed.`);
}

this.logger.debug?.({
method: 'sendWithoutResponse',
pattern,
data,
});

this.client.emit<void, TInput>(pattern, data);
};
}
Expand All @@ -165,15 +192,27 @@ const authQueueClientProxyFactory = (
const { host, port, user, password, heartbeat: _heartbeat, queue } = config;
const heartbeat =
process.env.NODE_ENV === 'production' ? _heartbeat : _heartbeat * 3;
const connectionString = `amqp://${user}:${password}@${host}:${port}?heartbeat=${heartbeat}`;
logger.verbose?.({ ...config, heartbeat, password: undefined });
try {
const options = {
urls: [connectionString],
queue,
queueOptions: { durable: true },
noAck: true,
const options: RmqOptions = {
transport: Transport.RMQ,
options: {
urls: [
{
protocol: 'amqp',
hostname: host,
username: user,
password,
port,
heartbeat,
},
],
queue,
queueOptions: { durable: true },
noAck: true,
},
};
return ClientProxyFactory.create({ transport: Transport.RMQ, options });
return ClientProxyFactory.create(options);
} catch (err) {
logger.error(`Could not connect to RabbitMQ: ${err}`);
return undefined;
Expand Down

0 comments on commit 88d0272

Please sign in to comment.