-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathSMASyncRedis.cpp
104 lines (90 loc) · 2.89 KB
/
SMASyncRedis.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include "SMASyncRedis.h"
#include "extension.h"
static inline redisReply *CopyReply(redisReply *src)
{
redisReply *reply = (redisReply *)calloc(1, sizeof(redisReply));
reply->type = src->type;
reply->integer = src->integer;
switch (reply->type) {
case REDIS_REPLY_INTEGER:
break;
case REDIS_REPLY_ARRAY:
if (src->element != nullptr) {
reply->elements = src->elements;
reply->element = (redisReply **)calloc(src->elements, sizeof(redisReply *));
for (size_t i = 0; i < src->elements; i++) {
if (src->element[i] != nullptr) {
reply->element[i] = CopyReply(src->element[i]);
}
}
}
break;
case REDIS_REPLY_ERROR:
case REDIS_REPLY_STATUS:
case REDIS_REPLY_STRING:
if (src->str != nullptr) {
reply->str = (char *)calloc(src->len + 1, sizeof(char));
memcpy(reply->str, src->str, src->len);
}
}
return reply;
}
static void _cmdcb(void *data)
{
CBData *d = (CBData *)data;
IPluginFunction *cb = d->callback;
if (cb->IsRunnable()) {
HandleSecurity sec(d->identity, myself->GetIdentity());
void *obj;
int err = handlesys->ReadHandle(d->handle, g_AsyncRedisCtxType, &sec, &obj);
if (err != HandleError_None || obj == nullptr) {
delete d;
return;
}
RedisReply *reply = new RedisReply();
reply->reply = d->reply;
HandleError error = HandleError_None;
Handle_t qh = handlesys->CreateHandle(g_RedisReplyType, reply, d->identity, myself->GetIdentity(), &error);
if (!qh || error != HandleError_None) {
delete data;
return;
}
cb->PushCell(d->handle);
cb->PushCell(qh);
cb->PushCell(d->data);
cb->Execute(nullptr);
if (qh != BAD_HANDLE) {
handlesys->FreeHandle(qh, &sec);
}
}
delete data;
}
void SMASyncRedis::CmdCallback(void *r, void *privdata)
{
if (privdata) {
CBData *data = (CBData *)privdata;
data->reply = CopyReply((redisReply *)r);
g_HiredisExt.RequestFrame(_cmdcb, privdata);
}
}
static void _connectcb(void *data)
{
CBData *d = (CBData *)data;
IPluginFunction *cb = d->callback;
if (cb && cb->IsRunnable()) {
HandleSecurity sec;
sec.pOwner = NULL;
sec.pIdentity = myself->GetIdentity();
void *obj;
int err = handlesys->ReadHandle(d->handle, g_AsyncRedisCtxType, &sec, &obj);
if (err != HandleError_None || obj == nullptr) {
delete d;
return;
}
cb->PushCell(d->handle);
cb->PushCell(d->status);
cb->PushCell(d->data);
cb->Execute(nullptr);
}
delete d;
}