1
1
<?php
2
2
namespace MySQLReplication \BinLog ;
3
3
4
+ use MySQLReplication \BinLog \Exception \BinLogException ;
4
5
use MySQLReplication \Config \Config ;
5
6
use MySQLReplication \Repository \MySQLRepository ;
6
7
use MySQLReplication \Definitions \ConstCapabilityFlags ;
7
8
use MySQLReplication \Definitions \ConstCommand ;
8
- use MySQLReplication \Exception \BinLogException ;
9
9
use MySQLReplication \Gtid \GtidCollection ;
10
10
11
11
/**
@@ -24,7 +24,7 @@ class BinLogConnect
24
24
/**
25
25
* @var MySQLRepository
26
26
*/
27
- private $ DBHelper ;
27
+ private $ mySQLRepository ;
28
28
/**
29
29
* @var Config
30
30
*/
@@ -37,25 +37,33 @@ class BinLogConnect
37
37
* @var GtidCollection
38
38
*/
39
39
private $ gtidCollection ;
40
+ /**
41
+ * http://dev.mysql.com/doc/internals/en/auth-phase-fast-path.html 00 FE
42
+ * @var array
43
+ */
44
+ private $ packageOkHeader = [0 , 254 ];
40
45
41
46
/**
42
47
* @param Config $config
43
- * @param MySQLRepository $DBHelper
48
+ * @param MySQLRepository $mySQLRepository
44
49
* @param BinLogAuth $packAuth
45
50
* @param GtidCollection $gtidCollection
46
51
*/
47
52
public function __construct (
48
53
Config $ config ,
49
- MySQLRepository $ DBHelper ,
54
+ MySQLRepository $ mySQLRepository ,
50
55
BinLogAuth $ packAuth ,
51
56
GtidCollection $ gtidCollection
52
57
) {
53
- $ this ->DBHelper = $ DBHelper ;
58
+ $ this ->mySQLRepository = $ mySQLRepository ;
54
59
$ this ->config = $ config ;
55
60
$ this ->packAuth = $ packAuth ;
56
61
$ this ->gtidCollection = $ gtidCollection ;
57
62
}
58
63
64
+ /**
65
+ *
66
+ */
59
67
public function __destruct ()
60
68
{
61
69
if (true === $ this ->isConnected ())
@@ -81,20 +89,25 @@ public function getCheckSum()
81
89
return $ this ->checkSum ;
82
90
}
83
91
92
+
84
93
/**
85
94
* @throws BinLogException
86
- * @return self
87
95
*/
88
96
public function connectToStream ()
89
97
{
98
+ if (false === filter_var ($ this ->config ->getIp (), FILTER_VALIDATE_IP ))
99
+ {
100
+ throw new BinLogException ('Given parameter " ' . $ this ->config ->getIp () . '" is not a valid IP ' );
101
+ }
102
+
90
103
if (false === ($ this ->socket = socket_create (AF_INET , SOCK_STREAM , SOL_TCP )))
91
104
{
92
105
throw new BinLogException ('Unable to create a socket: ' . socket_strerror (socket_last_error ()), socket_last_error ());
93
106
}
94
107
socket_set_block ($ this ->socket );
95
108
socket_set_option ($ this ->socket , SOL_SOCKET , SO_KEEPALIVE , 1 );
96
109
97
- if (false === socket_connect ($ this ->socket , $ this ->config ->getHost (), $ this ->config ->getPort ()))
110
+ if (false === socket_connect ($ this ->socket , $ this ->config ->getIp (), $ this ->config ->getPort ()))
98
111
{
99
112
throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
100
113
}
@@ -104,6 +117,9 @@ public function connectToStream()
104
117
$ this ->getBinlogStream ();
105
118
}
106
119
120
+ /**
121
+ *
122
+ */
107
123
private function serverInfo ()
108
124
{
109
125
BinLogServerInfo::parsePackage ($ this ->getPacket (false ));
@@ -126,11 +142,37 @@ public function getPacket($checkForOkByte = true)
126
142
$ result = $ this ->readFromSocket ($ dataLength );
127
143
if (true === $ checkForOkByte )
128
144
{
129
- $ this ->packAuth -> isWriteSuccessful ($ result );
145
+ $ this ->isWriteSuccessful ($ result );
130
146
}
131
147
return $ result ;
132
148
}
133
149
150
+
151
+ /**
152
+ * @param string $packet
153
+ * @return array
154
+ * @throws BinLogException
155
+ */
156
+ public function isWriteSuccessful ($ packet )
157
+ {
158
+ $ head = ord ($ packet [0 ]);
159
+ if (in_array ($ head , $ this ->packageOkHeader ))
160
+ {
161
+ return ['status ' => true , 'code ' => 0 , 'msg ' => '' ];
162
+ }
163
+ else
164
+ {
165
+ $ error_code = unpack ('v ' , $ packet [1 ] . $ packet [2 ])[1 ];
166
+ $ error_msg = '' ;
167
+ for ($ i = 9 ; $ i < strlen ($ packet ); $ i ++)
168
+ {
169
+ $ error_msg .= $ packet [$ i ];
170
+ }
171
+
172
+ throw new BinLogException ($ error_msg , $ error_code );
173
+ }
174
+ }
175
+
134
176
/**
135
177
* @param $length
136
178
* @return string
@@ -143,43 +185,25 @@ private function readFromSocket($length)
143
185
throw new BinLogException ('read 5 bytes from mysql server has gone away ' );
144
186
}
145
187
146
- try
147
- {
148
- $ bytes_read = 0 ;
149
- $ body = '' ;
150
- while ($ bytes_read < $ length )
151
- {
152
- $ resp = socket_read ($ this ->socket , $ length - $ bytes_read );
153
- if ($ resp === false )
154
- {
155
- throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
156
- }
157
-
158
- // server kill connection or server gone away
159
- if (strlen ($ resp ) === 0 )
160
- {
161
- throw new BinLogException ('read less ' . ($ length - strlen ($ body )));
162
- }
163
- $ body .= $ resp ;
164
- $ bytes_read += strlen ($ resp );
165
- }
166
- if (strlen ($ body ) < $ length )
167
- {
168
- throw new BinLogException ('read less ' . ($ length - strlen ($ body )));
169
- }
170
- return $ body ;
171
- } catch (\Exception $ e )
188
+ if ($ length === socket_recv ($ this ->socket , $ buf , $ length , MSG_WAITALL ))
172
189
{
173
- throw new BinLogException ( var_export ( $ e , true )) ;
190
+ return $ buf ;
174
191
}
192
+
193
+ throw new BinLogException (socket_strerror (socket_last_error ()), socket_last_error ());
175
194
}
176
195
177
196
/**
178
197
* @throws BinLogException
179
198
*/
180
199
private function auth ()
181
200
{
182
- $ data = $ this ->packAuth ->createAuthenticationPacket (ConstCapabilityFlags::getCapabilities (), $ this ->config ->getUser (), $ this ->config ->getPassword (), BinLogServerInfo::getSalt ());
201
+ $ data = $ this ->packAuth ->createAuthenticationBinary (
202
+ ConstCapabilityFlags::getCapabilities (),
203
+ $ this ->config ->getUser (),
204
+ $ this ->config ->getPassword (),
205
+ BinLogServerInfo::getSalt ()
206
+ );
183
207
184
208
$ this ->writeToSocket ($ data );
185
209
$ this ->getPacket ();
@@ -202,7 +226,7 @@ private function writeToSocket($data)
202
226
*/
203
227
private function getBinlogStream ()
204
228
{
205
- $ this ->checkSum = $ this ->DBHelper ->isCheckSum ();
229
+ $ this ->checkSum = $ this ->mySQLRepository ->isCheckSum ();
206
230
if (true === $ this ->checkSum )
207
231
{
208
232
$ this ->execute ('SET @master_binlog_checksum=@@global.binlog_checksum ' );
@@ -215,7 +239,7 @@ private function getBinlogStream()
215
239
216
240
if ('' === $ binFilePos || '' === $ binFileName )
217
241
{
218
- $ master = $ this ->DBHelper ->getMasterStatus ();
242
+ $ master = $ this ->mySQLRepository ->getMasterStatus ();
219
243
$ binFilePos = $ master ['Position ' ];
220
244
$ binFileName = $ master ['File ' ];
221
245
}
@@ -228,7 +252,7 @@ private function getBinlogStream()
228
252
}
229
253
else
230
254
{
231
- $ prelude = pack ('l ' , 26 + $ this ->gtidCollection ->getEncodedPacketLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
255
+ $ prelude = pack ('l ' , 26 + $ this ->gtidCollection ->getEncodedLength ()) . chr (ConstCommand::COM_BINLOG_DUMP_GTID );
232
256
$ prelude .= pack ('S ' , 0 );
233
257
$ prelude .= pack ('I ' , $ this ->config ->getSlaveId ());
234
258
$ prelude .= pack ('I ' , 3 );
@@ -237,8 +261,8 @@ private function getBinlogStream()
237
261
$ prelude .= chr (0 );
238
262
$ prelude .= pack ('Q ' , 4 );
239
263
240
- $ prelude .= pack ('I ' , $ this ->gtidCollection ->getEncodedPacketLength ());
241
- $ prelude .= $ this ->gtidCollection ->getEncodedPacket ();
264
+ $ prelude .= pack ('I ' , $ this ->gtidCollection ->getEncodedLength ());
265
+ $ prelude .= $ this ->gtidCollection ->getEncoded ();
242
266
}
243
267
244
268
$ this ->writeToSocket ($ prelude );
0 commit comments