Skip to content
This repository was archived by the owner on Apr 6, 2019. It is now read-only.

Commit 486bcd9

Browse files
committed
prevent possible callback interleaving issue and unlock mutex in io_service when calling callbacks
1 parent cd503b0 commit 486bcd9

File tree

4 files changed

+38
-18
lines changed

4 files changed

+38
-18
lines changed

includes/cpp_redis/network/io_service.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,12 @@ class io_service {
7575
private:
7676
//! select fds sets handling (init, rd/wr handling)
7777
int init_sets(fd_set* rd_set, fd_set* wr_set);
78-
void read_fd(int fd);
79-
void write_fd(int fd);
8078
void process_sets(fd_set* rd_set, fd_set* wr_set);
8179

80+
typedef std::function<void()> callback_t;
81+
callback_t read_fd(int fd);
82+
callback_t write_fd(int fd);
83+
8284
private:
8385
//! whether the worker should terminate or not
8486
std::atomic_bool m_should_stop;

includes/cpp_redis/redis_client.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class redis_client {
5555

5656
//! thread safety
5757
std::mutex m_callbacks_mutex;
58+
std::mutex m_send_mutex;
5859
};
5960

6061
} //! cpp_redis

sources/network/io_service.cpp

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,13 +59,13 @@ io_service::init_sets(fd_set* rd_set, fd_set* wr_set) {
5959
return max_fd;
6060
}
6161

62-
void
62+
io_service::callback_t
6363
io_service::read_fd(int fd) {
6464
std::lock_guard<std::recursive_mutex> lock(m_fds_mutex);
6565

6666
auto fd_it = m_fds.find(fd);
6767
if (fd_it == m_fds.end())
68-
return ;
68+
return nullptr;
6969

7070
auto& buffer = *fd_it->second.read_buffer;
7171
int original_buffer_size = buffer.size();
@@ -78,30 +78,35 @@ io_service::read_fd(int fd) {
7878
buffer.resize(original_buffer_size);
7979
fd_it->second.disconnection_handler(*this);
8080
m_fds.erase(fd_it);
81+
82+
return nullptr;
8183
}
8284
else {
8385
buffer.resize(original_buffer_size + nb_bytes_read);
84-
fd_it->second.read_callback(nb_bytes_read);
86+
87+
return std::bind(fd_it->second.read_callback, nb_bytes_read);
8588
}
8689
}
8790

88-
void
91+
io_service::callback_t
8992
io_service::write_fd(int fd) {
9093
std::lock_guard<std::recursive_mutex> lock(m_fds_mutex);
9194

9295
auto fd_it = m_fds.find(fd);
9396
if (fd_it == m_fds.end())
94-
return ;
97+
return nullptr;
9598

9699
int nb_bytes_written = send(fd_it->first, fd_it->second.write_buffer.data(), fd_it->second.write_size, 0);
97100
fd_it->second.async_write = false;
98101

99102
if (nb_bytes_written <= 0) {
100103
fd_it->second.disconnection_handler(*this);
101104
m_fds.erase(fd_it);
105+
106+
return nullptr;
102107
}
103108
else
104-
fd_it->second.write_callback(nb_bytes_written);
109+
return std::bind(fd_it->second.write_callback, nb_bytes_written);
105110
}
106111

107112
void
@@ -123,8 +128,16 @@ io_service::process_sets(fd_set* rd_set, fd_set* wr_set) {
123128
}
124129
}
125130

126-
for (int fd : fds_to_read) { read_fd(fd); }
127-
for (int fd : fds_to_write) { write_fd(fd); }
131+
for (int fd : fds_to_read) {
132+
auto callback = read_fd(fd);
133+
if (callback)
134+
callback();
135+
}
136+
for (int fd : fds_to_write) {
137+
auto callback = write_fd(fd);
138+
if (callback)
139+
callback();
140+
}
128141

129142
if (FD_ISSET(m_notif_pipe_fds[0], rd_set)) {
130143
char buf[1024];

sources/redis_client.cpp

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ redis_client::is_connected(void) {
3131

3232
redis_client&
3333
redis_client::send(const std::vector<std::string>& redis_cmd, const reply_callback_t& callback) {
34-
m_client.send(redis_cmd);
34+
std::lock_guard<std::mutex> lock_callback(m_callbacks_mutex);
3535

36-
std::lock_guard<std::mutex> lock(m_callbacks_mutex);
36+
m_client.send(redis_cmd);
3737
m_callbacks.push(callback);
3838

3939
return *this;
@@ -49,15 +49,19 @@ redis_client::commit(void) {
4949

5050
void
5151
redis_client::connection_receive_handler(network::redis_connection&, reply& reply) {
52-
std::lock_guard<std::mutex> lock(m_callbacks_mutex);
52+
reply_callback_t callback;
5353

54-
if (not m_callbacks.size())
55-
return ;
54+
{
55+
std::lock_guard<std::mutex> lock(m_callbacks_mutex);
5656

57-
if (m_callbacks.front())
58-
m_callbacks.front()(reply);
57+
if (m_callbacks.size()) {
58+
callback = m_callbacks.front();
59+
m_callbacks.pop();
60+
}
61+
}
5962

60-
m_callbacks.pop();
63+
if (callback)
64+
callback(reply);
6165
}
6266

6367
void

0 commit comments

Comments
 (0)