You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm trying to implement a very simple rest API with just one endpoint using shelf_plus initializing multiple instances and I want to be able to integrate with Prometheus and Gratana to display a graph of requests per second and/or total requests per day, I made this implementation using the stream_isolate package and prometheus_client , I would like to know if there is a simpler and cleaner way to do this
import'dart:async';
import'dart:convert';
import'dart:isolate';
import'package:eloquent/eloquent.dart';
import'package:stack_trace/stack_trace.dart';
import'package:new_sali_backend/src/db/db_layer.dart';
import'package:new_sali_backend/src/modules/protocolo/repositories/processo_repository.dart';
import'package:new_sali_core/src/utils/core_utils.dart';
import'package:new_sali_core/src/models/status_message.dart';
import'package:shelf_plus/shelf_plus.dart';
import'package:prometheus_client/prometheus_client.dart';
import'package:prometheus_client/runtime_metrics.dart'as runtime_metrics;
import'package:prometheus_client_shelf/shelf_metrics.dart'as shelf_metrics;
import'package:prometheus_client/format.dart'as format;
import'shelf_cors_headers_base.dart';
import'stream_isolate.dart';
const defaultHeaders = {'Content-Type':'application/json;charset=utf-8'};
ResponseresponseError(String message,
{dynamic exception, dynamic stackTrace, int statusCode =400}) {
final v =jsonEncode({
'is_error':true,
'status_code': statusCode,
'message': message,
'exception': exception?.toString(),
'stackTrace': stackTrace?.toString()
});
returnResponse(statusCode, body: v, headers: defaultHeaders);
}
final basePath ='/api/v1';
final streamIsolates =<Map<int, BidirectionalStreamIsolate>>[];
voidmain(List<String> arguments) async {
// Register default runtime metrics
runtime_metrics.register();
const numberOfIsolates =3;
for (var i =0; i < numberOfIsolates -1; i++) {
final streamIsolate =awaitStreamIsolate.spawnBidirectional(isolateMain,
debugName: i.toString(), argument: i);
streamIsolates.add({i: streamIsolate});
streamIsolate.stream.listen((event) =>receiveAndPass(event, i));
}
}
/// receive msg from isolate and send to all isolatesvoidreceiveAndPass(event, int idx) {
streamIsolates.forEach((item) {
item.values.first.send(event);
});
}
//xargs -I % -P 8 curl "http:/192.168.66.123:3161/api/v1/protocolo/processos/public/site/2023/10" < <(printf '%s\n' {1..400})StreamisolateMain(Stream inc, id) {
final streamController =StreamController.broadcast();
final reg =CollectorRegistry();
final http_requests_total =Counter(
name:'http_requests_total', help:'Total number of http api requests');
http_requests_total.register(reg);
// listen msg from main
inc.listen((msg) {
http_requests_total.inc();
});
shelfRun(init([id, streamController, reg]),
defaultShared:true,
defaultBindAddress:'0.0.0.0',
defaultBindPort:3161);
return streamController.stream;
}
HandlerFunction() init(List args) {
var id = args[0] asint;
var streamController = args[1] asStreamController;
var reg = args[2] asCollectorRegistry;
return () {
final app =Router().plus;
app.use(shelf_metrics.register(reg));
app.use(corsHeaders());
app.use((innerHandler) {
return (request) async {
// Every time http_request is called, increase the counter by onefinal resp =awaitinnerHandler(request);
if (!request.url.path.contains('metrics')) {
//send msg to main
streamController.add('+1');
}
return resp;
};
});
app.use(logRequestsCustom());
routes(app, reg);
return app;
};
}
voidroutes(RouterPlus app, CollectorRegistry reg) {
// Register a handler to expose the metrics in the Prometheus text format
app.get('/metrics', () {
return (request) async {
final buffer =StringBuffer();
final metrics =await reg.collectMetricFamilySamples();
format.write004(buffer, metrics);
returnResponse.ok(
buffer.toString(),
headers: {'Content-Type': format.contentType},
);
};
});
app.get('$basePath/protocolo/processos/public/site/<ano>/<codigo>',
(Request request, String ano, String codigo) async {
Connection? conn;
try {
final codProcesso =int.tryParse(codigo);
if (codProcesso ==null) {
returnresponseError('codProcesso invalido');
}
final anoExercicio = ano;
conn =awaitDBLayer().connect();
final procRepo =ProcessoRepository(conn);
final proc =await procRepo.getProcessoByCodigoPublic(codProcesso, anoExercicio);
await conn.disconnect();
returnResponse.ok(
jsonEncode(proc, toEncodable:SaliCoreUtils.customJsonEncode),
headers: defaultHeaders,
);
} catch (e, s) {
await conn?.disconnect();
print('public_backend@getProcessoByCodigoPublic $e $s');
returnresponseError(StatusMessage.ERROR_GENERIC);
}
});
}
MiddlewarelogRequestsCustom(
{voidFunction(String message, bool isError)? logger}) =>
(innerHandler) {
final theLogger = logger ?? _defaultLogger;
return (request) {
var startTime =DateTime.now();
var watch =Stopwatch()..start();
returnFuture.sync(() =>innerHandler(request)).then((response) {
var msg =_message(startTime, response.statusCode,
request.requestedUri, request.method, watch.elapsed);
theLogger(msg, false);
return response;
}, onError: (Object error, StackTrace stackTrace) {
if (error isHijackException) throw error;
var msg =_errorMessage(startTime, request.requestedUri,
request.method, watch.elapsed, error, stackTrace);
theLogger(msg, true);
// ignore: only_throw_errorsthrow error;
});
};
};
String_formatQuery(String query) {
return query ==''?'':'?$query';
}
String_message(DateTime requestTime, int statusCode, Uri requestedUri,
String method, Duration elapsedTime) {
return'${requestTime.toIso8601String()} ''${elapsedTime.toString().padLeft(15)} ''${method.padRight(7)} [$statusCode] '// 7 - longest standard HTTP method'${requestedUri.path}${_formatQuery(requestedUri.query)}'' isolate: ${Isolate.current.debugName}';
}
String_errorMessage(DateTime requestTime, Uri requestedUri, String method,
Duration elapsedTime, Object error, StackTrace? stack) {
var chain =Chain.current();
if (stack !=null) {
chain =Chain.forTrace(stack)
.foldFrames((frame) => frame.isCore || frame.package =='shelf')
.terse;
}
var msg ='$requestTime\t$elapsedTime\t$method\t${requestedUri.path}''${_formatQuery(requestedUri.query)}\n$error';
return'$msg\n$chain';
}
void_defaultLogger(String msg, bool isError) {
if (isError) {
print('[ERROR] $msg');
} else {
print(msg);
}
}
The text was updated successfully, but these errors were encountered:
I'm trying to implement a very simple rest API with just one endpoint using shelf_plus initializing multiple instances and I want to be able to integrate with Prometheus and Gratana to display a graph of requests per second and/or total requests per day, I made this implementation using the stream_isolate package and prometheus_client , I would like to know if there is a simpler and cleaner way to do this
The text was updated successfully, but these errors were encountered: