Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consul): integrate into app, better debug output, ping support #164

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import SanityLoop from './sanity_loop';
import MetricsLoop from './metrics_loop';
import ScalingManager from './scaling_options_manager';
import RedisStore from './redis';
import ConsulStore from './consul';
import PrometheusClient from './prometheus';
import MetricsStore from './metrics_store';
import InstanceStore from './instance_store';
Expand Down Expand Up @@ -92,12 +93,13 @@ switch (config.MetricsStoreProvider) {
let instanceStore: InstanceStore;

switch (config.InstanceStoreProvider) {
// case 'consul':
// instanceStore = new ConsulClient({
// logger,
// endpoint: config.ConsulURL,
// });
// break;
case 'consul':
instanceStore = new ConsulStore({
host: config.ConsulHost,
port: config.ConsulPort,
secure: config.ConsulSecure,
});
break;
default:
// redis
instanceStore = new RedisStore({
Expand All @@ -113,17 +115,16 @@ switch (config.InstanceStoreProvider) {
break;
}

mapp.get('/health', (req: express.Request, res: express.Response) => {
mapp.get('/health', async (req: express.Request, res: express.Response) => {
logger.debug('Health check');
if (req.query['deep']) {
redisClient.ping((err, reply) => {
if (err) {
res.status(500).send('unhealthy');
} else {
logger.debug('Redis ping reply', { reply });
res.send('deeply healthy');
}
});
const reply = await instanceStore.ping(req.context);
if (!reply) {
res.status(500).send('unhealthy');
} else {
logger.debug('instance store ping reply', { reply });
res.send('deeply healthy');
}
} else {
res.send('healthy!');
}
Expand Down
6 changes: 6 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ if (result.error) {
const env = cleanEnv(process.env, {
PORT: num({ default: 3000 }),
LOG_LEVEL: str({ default: 'info' }),
CONSUL_HOST: str({ default: 'localhost' }),
CONSUL_PORT: num({ default: 8500 }),
CONSUL_SECURE: bool({ default: false }),
REDIS_HOST: str({ default: '127.0.0.1' }),
REDIS_PORT: num({ default: 6379 }),
REDIS_PASSWORD: str({ default: '' }),
Expand Down Expand Up @@ -108,6 +111,9 @@ groupList.forEach((group) => {
export default {
HTTPServerPort: env.PORT,
LogLevel: env.LOG_LEVEL,
ConsulHost: env.CONSUL_HOST,
ConsulPort: env.CONSUL_PORT,
ConsulSecure: env.CONSUL_SECURE,
RedisHost: env.REDIS_HOST,
RedisPort: env.REDIS_PORT,
RedisPassword: env.REDIS_PASSWORD,
Expand Down
20 changes: 16 additions & 4 deletions src/consul.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,13 @@ export default class ConsulStore implements InstanceStore {

async getAllInstanceGroups(ctx: Context): Promise<InstanceGroup[]> {
ctx.logger.debug('fetching consul k/v keys');
const res = await this.client.kv.get({ key: this.groupsPrefix, recurse: true });
ctx.logger.debug('received consul k/v keys', { res });
const key = this.groupsPrefix;
const res = await this.client.kv.get({ key, recurse: true });
if (!res) {
ctx.logger.debug('received consul k/v results', { key });
return [];
}
ctx.logger.debug('received consul k/v results', { key, res });
return Object.entries(res).map(([_k, v]) => <InstanceGroup>JSON.parse(v.Value));
}

Expand Down Expand Up @@ -336,7 +338,7 @@ export default class ConsulStore implements InstanceStore {
async fetch(ctx: Context, key: string): Promise<GetItem | undefined> {
ctx.logger.debug(`reading consul k/v key`, { key });
const v = await this.client.kv.get(key);
ctx.logger.debug(`received consul k/v item`, { v });
ctx.logger.debug(`received consul k/v item`, { key, v });
return v;
}

Expand Down Expand Up @@ -365,7 +367,7 @@ export default class ConsulStore implements InstanceStore {
// the value is considered expired if the timestamp is in the past
async checkValue(ctx: Context, key: string): Promise<boolean> {
try {
const res = this.fetchTTLValue(ctx, key);
const res = this.fetchTTLValue(ctx, this.valuesPrefix + key);
if (!res) {
return false;
}
Expand Down Expand Up @@ -395,4 +397,14 @@ export default class ConsulStore implements InstanceStore {
await this.client.kv.del(key);
return true;
}

async ping(ctx: Context): Promise<boolean | string> {
try {
await this.client.status.leader();
return true;
} catch (err) {
ctx.logger.error(`Failed to ping consul: ${err}`, { err });
return err;
}
}
}
3 changes: 3 additions & 0 deletions src/instance_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ export interface InstanceStore {

// sanity related
saveCloudInstances: { (ctx: Context, groupName: string, cloudInstances: CloudInstance[]): Promise<boolean> };

// health
ping: { (ctx: Context): Promise<boolean | string> };
}

export default InstanceStore;
13 changes: 13 additions & 0 deletions src/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -536,4 +536,17 @@ export default class RedisStore implements MetricsStore, InstanceStore {
);
return true;
}

async ping(ctx: Context): Promise<boolean | string> {
return await new Promise((resolve) => {
this.redisClient.ping((err, reply) => {
if (err) {
ctx.logger.error('Redis ping error', { err });
resolve(false);
} else {
resolve(reply);
}
});
});
}
}
Loading