From 2ab184792ea62d7bad9bbf35313c19b39d5a20bd Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 11 Jul 2014 16:39:40 -0400 Subject: [PATCH 01/16] rtbkit_integration_test: make use of a configurable number of agents --- rtbkit/core/banker/account.h | 4 +- rtbkit/examples/rtbkit_integration_test.cc | 89 +++++++++++++++------- rtbkit/testing/test_agent.h | 13 ++-- 3 files changed, 71 insertions(+), 35 deletions(-) diff --git a/rtbkit/core/banker/account.h b/rtbkit/core/banker/account.h index 33a65a218..25dad571b 100644 --- a/rtbkit/core/banker/account.h +++ b/rtbkit/core/banker/account.h @@ -1194,7 +1194,7 @@ struct Accounts { { auto it = accounts.find(account); if (it == accounts.end()) - throw ML::Exception("couldn't get account"); + throw ML::Exception("couldn't get account: " + account.toString()); return it->second; } @@ -1202,7 +1202,7 @@ struct Accounts { { auto it = accounts.find(account); if (it == accounts.end()) - throw ML::Exception("couldn't get account"); + throw ML::Exception("couldn't get account: " + account.toString()); return it->second; } diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 412060a4d..52d6c28d3 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -39,6 +39,9 @@ using namespace Datacratic; using namespace RTBKIT; +#define NUM_ACCOUNTS 200 + + /******************************************************************************/ /* COMPONENTS */ /******************************************************************************/ @@ -58,7 +61,7 @@ struct Components SlaveBudgetController budgetController; AgentConfigurationService agentConfig; MonitorEndpoint monitor; - TestAgent agent; + vector > agents; FrequencyCapAugmentor augmentor1, augmentor2; // \todo Add a PAL event subscriber. @@ -79,7 +82,6 @@ struct Components masterBanker(proxies, "masterBanker"), agentConfig(proxies, "agentConfigurationService"), monitor(proxies, "monitor"), - agent(proxies, "agent1"), augmentor1(proxies, "fca1"), augmentor2(proxies, "fca2"), winStream("mockStream", proxies) @@ -95,7 +97,9 @@ struct Components budgetController.shutdown(); - agent.shutdown(); + for (shared_ptr & agent: agents) { + agent->shutdown(); + } augmentor1.shutdown(); augmentor2.shutdown(); agentConfig.shutdown(); @@ -142,6 +146,7 @@ struct Components // Setup a slave banker that we can use to manipulate and peak at the // budgets during the test. budgetController.setApplicationLayer(make_application_layer(proxies->config)); + // budgetController.setApplicationLayer(make_application_layer(bankerAddr)); budgetController.start(); // Each router contains a slave masterBanker which is periodically @@ -150,6 +155,7 @@ struct Components { auto res = std::make_shared(name); res->setApplicationLayer(make_application_layer(proxies->config)); + // res->setApplicationLayer(make_application_layer(bankerAddr)); res->start(); return res; }; @@ -205,9 +211,11 @@ struct Components // Our bidding agent which listens to the bid request stream from all // available routers and decide who gets to see your awesome pictures of // kittens. - agent.init(); - agent.start(); - agent.configure(); + for (const shared_ptr & agent: agents) { + agent->init(); + agent->start(); + agent->configure(); + } // Our augmentor which does frequency capping for our agent. augmentor1.init(); @@ -288,23 +296,21 @@ void allocateBudget( budgetController.setBudgetSync(account[0], budget); budgetController.topupTransferSync(account, USD(10)); - cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; - - // Syncing is done periodically so we have to wait a bit before the router - // will have a budget available. Necessary because the bid request stream - // for this test isn't infinit. - cerr << "sleeping so that the slave accounts can sync up" << endl; - ML::sleep(2.1); + // cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; +} +void testBudget(SlaveBudgetController& budgetController, + const AccountKey& account) +{ auto summary = budgetController.getAccountSummarySync(account[0], -1); cerr << summary << endl; ExcAssertEqual( - summary.subAccounts["testStrategy"].subAccounts["router1"].budget, + summary.subAccounts[account[1]].subAccounts["router1"].budget, USD(0.10)); ExcAssertEqual( - summary.subAccounts["testStrategy"].subAccounts["router2"].budget, + summary.subAccounts[account[1]].subAccounts["router2"].budget, USD(0.10)); } @@ -332,7 +338,7 @@ void dumpAccounts( */ int main(int argc, char ** argv) { - Watchdog watchdog(30.0); + Watchdog watchdog(200.0); // Controls the length of the test. enum { @@ -352,20 +358,47 @@ int main(int argc, char ** argv) if (false) proxies->logToCarbon("carbon.rtbkit.org", "stats"); + Components components(proxies); + + // Setup an initial budgeting for the test. + for (int i = 0; i < NUM_ACCOUNTS; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + auto agent = make_shared(proxies, + "testAgent" + to_string(i), + key); + components.agents.push_back(agent); + } + // Setups up the various component of the RTBKit stack. See Components::init // for more details. - Components components(proxies); components.init(); - // Some extra customization for our agent to make it extra special. See - // setupAgent for more details. - setupAgent(components.agent); - // Setup an initial budgeting for the test. - allocateBudget( - components.budgetController, - {"testCampaign", "testStrategy"}, - USD(1000)); + for (int i = 0; i < NUM_ACCOUNTS; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + allocateBudget(components.budgetController, key, USD(1000)); + } + + // Syncing is done periodically so we have to wait a bit before the router + // will have a budget available. Necessary because the bid request stream + // for this test isn't infinit. + cerr << "sleeping so that the slave accounts can sync up" << endl; + ML::sleep(2.1); + + for (int i = 0; i < NUM_ACCOUNTS; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + testBudget(components.budgetController, key); + } + + + for (const auto & agent: components.agents) { + // Some extra customization for our agent to make it extra special. + // See setupAgent for more details. + setupAgent(*agent); + } // Start up the exchange threads which should let bid requests flow through // our stack. @@ -380,11 +413,11 @@ int main(int argc, char ** argv) // Dump the budget stats while we wait for the test to finish. while (!exchange.isDone()) { - auto summary = components.budgetController.getAccountSummarySync( - {"testCampaign"}, -1); + auto summary = components.budgetController.getAccountSummarySync( + {"testCampaign0"}, -1); cerr << summary << endl; - dumpAccounts(components.budgetController, {"testCampaign"}, summary); + dumpAccounts(components.budgetController, {"testCampaign0"}, summary); ML::sleep(1.0); } diff --git a/rtbkit/testing/test_agent.h b/rtbkit/testing/test_agent.h index ba0460609..10f8d2174 100644 --- a/rtbkit/testing/test_agent.h +++ b/rtbkit/testing/test_agent.h @@ -7,27 +7,30 @@ #pragma once +#include "rtbkit/common/account_key.h" #include "rtbkit/plugins/bidding_agent/bidding_agent.h" #include "jml/arch/futex.h" namespace RTBKIT { -struct TestAgent : public RTBKIT::BiddingAgent { +struct TestAgent : public BiddingAgent { TestAgent(std::shared_ptr proxies, - const std::string & name = "testAgent") + const std::string & name = "testAgent", + const AccountKey & accountKey + = AccountKey({"testCampaign", "testStrategy"})) noexcept : RTBKIT::BiddingAgent(proxies, name) { - setDefaultConfig(); + setDefaultConfig(accountKey); setupCallbacks(); clear(); } RTBKIT::AgentConfig config; - void setDefaultConfig() + void setDefaultConfig(const AccountKey & accountKey) { RTBKIT::AgentConfig config; - config.account = {"testCampaign", "testStrategy"}; + config.account = accountKey; config.maxInFlight = 20000; config.creatives.push_back(RTBKIT::Creative::sampleLB); config.creatives.push_back(RTBKIT::Creative::sampleWS); From ec23bde49ccd41de200e3fed991c32e75c901d7d Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 14 Jul 2014 10:44:56 -0400 Subject: [PATCH 02/16] "toPopupTransfer" -> "topupTransfer" and reworked indentation --- rtbkit/core/banker/application_layer.cc | 151 ++++++++++++------------ rtbkit/core/banker/application_layer.h | 12 +- rtbkit/core/banker/slave_banker.cc | 2 +- 3 files changed, 80 insertions(+), 85 deletions(-) diff --git a/rtbkit/core/banker/application_layer.cc b/rtbkit/core/banker/application_layer.cc index e016149f9..6b62b244d 100644 --- a/rtbkit/core/banker/application_layer.cc +++ b/rtbkit/core/banker/application_layer.cc @@ -15,7 +15,7 @@ namespace RTBKIT { template std::shared_ptr makeCallback(std::string functionName, - std::function onDone) + std::function onDone) { //static_assert(std::is_default_constructible::value, // "Result is not default constructible"); @@ -23,8 +23,7 @@ makeCallback(std::string functionName, return std::make_shared( [=](const HttpRequest &, HttpClientError error, int statusCode, - std::string &&, std::string &&body) - { + std::string &&, std::string &&body) { JML_TRACE_EXCEPTIONS(false); if (!onDone) { return; @@ -38,19 +37,19 @@ makeCallback(std::string functionName, Result { }); } else { - decodeRestResponseJson(functionName, nullptr, statusCode, body, onDone); + decodeRestResponseJson(functionName, nullptr, statusCode, body, onDone); } }); } void ApplicationLayer:: -toPopupTransfer(const AccountKey &account, - AccountType accountType, - CurrencyPool amount, - const BudgetController::OnBudgetResult &onResult) +topupTransfer(const AccountKey &account, + AccountType accountType, + CurrencyPool amount, + const BudgetController::OnBudgetResult &onResult) { - toPopupTransfer(account.toString(), accountType, amount, onResult); + topupTransfer(account.toString(), accountType, amount, onResult); } void @@ -65,8 +64,7 @@ init(std::string bankerUri) // Since we send one HttpRequest per account when syncing, this is a good idea // to keep a fairly large queue size in order to avoid deadlocks - httpClient.reset(new HttpClient(bankerUri, 4 /* numParallel */, - 1024 /* queueSize */)); + httpClient.reset(new HttpClient(bankerUri, 4 /* numParallel */)); addSource("HttpLayer::httpClient", httpClient); } @@ -76,18 +74,18 @@ addAccount(const AccountKey &account, const BudgetController::OnBudgetResult &onResult) { httpClient->post("/v1/accounts", budgetResultCallback(onResult), - { }, - { { "accountName", account.toString() }, - { "accountType", "budget" }}); + { }, + { { "accountName", account.toString() }, + { "accountType", "budget" }}); } void HttpLayer:: -toPopupTransfer(const std::string &accountStr, - AccountType accountType, - CurrencyPool amount, - const BudgetController::OnBudgetResult &onResult) +topupTransfer(const std::string &accountStr, + AccountType accountType, + CurrencyPool amount, + const BudgetController::OnBudgetResult &onResult) { httpClient->put("/v1/accounts/" + accountStr + "/balance", budgetResultCallback(onResult), @@ -109,14 +107,14 @@ setBudget(const std::string &topLevelAccount, void HttpLayer:: getAccountSummary( - const AccountKey &account, - int depth, - std::function - onResult) + const AccountKey &account, + int depth, + std::function + onResult) { httpClient->get("/v1/accounts/" + account.toString() + "/summary", makeCallback( - "HttpLayer::getAccountSummary", + "HttpLayer::getAccountSummary", onResult), { { "depth", to_string(depth) } }); } @@ -128,8 +126,8 @@ getAccount(const AccountKey &account, { httpClient->get("/v1/accounts/" + account.toString(), makeCallback( - "HttpLayer::getAccount", - onResult)); + "HttpLayer::getAccount", + onResult)); } void @@ -141,8 +139,8 @@ addSpendAccount(const std::string &shadowStr, makeCallback("HttpLayer::addSpendAccount", onDone), { }, { - { "accountName", shadowStr }, - { "accountType", "spend" } + { "accountName", shadowStr }, + { "accountType", "spend" } }); } @@ -160,23 +158,22 @@ syncAccount(const ShadowAccount &account, const std::string &shadowStr, void HttpLayer:: request(std::string method, const std::string &resource, - const RestParams ¶ms, const std::string &content, OnRequestResult onResult) + const RestParams ¶ms, const std::string &content, OnRequestResult onResult) { std::transform(begin(method), end(method), begin(method), [](char c) { return ::tolower(c); }); auto onDone = std::make_shared( - [=](const HttpRequest &, - HttpClientError error, int statusCode, - std::string &&, std::string &&body) - { + [=](const HttpRequest &, + HttpClientError error, int statusCode, + std::string &&, std::string &&body) { if (error != HttpClientError::NONE) { std::ostringstream oss; oss << error; onResult(std::make_exception_ptr( - ML::Exception("HTTP Request failed with return code %s", oss.str().c_str())), - statusCode, ""); - } - else { + ML::Exception("HTTP Request failed with return code %s", oss.str().c_str())), + statusCode, ""); + } + else { onResult(nullptr, statusCode, body); } }); @@ -202,13 +199,12 @@ budgetResultCallback(const BudgetController::OnBudgetResult &onResult) return std::make_shared( [=](const HttpRequest &, HttpClientError error, int statusCode, - std::string &&, std::string &&body) - { + std::string &&, std::string &&body) { if (error != HttpClientError::NONE) { std::ostringstream oss; oss << error; onResult(std::make_exception_ptr( - ML::Exception("HTTP Request failed with return code %s", oss.str().c_str()))); + ML::Exception("HTTP Request failed with return code %s", oss.str().c_str()))); } else { onResult(nullptr); @@ -235,23 +231,23 @@ addAccount(const AccountKey &account, const BudgetController::OnBudgetResult &onResult) { proxy->push(budgetResultCallback(onResult), - "POST", "/v1/accounts", - { {"accountName", account.toString()}, - { "accountType", "budget" } }); + "POST", "/v1/accounts", + { {"accountName", account.toString()}, + { "accountType", "budget" } }); } void ZmqLayer:: -toPopupTransfer(const std::string &accountStr, - AccountType accountType, - CurrencyPool amount, - const BudgetController::OnBudgetResult &onResult) +topupTransfer(const std::string &accountStr, + AccountType accountType, + CurrencyPool amount, + const BudgetController::OnBudgetResult &onResult) { proxy->push(budgetResultCallback(onResult), - "PUT", "/v1/accounts/" + accountStr + "/balance", - { { "accountType", AccountTypeToString(accountType) } }, - amount.toJson().toString()); + "PUT", "/v1/accounts/" + accountStr + "/balance", + { { "accountType", AccountTypeToString(accountType) } }, + amount.toJson().toString()); } void @@ -261,23 +257,23 @@ setBudget(const std::string &topLevelAccount, const BudgetController::OnBudgetResult &onResult) { proxy->push(budgetResultCallback(onResult), - "PUT", "/v1/accounts/" + topLevelAccount + "/budget", - { /* {"amount", amount.toString()}*/ }, - amount.toJson().toString()); + "PUT", "/v1/accounts/" + topLevelAccount + "/budget", + { /* {"amount", amount.toString()}*/ }, + amount.toJson().toString()); } void ZmqLayer:: getAccountSummary( - const AccountKey &account, - int depth, - std::function - onResult) + const AccountKey &account, + int depth, + std::function + onResult) { proxy->push(makeRestResponseJsonDecoder("ZmqLayer::getAccountSummary", onResult), - "GET", "/v1/accounts/" + account.toString() + "/summary", - { {"depth", to_string(depth)} }, - ""); + "GET", "/v1/accounts/" + account.toString() + "/summary", + { {"depth", to_string(depth)} }, + ""); } void @@ -295,31 +291,31 @@ addSpendAccount(const std::string &shadowStr, std::function onDone) { proxy->push(makeRestResponseJsonDecoder("ZmqLayer::addSpendAccount", onDone), - "POST", - "/v1/accounts", - { - { "accountName", shadowStr }, - { "accountType", "spend" } - }, - ""); + "POST", + "/v1/accounts", + { + { "accountName", shadowStr }, + { "accountType", "spend" } + }, + ""); } void ZmqLayer:: syncAccount(const ShadowAccount &account, const std::string &shadowStr, - std::function onDone) + std::function onDone) { proxy->push(makeRestResponseJsonDecoder("ZmqLayer::syncAccount", onDone), - "PUT", - "/v1/accounts/" + shadowStr + "/shadow", - {}, - account.toJson().toString()); + "PUT", + "/v1/accounts/" + shadowStr + "/shadow", + {}, + account.toJson().toString()); } void ZmqLayer:: request(std::string method, const std::string &resource, - const RestParams ¶ms, const std::string &content, OnRequestResult onResult) + const RestParams ¶ms, const std::string &content, OnRequestResult onResult) { proxy->push(onResult, method, resource, params, content); } @@ -329,12 +325,11 @@ RestProxy::OnDone ZmqLayer:: budgetResultCallback(const BudgetController::OnBudgetResult &onResult) { - return [=] (std::exception_ptr ptr, int resultCode, string body) - { - //cerr << "got budget result callback with resultCode " - // << resultCode << " body " << body << endl; - onResult(ptr); - }; + return [=] (std::exception_ptr ptr, int resultCode, string body) { + //cerr << "got budget result callback with resultCode " + // << resultCode << " body " << body << endl; + onResult(ptr); + }; } } // namespace RTBKIT diff --git a/rtbkit/core/banker/application_layer.h b/rtbkit/core/banker/application_layer.h index 4c84b7870..9d87a435a 100644 --- a/rtbkit/core/banker/application_layer.h +++ b/rtbkit/core/banker/application_layer.h @@ -35,13 +35,13 @@ struct ApplicationLayer : public MessageLoop { const AccountKey &account, const BudgetController::OnBudgetResult &onResult) = 0; - virtual void toPopupTransfer( + virtual void topupTransfer( const AccountKey &account, AccountType accountType, CurrencyPool amount, const BudgetController::OnBudgetResult &onResult); - virtual void toPopupTransfer( + virtual void topupTransfer( const std::string &accountStr, AccountType accountType, CurrencyPool amount, @@ -90,8 +90,8 @@ struct HttpLayer : public ApplicationLayer { const AccountKey &account, const BudgetController::OnBudgetResult &onResult); - using ApplicationLayer::toPopupTransfer; - void toPopupTransfer( + using ApplicationLayer::topupTransfer; + void topupTransfer( const std::string &accountStr, AccountType accountType, CurrencyPool amount, @@ -143,8 +143,8 @@ struct ZmqLayer : public ApplicationLayer { const AccountKey &account, const BudgetController::OnBudgetResult &onResult); - using ApplicationLayer::toPopupTransfer; - void toPopupTransfer( + using ApplicationLayer::topupTransfer; + void topupTransfer( const std::string &accountStr, AccountType accountType, CurrencyPool amount, diff --git a/rtbkit/core/banker/slave_banker.cc b/rtbkit/core/banker/slave_banker.cc index 08f19ca1a..b0c96e0f9 100644 --- a/rtbkit/core/banker/slave_banker.cc +++ b/rtbkit/core/banker/slave_banker.cc @@ -40,7 +40,7 @@ topupTransfer(const AccountKey & account, CurrencyPool amount, const OnBudgetResult & onResult) { - applicationLayer->toPopupTransfer(account, AT_BUDGET, amount, onResult); + applicationLayer->topupTransfer(account, AT_BUDGET, amount, onResult); } void From 3e38a99e500e482644d5da0fae0b9615a61cc1ee Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 14 Jul 2014 11:06:08 -0400 Subject: [PATCH 03/16] rtbkit_integration_test: cleanup of the code handling multiple agents/accounts --- rtbkit/examples/rtbkit_integration_test.cc | 255 ++++++++++----------- 1 file changed, 123 insertions(+), 132 deletions(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 52d6c28d3..c545ca56b 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -39,7 +39,109 @@ using namespace Datacratic; using namespace RTBKIT; -#define NUM_ACCOUNTS 200 +const size_t numAccounts(200); /* number of accounts and agents */ + + +/******************************************************************************/ +/* SETUP */ +/******************************************************************************/ + +void setupAgent(TestAgent& agent) +{ + // Since we're writting a simple test, this allows us to omit callbacks in + // the bidding agent class. + agent.strictMode(false); + + + // Indicate to the router that we want our bid requests to be augmented + // with our frequency cap augmentor example. + { + AugmentationConfig augConfig; + + // Name of the requested augmentor. + augConfig.name = "frequency-cap-ex"; + + // If the augmentor was unable to augment our bid request then it + // should be filtered before it makes it to our agent. + augConfig.required = true; + + // Config parameter sent used by the augmentor to determine which + // tag to set. + augConfig.config = Json::Value(42); + + // Instruct to router to filter out all bid requests who have not + // been tagged by our frequency cap augmentor. + augConfig.filters.include.push_back("pass-frequency-cap-ex"); + + agent.config.addAugmentation(augConfig); + } + + // Notify the world about our config change. + agent.doConfig(agent.config); + + // This lambda implements our incredibly sophisticated bidding strategy. + agent.onBidRequest = [&] ( + double timestamp, + const Id & id, + std::shared_ptr br, + Bids bids, + double timeLeftMs, + const Json::Value & augmentations, + const WinCostModel & wcm) + { + ExcAssertGreater(bids.size(), 0); + + Bid& bid = bids[0]; + ExcAssertGreater(bid.availableCreatives.size(), 0); + + bid.bid(bid.availableCreatives[0], USD_CPM(1)); + + agent.doBid(id, bids, Json::Value(), wcm); + ML::atomic_inc(agent.numBidRequests); + }; +} + + +/** Transfer a given budget to each router for a given account. */ +void allocateBudget( + SlaveBudgetController& budgetController, + const AccountKey& account, + Amount budget) +{ + budgetController.addAccountSync(account); + budgetController.setBudgetSync(account[0], budget); + budgetController.topupTransferSync(account, USD(10)); + + // cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; +} + +void testBudget(SlaveBudgetController& budgetController, + const AccountKey& account) +{ + auto summary = budgetController.getAccountSummarySync(account[0], -1); + cerr << summary << endl; + + ExcAssertEqual( + summary.subAccounts[account[1]].subAccounts["router1"].budget, + USD(0.10)); + + ExcAssertEqual( + summary.subAccounts[account[1]].subAccounts["router2"].budget, + USD(0.10)); +} + +/** Some debugging output for the banker. */ +void dumpAccounts( + SlaveBudgetController& budgetController, + const AccountKey & name, const AccountSummary & a) +{ + cerr << name << ": " << endl; + cerr << budgetController.getAccountSync(name) << endl; + + for (auto & sub: a.subAccounts) { + dumpAccounts(budgetController, name.childKey(sub.first), sub.second); + } +}; /******************************************************************************/ @@ -114,7 +216,7 @@ struct Components cerr << "done shutdown" << endl; } - void init() + void init(size_t numAgents) { const string agentUri = "tcp://127.0.0.1:1234"; @@ -211,10 +313,24 @@ struct Components // Our bidding agent which listens to the bid request stream from all // available routers and decide who gets to see your awesome pictures of // kittens. - for (const shared_ptr & agent: agents) { + for (size_t i = 0; i < numAgents; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + auto agent = make_shared(proxies, + "testAgent" + to_string(i), + key); + agents.push_back(agent); + agent->init(); agent->start(); agent->configure(); + + // Some extra customization for our agent to make it extra + // special. See setupAgent for more details. + setupAgent(*agent); + + // Setup an initial budgeting for the test. + allocateBudget(budgetController, key, USD(1000)); } // Our augmentor which does frequency capping for our agent. @@ -226,108 +342,6 @@ struct Components }; -/******************************************************************************/ -/* SETUP */ -/******************************************************************************/ - -void setupAgent(TestAgent& agent) -{ - // Since we're writting a simple test, this allows us to omit callbacks in - // the bidding agent class. - agent.strictMode(false); - - - // Indicate to the router that we want our bid requests to be augmented - // with our frequency cap augmentor example. - { - AugmentationConfig augConfig; - - // Name of the requested augmentor. - augConfig.name = "frequency-cap-ex"; - - // If the augmentor was unable to augment our bid request then it - // should be filtered before it makes it to our agent. - augConfig.required = true; - - // Config parameter sent used by the augmentor to determine which - // tag to set. - augConfig.config = Json::Value(42); - - // Instruct to router to filter out all bid requests who have not - // been tagged by our frequency cap augmentor. - augConfig.filters.include.push_back("pass-frequency-cap-ex"); - - agent.config.addAugmentation(augConfig); - } - - // Notify the world about our config change. - agent.doConfig(agent.config); - - // This lambda implements our incredibly sophisticated bidding strategy. - agent.onBidRequest = [&] ( - double timestamp, - const Id & id, - std::shared_ptr br, - Bids bids, - double timeLeftMs, - const Json::Value & augmentations, - const WinCostModel & wcm) - { - ExcAssertGreater(bids.size(), 0); - - Bid& bid = bids[0]; - ExcAssertGreater(bid.availableCreatives.size(), 0); - - bid.bid(bid.availableCreatives[0], USD_CPM(1)); - - agent.doBid(id, bids, Json::Value(), wcm); - ML::atomic_inc(agent.numBidRequests); - }; -} - - -/** Transfer a given budget to each router for a given account. */ -void allocateBudget( - SlaveBudgetController& budgetController, - const AccountKey& account, - Amount budget) -{ - budgetController.addAccountSync(account); - budgetController.setBudgetSync(account[0], budget); - budgetController.topupTransferSync(account, USD(10)); - - // cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; -} - -void testBudget(SlaveBudgetController& budgetController, - const AccountKey& account) -{ - auto summary = budgetController.getAccountSummarySync(account[0], -1); - cerr << summary << endl; - - ExcAssertEqual( - summary.subAccounts[account[1]].subAccounts["router1"].budget, - USD(0.10)); - - ExcAssertEqual( - summary.subAccounts[account[1]].subAccounts["router2"].budget, - USD(0.10)); -} - -/** Some debugging output for the banker. */ -void dumpAccounts( - SlaveBudgetController& budgetController, - const AccountKey & name, const AccountSummary & a) -{ - cerr << name << ": " << endl; - cerr << budgetController.getAccountSync(name) << endl; - - for (auto & sub: a.subAccounts) { - dumpAccounts(budgetController, name.childKey(sub.first), sub.second); - } -}; - - /******************************************************************************/ /* MAIN */ /******************************************************************************/ @@ -360,26 +374,9 @@ int main(int argc, char ** argv) Components components(proxies); - // Setup an initial budgeting for the test. - for (int i = 0; i < NUM_ACCOUNTS; i++) { - AccountKey key{"testCampaign" + to_string(i), - "testStrategy" + to_string(i)}; - auto agent = make_shared(proxies, - "testAgent" + to_string(i), - key); - components.agents.push_back(agent); - } - - // Setups up the various component of the RTBKit stack. See Components::init - // for more details. - components.init(); - - // Setup an initial budgeting for the test. - for (int i = 0; i < NUM_ACCOUNTS; i++) { - AccountKey key{"testCampaign" + to_string(i), - "testStrategy" + to_string(i)}; - allocateBudget(components.budgetController, key, USD(1000)); - } + // Setups up the various component of the RTBKit stack. See + // Components::init for more details. + components.init(numAccounts); // Syncing is done periodically so we have to wait a bit before the router // will have a budget available. Necessary because the bid request stream @@ -387,19 +384,13 @@ int main(int argc, char ** argv) cerr << "sleeping so that the slave accounts can sync up" << endl; ML::sleep(2.1); - for (int i = 0; i < NUM_ACCOUNTS; i++) { + for (int i = 0; i < numAccounts; i++) { AccountKey key{"testCampaign" + to_string(i), "testStrategy" + to_string(i)}; testBudget(components.budgetController, key); } - for (const auto & agent: components.agents) { - // Some extra customization for our agent to make it extra special. - // See setupAgent for more details. - setupAgent(*agent); - } - // Start up the exchange threads which should let bid requests flow through // our stack. MockExchange exchange(proxies, "mock-exchange"); From 679bf807086687ed5807fb15f0914440cd7db3c4 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 14 Jul 2014 16:21:50 -0400 Subject: [PATCH 04/16] rtbkit_integration_test: improved support for HTTP application layer --- rtbkit/examples/rtbkit_integration_test.cc | 71 +++++++++++++++++----- 1 file changed, 55 insertions(+), 16 deletions(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index c545ca56b..7044c905c 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -41,6 +41,7 @@ using namespace RTBKIT; const size_t numAccounts(200); /* number of accounts and agents */ +#define ZMQ_APP_LAYER 0 /******************************************************************************/ /* SETUP */ @@ -180,7 +181,7 @@ struct Components : proxies(proxies), router1(proxies, "router1"), router2(proxies, "router2"), - postAuctionLoop(proxies, "pas1"), + postAuctionLoop(proxies, "pal1"), masterBanker(proxies, "masterBanker"), agentConfig(proxies, "agentConfigurationService"), monitor(proxies, "monitor"), @@ -245,27 +246,39 @@ struct Components auto bankerAddr = masterBanker.bindTcp().second; masterBanker.start(); + cerr << "bankerAddr: " + bankerAddr + "\n"; + ML::sleep(5); + // Setup a slave banker that we can use to manipulate and peak at the // budgets during the test. +#if 0 budgetController.setApplicationLayer(make_application_layer(proxies->config)); - // budgetController.setApplicationLayer(make_application_layer(bankerAddr)); +#else + auto appLayer = make_application_layer("http://127.0.0.1:15500"); + budgetController.setApplicationLayer(appLayer); +#endif + budgetController.start(); // Each router contains a slave masterBanker which is periodically // synced with the master banker. - auto makeSlaveBanker = [=] (const std::string & name) - { - auto res = std::make_shared(name); - res->setApplicationLayer(make_application_layer(proxies->config)); - // res->setApplicationLayer(make_application_layer(bankerAddr)); - res->start(); - return res; - }; + auto makeSlaveBanker = [=] (const std::string & name) { + auto res = std::make_shared(name); +#if 0 + auto appLayer = make_application_layer(proxies->config); +#else + cerr << "bankerAddr: " + bankerAddr + "\n"; + auto appLayer = make_application_layer("http://127.0.0.1:15500"); +#endif + res->setApplicationLayer(appLayer); + res->start(); + return res; + }; // Setup a post auction loop (PAL) which handles all exchange events // that don't need to be processed in real-time (wins, loss, etc). postAuctionLoop.init(8); - postAuctionLoop.setBanker(makeSlaveBanker("pas1")); + postAuctionLoop.setBanker(makeSlaveBanker("pal1")); postAuctionLoop.bindTcp(); postAuctionLoop.start(); @@ -381,8 +394,17 @@ int main(int argc, char ** argv) // Syncing is done periodically so we have to wait a bit before the router // will have a budget available. Necessary because the bid request stream // for this test isn't infinit. - cerr << "sleeping so that the slave accounts can sync up" << endl; - ML::sleep(2.1); + auto ensureBudgetSync = [&] (const shared_ptr & banker) { + auto slave = (SlaveBanker *) banker.get(); + while (slave->getNumReauthorized() == 0) { + ML::sleep(0.5); + } + }; + ensureBudgetSync(components.postAuctionLoop.getBanker()); + ensureBudgetSync(components.router1.getBanker()); + ensureBudgetSync(components.router2.getBanker()); + + ML::sleep(5); for (int i = 0; i < numAccounts; i++) { AccountKey key{"testCampaign" + to_string(i), @@ -402,16 +424,33 @@ int main(int argc, char ** argv) exchange.add(new MockBidSource(bids, nBidRequestsPerThread), new MockWinSource(wins), new MockEventSource(events)); } - // Dump the budget stats while we wait for the test to finish. + // Dump the budget stats while we wait for the test to finish. Only the + // first one is fetched to avoid flooding the console with unreadable + // data. while (!exchange.isDone()) { - auto summary = components.budgetController.getAccountSummarySync( - {"testCampaign0"}, -1); + auto summary = components.budgetController.getAccountSummarySync( + {"testCampaign0"}, -1); cerr << summary << endl; dumpAccounts(components.budgetController, {"testCampaign0"}, summary); ML::sleep(1.0); + + auto doCheckBanker = [&] (const string & label, + const shared_ptr & banker) { + auto slave = (SlaveBanker *) banker.get(); + cerr << ("banker rqs : " + label + " " + + to_string(slave->getNumReauthorized()) + + " last delay: " + to_string(slave->getLastReauthorizeDelay()) + + "\n"); + }; + + doCheckBanker("pal", components.postAuctionLoop.getBanker()); + doCheckBanker("router1", components.router1.getBanker()); + doCheckBanker("router2", components.router2.getBanker()); } + cerr << "SHUTDOWN\n"; + exit(0); // Test is done; clean up time. components.shutdown(); From 7de24313fae076f3fca0d3aa09e31ccba9412e77 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 14 Jul 2014 16:22:14 -0400 Subject: [PATCH 05/16] slave banker: added some metrics for the account sync --- rtbkit/core/banker/slave_banker.cc | 43 +++++++++++++++++++++++------- rtbkit/core/banker/slave_banker.h | 29 +++++++++++++++++++- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/rtbkit/core/banker/slave_banker.cc b/rtbkit/core/banker/slave_banker.cc index b0c96e0f9..3663d60ff 100644 --- a/rtbkit/core/banker/slave_banker.cc +++ b/rtbkit/core/banker/slave_banker.cc @@ -97,7 +97,7 @@ getAccount(const AccountKey & accountKey, SlaveBanker:: SlaveBanker(const std::string & accountSuffix, CurrencyPool spendRate) - : createdAccounts(128) + : createdAccounts(128), reauthorizing(false), numReauthorized(0) { init(accountSuffix, spendRate); } @@ -410,17 +410,22 @@ void SlaveBanker:: reauthorizeBudget(uint64_t numTimeoutsExpired) { + cerr << "reauthorizeBudget....\n"; + + + if (numTimeoutsExpired > 1) { cerr << "warning: slave banker missed " << numTimeoutsExpired << " timeouts" << endl; } //std::unique_lock guard(lock); - if (reauthorizeBudgetSent != Date()) { + if (reauthorizing) { cerr << "warning: reauthorize budget still in progress" << endl; + return; } - int numDone = 0; + accountsLeft = 0; // For each of our accounts, we report back what has been spent // and re-up to our desired float @@ -428,13 +433,14 @@ reauthorizeBudget(uint64_t numTimeoutsExpired) const ShadowAccount & account) { Json::Value payload = spendRate.toJson(); - ++numDone; auto onDone = std::bind(&SlaveBanker::onReauthorizeBudgetMessage, this, key, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3); + accountsLeft++; + // Finally, send it out applicationLayer->request( "POST", "/v1/accounts/" + getShadowAccountStr(key) + "/balance", @@ -442,11 +448,12 @@ reauthorizeBudget(uint64_t numTimeoutsExpired) payload.toString(), onDone); }; - accounts.forEachInitializedAccount(onAccount); - - if (numDone != 0) - reauthorizeBudgetSent = Date::now(); + + if (accountsLeft > 0) { + reauthorizing = true; + reauthorizeDate = Date::now(); + } } void @@ -456,8 +463,7 @@ onReauthorizeBudgetMessage(const AccountKey & accountKey, int responseCode, const std::string & payload) { - //cerr << "finished reauthorize budget" << endl; - + cerr << "finished reauthorize budget: responseCode : " + to_string(responseCode) + "\n"; if (exc) { cerr << "reauthorize budget got exception" << payload << endl; cerr << "accountKey = " << accountKey << endl; @@ -465,10 +471,27 @@ onReauthorizeBudgetMessage(const AccountKey & accountKey, return; } else if (responseCode == 200) { + cerr << "reauth account: " + accountKey.toString() + "\n"; Account masterAccount = Account::fromJson(Json::parse(payload)); accounts.syncFromMaster(accountKey, masterAccount); } reauthorizeBudgetSent = Date(); + accountsLeft--; + if (accountsLeft == 0) { + lastReauthorizeDelay = Date::now() - reauthorizeDate; + numReauthorized++; + reauthorizing = false; + } +} + +void +SlaveBanker:: +waitReauthorized() + const +{ + while (reauthorizing) { + ML::sleep(0.2); + } } MonitorIndicator diff --git a/rtbkit/core/banker/slave_banker.h b/rtbkit/core/banker/slave_banker.h index 76c2bbd5f..bdca9a64b 100644 --- a/rtbkit/core/banker/slave_banker.h +++ b/rtbkit/core/banker/slave_banker.h @@ -8,6 +8,7 @@ #ifndef __banker__slave_banker_h__ #define __banker__slave_banker_h__ +#include #include "banker.h" #include "application_layer.h" #include "soa/service/zmq_endpoint.h" @@ -93,7 +94,6 @@ struct SlaveBudgetController what has been committed so far. */ struct SlaveBanker : public Banker, public MessageLoop { - static const CurrencyPool DefaultSpendRate; SlaveBanker(); @@ -205,6 +205,25 @@ struct SlaveBanker : public Banker, public MessageLoop { addSource("SlaveBanker::ApplicationLayer", *layer); } + bool isReauthorizing() const + { + return reauthorizing; + } + + void waitReauthorized() const; + + size_t getNumReauthorized() + const + { + return numReauthorized; + } + + double getLastReauthorizeDelay() + const + { + return lastReauthorizeDelay; + } + /* Logging */ virtual void logBidEvents(const Datacratic::EventRecorder & eventRecorder) { @@ -266,6 +285,14 @@ struct SlaveBanker : public Banker, public MessageLoop { { return account.childKey(accountSuffix).toString(); } + + std::atomic shutdown_; + + std::atomic reauthorizing; + Date reauthorizeDate; + double lastReauthorizeDelay; + size_t numReauthorized; + size_t accountsLeft; }; } // naemspace RTBKIT From e6686aec8f6fcb8ad456805fc947b3db80866d92 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 15 Jul 2014 15:15:22 -0400 Subject: [PATCH 06/16] slave_banker: removed comments --- rtbkit/core/banker/slave_banker.cc | 6 ------ 1 file changed, 6 deletions(-) diff --git a/rtbkit/core/banker/slave_banker.cc b/rtbkit/core/banker/slave_banker.cc index 3663d60ff..5af6e0ab3 100644 --- a/rtbkit/core/banker/slave_banker.cc +++ b/rtbkit/core/banker/slave_banker.cc @@ -410,10 +410,6 @@ void SlaveBanker:: reauthorizeBudget(uint64_t numTimeoutsExpired) { - cerr << "reauthorizeBudget....\n"; - - - if (numTimeoutsExpired > 1) { cerr << "warning: slave banker missed " << numTimeoutsExpired << " timeouts" << endl; @@ -463,7 +459,6 @@ onReauthorizeBudgetMessage(const AccountKey & accountKey, int responseCode, const std::string & payload) { - cerr << "finished reauthorize budget: responseCode : " + to_string(responseCode) + "\n"; if (exc) { cerr << "reauthorize budget got exception" << payload << endl; cerr << "accountKey = " << accountKey << endl; @@ -471,7 +466,6 @@ onReauthorizeBudgetMessage(const AccountKey & accountKey, return; } else if (responseCode == 200) { - cerr << "reauth account: " + accountKey.toString() + "\n"; Account masterAccount = Account::fromJson(Json::parse(payload)); accounts.syncFromMaster(accountKey, masterAccount); } From a51e955b00f95992f5418fd17abf371ddf34cf3c Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 15 Jul 2014 15:33:46 -0400 Subject: [PATCH 07/16] rtbkit_integration_test: use static_pointer_cast when casting a Banker to a SlaveBanker --- rtbkit/examples/rtbkit_integration_test.cc | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 7044c905c..7bf3a0156 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -5,6 +5,8 @@ Overall integration test for the router stack. */ +#include + #include "augmentor_ex.h" #include "rtbkit/core/router/router.h" @@ -395,7 +397,7 @@ int main(int argc, char ** argv) // will have a budget available. Necessary because the bid request stream // for this test isn't infinit. auto ensureBudgetSync = [&] (const shared_ptr & banker) { - auto slave = (SlaveBanker *) banker.get(); + shared_ptr slave = static_pointer_cast(banker); while (slave->getNumReauthorized() == 0) { ML::sleep(0.5); } @@ -437,7 +439,7 @@ int main(int argc, char ** argv) auto doCheckBanker = [&] (const string & label, const shared_ptr & banker) { - auto slave = (SlaveBanker *) banker.get(); + shared_ptr slave = static_pointer_cast(banker); cerr << ("banker rqs : " + label + " " + to_string(slave->getNumReauthorized()) + " last delay: " + to_string(slave->getLastReauthorizeDelay()) From d22f03c8d4fc580e50615b5efde4256e18d215e8 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 15 Jul 2014 15:59:07 -0400 Subject: [PATCH 08/16] rtbkit_integration_test: use ZMQ_APP_LAYER, since it fits the first scope of the test and enables it to pass --- rtbkit/examples/rtbkit_integration_test.cc | 32 ++++++++++++---------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 7bf3a0156..87712f034 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -43,7 +43,7 @@ using namespace RTBKIT; const size_t numAccounts(200); /* number of accounts and agents */ -#define ZMQ_APP_LAYER 0 +#define ZMQ_APP_LAYER 1 /******************************************************************************/ /* SETUP */ @@ -118,19 +118,22 @@ void allocateBudget( // cerr << budgetController.getAccountSummarySync(account[0], -1) << endl; } -void testBudget(SlaveBudgetController& budgetController, - const AccountKey& account) +void testBudget(SlaveBudgetController & budgetController, + const AccountKey & account) { auto summary = budgetController.getAccountSummarySync(account[0], -1); - cerr << summary << endl; - ExcAssertEqual( - summary.subAccounts[account[1]].subAccounts["router1"].budget, - USD(0.10)); + if (summary.subAccounts[account[1]].subAccounts["router1"].budget + != USD(0.10)) { + cerr << summary << endl; + throw ML::Exception("USD(0.10) not available in budget of router1"); + } - ExcAssertEqual( - summary.subAccounts[account[1]].subAccounts["router2"].budget, - USD(0.10)); + if (summary.subAccounts[account[1]].subAccounts["router2"].budget + != USD(0.10)) { + cerr << summary << endl; + throw ML::Exception("USD(0.10) not available in budget of router2"); + } } /** Some debugging output for the banker. */ @@ -253,7 +256,7 @@ struct Components // Setup a slave banker that we can use to manipulate and peak at the // budgets during the test. -#if 0 +#if ZMQ_APP_LAYER budgetController.setApplicationLayer(make_application_layer(proxies->config)); #else auto appLayer = make_application_layer("http://127.0.0.1:15500"); @@ -266,7 +269,7 @@ struct Components // synced with the master banker. auto makeSlaveBanker = [=] (const std::string & name) { auto res = std::make_shared(name); -#if 0 +#if ZMQ_APP_LAYER auto appLayer = make_application_layer(proxies->config); #else cerr << "bankerAddr: " + bankerAddr + "\n"; @@ -406,14 +409,15 @@ int main(int argc, char ** argv) ensureBudgetSync(components.router1.getBanker()); ensureBudgetSync(components.router2.getBanker()); - ML::sleep(5); + ML::sleep(2.1); + cerr << "testing budgets\n"; for (int i = 0; i < numAccounts; i++) { AccountKey key{"testCampaign" + to_string(i), "testStrategy" + to_string(i)}; testBudget(components.budgetController, key); } - + cerr << "budgets tested\n"; // Start up the exchange threads which should let bid requests flow through // our stack. From e343c8f8088b8893d7e87bba4a3f5770c4adfe35 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 15 Jul 2014 16:55:16 -0400 Subject: [PATCH 09/16] rtbkit_integration_test: reduced number of accounts to 50, to lower pressure on our test machine --- rtbkit/examples/rtbkit_integration_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 87712f034..7fa8049f4 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -41,7 +41,7 @@ using namespace Datacratic; using namespace RTBKIT; -const size_t numAccounts(200); /* number of accounts and agents */ +const size_t numAccounts(50); /* number of accounts and agents */ #define ZMQ_APP_LAYER 1 From ad18161a6f95f9a560631adaaaaa41344f4294a8 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 15 Jul 2014 16:57:03 -0400 Subject: [PATCH 10/16] rtbkit_integration_test: benchmarking code --- rtbkit/examples/examples.mk | 2 +- rtbkit/examples/rtbkit_integration_test.cc | 35 +++++++++++++++++----- 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/rtbkit/examples/examples.mk b/rtbkit/examples/examples.mk index ff6982e8d..4337c5d7f 100644 --- a/rtbkit/examples/examples.mk +++ b/rtbkit/examples/examples.mk @@ -19,7 +19,7 @@ $(eval $(call program,adserver_endpoint,standard_adserver data_logger rtb_router $(eval $(call program,integration_endpoints,exchange standard_adserver data_logger rtb_router bidding_agent boost_program_options services)) RTBKIT_INTEGRATION_TEST_LINK := \ - rtb_router bidding_agent integration_test_utils monitor monitor_service augmentor_ex adserver_connector mock_bid_request mock_adserver + rtb_router bidding_agent integration_test_utils monitor monitor_service augmentor_ex adserver_connector mock_bid_request mock_adserver recoset_utils $(eval $(call test,rtbkit_integration_test,$(RTBKIT_INTEGRATION_TEST_LINK),boost)) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 7fa8049f4..e105f2adf 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -9,6 +9,7 @@ #include "augmentor_ex.h" +#include "platform/utils/benchmarks.h" #include "rtbkit/core/router/router.h" #include "rtbkit/core/post_auction/post_auction_service.h" #include "rtbkit/core/agent_configuration/agent_configuration_service.h" @@ -41,9 +42,7 @@ using namespace Datacratic; using namespace RTBKIT; -const size_t numAccounts(50); /* number of accounts and agents */ - -#define ZMQ_APP_LAYER 1 +#define ZMQ_APP_LAYER 0 /******************************************************************************/ /* SETUP */ @@ -222,7 +221,7 @@ struct Components cerr << "done shutdown" << endl; } - void init(size_t numAgents) + void init(Benchmarks & bm, size_t numAgents) { const string agentUri = "tcp://127.0.0.1:1234"; @@ -331,24 +330,35 @@ struct Components // Our bidding agent which listens to the bid request stream from all // available routers and decide who gets to see your awesome pictures of // kittens. + for (size_t i = 0; i < numAgents; i++) { + Benchmark agentBm(bm, "agent-setup"); + AccountKey key{"testCampaign" + to_string(i), "testStrategy" + to_string(i)}; auto agent = make_shared(proxies, "testAgent" + to_string(i), key); agents.push_back(agent); + + shared_ptr shortBm; + shortBm.reset(new Benchmark(bm, "agent-init")); agent->init(); + shortBm.reset(new Benchmark(bm, "agent-start")); agent->start(); + shortBm.reset(new Benchmark(bm, "agent-configure")); agent->configure(); // Some extra customization for our agent to make it extra // special. See setupAgent for more details. + shortBm.reset(new Benchmark(bm, "agent-setupAgent")); setupAgent(*agent); + shortBm.reset(new Benchmark(bm, "agent-budget")); // Setup an initial budgeting for the test. allocateBudget(budgetController, key, USD(1000)); + shortBm.reset(); } // Our augmentor which does frequency capping for our agent. @@ -375,7 +385,8 @@ int main(int argc, char ** argv) // Controls the length of the test. enum { nExchangeThreads = 10, - nBidRequestsPerThread = 100 + nBidRequestsPerThread = 100, + nAccounts = 50 }; auto proxies = std::make_shared(); @@ -389,12 +400,16 @@ int main(int argc, char ** argv) // we don't, ServiceProxies will just default to using a local equivalent. if (false) proxies->logToCarbon("carbon.rtbkit.org", "stats"); + Benchmarks bm; + shared_ptr componentsBm(new Benchmark(bm, {"components"})); Components components(proxies); // Setups up the various component of the RTBKit stack. See // Components::init for more details. - components.init(numAccounts); + components.init(bm, nAccounts); + + componentsBm.reset(); // Syncing is done periodically so we have to wait a bit before the router // will have a budget available. Necessary because the bid request stream @@ -411,8 +426,11 @@ int main(int argc, char ** argv) ML::sleep(2.1); + bm.dumpTotals(); + cerr << "testing budgets\n"; - for (int i = 0; i < numAccounts; i++) { + for (int i = 0; i < nAccounts; i++) { + Benchmark budgetBm(bm, {"budget-tests"}); AccountKey key{"testCampaign" + to_string(i), "testStrategy" + to_string(i)}; testBudget(components.budgetController, key); @@ -456,6 +474,9 @@ int main(int argc, char ** argv) } cerr << "SHUTDOWN\n"; + + bm.dumpTotals(); + exit(0); // Test is done; clean up time. components.shutdown(); From 66bfa1bb3401f08f04828552e21ba8d13e5b895f Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Thu, 17 Jul 2014 14:45:00 -0400 Subject: [PATCH 11/16] let the "router_runner" and "post_auction_runner" depend on the bidder interface plugins, rather than librtb.so, as the latter form seems to confuse the make system --- rtbkit/plugins/bidder_interface/agents_bidder_interface.cc | 2 +- rtbkit/plugins/bidder_interface/bidder_interface.mk | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/rtbkit/plugins/bidder_interface/agents_bidder_interface.cc b/rtbkit/plugins/bidder_interface/agents_bidder_interface.cc index 38ee81884..c275ed072 100644 --- a/rtbkit/plugins/bidder_interface/agents_bidder_interface.cc +++ b/rtbkit/plugins/bidder_interface/agents_bidder_interface.cc @@ -3,9 +3,9 @@ Copyright (c) 2011 Datacratic. All rights reserved. */ -#include "agents_bidder_interface.h" #include "rtbkit/common/messages.h" #include "rtbkit/core/router/router.h" +#include "agents_bidder_interface.h" using namespace Datacratic; using namespace RTBKIT; diff --git a/rtbkit/plugins/bidder_interface/bidder_interface.mk b/rtbkit/plugins/bidder_interface/bidder_interface.mk index a1cb17f2a..abb47fe50 100644 --- a/rtbkit/plugins/bidder_interface/bidder_interface.mk +++ b/rtbkit/plugins/bidder_interface/bidder_interface.mk @@ -1,5 +1,4 @@ -$(eval $(call library,agents_bidder,agents_bidder_interface.cc,)) +$(eval $(call library,agents_bidder,agents_bidder_interface.cc,rtb_router)) $(eval $(call library,http_bidder,http_bidder_interface.cc,openrtb_bid_request)) -# the code loading the plugins above is part of librtb (bidder_interface.cc) -$(LIB)/librtb.so: $(LIB)/libagents_bidder.so $(LIB)/libhttp_bidder.so +$(BIN)/router_runner $(BIN)/post_auction_runner: $(LIB)/libagents_bidder.so $(LIB)/libhttp_bidder.so From 5faaa22320336c58d590cba59aad001faf9fc6ed Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Thu, 17 Jul 2014 16:24:48 -0400 Subject: [PATCH 12/16] added "bidder_interface_plugins" as a phony target --- rtbkit/plugins/bidder_interface/bidder_interface.mk | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/rtbkit/plugins/bidder_interface/bidder_interface.mk b/rtbkit/plugins/bidder_interface/bidder_interface.mk index abb47fe50..816b7a3f1 100644 --- a/rtbkit/plugins/bidder_interface/bidder_interface.mk +++ b/rtbkit/plugins/bidder_interface/bidder_interface.mk @@ -1,4 +1,8 @@ $(eval $(call library,agents_bidder,agents_bidder_interface.cc,rtb_router)) $(eval $(call library,http_bidder,http_bidder_interface.cc,openrtb_bid_request)) -$(BIN)/router_runner $(BIN)/post_auction_runner: $(LIB)/libagents_bidder.so $(LIB)/libhttp_bidder.so + +bidder_interface_plugins: $(LIB)/libagents_bidder.so $(LIB)/libhttp_bidder.so + + +.PHONY: bidder_interface_plugins From d5423cc342ae67e6bc622d35b92e2a2c7bc3f67b Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Fri, 18 Jul 2014 12:12:49 -0400 Subject: [PATCH 13/16] slave banker: added a final "onDone" mechanism method to reauthorizeBudget, to enable benchmarking --- rtbkit/core/banker/application_layer.cc | 6 +- rtbkit/core/banker/application_layer.h | 21 +++--- rtbkit/core/banker/slave_banker.cc | 81 ++++++++++++---------- rtbkit/core/banker/slave_banker.h | 36 +++++----- rtbkit/examples/rtbkit_integration_test.cc | 1 - 5 files changed, 78 insertions(+), 67 deletions(-) diff --git a/rtbkit/core/banker/application_layer.cc b/rtbkit/core/banker/application_layer.cc index 6b62b244d..7f87d224f 100644 --- a/rtbkit/core/banker/application_layer.cc +++ b/rtbkit/core/banker/application_layer.cc @@ -158,7 +158,8 @@ syncAccount(const ShadowAccount &account, const std::string &shadowStr, void HttpLayer:: request(std::string method, const std::string &resource, - const RestParams ¶ms, const std::string &content, OnRequestResult onResult) + const RestParams ¶ms, const std::string &content, + const OnRequestResult & onResult) { std::transform(begin(method), end(method), begin(method), [](char c) { return ::tolower(c); }); @@ -315,7 +316,8 @@ syncAccount(const ShadowAccount &account, const std::string &shadowStr, void ZmqLayer:: request(std::string method, const std::string &resource, - const RestParams ¶ms, const std::string &content, OnRequestResult onResult) + const RestParams ¶ms, const std::string &content, + const OnRequestResult & onResult) { proxy->push(onResult, method, resource, params, content); } diff --git a/rtbkit/core/banker/application_layer.h b/rtbkit/core/banker/application_layer.h index 9d87a435a..0e1f99e57 100644 --- a/rtbkit/core/banker/application_layer.h +++ b/rtbkit/core/banker/application_layer.h @@ -71,11 +71,10 @@ struct ApplicationLayer : public MessageLoop { Account &&)> onDone) = 0; /* CUSTOM */ - virtual void request( - std::string method, const std::string &resource, - const RestParams ¶ms, - const std::string &content, - OnRequestResult onResult) = 0; + virtual void request(std::string method, const std::string &resource, + const RestParams ¶ms, + const std::string &content, + const OnRequestResult & onResult) = 0; }; /*****************************************************************************/ @@ -120,9 +119,9 @@ struct HttpLayer : public ApplicationLayer { Account &&)> onDone); void request(std::string method, const std::string &resource, - const RestParams ¶ms, - const std::string &content, - OnRequestResult onResult); + const RestParams ¶ms, + const std::string &content, + const OnRequestResult & onResult); private: std::shared_ptr httpClient; @@ -172,9 +171,9 @@ struct ZmqLayer : public ApplicationLayer { std::function onDone); void request(std::string method, const std::string &resource, - const RestParams ¶ms, - const std::string &content, - OnRequestResult onResult); + const RestParams ¶ms, + const std::string &content, + const OnRequestResult & onResult); private: std::shared_ptr proxy; diff --git a/rtbkit/core/banker/slave_banker.cc b/rtbkit/core/banker/slave_banker.cc index 5af6e0ab3..3b79a0077 100644 --- a/rtbkit/core/banker/slave_banker.cc +++ b/rtbkit/core/banker/slave_banker.cc @@ -159,7 +159,7 @@ init(const std::string & accountSuffix, CurrencyPool spendRate) std::placeholders::_1), true /* single threaded */); addPeriodic("SlaveBanker::reauthorizeBudget", 1.0, - std::bind(&SlaveBanker::reauthorizeBudget, + std::bind(&SlaveBanker::reauthorizeBudgetPeriodic, this, std::placeholders::_1), true /* single threaded */); @@ -408,56 +408,74 @@ reportSpend(uint64_t numTimeoutsExpired) void SlaveBanker:: -reauthorizeBudget(uint64_t numTimeoutsExpired) +reauthorizeBudgetPeriodic(uint64_t numTimeoutsExpired) { if (numTimeoutsExpired > 1) { cerr << "warning: slave banker missed " << numTimeoutsExpired << " timeouts" << endl; } - //std::unique_lock guard(lock); if (reauthorizing) { - cerr << "warning: reauthorize budget still in progress" << endl; + cerr << "warning: reauthorize budget still in progress (skipping)\n"; return; } - accountsLeft = 0; + auto onReauthorizedDone = [&] () { + reauthorizing = false; + numReauthorized++; + }; + reauthorizing = true; + reauthorizeBudget(onReauthorizedDone); +} - // For each of our accounts, we report back what has been spent - // and re-up to our desired float - auto onAccount = [&] (const AccountKey & key, - const ShadowAccount & account) - { - Json::Value payload = spendRate.toJson(); - auto onDone = std::bind(&SlaveBanker::onReauthorizeBudgetMessage, this, - key, - std::placeholders::_1, std::placeholders::_2, - std::placeholders::_3); +void +SlaveBanker:: +reauthorizeBudget(const OnReauthorizeBudgetDone & onDone) +{ + //std::unique_lock guard(lock); - accountsLeft++; + // For each of our accounts, we report back what has been spent + // and re-up to our desired float - // Finally, send it out - applicationLayer->request( - "POST", "/v1/accounts/" + getShadowAccountStr(key) + "/balance", - { { "accountType", "spend" } }, - payload.toString(), - onDone); + auto reauthorizeOp = make_shared(); + reauthorizeOp->start = Date::now(); + reauthorizeOp->numAccounts = 0; + reauthorizeOp->onDone = onDone; + + auto onAccount + = [&] (const AccountKey & key, const ShadowAccount & account) { + reauthorizeOp->numAccounts++; + reauthorizeOp->pending++; + + // Finally, send it out + Json::Value payload = spendRate.toJson(); + auto onAccountDone = [=] (exception_ptr exc, int responseCode, + const string & payload) { + this->onReauthorizeBudgetMessage(key, exc, responseCode, payload); + reauthorizeOp->pending--; + if (reauthorizeOp->pending == 0) { + Date now = Date::now(); + reauthorizeOp->onDone(); + } }; + + applicationLayer->request("POST", "/v1/accounts/" + getShadowAccountStr(key) + "/balance", + { { "accountType", "spend" } }, + payload.toString(), + onAccountDone); + }; accounts.forEachInitializedAccount(onAccount); - if (accountsLeft > 0) { + if (reauthorizeOp->numAccounts > 0) { reauthorizing = true; - reauthorizeDate = Date::now(); } } void SlaveBanker:: -onReauthorizeBudgetMessage(const AccountKey & accountKey, - std::exception_ptr exc, - int responseCode, - const std::string & payload) +onReauthorizeBudgetMessage(const AccountKey & accountKey, exception_ptr exc, + int responseCode, const string & payload) { if (exc) { cerr << "reauthorize budget got exception" << payload << endl; @@ -469,13 +487,6 @@ onReauthorizeBudgetMessage(const AccountKey & accountKey, Account masterAccount = Account::fromJson(Json::parse(payload)); accounts.syncFromMaster(accountKey, masterAccount); } - reauthorizeBudgetSent = Date(); - accountsLeft--; - if (accountsLeft == 0) { - lastReauthorizeDelay = Date::now() - reauthorizeDate; - numReauthorized++; - reauthorizing = false; - } } void diff --git a/rtbkit/core/banker/slave_banker.h b/rtbkit/core/banker/slave_banker.h index bdca9a64b..aa262a8ca 100644 --- a/rtbkit/core/banker/slave_banker.h +++ b/rtbkit/core/banker/slave_banker.h @@ -19,7 +19,6 @@ namespace RTBKIT { - /*****************************************************************************/ /* SLAVE BUDGET CONTROLLER */ /*****************************************************************************/ @@ -81,6 +80,7 @@ struct SlaveBudgetController budgetResultCallback(const SlaveBudgetController::OnBudgetResult & onResult); private: std::shared_ptr applicationLayer; + //std::shared_ptr httpClient; }; @@ -212,18 +212,11 @@ struct SlaveBanker : public Banker, public MessageLoop { void waitReauthorized() const; - size_t getNumReauthorized() - const + int getNumReauthorized() const { return numReauthorized; } - double getLastReauthorizeDelay() - const - { - return lastReauthorizeDelay; - } - /* Logging */ virtual void logBidEvents(const Datacratic::EventRecorder & eventRecorder) { @@ -246,16 +239,26 @@ struct SlaveBanker : public Banker, public MessageLoop { mutable Lock syncLock; Datacratic::Date lastSync; - /** Periodically we report spend to the banker.*/ void reportSpend(uint64_t numTimeoutsExpired); Date reportSpendSent; /** Periodically we ask the banker to re-authorize our budget. */ - void reauthorizeBudget(uint64_t numTimeoutsExpired); - Date reauthorizeBudgetSent; - CurrencyPool spendRate; + void reauthorizeBudgetPeriodic(uint64_t numTimeoutsExpired); + + typedef std::function OnReauthorizeBudgetDone; + void reauthorizeBudget(const OnReauthorizeBudgetDone & onDone = nullptr); + + struct ReauthorizeOp { + Date start; + int numAccounts; + std::atomic pending; + + OnReauthorizeBudgetDone onDone; + }; + + CurrencyPool spendRate; /// Called when we get an account status back from the master banker /// after a synchrnonization @@ -288,11 +291,8 @@ struct SlaveBanker : public Banker, public MessageLoop { std::atomic shutdown_; - std::atomic reauthorizing; - Date reauthorizeDate; - double lastReauthorizeDelay; - size_t numReauthorized; - size_t accountsLeft; + bool reauthorizing; + int numReauthorized; }; } // naemspace RTBKIT diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index c37040b7c..027aeca14 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -478,7 +478,6 @@ int main(int argc, char ** argv) shared_ptr slave = static_pointer_cast(banker); cerr << ("banker rqs : " + label + " " + to_string(slave->getNumReauthorized()) - + " last delay: " + to_string(slave->getLastReauthorizeDelay()) + "\n"); }; From c783c1c4645420ba1d178369b5eb8d1e5ae23881 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 21 Jul 2014 10:55:48 -0400 Subject: [PATCH 14/16] use Benchmark classes from soa --- rtbkit/examples/rtbkit_integration_test.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rtbkit/examples/rtbkit_integration_test.cc b/rtbkit/examples/rtbkit_integration_test.cc index 027aeca14..151af2ded 100644 --- a/rtbkit/examples/rtbkit_integration_test.cc +++ b/rtbkit/examples/rtbkit_integration_test.cc @@ -7,14 +7,13 @@ #include -#include "platform/utils/benchmarks.h" - // #include "jml/utils/rng.h" // #include "jml/utils/pair_utils.h" // #include "jml/utils/environment.h" #include "jml/arch/timers.h" #include "jml/utils/testing/watchdog.h" #include "soa/service/testing/redis_temporary_server.h" +#include "soa/utils/benchmarks.h" #include "rtbkit/core/router/router.h" #include "rtbkit/core/post_auction/post_auction_service.h" #include "rtbkit/core/agent_configuration/agent_configuration_service.h" From 1959dd4546c05386e7b0005bf83f0fd8616430f6 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 21 Jul 2014 10:57:30 -0400 Subject: [PATCH 15/16] slave banker: invoke "onDone" only when set --- rtbkit/core/banker/slave_banker.cc | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/rtbkit/core/banker/slave_banker.cc b/rtbkit/core/banker/slave_banker.cc index 3b79a0077..8c6194de7 100644 --- a/rtbkit/core/banker/slave_banker.cc +++ b/rtbkit/core/banker/slave_banker.cc @@ -441,6 +441,7 @@ reauthorizeBudget(const OnReauthorizeBudgetDone & onDone) auto reauthorizeOp = make_shared(); reauthorizeOp->start = Date::now(); reauthorizeOp->numAccounts = 0; + reauthorizeOp->pending = 0; reauthorizeOp->onDone = onDone; auto onAccount @@ -456,7 +457,9 @@ reauthorizeBudget(const OnReauthorizeBudgetDone & onDone) reauthorizeOp->pending--; if (reauthorizeOp->pending == 0) { Date now = Date::now(); - reauthorizeOp->onDone(); + if (reauthorizeOp->onDone) { + reauthorizeOp->onDone(); + } } }; @@ -466,9 +469,10 @@ reauthorizeBudget(const OnReauthorizeBudgetDone & onDone) onAccountDone); }; accounts.forEachInitializedAccount(onAccount); - - if (reauthorizeOp->numAccounts > 0) { - reauthorizing = true; + if (reauthorizeOp->numAccounts == 0) { + if (reauthorizeOp->onDone) { + reauthorizeOp->onDone(); + } } } From b2c67a416ab1e751bb44e74b304ca54c9f2469c7 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 21 Jul 2014 10:59:52 -0400 Subject: [PATCH 16/16] new benchmark program for http ops on MasterBanker --- rtbkit/core/banker/testing/banker_bench.cc | 258 +++++++++++++++++++ rtbkit/core/banker/testing/banker_testing.mk | 2 + 2 files changed, 260 insertions(+) create mode 100644 rtbkit/core/banker/testing/banker_bench.cc diff --git a/rtbkit/core/banker/testing/banker_bench.cc b/rtbkit/core/banker/testing/banker_bench.cc new file mode 100644 index 000000000..2223578c0 --- /dev/null +++ b/rtbkit/core/banker/testing/banker_bench.cc @@ -0,0 +1,258 @@ +/* banker_bench.cc + Wolfgang Sourdeau, 21 July 2014 + Copyright (c) 2014 Datacratic. All rights reserved. + + Benchmark "setBudget" by invoking using the HTTP services from the + MasterBanker from different clients and different transports. +*/ + +#include + +#include "jml/arch/timers.h" +#include "soa/service/message_loop.h" +#include "soa/service/rest_request_router.h" +#include "soa/service/http_client.h" +#include "soa/service/http_rest_proxy.h" +#include "soa/utils/benchmarks.h" + +#include "rtbkit/core/banker/master_banker.h" +#include "rtbkit/core/banker/slave_banker.h" + + +using namespace std; +using namespace Datacratic; +using namespace RTBKIT; + + +/* add an account */ +void addAccountSync(HttpClient & client, const AccountKey & account) +{ + int done(false), status(0); + + auto onResponse = [&] (const HttpRequest & rq, + HttpClientError error, + int newStatus, + string && headers, + string && body) { + status = newStatus; + done = true; + ML::futex_wake(done); + }; + auto cbs = make_shared(onResponse); + + string body("{" + " \"adjustmentLineItems\" : {}," + " \"adjustmentsIn\" : {}," + " \"adjustmentsOut\" : {}," + " \"allocatedIn\" : {}," + " \"allocatedOut\" : {}," + " \"budgetDecreases\" : {}," + " \"budgetIncreases\" : {}," + " \"commitmentsMade\" : {}," + " \"commitmentsRetired\" : {}," + " \"lineItems\" : {}," + " \"md\" : {" + " \"objectType\" : \"Account\"," + " \"version\" : 1" + " }," + " \"recycledIn\" : {}," + " \"recycledOut\" : {}," + " \"spent\" : {}," + " \"type\" : \"budget\"" + "}"); + HttpRequest::Content content(body, "application/json"); + + RestParams params{{"accountName", account.toString()}, + {"accountType", "budget"}}; + client.post("/v1/accounts", cbs, content, params); + while (!done) { + int oldDone(done); + ML::futex_wait(done, oldDone); + } + ExcAssert(status == 200); +} + +/* sets a budget of 100c MicroUSD, using the HttpClient class */ +void setBudgetSyncHC(HttpClient & client, const AccountKey & key, bool isPut) +{ + int done(false), status(0); + + auto onResponse = [&] (const HttpRequest & rq, + HttpClientError error, + int newStatus, + string && headers, + string && body) { + status = newStatus; + done = true; + ML::futex_wake(done); + }; + auto cbs = make_shared(onResponse); + + HttpRequest::Content content(string("{\"USD/1M\":1000000000}"), + "application/json"); + if (isPut) + client.put("/v1/accounts/" + key[0] + "/budget", cbs, content); + else + client.post("/v1/accounts/" + key[0] + "/budget", cbs, content); + while (!done) { + int oldDone(done); + ML::futex_wait(done, oldDone); + } + ExcAssert(status == 200); +} + +/* sets a budget of 100c MicroUSD, using the HttpClient class */ +void setBudgetSyncRP(HttpRestProxy & client, const AccountKey & key, + bool isPut) +{ + HttpRestProxy::Response response; + + HttpRestProxy::Content content(string("{\"USD/1M\":1000000000}"), + "application/json"); + if (isPut) + response = client.put(string("/v1/accounts/") + key[0] + "/budget", + content); + else + response = client.post(string("/v1/accounts/") + key[0] + "/budget", + content); + ExcAssert(response.code() == 200); +} + +/* sets a budget of X MicroUSD, using the RestProxy class */ +void setBudgetSyncZMQ(RestProxy & client, const AccountKey & key) +{ + int done(false), status(0); + + auto onDone = [&] (std::exception_ptr, + int newStatus, const std::string & body) { + status = newStatus; + done = true; + ML::futex_wake(done); + }; + + client.push(onDone, + "PUT", "/v1/accounts/" + key[0] + "/budget", + {}, string("{\"USD/1M\":1000000000}")); + while (!done) { + int oldDone(done); + ML::futex_wait(done, oldDone); + } + ExcAssert(status == 200); +} + +int main(int argc, char ** argv) +{ + size_t nAccounts(1); + { + char * envAccs = ::getenv("NUM_ACCOUNTS"); + if (envAccs) { + nAccounts = stoi(envAccs); + } + } + + cerr << ("Running benchmarks with num accounts = " + + to_string(nAccounts) + "\n"); + + Benchmarks bms; + shared_ptr bm; + + auto proxies = std::make_shared(); + + MessageLoop httpLoop; + httpLoop.start(); + + MasterBanker masterBanker(proxies, "masterBanker"); + + // Setup a master banker used to keep the canonical budget of the + // various bidding agent accounts. The data contained in this service is + // periodically persisted to redis. + masterBanker.init(std::make_shared()); + masterBanker.bindTcp(); + masterBanker.start(); + + ML::sleep(1); + + /* operations using async http client */ + { + auto client = make_shared("http://localhost:15000"); + httpLoop.addSource("httpClient", client); + client->waitConnectionState(AsyncEventSource::CONNECTED); + + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + addAccountSync(*client, key); + } + + cerr << "hc set budget put using HttpClient\n"; + bm.reset(new Benchmark(bms, "HttpClient-set-budget-put")); + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + setBudgetSyncHC(*client, key, true); + } + bm.reset(); + + cerr << "hc set budget post using HttpClient\n"; + bm.reset(new Benchmark(bms, "HttpClient-set-budget-post")); + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + setBudgetSyncHC(*client, key, false); + } + bm.reset(); + + httpLoop.removeSource(client.get()); + client->waitConnectionState(AsyncEventSource::DISCONNECTED); + } + + { + /* operations using http rest proxy */ + cerr << "rp set budget put using HttpRestProxy\n"; + bm.reset(new Benchmark(bms, "HttpRestProxy-set-budget-put")); + HttpRestProxy restProxy("http://localhost:15000"); + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + setBudgetSyncRP(restProxy, key, true); + } + bm.reset(); + + cerr << "hc set budget post using HttpRestProxy\n"; + bm.reset(new Benchmark(bms, "HttpRestProxy-set-budget-post")); + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + setBudgetSyncRP(restProxy, key, false); + } + bm.reset(); + } + + /* operations using zmq "rest" client */ + { + auto client = make_shared(); + httpLoop.addSource("zmqClient", client); + client->waitConnectionState(AsyncEventSource::CONNECTED); + client->init(proxies->config, "masterBanker"); + + cerr << "zmq set budget put using RestProxy\n"; + bm.reset(new Benchmark(bms, "RestProxy-set-budget-put")); + for (int i = 0; i < nAccounts; i++) { + AccountKey key{"testCampaign" + to_string(i), + "testStrategy" + to_string(i)}; + setBudgetSyncZMQ(*client, key); + } + bm.reset(); + + httpLoop.removeSource(client.get()); + client->waitConnectionState(AsyncEventSource::DISCONNECTED); + } + + bms.dumpTotals(); + + exit(0); + + // Test is done; clean up time. + // budgetController.shutdown(); + masterBanker.shutdown(); +} diff --git a/rtbkit/core/banker/testing/banker_testing.mk b/rtbkit/core/banker/testing/banker_testing.mk index de2214c2a..12bd27141 100644 --- a/rtbkit/core/banker/testing/banker_testing.mk +++ b/rtbkit/core/banker/testing/banker_testing.mk @@ -32,4 +32,6 @@ $(eval $(call test,banker_account_test,banker,boost)) $(eval $(call test,banker_behaviour_test,banker banker_temporary_server,boost manual)) $(eval $(call test,redis_persistence_test,banker,boost)) +$(eval $(call program,banker_bench,banker test_utils)) + banker_tests: master_banker_test slave_banker_test banker_account_test banker_behaviour_test redis_persistence_test