diff --git a/python-markdown-extension/pyproject.toml b/python-markdown-extension/pyproject.toml index 453a20e5..39561749 100644 --- a/python-markdown-extension/pyproject.toml +++ b/python-markdown-extension/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "python_markdown_document_offsets_injection_extension" -version = "0.0.2" +version = "0.1.0" description = "A Python-Markdown compiler plugin that put markdown words offset to the output HTML." authors = [{ name = "HikariLan", email = "hikarilan@minecraft.kim" }] license = { text = "Apache-2.0" } diff --git a/python-markdown-extension/src/python_markdown_document_offsets_injection_extension/extension.py b/python-markdown-extension/src/python_markdown_document_offsets_injection_extension/extension.py index 1d6152a7..95dbec74 100644 --- a/python-markdown-extension/src/python_markdown_document_offsets_injection_extension/extension.py +++ b/python-markdown-extension/src/python_markdown_document_offsets_injection_extension/extension.py @@ -1,26 +1,29 @@ -import re from markdown import Extension, Markdown from markdown.preprocessors import Preprocessor from markdown.blockprocessors import BlockProcessor from markdown.blockparser import BlockParser +from difflib import SequenceMatcher import xml.etree.ElementTree as etree +import logging -MARK_PREVENT_RECURSION: str = "\t\t\t\r\r\rMARK_PREVENT_RECURSION\r\r\r\t\t\t" - -MARK_CONTINUE: str = "\t\t\t\r\r\rMARK_CONTINUE\r\r\r\t\t\t" - -# @see: markdown.util.HTML_PLACEHOLDER_RE -# PYTHON_MARKDOWN_HTML_PLACEHOLDER_RE: re.Pattern[str] = re.compile( -# "\u0002wzxhzdk:%s\u0003" % r"([0-9]+)" -# ) +logging.basicConfig(format="%(levelname)s - %(message)s") +logger = logging.getLogger("document-offsets-injection") class MainExtension(Extension): + def __init__(self, **kwargs): + self.config = { + "debug": [False, "Debug mode"], + } + super(MainExtension, self).__init__(**kwargs) + def extendMarkdown(self, md: Markdown): meta: dict = { + "document": "", "document_offsets": [], - "used_document_offsets": {}, - "last_parent": None, + "preprocessed_document": "", + "preprocessed_document_restore_opcodes": [], + "debug_enabled": self.getConfig("debug"), } md.preprocessors.register( CalculateDocumentOffsetPreprocessor(md, meta), "capture_document", 1000 @@ -43,21 +46,16 @@ def __init__(self, md: Markdown, meta: dict): self.meta = meta def run(self, lines: list[str]) -> list[str]: + self.meta["document"] = lines + offset: int = 0 for line in lines: - # Skip empty lines - if len(line) == 0: - store: tuple[str, int, int] = (line, offset, offset + 1) - self.meta["document_offsets"].append(store) - self.meta["used_document_offsets"][store] = False - offset += 1 - continue # store the line and offset store: tuple[str, int, int] = (line, offset, offset + len(line)) self.meta["document_offsets"].append(store) - self.meta["used_document_offsets"][store] = False # plus 1 is for the newline character (\n), use the CRLF file is unknown behavior offset += len(line) + 1 + return lines @@ -71,180 +69,156 @@ def __init__(self, md: Markdown, meta: dict): self.meta = meta def run(self, lines: list[str]) -> list[str]: - document_offsets: list[tuple[str, int, int]] = self.meta["document_offsets"] - - # 最后一次成功匹配的文档偏移量字典索引末,开区间 - last_success_match_end: int = 0 - num_lines: int = 0 - num_document_offsets: int = 0 - while num_document_offsets < len(document_offsets) and num_lines < len(lines): - line = lines[num_lines] - document_offset: tuple[str, int, int] = document_offsets[ - num_document_offsets - ] - - # 如果精准匹配 - if document_offset[0] == line: - # 匹配该行 - self.match(line, num_document_offsets, num_document_offsets + 1) - # 如果上次成功匹配的原文档偏移量未连续,匹配当前行到这部分未连续的原文档偏移量 - if num_document_offsets > last_success_match_end and num_lines > 0: - self.match( - lines[num_lines - 1], - last_success_match_end, - num_document_offsets, - ) - last_success_match_end = num_document_offsets + 1 - num_lines += 1 - num_document_offsets += 1 - # 如果未能精准匹配,查找该行在原文档偏移量字典中的位置 - else: - remain: list[str] = [ - line for line, _, _ in document_offsets[num_document_offsets:] - ] - # 如果存在这样的行 - if line in remain: - # 找到第一次匹配的位置,匹配该行到此处 - idx = remain.index(line) + num_document_offsets - self.match(line, idx, idx + 1) - # 如果上次成功匹配的原文档偏移量未连续,匹配当前行到这部分未连续的原文档偏移量 - if idx > last_success_match_end and num_lines > 0: - self.match(lines[num_lines - 1], last_success_match_end, idx) - last_success_match_end = idx + 1 - num_lines += 1 - num_document_offsets = idx + 1 - # 如果未找到匹配的位置,继续查找下一行 - else: - num_lines += 1 - - # 如果行匹配完成,但原文档偏移量未匹配完成,匹配剩余的原文档偏移量 - if last_success_match_end < len(document_offsets): - self.match( - lines[num_lines - 1], last_success_match_end, len(document_offsets) + self.meta["preprocessed_document"] = lines + + a = self.meta["preprocessed_document"] + b = self.meta["document"] + + s = SequenceMatcher(lambda x: len(x) == 0, a, b) + + for tag, i1, i2, j1, j2 in s.get_opcodes(): + self.meta["preprocessed_document_restore_opcodes"].append( + (tag, i1, i2, j1, j2) ) return lines - def match( - self, - matched_line: str, - num_document_offsets_start: int, - num_document_offsets_end: int, - ): - """ - 将单个匹配行设置到多个原文档偏移量字典,索引范围为[num_document_offsets_start, num_document_offsets_end) - """ - document_offsets: list[tuple[str, int, int]] = self.meta["document_offsets"] - used_document_offsets: dict[tuple[str, int, int], bool] = self.meta[ - "used_document_offsets" - ] - for i in range(num_document_offsets_start, num_document_offsets_end): - document_offset = document_offsets[i] - # 如果是第一个匹配的原文档偏移量,设置为匹配行,否则设置为 MARK_CONTINUE - if i == num_document_offsets_start: - document_offsets[i] = ( - matched_line, - document_offset[1], - document_offset[2], - ) - else: - document_offsets[i] = ( - MARK_CONTINUE, - document_offset[1], - document_offset[2], - ) - del used_document_offsets[document_offset] - used_document_offsets[document_offsets[i]] = False - class OffsetsInjectionBlockProcessor(BlockProcessor): """ A block processor to mark the words in the document and inject the offset of the block to the HTML element """ + last_processed_line_idx: int = -1 + def __init__(self, parser: BlockParser, meta: dict): super(OffsetsInjectionBlockProcessor, self).__init__(parser) self.meta = meta def test(self, _, block) -> bool: - # Test if there is any line in the block - for line in [line for (line, _, _) in self.meta["document_offsets"]]: + for i in range(0, len(self.meta["preprocessed_document"])): + line: str = self.meta["preprocessed_document"][i] + if len(line) == 0: + continue if line in block: - return True + # 如果该行在块中,且该行的索引大于上次处理的索引,从此块开始处理 + return i > self.last_processed_line_idx + # 如果始终找不到符合条件的行,则不处理整个块 return False - def run(self, parent: etree.Element, blocks: list[str]) -> bool: - """ - 注入文档中的偏移量到HTML元素中,以便在后续的处理中可以使用这些偏移量来定位文档中的位置。目前的算法如下: - 1. 从文档中查找第一个包含文本的块 - 2. 查找这个块在文档中的位置,这通过遍历文档中的每一行,以找到所有被包含在该块中的行,通过获取这些行的起始和结束位置,来确定这个块在文档中的位置 - 3. 注入这个块的起始和结束位置到HTML元素中,这会先递归的解析这个块,然后再注入这个块的起始和结束位置注入到最后一个被生成的HTML元素中 - 由于递归解析块时该块仍会被本处理器捕获,为了避免循环递归,我们在块的末尾添加了MARK_PREVENT_RECURSION标记,当本处理器再次捕获到这个块时,会直接跳过这个块,并清除这个标记。 - """ - + def run(self, parent: etree.Element, blocks: list[str]): block: str = blocks[0] - # If the first block is handled, remove the marker and return, so that other block processors can process it - if MARK_PREVENT_RECURSION in blocks[0]: - blocks[0] = blocks[0].replace(MARK_PREVENT_RECURSION, "") - return False - - start: int | None = None - end: int | None = None - used: dict[tuple[str, int, int], bool] = {} - # Search for the block fragment in the document_offsets - for store in self.meta["document_offsets"]: - # Skip empty lines - if len(store[0]) == 0: - continue - # If already used, skip - if self.meta["used_document_offsets"][store]: - continue - (line, offset, end_offset) = store - # 如果收到 MARK_CONTINUE 标记,直接认为该标记之前的行是连续的 - if line == MARK_CONTINUE: - end = end_offset - used[store] = True + start: int = -1 + end: int = -1 + + for i in range( + self.last_processed_line_idx + 1, len(self.meta["preprocessed_document"]) + ): + line: str = self.meta["preprocessed_document"][i] + if len(line) == 0: continue - # If found one if line in block: - # If the line already scanned (usually some lines with same content in different place), skip - if line in [line for (line, _, _) in used.keys()]: - continue - # If none yet set, set the start offset - if start is None: - start = offset - end = end_offset - # Or, continuing searching for the end offset until the end of the block - else: - end = end_offset - # Mark the fragment as used - used[store] = True - # If end is not found but new line not in block, reset the search and restart from the next line - elif end is None: - start = None - # Clear the used list - used = {} - continue - # If both start and end are both set and no continuously block found, break the loop - else: + if start == -1: + start = i + end = i + 1 + self.last_processed_line_idx = i + elif start != -1: break - # If both start and end are found, store the result - if start is not None and end is not None: - blocks.pop(0) - self.meta["used_document_offsets"].update(used) - # append MARK_PREVENT_RECURSION to tail of the block to prevent recursion, we don't use a handled - # flaglist because we don't know if there's some same block in the document - self.parser.parseBlocks(parent, [block + MARK_PREVENT_RECURSION]) - # fix offset index out of range - if len(parent) == 0: - return True - # fix multi blocks in same parents - if self.meta["last_parent"] == parent[-1]: - parent[-1].set("data-original-document-end", str(end)) - return True - parent[-1].set("data-original-document-start", str(start)) - parent[-1].set("data-original-document-end", str(end)) - self.meta["last_parent"] = parent[-1] - return True - return False + + if start == -1: + return + + previous_len = len(parent) + self.parser.parseBlocks(parent, [block]) + parsed_len = len(parent) + + blocks.pop(0) + + restored_start = -1 + restored_end = -1 + restored_accurate = (False, False) + + # 查找删除行 + for _, i1, i2, j1, j2 in [ + r + for r in self.meta["preprocessed_document_restore_opcodes"] + if r[0] == "delete" + ]: + # 如果删除的行在开始和结束之间,就更新开始和结束的行号 + if i1 <= start < i2: + start = i2 + if i1 < end <= i2: + end = i1 + # 如果这导致开始大于结束,则没有与之匹配的原文档行,直接返回 + if start >= end: + return + + for tag, i1, i2, j1, j2 in self.meta["preprocessed_document_restore_opcodes"]: + # 插入行(原文档有但处理后文档没有)无意义,直接跳过 + if tag == "insert": + continue + # 模糊匹配替换行 + if tag == "replace": + if i1 <= start < i2: + restored_start = j1 + if i1 < end <= i2: + restored_end = j2 + + # 匹配相等行 + for _, i1, i2, j1, j2 in [ + r + for r in self.meta["preprocessed_document_restore_opcodes"] + if r[0] == "equal" + ]: + if i1 <= start < i2: + restored_start = j1 + start - i1 + restored_accurate = (True, restored_accurate[1]) + if i1 < end <= i2: + restored_end = j1 + end - i1 + restored_accurate = (restored_accurate[0], True) + + if restored_start == -1 or restored_end == -1: + if self.meta["debug_enabled"]: + logger.error( + "Failed to restore the document offsets for the block {}-{}, restored {}-{}".format( + start, end, restored_start, restored_end + ) + ) + return + + offset_start = self.meta["document_offsets"][restored_start][1] + offset_end = self.meta["document_offsets"][restored_end - 1][2] + + if previous_len == parsed_len and len(parent) > 0: + child = parent[-1] + if child.get("data-original-document-start") is None: + child.set("data-original-document-start", str(offset_start)) + if self.meta["debug_enabled"]: + logger.warning( + "Trying to patch a block without original document start, patching to current block offset in {}-{}".format( + offset_start, offset_end + ) + ) + child.set("data-original-document-end", str(offset_end)) + if self.meta["debug_enabled"]: + child.set( + "data-original-document", + child.get("data-original-document", "") + + "\n".join(self.meta["document"])[offset_start:offset_end], + ) + child.set("data-offset-accurate-end", str(restored_accurate[1]).lower()) + + for i in range(parsed_len - previous_len): + child = parent[-1 - i] + child.set("data-original-document-start", str(offset_start)) + child.set("data-original-document-end", str(offset_end)) + if self.meta["debug_enabled"]: + child.set( + "data-original-document", + "\n".join(self.meta["document"])[offset_start:offset_end], + ) + child.set( + "data-offset-accurate-start", str(restored_accurate[0]).lower() + ) + child.set("data-offset-accurate-end", str(restored_accurate[1]).lower()) diff --git a/python-markdown-extension/test/__main__.py b/python-markdown-extension/test/__main__.py index 93f2eb00..21f8619a 100644 --- a/python-markdown-extension/test/__main__.py +++ b/python-markdown-extension/test/__main__.py @@ -79,6 +79,9 @@ def __init__(self, case, test_case: unittest.TestCase): "pymdownx.tabbed", ], extension_configs={ + "document-offsets-injection": { + "debug": True, + }, "toc": { "permalink": "", "slugify": uslugify, @@ -115,6 +118,7 @@ def __init__(self, case, test_case: unittest.TestCase): self.test_case = test_case def test(self): + print(self.result) tester = ParserTester(self.case, self.test_case) tester.feed(self.result) tester.check_integrity() @@ -135,6 +139,12 @@ def handle_starttag(self, tag, attrs): start = None end = None for attr in attrs: + if ( + attr[0] == "data-offset-accurate-start" + or attr[0] == "data-offset-accurate-end" + ): + if not bool(attr[1]): + break if attr[0] == "data-original-document-start": start = int(attr[1]) if attr[0] == "data-original-document-end": @@ -275,7 +285,7 @@ def test_oi_wiki_index(self): }, { "tag": "p", - "offset": (780, 1101), # FIXME: Correct one is (780, 1101) + "offset": (780, 844), }, ], } diff --git a/python-markdown-extension/test/cli.py b/python-markdown-extension/test/cli.py index f1fc1643..407d92e3 100644 --- a/python-markdown-extension/test/cli.py +++ b/python-markdown-extension/test/cli.py @@ -40,6 +40,9 @@ "pymdownx.tabbed", ], extension_configs={ + "document-offsets-injection": { + "debug": True, + }, "toc": { "permalink": "", "slugify": uslugify,