1
+ <?php
2
+ namespace Swoole \Async ;
3
+
4
+ class MySQL
5
+ {
6
+ /**
7
+ * max connections for mysql client
8
+ * @var int
9
+ */
10
+ protected $ pool_size ;
11
+
12
+ protected $ connection_num ;
13
+
14
+ /**
15
+ * idle connection
16
+ * @var array
17
+ */
18
+ protected $ idle_pool = array ();
19
+
20
+ /**
21
+ * work connetion
22
+ * @var array
23
+ */
24
+ protected $ work_pool = array ();
25
+
26
+ protected $ config ;
27
+
28
+ /**
29
+ * wait connection
30
+ * @var array
31
+ */
32
+ protected $ wait_queue = array ();
33
+
34
+ protected $ round_id ;
35
+
36
+ function __construct ($ config , $ pool_size = 100 )
37
+ {
38
+ if (empty ($ config ['host ' ]) or empty ($ config ['database ' ]) or empty ($ config ['user ' ]) or empty ($ config ['password ' ]))
39
+ {
40
+ throw new \Exception ("require host, database, user, password config. " );
41
+ }
42
+
43
+ if (!function_exists ('swoole_get_mysqli_sock ' ))
44
+ {
45
+ throw new \Exception ("require swoole_get_mysqli_sock function. " );
46
+ }
47
+
48
+ if (empty ($ config ['port ' ]))
49
+ {
50
+ $ config ['port ' ] = 3306 ;
51
+ }
52
+
53
+ $ this ->config = $ config ;
54
+ $ this ->pool_size = $ pool_size ;
55
+ }
56
+
57
+ /**
58
+ * create mysql connection
59
+ */
60
+ protected function createConnection ()
61
+ {
62
+ $ config = $ this ->config ;
63
+ $ db = new \mysqli ;
64
+ $ db ->connect ($ config ['host ' ], $ config ['user ' ], $ config ['password ' ], $ config ['database ' ], $ config ['port ' ]);
65
+ $ db_sock = swoole_get_mysqli_sock ($ db );
66
+ swoole_event_add ($ db_sock , array ($ this , 'onSQLReady ' ));
67
+ $ this ->idle_pool [] = array (
68
+ 'object ' => $ db ,
69
+ 'socket ' => $ db_sock ,
70
+ );
71
+ $ this ->connection_num ++;
72
+ }
73
+
74
+ function onSQLReady ($ db_sock )
75
+ {
76
+ $ task = $ this ->work_pool [$ db_sock ];
77
+
78
+ /**
79
+ * @var \mysqli
80
+ */
81
+ $ mysqli = $ task ['mysql ' ]['object ' ];
82
+ $ callback = $ task ['callback ' ];
83
+
84
+ if ($ result = $ mysqli ->reap_async_query ())
85
+ {
86
+ call_user_func ($ callback , $ mysqli , $ result );
87
+ if (is_object ($ result ))
88
+ {
89
+ mysqli_free_result ($ result );
90
+ }
91
+ }
92
+ else
93
+ {
94
+ echo "MySQLi Error: " . mysqli_error ($ mysqli )."\n" ;
95
+ }
96
+
97
+ //release mysqli object
98
+ $ this ->idle_pool [] = $ task ['mysql ' ];
99
+ unset($ this ->work_pool [$ db_sock ]);
100
+
101
+ //fetch a request from wait queue.
102
+ if (count ($ this ->wait_queue ) > 0 )
103
+ {
104
+ $ idle_n = count ($ this ->idle_pool );
105
+ for ($ i = 0 ; $ i < $ idle_n ; $ i ++)
106
+ {
107
+ $ new_task = array_shift ($ this ->wait_queue );
108
+ $ this ->doQuery ($ new_task ['sql ' ], $ new_task ['callback ' ]);
109
+ }
110
+ }
111
+ }
112
+
113
+ function query ($ sql , $ callback )
114
+ {
115
+ //no idle connection
116
+ if (count ($ this ->idle_pool ) == 0 )
117
+ {
118
+ if ($ this ->connection_num < $ this ->pool_size )
119
+ {
120
+ $ this ->createConnection ();
121
+ $ this ->doQuery ($ sql , $ callback );
122
+ }
123
+ else
124
+ {
125
+ $ this ->wait_queue [] = array (
126
+ 'sql ' => $ sql ,
127
+ 'callback ' => $ callback ,
128
+ );
129
+ }
130
+ }
131
+ else
132
+ {
133
+ $ this ->doQuery ($ sql , $ callback );
134
+ }
135
+ }
136
+
137
+ protected function doQuery ($ sql , $ callback )
138
+ {
139
+ //remove from idle pool
140
+ $ db = array_pop ($ this ->idle_pool );
141
+
142
+ /**
143
+ * @var \mysqli
144
+ */
145
+ $ mysqli = $ db ['object ' ];
146
+
147
+ for ($ i = 0 ; $ i < 2 ; $ i ++)
148
+ {
149
+ $ result = $ mysqli ->query ($ sql , MYSQLI_ASYNC );
150
+ if ($ result === false )
151
+ {
152
+ if ($ mysqli ->errno == 2013 or $ mysqli ->errno == 2006 )
153
+ {
154
+ $ mysqli ->close ();
155
+ $ r = $ mysqli ->connect ();
156
+ if ($ r === true )
157
+ {
158
+ continue ;
159
+ }
160
+ }
161
+ else
162
+ {
163
+ echo "server exception. \n" ;
164
+ $ this ->connection_num --;
165
+ $ this ->wait_queue [] = array (
166
+ 'sql ' => $ sql ,
167
+ 'callback ' => $ callback ,
168
+ );
169
+ }
170
+ }
171
+ break ;
172
+ }
173
+
174
+ $ task ['sql ' ] = $ sql ;
175
+ $ task ['callback ' ] = $ callback ;
176
+ $ task ['mysql ' ] = $ db ;
177
+
178
+ //join to work pool
179
+ $ this ->work_pool [$ db ['socket ' ]] = $ task ;
180
+ }
181
+ }
0 commit comments