Skip to content

Commit

Permalink
v2.0.0 ClickHouse cluster support
Browse files Browse the repository at this point in the history
  • Loading branch information
glushkovds committed Jul 26, 2024
1 parent 884b21f commit 9a13f23
Show file tree
Hide file tree
Showing 20 changed files with 632 additions and 66 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
## 2.0.0 [2024-07-26]

### Features
1. ClickHouse cluster support

### Breaking changes
1. The minimum required PHP version is 8.0
2. Removed deprecated method BaseModel::insert, use BaseModel::insertBulk instead
3. Method BaseModel::prepareAndInsert is marked as deprecated, use BaseModel::prepareAndInsertBulk instead

## 1.19.0 [2023-09-28]

### Features
Expand Down
89 changes: 84 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ Adapter to Laravel and Lumen of the most popular libraries:

## Features

No dependency, only Curl (support php >=7.1 )
No dependency, only Curl (support php >=8.0 )

More: https://github.com/smi2/phpClickHouse#features

## Prerequisites

- PHP 7.1, 8.0
- PHP 8.0
- Laravel/Lumen 7+
- Clickhouse server

Expand Down Expand Up @@ -87,7 +87,6 @@ More about `$db` see here: https://github.com/smi2/phpClickHouse/blob/master/REA
```php
<?php


namespace App\Models\Clickhouse;

use PhpClickHouseLaravel\BaseModel;
Expand All @@ -105,7 +104,6 @@ class MyTable extends BaseModel
```php
<?php


class CreateMyTable extends \PhpClickHouseLaravel\Migration
{
/**
Expand Down Expand Up @@ -354,7 +352,6 @@ MyTable::insertAssoc([[1, 'str', new InsertArray(['a','b'])]]);
```php
<?php


namespace App\Models\Clickhouse;

use PhpClickHouseLaravel\BaseModel;
Expand Down Expand Up @@ -386,3 +383,85 @@ return new class extends \PhpClickHouseLaravel\Migration
}
};
```

### Cluster mode

**Important!**
* Each ClickHouse node must have one database name and login and password.
* For reading and writing, the connection is made to the first available node.
* Migrations executes on all nodes. If one of the nodes is unavailable, the migration will throw an exception.

Your config/database.php should look like:
```php
'clickhouse' => [
'driver' => 'clickhouse',
'cluster' => [
[
'host' => 'clickhouse01',
'port' => '8123',
],
[
'host' => 'clickhouse02',
'port' => '8123',
],
],
'database' => env('CLICKHOUSE_DATABASE','default'),
'username' => env('CLICKHOUSE_USERNAME','default'),
'password' => env('CLICKHOUSE_PASSWORD',''),
'timeout_connect' => env('CLICKHOUSE_TIMEOUT_CONNECT',2),
'timeout_query' => env('CLICKHOUSE_TIMEOUT_QUERY',2),
'https' => (bool)env('CLICKHOUSE_HTTPS', null),
'retries' => env('CLICKHOUSE_RETRIES', 0),
'settings' => [ // optional
'max_partitions_per_insert_block' => 300,
],
],
```

Migration is:

```php
<?php

return new class extends \PhpClickHouseLaravel\Migration
{
/**
* Run the migrations.
*
* @return void
*/
public function up()
{
static::write('
CREATE TABLE my_table (
id UInt32,
created_at DateTime,
field_one String,
field_two Int32
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/default.my_table', '{replica}')
ORDER BY (id)
');
}

/**
* Reverse the migrations.
*
* @return void
*/
public function down()
{
static::write('DROP TABLE my_table');
}
};
```

You can get the host of the current node and switch the active connection to the next node:
```php
$row = new MyTable();
echo $row->getThisClient()->getConnectHost();
// will print 'clickhouse01'
$row->resolveConnection()->getCluster()->slideNode();
echo $row->getThisClient()->getConnectHost();
// will print 'clickhouse02'
```
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
}
],
"require": {
"php": ">=7.1.0|>=8.0",
"php": ">=8.0",
"smi2/phpclickhouse": "^1.4.2",
"the-tinderbox/clickhouse-builder": "^6.0",
"illuminate/support": ">=7",
Expand Down
32 changes: 18 additions & 14 deletions docker-compose.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,29 @@ services:
php:
build: tests/docker
depends_on:
- clickhouse
- clickhouse2
- clickhouse01
- clickhouse02
- zookeeper
volumes:
- ./:/src

clickhouse:
clickhouse01:
image: yandex/clickhouse-server
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
ports:
- "18123:8123"
depends_on:
- zookeeper
volumes:
- ./tests/docker/clickhouse01:/etc/clickhouse-server

clickhouse2:
clickhouse02:
image: yandex/clickhouse-server
ulimits:
nproc: 65535
nofile:
soft: 262144
hard: 262144
ports:
- "18124:8123"
depends_on:
- zookeeper
volumes:
- ./tests/docker/clickhouse02:/etc/clickhouse-server

zookeeper:
image: zookeeper:3.7
36 changes: 18 additions & 18 deletions src/BaseModel.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,12 @@ class BaseModel
* @var string
*/
protected $connection = Connection::DEFAULT_NAME;

/**
* Determine if an attribute or relation exists on the model.
* The __isset magic method is triggered by calling isset() or empty() on inaccessible properties.
*
* @param string $key The name of the attribute or relation.
* @param string $key The name of the attribute or relation.
* @return bool True if the attribute or relation exists, false otherwise.
*/
public function __isset($key)
Expand All @@ -97,7 +97,7 @@ public function __isset($key)

return false;
}

/**
* Get the table associated with the model.
*
Expand Down Expand Up @@ -190,18 +190,6 @@ public function save(array $options = []): bool
return $this->exists;
}

/**
* Bulk insert into Clickhouse database
* @param array[] $rows
* @return Statement
* @deprecated use insertBulk
*/
public static function insert(array $rows): Statement
{
$instance = new static();
return $instance->getThisClient()->insert($instance->getTableForInserts(), $rows);
}

/**
* Bulk insert into Clickhouse database
* @param array[] $rows
Expand Down Expand Up @@ -232,6 +220,21 @@ public static function insertBulk(array $rows, array $columns = []): Statement
* @param array $columns
* @return Statement
*/
public static function prepareAndInsertBulk(array $rows, array $columns = []): Statement
{
return static::insertBulk(
array_map('static::prepareFromRequest', $rows, $columns),
$columns
);
}

/**
* Prepare each row by calling static::prepareFromRequest to bulk insert into database
* @param array[] $rows
* @param array $columns
* @return Statement
* @deprecated use prepareAndInsertBulk
*/
public static function prepareAndInsert(array $rows, array $columns = []): Statement
{
$rows = array_map('static::prepareFromRequest', $rows, $columns);
Expand Down Expand Up @@ -319,9 +322,6 @@ public static function select($select = ['*']): Builder
return $instance->newQuery()->select($select)->from($instance->getTable());
}

/**
* @return Builder
*/
protected function newQuery(): Builder
{
return new Builder($this->getThisClient());
Expand Down
94 changes: 94 additions & 0 deletions src/Cluster.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<?php

namespace PhpClickHouseLaravel;

use ClickHouseDB\Client;
use ClickHouseDB\Exception\TransportException;
use ClickHouseDB\Statement;

class Cluster
{
/**
* @var Client[]
*/
protected array $nodes;
protected int $activeNodeIndex;

public function __construct(
protected array $nodeConfigs
) {
foreach ($this->nodeConfigs as $index => $nodeConfig) {
try {
$this->nodes[$index] = static::createClient($nodeConfig);
$this->nodes[$index]->ping(true);
$this->activeNodeIndex = $index;
break;
} catch (TransportException $e) {
}
}
if (!isset($this->activeNodeIndex)) {
throw $e ?? new TransportException('No nodes are available');
}
}

public function write(string $sql, array $bindings = [], bool $exception = true): ?Statement
{
foreach ($this->nodeConfigs as $index => $config) {
if (empty($this->nodes[$index])) {
$this->nodes[$index] = static::createClient($config);
}
$statement = $this->nodes[$index]->write($sql, $bindings, $exception);
}
return $statement ?? null;
}

public function getActiveNode(): Client
{
return $this->nodes[$this->activeNodeIndex];
}

/**
* Switch active node to the next available node
* @return void
*/
public function slideNode(): void
{
$configCount = count($this->nodeConfigs);
if ($configCount < 2) {
return;
}
for ($i = 0; $i < $configCount; $i++) {
$nextIndex = $this->activeNodeIndex + 1;
if ($configCount == $nextIndex) {
$nextIndex = 0;
}
try {
$this->nodes[$nextIndex] ??= static::createClient($this->nodeConfigs[$nextIndex]);
$this->nodes[$nextIndex]->ping(true);
$this->activeNodeIndex = $nextIndex;
break;
} catch (TransportException) {
}
}
}

protected static function createClient(array $config): Client
{
$client = new Client($config);
$client->database($config['database']);
$client->setTimeout((int)$config['timeout_query']);
$client->setConnectTimeOut((int)$config['timeout_connect']);
if ($configSettings =& $config['settings']) {
$settings = $client->settings();
foreach ($configSettings as $sName => $sValue) {
$settings->set($sName, $sValue);
}
}
if ($retries = (int)($config['retries'] ?? null)) {
$curler = new CurlerRollingWithRetries();
$curler->setRetries($retries);
$client->transport()->setDirtyCurler($curler);
}
return $client;
}
}
Loading

0 comments on commit 9a13f23

Please sign in to comment.