Skip to content

better integration with prometheus and grafana #37

Open
@insinfo

Description

@insinfo

I'm trying to implement a very simple rest API with just one endpoint using shelf_plus initializing multiple instances and I want to be able to integrate with Prometheus and Gratana to display a graph of requests per second and/or total requests per day, I made this implementation using the stream_isolate package and prometheus_client , I would like to know if there is a simpler and cleaner way to do this

import 'dart:async';
import 'dart:convert';
import 'dart:isolate';
import 'package:eloquent/eloquent.dart';
import 'package:stack_trace/stack_trace.dart';
import 'package:new_sali_backend/src/db/db_layer.dart';
import 'package:new_sali_backend/src/modules/protocolo/repositories/processo_repository.dart';
import 'package:new_sali_core/src/utils/core_utils.dart';
import 'package:new_sali_core/src/models/status_message.dart';
import 'package:shelf_plus/shelf_plus.dart';
import 'package:prometheus_client/prometheus_client.dart';
import 'package:prometheus_client/runtime_metrics.dart' as runtime_metrics;
import 'package:prometheus_client_shelf/shelf_metrics.dart' as shelf_metrics;
import 'package:prometheus_client/format.dart' as format;
import 'shelf_cors_headers_base.dart';
import 'stream_isolate.dart';

const defaultHeaders = {'Content-Type': 'application/json;charset=utf-8'};

Response responseError(String message,
    {dynamic exception, dynamic stackTrace, int statusCode = 400}) {
  final v = jsonEncode({
    'is_error': true,
    'status_code': statusCode,
    'message': message,
    'exception': exception?.toString(),
    'stackTrace': stackTrace?.toString()
  });
  return Response(statusCode, body: v, headers: defaultHeaders);
}

final basePath = '/api/v1';
final streamIsolates = <Map<int, BidirectionalStreamIsolate>>[];
void main(List<String> arguments) async {
  // Register default runtime metrics
  runtime_metrics.register();

  const numberOfIsolates = 3;

  for (var i = 0; i < numberOfIsolates - 1; i++) {
    final streamIsolate = await StreamIsolate.spawnBidirectional(isolateMain,
        debugName: i.toString(), argument: i);
    streamIsolates.add({i: streamIsolate});
    streamIsolate.stream.listen((event) => receiveAndPass(event, i));
  }
}

/// receive msg from isolate and send to all isolates
void receiveAndPass(event, int idx) {
  streamIsolates.forEach((item) {
    item.values.first.send(event);
  });
}

//xargs -I % -P 8 curl "http:/192.168.66.123:3161/api/v1/protocolo/processos/public/site/2023/10" < <(printf '%s\n' {1..400})
Stream isolateMain(Stream inc, id) {
  final streamController = StreamController.broadcast();

  final reg = CollectorRegistry();
  final http_requests_total = Counter(
      name: 'http_requests_total', help: 'Total number of http api requests');
  http_requests_total.register(reg);
  // listen msg from main
  inc.listen((msg) {
    http_requests_total.inc();
  });

  shelfRun(init([id, streamController, reg]),
      defaultShared: true,
      defaultBindAddress: '0.0.0.0',
      defaultBindPort: 3161);

  return streamController.stream;
}

Handler Function() init(List args) {
  var id = args[0] as int;
  var streamController = args[1] as StreamController;
  var reg = args[2] as CollectorRegistry;

  return () {
    final app = Router().plus;  
    app.use(shelf_metrics.register(reg));
    app.use(corsHeaders());
    app.use((innerHandler) {
      return (request) async {
        // Every time http_request is called, increase the counter by one
        final resp = await innerHandler(request);
        if (!request.url.path.contains('metrics')) {
          //send msg to main
          streamController.add('+1');
        }
        return resp;
      };
    });
    app.use(logRequestsCustom());

    routes(app, reg);
    return app;
  };
}

void routes(RouterPlus app, CollectorRegistry reg) {
  // Register a handler to expose the metrics in the Prometheus text format
  app.get('/metrics', () {
    return (request) async {
      final buffer = StringBuffer();
      final metrics = await reg.collectMetricFamilySamples();
      format.write004(buffer, metrics);
      return Response.ok(
        buffer.toString(),
        headers: {'Content-Type': format.contentType},
      );
    };
  });

  app.get('$basePath/protocolo/processos/public/site/<ano>/<codigo>',
      (Request request, String ano, String codigo) async {

    Connection? conn;
    try {
      final codProcesso = int.tryParse(codigo);
      if (codProcesso == null) {
        return responseError('codProcesso invalido');
      }
      final anoExercicio = ano;
      conn = await DBLayer().connect();
      final procRepo = ProcessoRepository(conn);
      final proc =
          await procRepo.getProcessoByCodigoPublic(codProcesso, anoExercicio);
      await conn.disconnect();
      return Response.ok(
        jsonEncode(proc, toEncodable: SaliCoreUtils.customJsonEncode),
        headers: defaultHeaders,
      );
    } catch (e, s) {
      await conn?.disconnect();
      print('public_backend@getProcessoByCodigoPublic $e $s');
      return responseError(StatusMessage.ERROR_GENERIC);
    }
  });
}

Middleware logRequestsCustom(
        {void Function(String message, bool isError)? logger}) =>
    (innerHandler) {
      final theLogger = logger ?? _defaultLogger;
      return (request) {
        var startTime = DateTime.now();
        var watch = Stopwatch()..start();
        return Future.sync(() => innerHandler(request)).then((response) {
          var msg = _message(startTime, response.statusCode,
              request.requestedUri, request.method, watch.elapsed);
          theLogger(msg, false);
          return response;
        }, onError: (Object error, StackTrace stackTrace) {
          if (error is HijackException) throw error;
          var msg = _errorMessage(startTime, request.requestedUri,
              request.method, watch.elapsed, error, stackTrace);
          theLogger(msg, true);
          // ignore: only_throw_errors
          throw error;
        });
      };
    };

String _formatQuery(String query) {
  return query == '' ? '' : '?$query';
}

String _message(DateTime requestTime, int statusCode, Uri requestedUri,
    String method, Duration elapsedTime) {
  return '${requestTime.toIso8601String()} '
      '${elapsedTime.toString().padLeft(15)} '
      '${method.padRight(7)} [$statusCode] ' // 7 - longest standard HTTP method
      '${requestedUri.path}${_formatQuery(requestedUri.query)}'
      '  isolate: ${Isolate.current.debugName}';
}

String _errorMessage(DateTime requestTime, Uri requestedUri, String method,
    Duration elapsedTime, Object error, StackTrace? stack) {
  var chain = Chain.current();
  if (stack != null) {
    chain = Chain.forTrace(stack)
        .foldFrames((frame) => frame.isCore || frame.package == 'shelf')
        .terse;
  }

  var msg = '$requestTime\t$elapsedTime\t$method\t${requestedUri.path}'
      '${_formatQuery(requestedUri.query)}\n$error';

  return '$msg\n$chain';
}

void _defaultLogger(String msg, bool isError) {
  if (isError) {
    print('[ERROR] $msg');
  } else {
    print(msg);
  }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions