Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor extension #11

Merged
merged 4 commits into from
Jul 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python-markdown-extension/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "python_markdown_document_offsets_injection_extension"
version = "0.0.2"
version = "0.1.0"
description = "A Python-Markdown compiler plugin that put markdown words offset to the output HTML."
authors = [{ name = "HikariLan", email = "[email protected]" }]
license = { text = "Apache-2.0" }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
import re
from markdown import Extension, Markdown
from markdown.preprocessors import Preprocessor
from markdown.blockprocessors import BlockProcessor
from markdown.blockparser import BlockParser
from difflib import SequenceMatcher
import xml.etree.ElementTree as etree
import logging

MARK_PREVENT_RECURSION: str = "\t\t\t\r\r\rMARK_PREVENT_RECURSION\r\r\r\t\t\t"

MARK_CONTINUE: str = "\t\t\t\r\r\rMARK_CONTINUE\r\r\r\t\t\t"

# @see: markdown.util.HTML_PLACEHOLDER_RE
# PYTHON_MARKDOWN_HTML_PLACEHOLDER_RE: re.Pattern[str] = re.compile(
# "\u0002wzxhzdk:%s\u0003" % r"([0-9]+)"
# )
logging.basicConfig(format="%(levelname)s - %(message)s")
logger = logging.getLogger("document-offsets-injection")


class MainExtension(Extension):
def __init__(self, **kwargs):
self.config = {
"debug": [False, "Debug mode"],
}
super(MainExtension, self).__init__(**kwargs)

def extendMarkdown(self, md: Markdown):
meta: dict = {
"document": "",
"document_offsets": [],
"used_document_offsets": {},
"last_parent": None,
"preprocessed_document": "",
"preprocessed_document_restore_opcodes": [],
"debug_enabled": self.getConfig("debug"),
}
md.preprocessors.register(
CalculateDocumentOffsetPreprocessor(md, meta), "capture_document", 1000
Expand All @@ -43,21 +46,16 @@ def __init__(self, md: Markdown, meta: dict):
self.meta = meta

def run(self, lines: list[str]) -> list[str]:
self.meta["document"] = lines

offset: int = 0
for line in lines:
# Skip empty lines
if len(line) == 0:
store: tuple[str, int, int] = (line, offset, offset + 1)
self.meta["document_offsets"].append(store)
self.meta["used_document_offsets"][store] = False
offset += 1
continue
# store the line and offset
store: tuple[str, int, int] = (line, offset, offset + len(line))
self.meta["document_offsets"].append(store)
self.meta["used_document_offsets"][store] = False
# plus 1 is for the newline character (\n), use the CRLF file is unknown behavior
offset += len(line) + 1

return lines


Expand All @@ -71,180 +69,156 @@ def __init__(self, md: Markdown, meta: dict):
self.meta = meta

def run(self, lines: list[str]) -> list[str]:
document_offsets: list[tuple[str, int, int]] = self.meta["document_offsets"]

# 最后一次成功匹配的文档偏移量字典索引末,开区间
last_success_match_end: int = 0
num_lines: int = 0
num_document_offsets: int = 0
while num_document_offsets < len(document_offsets) and num_lines < len(lines):
line = lines[num_lines]
document_offset: tuple[str, int, int] = document_offsets[
num_document_offsets
]

# 如果精准匹配
if document_offset[0] == line:
# 匹配该行
self.match(line, num_document_offsets, num_document_offsets + 1)
# 如果上次成功匹配的原文档偏移量未连续,匹配当前行到这部分未连续的原文档偏移量
if num_document_offsets > last_success_match_end and num_lines > 0:
self.match(
lines[num_lines - 1],
last_success_match_end,
num_document_offsets,
)
last_success_match_end = num_document_offsets + 1
num_lines += 1
num_document_offsets += 1
# 如果未能精准匹配,查找该行在原文档偏移量字典中的位置
else:
remain: list[str] = [
line for line, _, _ in document_offsets[num_document_offsets:]
]
# 如果存在这样的行
if line in remain:
# 找到第一次匹配的位置,匹配该行到此处
idx = remain.index(line) + num_document_offsets
self.match(line, idx, idx + 1)
# 如果上次成功匹配的原文档偏移量未连续,匹配当前行到这部分未连续的原文档偏移量
if idx > last_success_match_end and num_lines > 0:
self.match(lines[num_lines - 1], last_success_match_end, idx)
last_success_match_end = idx + 1
num_lines += 1
num_document_offsets = idx + 1
# 如果未找到匹配的位置,继续查找下一行
else:
num_lines += 1

# 如果行匹配完成,但原文档偏移量未匹配完成,匹配剩余的原文档偏移量
if last_success_match_end < len(document_offsets):
self.match(
lines[num_lines - 1], last_success_match_end, len(document_offsets)
self.meta["preprocessed_document"] = lines

a = self.meta["preprocessed_document"]
b = self.meta["document"]

s = SequenceMatcher(lambda x: len(x) == 0, a, b)

for tag, i1, i2, j1, j2 in s.get_opcodes():
self.meta["preprocessed_document_restore_opcodes"].append(
(tag, i1, i2, j1, j2)
)

return lines

def match(
self,
matched_line: str,
num_document_offsets_start: int,
num_document_offsets_end: int,
):
"""
将单个匹配行设置到多个原文档偏移量字典,索引范围为[num_document_offsets_start, num_document_offsets_end)
"""
document_offsets: list[tuple[str, int, int]] = self.meta["document_offsets"]
used_document_offsets: dict[tuple[str, int, int], bool] = self.meta[
"used_document_offsets"
]
for i in range(num_document_offsets_start, num_document_offsets_end):
document_offset = document_offsets[i]
# 如果是第一个匹配的原文档偏移量,设置为匹配行,否则设置为 MARK_CONTINUE
if i == num_document_offsets_start:
document_offsets[i] = (
matched_line,
document_offset[1],
document_offset[2],
)
else:
document_offsets[i] = (
MARK_CONTINUE,
document_offset[1],
document_offset[2],
)
del used_document_offsets[document_offset]
used_document_offsets[document_offsets[i]] = False


class OffsetsInjectionBlockProcessor(BlockProcessor):
"""
A block processor to mark the words in the document and inject the offset of the block to the HTML element
"""

last_processed_line_idx: int = -1

def __init__(self, parser: BlockParser, meta: dict):
super(OffsetsInjectionBlockProcessor, self).__init__(parser)
self.meta = meta

def test(self, _, block) -> bool:
# Test if there is any line in the block
for line in [line for (line, _, _) in self.meta["document_offsets"]]:
for i in range(0, len(self.meta["preprocessed_document"])):
line: str = self.meta["preprocessed_document"][i]
if len(line) == 0:
continue
if line in block:
return True
# 如果该行在块中,且该行的索引大于上次处理的索引,从此块开始处理
return i > self.last_processed_line_idx
# 如果始终找不到符合条件的行,则不处理整个块
return False

def run(self, parent: etree.Element, blocks: list[str]) -> bool:
"""
注入文档中的偏移量到HTML元素中,以便在后续的处理中可以使用这些偏移量来定位文档中的位置。目前的算法如下:
1. 从文档中查找第一个包含文本的块
2. 查找这个块在文档中的位置,这通过遍历文档中的每一行,以找到所有被包含在该块中的行,通过获取这些行的起始和结束位置,来确定这个块在文档中的位置
3. 注入这个块的起始和结束位置到HTML元素中,这会先递归的解析这个块,然后再注入这个块的起始和结束位置注入到最后一个被生成的HTML元素中
由于递归解析块时该块仍会被本处理器捕获,为了避免循环递归,我们在块的末尾添加了MARK_PREVENT_RECURSION标记,当本处理器再次捕获到这个块时,会直接跳过这个块,并清除这个标记。
"""

def run(self, parent: etree.Element, blocks: list[str]):
block: str = blocks[0]

# If the first block is handled, remove the marker and return, so that other block processors can process it
if MARK_PREVENT_RECURSION in blocks[0]:
blocks[0] = blocks[0].replace(MARK_PREVENT_RECURSION, "")
return False

start: int | None = None
end: int | None = None
used: dict[tuple[str, int, int], bool] = {}
# Search for the block fragment in the document_offsets
for store in self.meta["document_offsets"]:
# Skip empty lines
if len(store[0]) == 0:
continue
# If already used, skip
if self.meta["used_document_offsets"][store]:
continue
(line, offset, end_offset) = store
# 如果收到 MARK_CONTINUE 标记,直接认为该标记之前的行是连续的
if line == MARK_CONTINUE:
end = end_offset
used[store] = True
start: int = -1
end: int = -1

for i in range(
self.last_processed_line_idx + 1, len(self.meta["preprocessed_document"])
):
line: str = self.meta["preprocessed_document"][i]
if len(line) == 0:
continue
# If found one
if line in block:
# If the line already scanned (usually some lines with same content in different place), skip
if line in [line for (line, _, _) in used.keys()]:
continue
# If none yet set, set the start offset
if start is None:
start = offset
end = end_offset
# Or, continuing searching for the end offset until the end of the block
else:
end = end_offset
# Mark the fragment as used
used[store] = True
# If end is not found but new line not in block, reset the search and restart from the next line
elif end is None:
start = None
# Clear the used list
used = {}
continue
# If both start and end are both set and no continuously block found, break the loop
else:
if start == -1:
start = i
end = i + 1
self.last_processed_line_idx = i
elif start != -1:
break
# If both start and end are found, store the result
if start is not None and end is not None:
blocks.pop(0)
self.meta["used_document_offsets"].update(used)
# append MARK_PREVENT_RECURSION to tail of the block to prevent recursion, we don't use a handled
# flaglist because we don't know if there's some same block in the document
self.parser.parseBlocks(parent, [block + MARK_PREVENT_RECURSION])
# fix offset index out of range
if len(parent) == 0:
return True
# fix multi blocks in same parents
if self.meta["last_parent"] == parent[-1]:
parent[-1].set("data-original-document-end", str(end))
return True
parent[-1].set("data-original-document-start", str(start))
parent[-1].set("data-original-document-end", str(end))
self.meta["last_parent"] = parent[-1]
return True
return False

if start == -1:
return

previous_len = len(parent)
self.parser.parseBlocks(parent, [block])
parsed_len = len(parent)

blocks.pop(0)

restored_start = -1
restored_end = -1
restored_accurate = (False, False)

# 查找删除行
for _, i1, i2, j1, j2 in [
r
for r in self.meta["preprocessed_document_restore_opcodes"]
if r[0] == "delete"
]:
# 如果删除的行在开始和结束之间,就更新开始和结束的行号
if i1 <= start < i2:
start = i2
if i1 < end <= i2:
end = i1
# 如果这导致开始大于结束,则没有与之匹配的原文档行,直接返回
if start >= end:
return

for tag, i1, i2, j1, j2 in self.meta["preprocessed_document_restore_opcodes"]:
# 插入行(原文档有但处理后文档没有)无意义,直接跳过
if tag == "insert":
continue
# 模糊匹配替换行
if tag == "replace":
if i1 <= start < i2:
restored_start = j1
if i1 < end <= i2:
restored_end = j2

# 匹配相等行
for _, i1, i2, j1, j2 in [
r
for r in self.meta["preprocessed_document_restore_opcodes"]
if r[0] == "equal"
]:
if i1 <= start < i2:
restored_start = j1 + start - i1
restored_accurate = (True, restored_accurate[1])
if i1 < end <= i2:
restored_end = j1 + end - i1
restored_accurate = (restored_accurate[0], True)

if restored_start == -1 or restored_end == -1:
if self.meta["debug_enabled"]:
logger.error(
"Failed to restore the document offsets for the block {}-{}, restored {}-{}".format(
start, end, restored_start, restored_end
)
)
return

offset_start = self.meta["document_offsets"][restored_start][1]
offset_end = self.meta["document_offsets"][restored_end - 1][2]

if previous_len == parsed_len and len(parent) > 0:
child = parent[-1]
if child.get("data-original-document-start") is None:
child.set("data-original-document-start", str(offset_start))
if self.meta["debug_enabled"]:
logger.warning(
"Trying to patch a block without original document start, patching to current block offset in {}-{}".format(
offset_start, offset_end
)
)
child.set("data-original-document-end", str(offset_end))
if self.meta["debug_enabled"]:
child.set(
"data-original-document",
child.get("data-original-document", "")
+ "\n".join(self.meta["document"])[offset_start:offset_end],
)
child.set("data-offset-accurate-end", str(restored_accurate[1]).lower())

for i in range(parsed_len - previous_len):
child = parent[-1 - i]
child.set("data-original-document-start", str(offset_start))
child.set("data-original-document-end", str(offset_end))
if self.meta["debug_enabled"]:
child.set(
"data-original-document",
"\n".join(self.meta["document"])[offset_start:offset_end],
)
child.set(
"data-offset-accurate-start", str(restored_accurate[0]).lower()
)
child.set("data-offset-accurate-end", str(restored_accurate[1]).lower())
Loading