-
Notifications
You must be signed in to change notification settings - Fork 37
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Minor] Class attributes being private prevents code from being easily adapted #39
Comments
Hi! Change how index name is generatedWhat would you need for example? We have a PREFIX option, but that's all at the moment. I'm completely ok adding an option to change the date format, but using a completely non date-related name is not desirable. For example the method Was that your plan/need? Change to
|
I discovered just after posting my message the CONFIG_INDEX_PREFIX which is indeed what I needed. I just wanted to add more info before the index name. I will explain again what I have done now since my implementation is in a more advanced stage. I extend two classes (which are copied from src code with protected attributes and overridden in composer autoload):
CustomClientuse JoliCode\Elastically\Client;
...
class CustomClient extends Client
{
const CONFIG_BULK_BYTES_SIZE = 'elastically_bulk_bytes_size'; //New conf
public function __construct($config = [], ?callable $callback = null, ?LoggerInterface $logger = null)
{
parent::__construct($config, $callback, $logger);
}
public function getIndexer(): Indexer //Use a custom indexer -> OOP
{
if (!$this->indexer) {
$this->indexer = new CustomIndexer($this, $this->getSerializer(), $this->getConfigValue(self::CONFIG_BULK_SIZE, 100), $this->getConfigValue(self::CONFIG_BULK_BYTES_SIZE, (int) (ini_get('post_max_size')) * 1.0e+6));
}
return $this->indexer;
}
} As you see, I extend the class in order to add a new conf value and to override CustomIndexeruse JoliCode\Elastically\Indexer;
...
class LabCollectorIndexer extends Indexer
{
protected $bulkBytesMaxSize; //New attribute
public function __construct(Client $client, SerializerInterface $serializer, int $bulkMaxSize = 100, int $bulkBytesMaxSize) //Extended constructor
{
parent::__construct($client, $serializer, $bulkMaxSize ?? 100, $serializer);
$this->bulkBytesMaxSize = $bulkBytesMaxSize;
}
public function scheduleIndex($index, Document $document, string $pipelineID = null) //add string $pipelineID = null to all methods which might call flush method
{
$document->setIndex($index instanceof Index ? $index->getName() : $index);
if (is_object($document->getData())) {
$context = $this->client->getSerializerContext(get_class($document->getData()));
$document->setData($this->serializer->serialize($document->getData(), 'json', $context));
}
$docsize = $this->getBytesSize($document) * 2; // After adding a document $currentBulk size increase 2 times the document size
if ($docsize > $this->bulkBytesMaxSize) {
//Document too heavy
//TODO return exception
return;
}
$queuesize = $this->getQueueBytesSize();
$response = null;
if ($this->getQueueSize() + 1 >= $this->bulkMaxSize || $queuesize + $docsize >= $this->bulkBytesMaxSize) {
//Queue too long or heavy in case we add the document -> send the bulk
try {
$response = $this->flush($pipelineID); //store response
} catch (ResponseException $e) {
$response = $e->getResponseSet(); //Get response even in case of exception (ie in my case, tka exception with encrypted files)
}
}
$this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX);
return $response; //Return the bulk response or null
}
// Note: insert modifications of scheduleDelete, scheduleUpdate and scheduleCreate which are very similar (but I want to reduce this issue post size)
public function getQueueBytesSize() //New method
{
if (null === $this->currentBulk) {
return 0;
}
return $this->getBytesSize($this->currentBulk->getActions());
}
public function getBytesSize($arr): int //New method
{
//TODO might need improvements
$tot = 0;
if (is_array($arr)) {
foreach ($arr as $a) {
$tot += $this->getBytesSize($a);
}
}
if (is_string($arr)) {
$tot += strlen($arr);
}
if (is_int($arr)) {
$tot += PHP_INT_SIZE;
}
if (is_object($arr)) {
$tot += strlen(serialize($arr));
}
return $tot;
}
protected function getCurrentBulk(string $pipelineID = null): Bulk //Override to add pipeline
{
if (!($this->currentBulk instanceof Bulk)) {
$this->currentBulk = new Bulk($this->client);
if (isset($pipelineID)) {
$this->currentBulk->setRequestParam('pipeline', $pipelineID);
}
}
return $this->currentBulk;
}
} Implementation of bulk pipelines and max byte size. |
As I read my code again, getCurrentBulk override in CustomIndexer might be useless since I also can replace the line $this->getCurrentBulk($pipelineID)->addDocument($document, Bulk\Action::OP_TYPE_INDEX); with the lines $bulk = $this->getCurrentBulk($pipelineID);
if (isset($pipelineID)) {
$bulk ->setRequestParam('pipeline', $pipelineID); //Note: It will reset the value at each schedule* call
}
$bulk->addDocument($document, Bulk\Action::OP_TYPE_INDEX); |
Thanks for sharing. Just found this setter on the Elastica Document One possible implementation we could do is adding the pipeline as a Indexer option, like you did for bulkBytesMaxSize (that way you create one indexer per pipeline you want to use). ping #31 for reference. |
Yeah, it was my first try to add a pipeline (before adding to CustomIndexer) and you remind me that I still have this line /**
* Index a FileSystem file.
*
* @param string $attachId The file id in DB
* @param string $hash An hash of the file (ie: filename or content, depends on what we want to be identified as unique), will be used as Elasticsearch index ID
* @param string $contents The file content (will be encoded in base64)
* @param string $filepath The direct path to the file
* @param string $dbtable The file id in DB
* @param Indexer $indexer The Indexer instance
* @param Index $index The Index instance
*
* @return null|ResponseSet
*/
function indexFSFile(string $attachId, string $hash, string $contents, string $filepath, string $dbtable, Indexer $indexer, Index $index)
{
$b64 = base64_encode($contents);
$path_parts = pathinfo($filepath);
$dto = MyIndexedFile::createFSdoc($attachId, $b64, $dbtable, $path_parts['basename'], $path_parts['dirname'], $filepath);
$doc = new Document($hash, $dto);
$doc->setPipeline('fileattachment'); //TODO useless because it's a bulk request
return $indexer->scheduleIndex($index, $doc, 'fileattachment');
}
This is how I create the pipeline (at index creation) using only Elastica code (apart the $client which is an instance of CustomClient, which extends Elastically\Client which extends etc...) //Add pipeline for ingest-attachement
$pipeline = new Pipeline($client);
$pipeline->setId('fileattachment')->setDescription('Extract attachment information');
//Create attachment processor pipeline -> set the field where are file binaries and infinite indexed chars
$attachproc = new Attachment('contentBinary');
$attachproc->setIndexedChars(-1);
//Create remove processor pipeline -> remove the file binaries after contentBinary is indexed (reduce weight)
$removeprc = new Remove('contentBinary');
//Add processors in the correct order to the pipeline
$pipeline->addProcessor($attachproc);
$pipeline->addProcessor($removeprc);
//Create the pipeline
$response = $pipeline->create(); And this |
Hi,
I began to use elastically and implement it.
But due to some specific use, I need to extend some classes and overload some methods.
For example, overload the
IndexBuilder.createIndex($indexName)
method in order to change how index name is generated. But it's impossible to access private attributes like$this->configurationDirectory
.Another example is to add request param to the bulk request generated in
Indexer.getCurrentBulk()
like set a pipeline$this->currentBulk->setRequestParam('pipeline', $pipelineID);
(this part is pretty similar to #31)Note: Another solution could be having more getters&setters for class attributes
The text was updated successfully, but these errors were encountered: