From 25eada7af0a1d798df8e6ebbd9f3956718e1d0c3 Mon Sep 17 00:00:00 2001 From: gaodunqiao Date: Mon, 18 Sep 2017 18:39:51 +0800 Subject: [PATCH] Add keepalive timeout, zeppelin operation timeout configuration; Upgrade libzp to 0.2.5 --- conf/zgw-example.conf | 4 +++- src/zgw_config.cc | 8 ++++++-- src/zgw_config.h | 4 +++- src/zgw_server.cc | 9 ++++++--- src/zgwstore/zgw_store.cc | 6 ++++-- src/zgwstore/zgw_store.h | 5 +++-- third/zeppelin-client | 2 +- 7 files changed, 26 insertions(+), 12 deletions(-) diff --git a/conf/zgw-example.conf b/conf/zgw-example.conf index d9d1cb5..c31fad1 100644 --- a/conf/zgw-example.conf +++ b/conf/zgw-example.conf @@ -1,6 +1,7 @@ # zeppelin-gateway config file -zp_meta_addr: 127.0.0.1:9221 +zp_meta_addr: 127.0.0.1:9221/127.0.0.1:9222/127.0.0.1:9223 +zp_optimeout_ms: 5000 redis_ip_port: 127.0.0.1:19221 redis_passwd: passwd @@ -9,6 +10,7 @@ server_port: 8099 admin_port: 8199 worker_num: 4 max_clients: 8000 +keepalive_timeout: 30 enable_gc: no #yes or no diff --git a/src/zgw_config.cc b/src/zgw_config.cc index 4b8b8c8..7b4f519 100644 --- a/src/zgw_config.cc +++ b/src/zgw_config.cc @@ -8,11 +8,13 @@ static const std::string kZgwTableName = "__zgw20_data_table"; ZgwConfig::ZgwConfig(std::string path) - : redis_ip_port("127.0.0.1"), - zp_table_name(kZgwTableName), + : zp_table_name(kZgwTableName), + zp_optimeout_ms(5000), // 5 seconds + redis_ip_port("127.0.0.1"), redis_passwd("_"), server_ip("0.0.0.0"), server_port(8099), + keepalive_timeout(30), admin_port(8199), daemonize(false), minloglevel(0), @@ -39,6 +41,7 @@ int ZgwConfig::LoadConf() { std::string zp_meta_addr; b_conf->GetConfStr("zp_meta_addr", &zp_meta_addr); slash::StringSplit(zp_meta_addr, '/', zp_meta_ip_ports); + b_conf->GetConfInt("zp_optimeout_ms", &zp_optimeout_ms); b_conf->GetConfStr("redis_ip_port", &redis_ip_port); b_conf->GetConfStr("redis_passwd", &redis_passwd); // b_conf->GetConfStr("zp_table_name", &zp_table_name); @@ -46,6 +49,7 @@ int ZgwConfig::LoadConf() { // Server info b_conf->GetConfStr("server_ip", &server_ip); b_conf->GetConfInt("server_port", &server_port); + b_conf->GetConfInt("keepalive_timeout", &keepalive_timeout); b_conf->GetConfInt("admin_port", &admin_port); b_conf->GetConfBool("daemonize", &daemonize); b_conf->GetConfInt("minloglevel", &minloglevel); diff --git a/src/zgw_config.h b/src/zgw_config.h index c107a82..bf1ae16 100644 --- a/src/zgw_config.h +++ b/src/zgw_config.h @@ -15,12 +15,14 @@ struct ZgwConfig { slash::BaseConf *b_conf; std::vector zp_meta_ip_ports; - std::string redis_ip_port; std::string zp_table_name; + int zp_optimeout_ms; + std::string redis_ip_port; std::string redis_passwd; std::string server_ip; int server_port; + int keepalive_timeout; int admin_port; bool daemonize; int minloglevel; diff --git a/src/zgw_server.cc b/src/zgw_server.cc index 74db977..f810d80 100644 --- a/src/zgw_server.cc +++ b/src/zgw_server.cc @@ -28,8 +28,9 @@ bool ZgwServer::ZgwServerHandle::AccessHandle(std::string& ip) const { int ZgwServer::ZgwServerHandle::CreateWorkerSpecificData(void** data) const { zgwstore::ZgwStore* store; Status s = zgwstore::ZgwStore::Open(g_zgw_conf->zp_meta_ip_ports, - g_zgw_conf->redis_ip_port, g_zgw_conf->zp_table_name, + g_zgw_conf->zp_optimeout_ms, + g_zgw_conf->redis_ip_port, LockName(), kZgwRedisLockTTL, g_zgw_conf->redis_passwd, &store); @@ -59,8 +60,9 @@ ZgwServer::ZgwServer() zgw_dispatch_thread_ = pink::NewDispatchThread(g_zgw_conf->server_ip, g_zgw_conf->server_port, worker_num_, &conn_factory_, - 0, 1000, &server_handle_); + 2000, 1000, &server_handle_); zgw_dispatch_thread_->set_thread_name("DispatchThread"); + zgw_dispatch_thread_->set_keepalive_timeout(g_zgw_conf->keepalive_timeout); zgw_admin_thread_ = pink::NewHolyThread(g_zgw_conf->server_ip, g_zgw_conf->admin_port, @@ -122,8 +124,9 @@ Status ZgwServer::Start() { // Open new store ptr for gc thread if (g_zgw_conf->enable_gc) { s = zgwstore::ZgwStore::Open(g_zgw_conf->zp_meta_ip_ports, - g_zgw_conf->redis_ip_port, g_zgw_conf->zp_table_name, + g_zgw_conf->zp_optimeout_ms, + g_zgw_conf->redis_ip_port, LockName(), kZgwRedisLockTTL, g_zgw_conf->redis_passwd, &store_for_gc_); diff --git a/src/zgwstore/zgw_store.cc b/src/zgwstore/zgw_store.cc index e1b0d9c..fab9595 100644 --- a/src/zgwstore/zgw_store.cc +++ b/src/zgwstore/zgw_store.cc @@ -32,8 +32,9 @@ ZgwStore::~ZgwStore() { } } -Status ZgwStore::Open(const std::vector& zp_addrs, - const std::string& redis_addr, const std::string& zp_table, +Status ZgwStore::Open( + const std::vector& zp_addrs, const std::string& zp_table, + int zp_op_timeout_ms, const std::string& redis_addr, const std::string& lock_name, const int32_t lock_ttl, const std::string& redis_passwd, ZgwStore** store) { @@ -55,6 +56,7 @@ Status ZgwStore::Open(const std::vector& zp_addrs, } zp_option.meta_addr.push_back(libzp::Node(t_ip, t_port)); } + zp_option.op_timeout = zp_op_timeout_ms; libzp::Cluster* zp_cli = new libzp::Cluster(zp_option); s = zp_cli->Connect(); if (!s.ok()) { diff --git a/src/zgwstore/zgw_store.h b/src/zgwstore/zgw_store.h index c608517..900d840 100644 --- a/src/zgwstore/zgw_store.h +++ b/src/zgwstore/zgw_store.h @@ -16,8 +16,9 @@ class ZgwStore { ZgwStore(const std::string& zp_table, const std::string& lock_name, const int32_t lock_ttl, const std::string& redis_passwd); ~ZgwStore(); - static Status Open(const std::vector& zp_addrs, - const std::string& redis_addr, const std::string& zp_table, + static Status Open( + const std::vector& zp_addrs, const std::string& zp_table, + int zp_op_timeout_ms, const std::string& redis_addr, const std::string& lock_name, const int32_t lock_ttl, const std::string& redis_passwd, ZgwStore** store); void set_redis_ip(const std::string& redis_ip) { diff --git a/third/zeppelin-client b/third/zeppelin-client index 38e9a10..52d3e5b 160000 --- a/third/zeppelin-client +++ b/third/zeppelin-client @@ -1 +1 @@ -Subproject commit 38e9a10fcdbdbe2f7615c2bdfe0703e6d4d87987 +Subproject commit 52d3e5b0925ac063043be3b167e550b55e53b331