-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IMP] start extraction of chunk processing \o/ #80
base: 14.0
Are you sure you want to change the base?
Changes from all commits
f35935c
791bfcb
1c67416
7ebe578
0b6bab2
6ad30d5
9f70400
9d3a54f
9354a87
e957067
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from . import models | ||
from . import components |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
|
||
{ | ||
"name": "Chunk Processing", | ||
"summary": "Base module for processing chunk", | ||
"version": "14.0.1.0.0", | ||
"category": "Uncategorized", | ||
"website": "https://github.com/shopinvader/pattern-import-export", | ||
"author": " Akretion", | ||
"license": "AGPL-3", | ||
"application": False, | ||
"installable": True, | ||
"external_dependencies": { | ||
"python": [], | ||
"bin": [], | ||
}, | ||
"depends": [ | ||
"queue_job", | ||
"component", | ||
"web_refresher", | ||
], | ||
"data": [ | ||
"views/chunk_item_view.xml", | ||
"views/chunk_group_view.xml", | ||
"views/templates.xml", | ||
], | ||
"demo": [], | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
from . import processor | ||
from . import processor_xml | ||
from . import processor_json | ||
from . import processor_txt | ||
from . import splitter | ||
from . import splitter_json | ||
from . import splitter_xml | ||
from . import splitter_txt |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
|
||
from odoo.addons.component.core import AbstractComponent | ||
|
||
|
||
class ChunkProcessor(AbstractComponent): | ||
_name = "chunk.processor" | ||
_collection = "chunk.item" | ||
|
||
def _import_item(self): | ||
raise NotImplementedError | ||
|
||
def _prepare_error_message(self, idx, item, error): | ||
return { | ||
"rows": {"from": idx, "to": idx}, | ||
"type": type(error).__name__, | ||
"message": str(error), | ||
} | ||
|
||
def run(self): | ||
res = {"ids": [], "messages": []} | ||
for idx, item in enumerate(self._parse_data()): | ||
try: | ||
with self.env.cr.savepoint(): | ||
res["ids"] += self._import_item(item) | ||
except Exception as e: | ||
if self.env.context.get("chunk_raise_if_exception"): | ||
raise | ||
res["messages"].append(self._prepare_error_message(idx, item, e)) | ||
return res |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
import base64 | ||
import json | ||
|
||
from odoo.addons.component.core import AbstractComponent | ||
|
||
|
||
class ChunkProcessorJson(AbstractComponent): | ||
_name = "chunk.importer.json" | ||
_inherit = "chunk.processor" | ||
_collection = "chunk.item" | ||
|
||
def _parse_data(self): | ||
return json.loads(base64.b64decode(self.collection.data)) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
import base64 | ||
|
||
from odoo.addons.component.core import AbstractComponent | ||
|
||
|
||
class ChunkProcessorTxt(AbstractComponent): | ||
_name = "chunk.importer.txt" | ||
_inherit = "chunk.processor" | ||
_collection = "chunk.item" | ||
_end_of_line = b"\n" | ||
|
||
def _parse_data(self): | ||
return base64.b64decode(self.collection.data).split(self._end_of_line) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
import base64 | ||
|
||
from lxml import objectify | ||
|
||
from odoo.addons.component.core import AbstractComponent | ||
|
||
|
||
class ChunkProcessorXml(AbstractComponent): | ||
_name = "chunk.importer.xml" | ||
_inherit = "chunk.processor" | ||
_collection = "chunk.item" | ||
|
||
def _parse_data(self): | ||
return objectify.fromstring( | ||
base64.b64decode(self.collection.data) | ||
).iterchildren() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
import base64 | ||
|
||
from odoo.addons.component.core import AbstractComponent | ||
|
||
|
||
class ChunkSplitter(AbstractComponent): | ||
_name = "chunk.splitter" | ||
_collection = "chunk.group" | ||
|
||
def _parse_data(self, data): | ||
raise NotImplementedError | ||
|
||
def _convert_items_to_data(self, items): | ||
raise NotImplementedError | ||
|
||
def _prepare_chunk(self, start_idx, stop_idx, items): | ||
return { | ||
"start_idx": start_idx, | ||
"stop_idx": stop_idx, | ||
"data": base64.b64encode(self._convert_items_to_data(items)), | ||
"nbr_item": len(items), | ||
"state": "pending", | ||
"group_id": self.collection.id, | ||
} | ||
|
||
def _should_create_chunk(self, items, next_item): | ||
"""Customise this code if you want to add some additionnal | ||
item after reaching the limit""" | ||
return len(items) > self.collection.chunk_size | ||
|
||
def _create_chunk(self, start_idx, stop_idx, data): | ||
vals = self._prepare_chunk(start_idx, stop_idx, data) | ||
chunk = self.env["chunk.item"].create(vals) | ||
# we enqueue the chunk in case of multi process of if it's the first chunk | ||
if self.collection.process_multi or len(self.collection.item_ids) == 1: | ||
chunk.with_delay(priority=self.collection.job_priority).run() | ||
return chunk | ||
|
||
def run(self, data): | ||
items = [] | ||
start_idx = 1 | ||
previous_idx = None | ||
for idx, item in self._parse_data(data): | ||
if self._should_create_chunk(items, item): | ||
self._create_chunk(start_idx, previous_idx, items) | ||
items = [] | ||
start_idx = idx | ||
items.append((idx, item)) | ||
previous_idx = idx | ||
if items: | ||
self._create_chunk(start_idx, idx, items) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
import json | ||
|
||
from odoo.addons.component.core import Component | ||
|
||
|
||
class ChunkSplitterJson(Component): | ||
_inherit = "chunk.splitter" | ||
_name = "chunk.splitter.json" | ||
_usage = "json" | ||
|
||
def _convert_items_to_data(self, items): | ||
return json.dumps(items, indent=2).encode("utf-8") | ||
|
||
def _parse_data(self, data): | ||
items = json.loads(data.decode("utf-8")) | ||
for idx, item in enumerate(items): | ||
yield idx + 1, item |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
from odoo.addons.component.core import Component | ||
|
||
|
||
class ChunkSplitterTxt(Component): | ||
_inherit = "chunk.splitter" | ||
_name = "chunk.splitter.txt" | ||
_usage = "txt" | ||
_end_of_line = b"\n" | ||
|
||
def _parse_data(self, data): | ||
for idx, item in enumerate(data.split(self._end_of_line)): | ||
if item: | ||
yield idx + 1, item | ||
|
||
def _convert_items_to_data(self, items): | ||
return self._end_of_line.join([x[1] for x in items]) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
from lxml import etree | ||
|
||
from odoo.addons.component.core import Component | ||
|
||
|
||
class ChunkSplitterXml(Component): | ||
_inherit = "chunk.splitter" | ||
_name = "chunk.splitter.xml" | ||
_usage = "xml" | ||
|
||
def _parse_data(self, data): | ||
tree = etree.fromstring(data) | ||
items = tree.xpath(self.collection.xml_split_xpath) | ||
for idx, item in enumerate(items): | ||
yield idx + 1, item | ||
|
||
def _convert_items_to_data(self, items): | ||
data = etree.Element("data") | ||
for item in items: | ||
data.append(item[1]) | ||
return etree.tostring(data) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
from . import chunk_item | ||
from . import chunk_group |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
# Copyright 2021 Akretion (https://www.akretion.com). | ||
# @author Sébastien BEAU <[email protected]> | ||
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). | ||
|
||
|
||
from odoo import _, api, fields, models | ||
|
||
|
||
class ChunkGroup(models.Model): | ||
_inherit = "collection.base" | ||
_name = "chunk.group" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the word "batch" is very standard and expressive for this kind of purpose, instead of "group" |
||
|
||
item_ids = fields.One2many("chunk.item", "group_id", "Item") | ||
process_multi = fields.Boolean() | ||
job_priority = fields.Integer(default=20) | ||
chunk_size = fields.Integer(default=500, help="Define the size of the chunk") | ||
progress = fields.Float(compute="_compute_stat") | ||
date_done = fields.Datetime() | ||
data_format = fields.Selection( | ||
[ | ||
("json", "Json"), | ||
("xml", "XML"), | ||
("txt", "Txt"), | ||
] | ||
) | ||
xml_split_xpath = fields.Char() | ||
state = fields.Selection( | ||
[("pending", "Pending"), ("failed", "Failed"), ("done", "Done")], | ||
default="pending", | ||
) | ||
info = fields.Char() | ||
nbr_error = fields.Integer(compute="_compute_stat") | ||
nbr_success = fields.Integer(compute="_compute_stat") | ||
apply_on_model = fields.Char() | ||
usage = fields.Char() | ||
|
||
@api.depends("item_ids.nbr_error", "item_ids.nbr_success") | ||
def _compute_stat(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nitpicking: stats |
||
for record in self: | ||
record.nbr_error = sum(record.mapped("item_ids.nbr_error")) | ||
record.nbr_success = sum(record.mapped("item_ids.nbr_success")) | ||
todo = sum(record.mapped("item_ids.nbr_item")) | ||
if todo: | ||
record.progress = (record.nbr_error + record.nbr_success) * 100.0 / todo | ||
else: | ||
record.progress = 0 | ||
|
||
def _get_data(self): | ||
raise NotImplementedError | ||
|
||
def split_in_chunk(self): | ||
"""Split Group into Chunk""" | ||
# purge chunk in case of retring a job | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. retrying |
||
self.item_ids.unlink() | ||
try: | ||
data = self._get_data() | ||
with self.work_on(self._name) as work: | ||
splitter = work.component(usage=self.data_format) | ||
splitter.run(data) | ||
except Exception as e: | ||
if self._context.get("chunk_raise_if_exception"): | ||
raise | ||
else: | ||
self.state = "failed" | ||
self.info = _("Failed to create the chunk: %s") % e | ||
return True | ||
|
||
def set_done(self): | ||
for record in self: | ||
if record.nbr_error: | ||
record.state = "failed" | ||
else: | ||
record.state = "done" | ||
record.date_done = fields.Datetime.now() | ||
|
||
@api.model_create_multi | ||
def create(self, vals_list): | ||
records = super().create(vals_list) | ||
for record in records: | ||
record.with_delay(priority=self.job_priority).split_in_chunk() | ||
return records |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comment gives no new info, is equivalent to reading the next line