Skip to content

Commit

Permalink
Database Server and Database State Server metrics (#16)
Browse files Browse the repository at this point in the history
  • Loading branch information
ksmit799 authored Oct 11, 2023
1 parent a8b612f commit 93575da
Show file tree
Hide file tree
Showing 8 changed files with 358 additions and 28 deletions.
238 changes: 224 additions & 14 deletions src/database/database_server.cpp

Large diffs are not rendered by default.

28 changes: 25 additions & 3 deletions src/database/database_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include <mongocxx/client.hpp>
#include <mongocxx/instance.hpp>
#include <prometheus/counter.h>
#include <prometheus/histogram.h>
#include <uvw/timer.h>

#include "../messagedirector/channel_subscriber.h"
#include "../net/datagram_iterator.h"
Expand All @@ -24,21 +27,34 @@ class DatabaseServer : public ChannelSubscriber {
void HandleCreateDone(const uint64_t &channel, const uint32_t &context,
const uint32_t &doId);

void HandleDelete(DatagramIterator &dgi, const uint64_t &sender);
void HandleDelete(DatagramIterator &dgi);

void HandleGetAll(DatagramIterator &dgi, const uint64_t &sender);
void HandleGetField(DatagramIterator &dgi, const uint64_t &sender,
const bool &multiple);

void HandleSetField(DatagramIterator &dgi, const uint64_t &sender,
const bool &multiple);
void HandleSetField(DatagramIterator &dgi, const bool &multiple);
void HandleSetFieldEquals(DatagramIterator &dgi, const uint64_t &sender,
const bool &multiple);

void HandleContextFailure(const MessageTypes &type, const uint64_t &channel,
const uint32_t &context);

void InitMetrics();
void InitFreeChannelsMetric();

enum OperationType {
CREATE_OBJECT,
DELETE_OBJECT,
GET_OBJECT,
GET_OBJECT_FIELDS,
SET_OBJECT_FIELDS,
UPDATE_OBJECT_FIELDS,
};

void ReportCompleted(const OperationType &type,
const uvw::timer_handle::time &startTime);
void ReportFailed(const OperationType &type);

uint32_t _minDoId;
uint32_t _maxDoId;
Expand All @@ -49,6 +65,12 @@ class DatabaseServer : public ChannelSubscriber {
mongocxx::uri _uri;
mongocxx::client _conn;
mongocxx::database _db;

prometheus::Gauge *_freeChannelsGauge = nullptr;

std::unordered_map<OperationType, prometheus::Counter *> _opsCompleted;
std::unordered_map<OperationType, prometheus::Counter *> _opsFailed;
std::unordered_map<OperationType, prometheus::Histogram *> _opsCompletionTime;
};

} // namespace Ardos
Expand Down
11 changes: 8 additions & 3 deletions src/database/database_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,19 +331,24 @@ void DatabaseUtils::BsonToField(const DCSubatomicType &fieldType,
dg.AddBlob(arrDg.GetData(), arrDg.Size());
break;
}
case ST_uint32uint8array:
case ST_uint32uint8array: {
if (value.type() != bsoncxx::type::k_array) {
throw ConversionException("Expected array");
}
dg.AddUint16(value.get_array().value.length());
for (size_t i = 0; i < value.get_array().value.length();) {

auto arr = value.get_array().value;
auto arrLength = std::distance(arr.begin(), arr.end());

dg.AddUint16(arrLength);
for (size_t i = 0; i < arrLength;) {
dg.AddUint32(BsonToNumber<uint32_t>(
value.get_array().value[i].get_value(), divisor));
dg.AddUint8(BsonToNumber<uint8_t>(
value.get_array().value[i + 1].get_value(), divisor));
i += 2;
}
break;
}
case ST_char:
if (value.type() != bsoncxx::type::k_string &&
value.get_string().value.length() != 1) {
Expand Down
68 changes: 68 additions & 0 deletions src/stateserver/database_state_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "../util/config.h"
#include "../util/logger.h"
#include "../util/metrics.h"
#include "loading_object.h"

namespace Ardos {
Expand All @@ -30,18 +31,37 @@ DatabaseStateServer::DatabaseStateServer() : ChannelSubscriber() {

// Start listening to DoId's in our listening range.
SubscribeRange(min, max);

// Initialize metrics.
InitMetrics();
}

void DatabaseStateServer::ReceiveObject(DistributedObject *distObj) {
_distObjs[distObj->GetDoId()] = distObj;

if (_objectsGauge) {
_objectsGauge->Increment();
}

if (_objectsSize) {
_objectsSize->Observe((double)distObj->Size());
}
}

void DatabaseStateServer::RemoveDistributedObject(const uint32_t &doId) {
_distObjs.erase(doId);

if (_objectsGauge) {
_objectsGauge->Decrement();
}
}

void DatabaseStateServer::DiscardLoader(const uint32_t &doId) {
_loadObjs.erase(doId);

if (_loadingGauge) {
_loadingGauge->Decrement();
}
}

void DatabaseStateServer::HandleDatagram(const std::shared_ptr<Datagram> &dg) {
Expand Down Expand Up @@ -349,6 +369,54 @@ void DatabaseStateServer::HandleGetActivated(DatagramIterator &dgi,
PublishDatagram(dg);
}

void DatabaseStateServer::InitMetrics() {
// Make sure we want to collect metrics on this cluster.
if (!Metrics::Instance()->WantMetrics()) {
return;
}

auto registry = Metrics::Instance()->GetRegistry();

auto &objectsBuilder = prometheus::BuildGauge()
.Name("dbss_objects_size")
.Help("Number of loaded distributed objects")
.Register(*registry);

auto &loadingBuilder = prometheus::BuildGauge()
.Name("dbss_loading_size")
.Help("Number of objects currently loading")
.Register(*registry);

auto &activateTimeBuilder =
prometheus::BuildHistogram()
.Name("dbss_activate_time")
.Help("Time taken for an object to load/activate")
.Register(*registry);

auto &objectsSizeBuilder =
prometheus::BuildHistogram()
.Name("dbss_objects_bytes_size")
.Help("Byte-size of loaded distributed objects")
.Register(*registry);

_objectsGauge = &objectsBuilder.Add({});
_loadingGauge = &loadingBuilder.Add({});

_activateTime = &activateTimeBuilder.Add(
{}, prometheus::Histogram::BucketBoundaries{
0, 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000});
_objectsSize = &objectsSizeBuilder.Add(
{}, prometheus::Histogram::BucketBoundaries{0, 4, 16, 64, 256, 1024, 4096,
16384, 65536});
}

void DatabaseStateServer::ReportActivateTime(
const uvw::timer_handle::time &startTime) {
if (_activateTime) {
_activateTime->Observe((double)(g_loop->now() - startTime).count());
}
}

bool UnpackDBFields(DatagramIterator &dgi, DCClass *dclass, FieldMap &required,
FieldMap &ram) {
// Unload RAM and REQUIRED fields from database response.
Expand Down
12 changes: 12 additions & 0 deletions src/stateserver/database_state_server.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef ARDOS_DATABASE_STATE_SERVER_H
#define ARDOS_DATABASE_STATE_SERVER_H

#include <uvw/timer.h>

#include "../messagedirector/channel_subscriber.h"
#include "../net/datagram_iterator.h"
#include "../util/globals.h"
Expand Down Expand Up @@ -41,6 +43,10 @@ class DatabaseStateServer : public StateServerImplementation,

void HandleGetActivated(DatagramIterator &dgi, const uint64_t &sender);

void InitMetrics();

void ReportActivateTime(const uvw::timer_handle::time &startTime);

uint64_t _dbChannel;

std::unordered_map<uint32_t, DistributedObject *> _distObjs;
Expand All @@ -52,6 +58,12 @@ class DatabaseStateServer : public StateServerImplementation,
std::unordered_map<uint32_t, std::shared_ptr<Datagram>> _contextDatagrams;

uint32_t _nextContext = 0;

prometheus::Gauge *_objectsGauge = nullptr;
prometheus::Gauge *_loadingGauge = nullptr;

prometheus::Histogram *_objectsSize = nullptr;
prometheus::Histogram *_activateTime = nullptr;
};

} // namespace Ardos
Expand Down
16 changes: 13 additions & 3 deletions src/stateserver/loading_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,13 @@ LoadingObject::LoadingObject(DatabaseStateServer *stateServer,
const std::unordered_set<uint32_t> &contexts)
: ChannelSubscriber(), _stateServer(stateServer), _doId(doId),
_parentId(parentId), _zoneId(zoneId),
_context(stateServer->_nextContext++), _validContexts(contexts) {
_context(stateServer->_nextContext++), _validContexts(contexts),
_startTime(g_loop->now()) {
SubscribeChannel(doId);

if (_stateServer->_loadingGauge) {
_stateServer->_loadingGauge->Increment();
}
}

LoadingObject::LoadingObject(DatabaseStateServer *stateServer,
Expand All @@ -24,7 +29,7 @@ LoadingObject::LoadingObject(DatabaseStateServer *stateServer,
: ChannelSubscriber(), _stateServer(stateServer), _doId(doId),
_parentId(parentId), _zoneId(zoneId),
_context(stateServer->_nextContext++), _validContexts(contexts),
_dclass(dclass) {
_dclass(dclass), _startTime(g_loop->now()) {
SubscribeChannel(doId);

// Unpack the RAM fields we received in the generate message.
Expand All @@ -48,10 +53,14 @@ LoadingObject::LoadingObject(DatabaseStateServer *stateServer,
_doId, field->get_name()));
}
}

if (_stateServer->_loadingGauge) {
_stateServer->_loadingGauge->Increment();
}
}

void LoadingObject::Start() {
if (!_validContexts.size()) {
if (_validContexts.empty()) {
// Fetch our stored fields from the database.
auto dg = std::make_shared<Datagram>(_stateServer->_dbChannel, _doId,
DBSERVER_OBJECT_GET_ALL);
Expand Down Expand Up @@ -173,6 +182,7 @@ void LoadingObject::HandleDatagram(const std::shared_ptr<Datagram> &dg) {
}

void LoadingObject::Finalize() {
_stateServer->ReportActivateTime(_startTime);
_stateServer->DiscardLoader(_doId);
ForwardDatagrams();
ChannelSubscriber::Shutdown();
Expand Down
2 changes: 2 additions & 0 deletions src/stateserver/loading_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class LoadingObject : public ChannelSubscriber {
FieldMap _ramFields;

std::vector<std::shared_ptr<Datagram>> _datagramQueue;

uvw::timer_handle::time _startTime;
};

} // namespace Ardos
Expand Down
11 changes: 6 additions & 5 deletions src/stateserver/state_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ void StateServer::InitMetrics() {

auto &objectsBuilder = prometheus::BuildGauge()
.Name("ss_objects_size")
.Help("Number of distributed objects")
.Help("Number of loaded distributed objects")
.Register(*registry);

auto &objectsSizeBuilder = prometheus::BuildHistogram()
.Name("ss_objects_bytes_size")
.Help("Bytes size of distributed objects")
.Register(*registry);
auto &objectsSizeBuilder =
prometheus::BuildHistogram()
.Name("ss_objects_bytes_size")
.Help("Byte-size of loaded distributed objects")
.Register(*registry);

_objectsGauge = &objectsBuilder.Add({});
_objectsSizeHistogram = &objectsSizeBuilder.Add(
Expand Down

0 comments on commit 93575da

Please sign in to comment.