Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify API, add new MysqlClient and remove Factory and ConnectionInterface #186

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
260 changes: 73 additions & 187 deletions README.md

Large diffs are not rendered by default.

8 changes: 2 additions & 6 deletions examples/01-query.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,12 @@
// $ php examples/01-query.php
// $ MYSQL_URI=test:test@localhost/test php examples/01-query.php "SELECT * FROM book"

use React\MySQL\Factory;
use React\MySQL\QueryResult;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$connection->query($query)->then(function (QueryResult $command) {
$mysql->query($query)->then(function (React\MySQL\QueryResult $command) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
print_r($command->resultFields);
Expand Down
7 changes: 2 additions & 5 deletions examples/02-query-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,12 @@
// $ php examples/02-query-stream.php "SHOW VARIABLES"
// $ MYSQL_URI=test:test@localhost/test php examples/02-query-stream.php "SELECT * FROM book"

use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$connection = $factory->createLazyConnection(getenv('MYSQL_URI') ?: 'test:test@localhost/test');
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$stream = $connection->queryStream($query);
$stream = $mysql->queryStream($query);

$stream->on('data', function ($row) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;
Expand Down
126 changes: 56 additions & 70 deletions examples/11-interactive.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,87 +3,73 @@
// $ php examples/11-interactive.php
// $ MYSQL_URI=test:test@localhost/test php examples/11-interactive.php

use React\MySQL\ConnectionInterface;
use React\MySQL\QueryResult;
use React\MySQL\Factory;
use React\Stream\ReadableResourceStream;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

// open a STDIN stream to read keyboard input (not supported on Windows)
$stdin = new ReadableResourceStream(STDIN);
$stdin->pause();

//create a mysql connection for executing queries
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($stdin) {
echo 'Connection success.' . PHP_EOL;
$stdin->resume();
$stdin = new React\Stream\ReadableResourceStream(STDIN);

$stdin->on('data', function ($line) use ($connection) {
$query = trim($line);
$stdin->on('data', function ($line) use ($mysql) {
$query = trim($line);

if ($query === '') {
// skip empty commands
return;
}
if ($query === 'exit') {
// exit command should close the connection
echo 'bye.' . PHP_EOL;
$connection->quit();
return;
}
if ($query === '') {
// skip empty commands
return;
}
if ($query === 'exit') {
// exit command should close the connection
echo 'bye.' . PHP_EOL;
$mysql->quit();
return;
}

$time = microtime(true);
$connection->query($query)->then(function (QueryResult $command) use ($time) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
echo implode("\t", array_column($command->resultFields, 'name')) . PHP_EOL;
foreach ($command->resultRows as $row) {
echo implode("\t", $row) . PHP_EOL;
}

printf(
'%d row%s in set (%.03f sec)%s',
count($command->resultRows),
count($command->resultRows) === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
} else {
// this is an OK message in response to an UPDATE etc.
// the insertId will only be set if this is
if ($command->insertId !== 0) {
var_dump('last insert ID', $command->insertId);
}
$time = microtime(true);
$mysql->query($query)->then(function (React\MySQL\QueryResult $command) use ($time) {
if (isset($command->resultRows)) {
// this is a response to a SELECT etc. with some rows (0+)
echo implode("\t", array_column($command->resultFields, 'name')) . PHP_EOL;
foreach ($command->resultRows as $row) {
echo implode("\t", $row) . PHP_EOL;
}

printf(
'Query OK, %d row%s affected (%.03f sec)%s',
$command->affectedRows,
$command->affectedRows === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
printf(
'%d row%s in set (%.03f sec)%s',
count($command->resultRows),
count($command->resultRows) === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
} else {
// this is an OK message in response to an UPDATE etc.
// the insertId will only be set if this is
if ($command->insertId !== 0) {
var_dump('last insert ID', $command->insertId);
}
}, function (Exception $error) {
// the query was not executed successfully
echo 'Error: ' . $error->getMessage() . PHP_EOL;
});
});

// close connection when STDIN closes (EOF or CTRL+D)
$stdin->on('close', function () use ($connection) {
$connection->quit();
printf(
'Query OK, %d row%s affected (%.03f sec)%s',
$command->affectedRows,
$command->affectedRows === 1 ? '' : 's',
microtime(true) - $time,
PHP_EOL
);
}
}, function (Exception $error) {
// the query was not executed successfully
echo 'Error: ' . $error->getMessage() . PHP_EOL;
});
});

// close STDIN (stop reading) when connection closes
$connection->on('close', function () use ($stdin) {
$stdin->close();
echo 'Disconnected.' . PHP_EOL;
});
}, function (Exception $e) use ($stdin) {
echo 'Connection error: ' . $e->getMessage() . PHP_EOL;
// close connection when STDIN closes (EOF or CTRL+D)
$stdin->on('close', function () use ($mysql) {
$mysql->quit();
});

// close STDIN (stop reading) when connection closes
$mysql->on('close', function () use ($stdin) {
$stdin->close();
echo 'Disconnected.' . PHP_EOL;
});

echo '# Entering interactive mode ready, hit CTRL-D to quit' . PHP_EOL;
72 changes: 36 additions & 36 deletions examples/12-slow-stream.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,21 @@
// $ MYSQL_URI=test:test@localhost/test php examples/12-slow-stream.php "SELECT * FROM book"

use React\EventLoop\Loop;
use React\MySQL\ConnectionInterface;
use React\MySQL\Factory;

require __DIR__ . '/../vendor/autoload.php';

$factory = new Factory();
$uri = getenv('MYSQL_URI') ?: 'test:test@localhost/test';
$mysql = new React\MySQL\MysqlClient(getenv('MYSQL_URI') ?: 'test:test@localhost/test');

$query = isset($argv[1]) ? $argv[1] : 'select * from book';
$stream = $mysql->queryStream($query);

//create a mysql connection for executing query
$factory->createConnection($uri)->then(function (ConnectionInterface $connection) use ($query) {
// The protocol parser reads rather large chunked from the underlying connection
$ref = new ReflectionProperty($mysql, 'connecting');
$ref->setAccessible(true);
$promise = $ref->getValue($mysql);
assert($promise instanceof React\Promise\PromiseInterface);

$promise->then(function (React\MySQL\Io\Connection $connection) {
// The protocol parser reads rather large chunks from the underlying connection
// and as such can yield multiple (dozens to hundreds) rows from a single data
// chunk. We try to artificially limit the stream chunk size here to try to
// only ever read a single row so we can demonstrate throttling this stream.
Expand All @@ -28,11 +30,13 @@
$ref = new ReflectionProperty($connection, 'stream');
$ref->setAccessible(true);
$conn = $ref->getValue($connection);
assert($conn instanceof React\Socket\ConnectionInterface);

// access private "input" (instanceof React\Stream\DuplexStreamInterface)
$ref = new ReflectionProperty($conn, 'input');
$ref->setAccessible(true);
$stream = $ref->getValue($conn);
assert($stream instanceof React\Stream\DuplexStreamInterface);

// reduce private bufferSize to just a few bytes to slow things down
$ref = new ReflectionProperty($stream, 'bufferSize');
Expand All @@ -41,38 +45,34 @@
} catch (Exception $e) {
echo 'Warning: Unable to reduce buffer size: ' . $e->getMessage() . PHP_EOL;
}
});

$stream = $connection->queryStream($query);

$throttle = null;
$stream->on('data', function ($row) use (&$throttle, $stream) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;

// simple throttle mechanism: explicitly pause the result stream and
// resume it again after some time.
if ($throttle === null) {
$throttle = Loop::addTimer(1.0, function () use ($stream, &$throttle) {
$throttle = null;
$stream->resume();
});
$stream->pause();
}
});

$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () use (&$throttle) {
echo 'CLOSED' . PHP_EOL;
$throttle = null;
$stream->on('data', function ($row) use (&$throttle, $stream) {
echo json_encode($row, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES) . PHP_EOL;

if ($throttle) {
Loop::cancelTimer($throttle);
// simple throttle mechanism: explicitly pause the result stream and
// resume it again after some time.
if ($throttle === null) {
$throttle = Loop::addTimer(1.0, function () use ($stream, &$throttle) {
$throttle = null;
}
});
$stream->resume();
});
$stream->pause();
}
});

$connection->quit();
}, function (Exception $e) {
$stream->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

$stream->on('close', function () use (&$throttle) {
echo 'CLOSED' . PHP_EOL;

if ($throttle) {
Loop::cancelTimer($throttle);
$throttle = null;
}
});

$mysql->quit();
Loading