-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathExport.class.php
127 lines (100 loc) · 3.35 KB
/
Export.class.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
<?php
use Elasticsearch\ClientBuilder;
class Export{
public $host, $fields, $query, $stm = 30, $size = 100, $logfile, $csvFileObject;
private $es, $header = array(), $paramm, $recordsToWrite, $fieldsArray, $totalRecords, $recordsProcessed = 1;
function connect()
{
try{
$hosts = [
'host' => $this->host,
];
$client = ClientBuilder::create()
->setHosts($hosts)
->build();
$this->es = $client;
$this->log("Connected successfully ...");
}catch(exception $e){
echo "Error while connecting to Elasticsearch ..";
$this->log($e);
}
}
function queryParam()
{
try{
$params = array();
$query = json_decode($this->query,true);
$select = explode(',',$this->fields);
$keys = array_flip($select);
$this->fieldsArray = array_fill_keys(array_keys($keys),"");
$paramBuild = [
"_source" => $select,
"scroll" => $this->stm."s",
"size" => $this->size,
"index" => $this->index,
];
if(!empty($this->type)){
$paramBuild['type'] = $this->type;
}
if(!empty($this->query)){
$paramBuild['body'] = json_decode($this->query,true);
}
$this->param = $paramBuild;
}catch(Exception $e){
$this->log($e);
}
}
function fetchDataWriteCSV($id = 0 ,$max = 1)
{
try{
if(!$this->csvFileObject){
$this->log("Check with your csvfile."); exit;
}
$this->param['body']['slice']['id'] = $id;
$this->param['body']['slice']['max'] = $max;
$response = $this->es->search($this->param);
$i = 1;
while (isset($response['hits']['hits']) && count($response['hits']['hits']) > 0) {
$this->processRecords($response);
$this->writeFile();
$scroll_id = $response['_scroll_id'];
$response = $this->es->scroll([
"scroll_id" => $scroll_id,
"scroll" => $this->stm."s"
]);
$i++;
}
}catch(Exception $e){
$this->log($e);
}
}
function processRecords($response)
{
foreach($response['hits']['hits'] as $key => $records){
$records['_source']['_id'] = $records['_id'];
$this->recordsToWrite[] = $records['_source'];
$this->recordsProcessed++;
}
}
function writeFile()
{
if(!empty($this->recordsToWrite)){
foreach($this->recordsToWrite as $rec){
$row = array_replace($this->fieldsArray,$rec);
fputcsv($this->csvFileObject,$row);
}
}
$this->recordsToWrite = array();
}
function log($log)
{
if(!empty($log)){
if(!empty($this->logfile)){
file_put_contents($this->logfile, date("y-m-d H:i:s")." $log \n", FILE_APPEND | LOCK_EX);
}else{
echo date("y-m-d H:i:s")." $log \n";
}
}
}
}
?>