Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IMP] start extraction of chunk processing \o/ #80

Open
wants to merge 10 commits into
base: 14.0
Choose a base branch
from
2 changes: 2 additions & 0 deletions chunk_processing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import models
from . import components
31 changes: 31 additions & 0 deletions chunk_processing/__manifest__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).


{
"name": "Chunk Processing",
"summary": "Base module for processing chunk",
"version": "14.0.1.0.0",
"category": "Uncategorized",
"website": "https://github.com/shopinvader/pattern-import-export",
"author": " Akretion",
"license": "AGPL-3",
"application": False,
"installable": True,
"external_dependencies": {
"python": [],
"bin": [],
},
"depends": [
"queue_job",
"component",
"web_refresher",
],
"data": [
"views/chunk_item_view.xml",
"views/chunk_group_view.xml",
"views/templates.xml",
],
"demo": [],
}
8 changes: 8 additions & 0 deletions chunk_processing/components/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from . import processor
from . import processor_xml
from . import processor_json
from . import processor_txt
from . import splitter
from . import splitter_json
from . import splitter_xml
from . import splitter_txt
33 changes: 33 additions & 0 deletions chunk_processing/components/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).


from odoo.addons.component.core import AbstractComponent


class ChunkProcessor(AbstractComponent):
_name = "chunk.processor"
_collection = "chunk.item"

def _import_item(self):
raise NotImplementedError

def _prepare_error_message(self, idx, item, error):
return {
"rows": {"from": idx, "to": idx},
"type": type(error).__name__,
"message": str(error),
}

def run(self):
res = {"ids": [], "messages": []}
for idx, item in enumerate(self._parse_data()):
try:
with self.env.cr.savepoint():
res["ids"] += self._import_item(item)
except Exception as e:
if self.env.context.get("chunk_raise_if_exception"):
raise
res["messages"].append(self._prepare_error_message(idx, item, e))
return res
17 changes: 17 additions & 0 deletions chunk_processing/components/processor_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import base64
import json

from odoo.addons.component.core import AbstractComponent


class ChunkProcessorJson(AbstractComponent):
_name = "chunk.importer.json"
_inherit = "chunk.processor"
_collection = "chunk.item"

def _parse_data(self):
return json.loads(base64.b64decode(self.collection.data))
17 changes: 17 additions & 0 deletions chunk_processing/components/processor_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import base64

from odoo.addons.component.core import AbstractComponent


class ChunkProcessorTxt(AbstractComponent):
_name = "chunk.importer.txt"
_inherit = "chunk.processor"
_collection = "chunk.item"
_end_of_line = b"\n"

def _parse_data(self):
return base64.b64decode(self.collection.data).split(self._end_of_line)
20 changes: 20 additions & 0 deletions chunk_processing/components/processor_xml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import base64

from lxml import objectify

from odoo.addons.component.core import AbstractComponent


class ChunkProcessorXml(AbstractComponent):
_name = "chunk.importer.xml"
_inherit = "chunk.processor"
_collection = "chunk.item"

def _parse_data(self):
return objectify.fromstring(
base64.b64decode(self.collection.data)
).iterchildren()
55 changes: 55 additions & 0 deletions chunk_processing/components/splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import base64

from odoo.addons.component.core import AbstractComponent


class ChunkSplitter(AbstractComponent):
_name = "chunk.splitter"
_collection = "chunk.group"

def _parse_data(self, data):
raise NotImplementedError

def _convert_items_to_data(self, items):
raise NotImplementedError

def _prepare_chunk(self, start_idx, stop_idx, items):
return {
"start_idx": start_idx,
"stop_idx": stop_idx,
"data": base64.b64encode(self._convert_items_to_data(items)),
"nbr_item": len(items),
"state": "pending",
"group_id": self.collection.id,
}

def _should_create_chunk(self, items, next_item):
"""Customise this code if you want to add some additionnal
item after reaching the limit"""
return len(items) > self.collection.chunk_size

def _create_chunk(self, start_idx, stop_idx, data):
vals = self._prepare_chunk(start_idx, stop_idx, data)
chunk = self.env["chunk.item"].create(vals)
# we enqueue the chunk in case of multi process of if it's the first chunk
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment gives no new info, is equivalent to reading the next line

if self.collection.process_multi or len(self.collection.item_ids) == 1:
chunk.with_delay(priority=self.collection.job_priority).run()
return chunk

def run(self, data):
items = []
start_idx = 1
previous_idx = None
for idx, item in self._parse_data(data):
if self._should_create_chunk(items, item):
self._create_chunk(start_idx, previous_idx, items)
items = []
start_idx = idx
items.append((idx, item))
previous_idx = idx
if items:
self._create_chunk(start_idx, idx, items)
21 changes: 21 additions & 0 deletions chunk_processing/components/splitter_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import json

from odoo.addons.component.core import Component


class ChunkSplitterJson(Component):
_inherit = "chunk.splitter"
_name = "chunk.splitter.json"
_usage = "json"

def _convert_items_to_data(self, items):
return json.dumps(items, indent=2).encode("utf-8")

def _parse_data(self, data):
items = json.loads(data.decode("utf-8"))
for idx, item in enumerate(items):
yield idx + 1, item
20 changes: 20 additions & 0 deletions chunk_processing/components/splitter_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from odoo.addons.component.core import Component


class ChunkSplitterTxt(Component):
_inherit = "chunk.splitter"
_name = "chunk.splitter.txt"
_usage = "txt"
_end_of_line = b"\n"

def _parse_data(self, data):
for idx, item in enumerate(data.split(self._end_of_line)):
if item:
yield idx + 1, item

def _convert_items_to_data(self, items):
return self._end_of_line.join([x[1] for x in items])
25 changes: 25 additions & 0 deletions chunk_processing/components/splitter_xml.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from lxml import etree

from odoo.addons.component.core import Component


class ChunkSplitterXml(Component):
_inherit = "chunk.splitter"
_name = "chunk.splitter.xml"
_usage = "xml"

def _parse_data(self, data):
tree = etree.fromstring(data)
items = tree.xpath(self.collection.xml_split_xpath)
for idx, item in enumerate(items):
yield idx + 1, item

def _convert_items_to_data(self, items):
data = etree.Element("data")
for item in items:
data.append(item[1])
return etree.tostring(data)
2 changes: 2 additions & 0 deletions chunk_processing/models/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from . import chunk_item
from . import chunk_group
81 changes: 81 additions & 0 deletions chunk_processing/models/chunk_group.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
# Copyright 2021 Akretion (https://www.akretion.com).
# @author Sébastien BEAU <[email protected]>
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).


from odoo import _, api, fields, models


class ChunkGroup(models.Model):
_inherit = "collection.base"
_name = "chunk.group"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the word "batch" is very standard and expressive for this kind of purpose, instead of "group"


item_ids = fields.One2many("chunk.item", "group_id", "Item")
process_multi = fields.Boolean()
job_priority = fields.Integer(default=20)
chunk_size = fields.Integer(default=500, help="Define the size of the chunk")
progress = fields.Float(compute="_compute_stat")
date_done = fields.Datetime()
data_format = fields.Selection(
[
("json", "Json"),
("xml", "XML"),
("txt", "Txt"),
]
)
xml_split_xpath = fields.Char()
state = fields.Selection(
[("pending", "Pending"), ("failed", "Failed"), ("done", "Done")],
default="pending",
)
info = fields.Char()
nbr_error = fields.Integer(compute="_compute_stat")
nbr_success = fields.Integer(compute="_compute_stat")
apply_on_model = fields.Char()
usage = fields.Char()

@api.depends("item_ids.nbr_error", "item_ids.nbr_success")
def _compute_stat(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: stats

for record in self:
record.nbr_error = sum(record.mapped("item_ids.nbr_error"))
record.nbr_success = sum(record.mapped("item_ids.nbr_success"))
todo = sum(record.mapped("item_ids.nbr_item"))
if todo:
record.progress = (record.nbr_error + record.nbr_success) * 100.0 / todo
else:
record.progress = 0

def _get_data(self):
raise NotImplementedError

def split_in_chunk(self):
"""Split Group into Chunk"""
# purge chunk in case of retring a job
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrying

self.item_ids.unlink()
try:
data = self._get_data()
with self.work_on(self._name) as work:
splitter = work.component(usage=self.data_format)
splitter.run(data)
except Exception as e:
if self._context.get("chunk_raise_if_exception"):
raise
else:
self.state = "failed"
self.info = _("Failed to create the chunk: %s") % e
return True

def set_done(self):
for record in self:
if record.nbr_error:
record.state = "failed"
else:
record.state = "done"
record.date_done = fields.Datetime.now()

@api.model_create_multi
def create(self, vals_list):
records = super().create(vals_list)
for record in records:
record.with_delay(priority=self.job_priority).split_in_chunk()
return records
Loading