-
Notifications
You must be signed in to change notification settings - Fork 0
/
MRedisConnection.hpp
136 lines (92 loc) · 4.59 KB
/
MRedisConnection.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Copyright 2018 Stephan Menzel. Distributed under the Boost
// Software License, Version 1.0. (See accompanying file
// LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#pragma once
#include "MRedisConfig.hpp"
#include "MRedisConnection.hpp"
#include "RESP.hpp"
#include <boost/asio.hpp>
#include <boost/lockfree/queue.hpp>
#include <functional>
#include <string>
#include <deque>
namespace moose {
namespace mredis {
class AsyncClient;
class MRedisConnection {
public:
//! all timeouts in seconds
enum { MREDIS_CONNECT_TIMEOUT = 2 };
enum { MREDIS_READ_TIMEOUT = 5 }; // make that 10
enum { MREDIS_WRITE_TIMEOUT = 5 };
MRedisConnection(AsyncClient &n_parent);
MRedisConnection(const MRedisConnection &) = delete;
virtual ~MRedisConnection() noexcept;
MRedisConnection &operator=(const MRedisConnection &) = delete;
//! This blocks until connected or throws on error
void connect(const std::string &n_server, const boost::uint16_t n_port = 6379);
//! This doesn't block and sets promise upon done
void async_connect(const std::string &n_server, const boost::uint16_t n_port, std::shared_ptr<boost::promise<bool> > n_ret);
//! This doesn't block and sets promise upon done
void async_reconnect();
/*! @brief shut the connection down.
There may still be cancelled handlers on the io_service for it. Make sure you poll before
actually destroying the connections
@param n_reconnect if true, the connection will go to state Status::ShutdownReconnect
and attempt to re-connect at the next send()
*/
virtual void stop() noexcept;
/*! @brief send an unknown command that can be filled by the caller via n_prepare
*/
void send(std::function<void(std::ostream &n_os)> &&n_prepare, Callback &&n_callback) noexcept;
/*! @brief send an unknown command that can be filled by the caller via n_prepare
*/
promised_response_ptr send(std::function<void(std::ostream &n_os)> &&n_prepare) noexcept;
//! connection lifecycle
enum class Status {
Disconnected = 0,
Connecting,
Connected,
Pushing, // normal mode connection
Pubsub, // pubsub connection
ShuttingDownReconnect, // going down for reconnect with the intend of restarting
ShutdownReconnect, // down and ready to reconnect
ShuttingDown,
Shutdown // down and out
};
protected:
//! assume there are some and go send
void send_outstanding_requests() noexcept;
void read_response() noexcept;
//! when handling error conditions after async ops, use this to save some lines
//! @return true when error should cause closing of the connection
bool handle_error(const boost::system::error_code n_errc, const char *n_message) const;
//! yeah, this needs refactoring. I don't need two functions for that
void check_connect_deadline(const boost::system::error_code &n_error);
void check_read_deadline(const boost::system::error_code &n_error);
/*! @brief shut the connection down and try to reconnect until done.
must be called by the connection itself, in io_service's thread
*/
virtual void shutdown_reconnect() noexcept;
/*! output current status as string */
const char *status_string() const noexcept;
AsyncClient &m_parent;
std::string m_server_name;
boost::uint16_t m_server_port;
boost::asio::ip::tcp::socket m_socket; //!< Socket for the connection
boost::asio::streambuf m_send_streambuf; //!< use for writing
bool m_send_buffer_busy; //!< streambuf in use
boost::asio::steady_timer m_send_retry_timer; //!< when buffer is in use for sending, retry after a few micros
boost::asio::steady_timer m_send_timeout;
boost::asio::streambuf m_receive_streambuf; //!< use for reading
bool m_receive_buffer_busy; //!< streambuf in use
boost::asio::steady_timer m_receive_retry_timer; //!< when buffer is in use for receiving, retry after a few micros
boost::asio::steady_timer m_receive_timeout; //!< pipelining connection, we always have two concurrent timeouts
boost::asio::steady_timer m_connect_timeout; //!< we try to adopt a timeout concept but since we are in a
boost::mutex m_request_queue_lock;
std::deque<mrequest> m_requests_not_sent;
std::deque<Callback> m_outstanding; //!< callbacks that have not been resolved yet. We are waiting for an answer
Status m_status; //!< tell where we are in our workflow
};
}
}