Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support interrupted by ctrl+C #8

Merged
merged 5 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions browser/src/store/scanning_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ type Event = {
} | {
readonly kind: "startHandingFile";
readonly path: string;
readonly operation: FileOperation;
}| {
readonly kind: "completeHandingFile";
readonly path: string;
readonly operation: FileOperation;
} | {
readonly kind: "completeParsePdfPage";
readonly index: number;
Expand All @@ -35,7 +37,7 @@ export type ScanningStore$ = {
readonly phase: ReadonlyVal<ScanningPhase>;
readonly scanCount: ReadonlyVal<number>;
readonly handlingFile: ReadonlyVal<HandingFile | null>;
readonly completedFiles: ReadonlyVal<readonly string[]>;
readonly completedFiles: ReadonlyVal<readonly File[]>;
readonly error: ReadonlyVal<string | null>;
readonly isInterrupting: ReadonlyVal<boolean>;
readonly isInterrupted: ReadonlyVal<boolean>;
Expand All @@ -49,8 +51,14 @@ export enum ScanningPhase {
Completed,
}

export type HandingFile = {
export type File = {
readonly path: string;
readonly operation: FileOperation;
};

export type FileOperation = "create" | "update" | "remove";

export type HandingFile = File & {
readonly handlePdfPage?: {
readonly index: number;
readonly total: number;
Expand All @@ -68,7 +76,7 @@ export class ScanningStore {
readonly #phase$: Val<ScanningPhase> = val(ScanningPhase.Ready);
readonly #scanCount$: Val<number> = val(0);
readonly #handlingFile$: Val<HandingFile | null> = val<HandingFile | null>(null);
readonly #completedFiles$: Val<readonly string[]> = val<readonly string[]>([]);
readonly #completedFiles$: Val<readonly File[]> = val<readonly File[]>([]);
readonly #error$: Val<string | null> = val<string | null>(null);
readonly #isInterrupting$: Val<boolean> = val(false);
readonly #isInterrupted$: Val<boolean> = val(false);
Expand Down Expand Up @@ -138,14 +146,20 @@ export class ScanningStore {
break;
}
case "startHandingFile": {
this.#handlingFile$.set({ path: event.path });
this.#handlingFile$.set({
path: event.path,
operation: event.operation,
});
break;
}
case "completeHandingFile": {
this.#handlingFile$.set(null);
this.#completedFiles$.set([
...this.#completedFiles$.value,
event.path,
{
path: event.path,
operation: event.operation,
},
]);
break;
}
Expand Down
1 change: 0 additions & 1 deletion browser/src/views/App.module.less
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ body * {
align-items: stretch;

> div {
padding: 0 24px;
flex-grow: 0;
flex-shrink: 1;
flex-basis: 1024px;
Expand Down
1 change: 0 additions & 1 deletion browser/src/views/ScannerPage.module.less
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
.root {
margin-bottom: 68px;
overflow-y: scroll;
}

Expand Down
29 changes: 24 additions & 5 deletions browser/src/views/ScannerPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Skeleton, Result, Steps, List, Button, Divider, Progress, Typography }
import { ScanOutlined, ProfileTwoTone, SyncOutlined, FilePdfTwoTone, PauseOutlined } from "@ant-design/icons";
import { val } from "value-enhancer";
import { useVal } from "use-value-enhancer";
import { ScannerStore, ScanningStore, ScanningPhase } from "../store";
import { ScannerStore, ScanningStore, ScanningPhase, FileOperation } from "../store";
import { Sources } from "./Sources";

const { Title, Paragraph } = Typography;
Expand Down Expand Up @@ -149,15 +149,15 @@ const ScanningPanel: React.FC<ScanningPanelProps> = ({ store }) => {
for (const file of completedFiles) {
records.push({
icon: <FilePdfTwoTone />,
title: "录入 PDF 文件",
content: file,
title: `${textWithOperation(file.operation)} PDF 文件`,
content: file.path,
loading: false,
});
}
if (handlingFile) {
records.push({
icon: <FilePdfTwoTone />,
title: "录入 PDF 文件",
title: `${textWithOperation(handlingFile.operation)} PDF 文件`,
content: handlingFile.path,
loading: true,
});
Expand Down Expand Up @@ -242,4 +242,23 @@ const ProgressBar: React.FC<ProgressBarProps> = ({ name, error, pdfPage }) => {
status={status} />
</div>
);
};
};

function textWithOperation(operation: FileOperation): string {
let text: string = "";
switch (operation) {
case "create": {
text = "录入";
break;
}
case "update": {
text = "更新";
break;
}
case "remove": {
text = "删除";
break;
}
}
return text;
}
3 changes: 0 additions & 3 deletions index_package/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,6 @@ def query(self, text: str, results_limit: Optional[int]) -> QueryResult:
def page_content(self, pdf_hash: str, page_index: int) -> str:
return self._get_service_in_thread().page_content(pdf_hash, page_index)

def freeze_database(self):
pass # TODO: 因为强制退出导致数据结构损坏,此处需要冻结数据库并重新开始

def scan_job(self, max_workers: int = 1, progress_event_listener: Optional[ProgressEventListener] = None) -> ServiceScanJob:
if progress_event_listener is None:
progress_event_listener = lambda _: None
Expand Down
3 changes: 2 additions & 1 deletion server/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ def launch():
app_dir = os.path.abspath(app_dir)
sources = Sources(os.path.join(app_dir, "app.sqlite3"))
service = ServiceRef(
app=app,
workspace_path=app_dir,
embedding_model="shibing624/text2vec-base-chinese",
sources=sources,
embedding_model="shibing624/text2vec-base-chinese",
)
routes(app, service)
app.run(host="0.0.0.0", port=port)
Expand Down
43 changes: 31 additions & 12 deletions server/progress_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
CompleteHandleFileEvent,
PDFFileProgressEvent,
PDFFileStep,
HandleFileOperation,
)


Expand All @@ -27,9 +28,15 @@ class InterruptionStatus(IntEnum):
@dataclass
class HandingFile:
path: str
operation: HandleFileOperation
pdf_handing: tuple[int, int] | None = None
pdf_indexing: tuple[int, int] | None = None

@dataclass
class File:
path: str
operation: HandleFileOperation

class ProgressEvents:
def __init__(self):
self._phase: ProgressPhase = ProgressPhase.READY
Expand All @@ -38,7 +45,7 @@ def __init__(self):
self._handing_file: HandingFile | None = None
self._error: str | None = None
self._interruption_status: InterruptionStatus = InterruptionStatus.No
self._completed_files: list[str] = []
self._completed_files: list[File] = []
self._fetcher_lock: Lock = Lock()
self._fetcher_queues: list[Queue[dict]] = []

Expand Down Expand Up @@ -69,10 +76,11 @@ def _init_events(self) -> list[dict]:
"kind": "scanCompleted",
"count": self._updated_files,
})
for path in self._completed_files:
for file in self._completed_files:
events.append({
"kind": "completeHandingFile",
"path": path,
"path": file.path,
"operation": file.operation.value,
})
if self._phase == ProgressPhase.COMPLETED:
events.append({ "kind": "completed" })
Expand All @@ -82,6 +90,7 @@ def _init_events(self) -> list[dict]:
events.append({
"kind": "startHandingFile",
"path": self._handing_file.path,
"operation": self._handing_file.operation.value,
})
if self._handing_file.pdf_handing is not None:
index, total = self._handing_file.pdf_handing
Expand Down Expand Up @@ -114,7 +123,7 @@ def receive_event(self, event: ProgressEvent):
if isinstance(event, ScanCompletedEvent):
self._on_scan_completed(event.updated_files)
elif isinstance(event, StartHandleFileEvent):
self._on_start_handle_file(event.path)
self._on_start_handle_file(event.path, event.operation)
elif isinstance(event, CompleteHandleFileEvent):
self._on_complete_handle_file(event.path)
elif isinstance(event, PDFFileProgressEvent):
Expand All @@ -133,25 +142,35 @@ def _on_scan_completed(self, updated_files: int):
"count": updated_files,
})

def _on_start_handle_file(self, path: str):
def _on_start_handle_file(self, path: str, operation: HandleFileOperation):
with self._status_lock:
self._handing_file = HandingFile(path=path)

self._handing_file = HandingFile(
path=path,
operation=operation,
)
self._emit_event({
"kind": "startHandingFile",
"path": path,
"operation": operation.value,
})

def _on_complete_handle_file(self, path: str):
file: File | None = None
with self._status_lock:
self._completed_files.append(path)
if self._handing_file is not None and self._handing_file.path == path:
file = File(
path=path,
operation=self._handing_file.operation,
)
self._completed_files.append(file)
self._handing_file = None

self._emit_event({
"kind": "completeHandingFile",
"path": path,
})
if file is not None:
self._emit_event({
"kind": "completeHandingFile",
"path": file.path,
"operation": file.operation.value,
})

def _on_pdf_parse_progress(self, page_index: int, total_pages: int):
with self._status_lock:
Expand Down
1 change: 1 addition & 0 deletions server/routes.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys

from .service import ServiceRef
from flask import (
Expand Down
19 changes: 17 additions & 2 deletions server/service.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
from threading import Thread, Lock, Event
from typing import Generator
from json import dumps
from flask import Flask
from index_package import Service, ServiceScanJob
from .sources import Sources
from .progress_events import ProgressEvents
from .signal_handler import SignalHandler


class ServiceRef:
def __init__(self, workspace_path: str, embedding_model: str, sources: Sources):
def __init__(self,
app: Flask,
sources: Sources,
workspace_path: str,
embedding_model: str,
):
self._app: Flask = app
self._sources: Sources = sources
self._workspace_path: str = workspace_path
self._embedding_model: str = embedding_model
self._sources: Sources = sources
self._lock: Lock = Lock()
self._service: Service | None = None
self._is_scanning: bool = False
self._scan_job: ServiceScanJob | None = None
self._scan_job_event: Event | None = None
self._progress_events: ProgressEvents = ProgressEvents()
self._signal_handler = SignalHandler()

@property
def ref(self) -> Service:
Expand Down Expand Up @@ -70,6 +79,11 @@ def _scan(self):
self._scan_job_event.set()
self._scan_job_event = None

success_bind = self._signal_handler.bind_scan_job(scan_job)
if not success_bind:
self._progress_events.set_interrupted()
return

try:
try:
completed = scan_job.start({
Expand All @@ -89,6 +103,7 @@ def _scan(self):
self._progress_events.set_interrupted()

finally:
self._signal_handler.unbind_scan_job()
with self._lock:
self._is_scanning = False
self._scan_job = None
Expand Down
69 changes: 69 additions & 0 deletions server/signal_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import signal
import time
import sys
import threading

from typing import Optional
from index_package import ServiceScanJob

class SignalHandler:
def __init__(self):
self._scan_job: Optional[ServiceScanJob] = None
self._first_interrupted_at: Optional[float] = None
self._lock: threading.Lock = threading.Lock()
self._scan_unbidden_event: threading.Event | None = None
signal.signal(signal.SIGINT, self._on_sigint)

@property
def is_interrupting(self) -> bool:
with self._lock:
return self._first_interrupted_at is not None

# return False when is interrupting
def bind_scan_job(self, scan_job: ServiceScanJob) -> bool:
with self._lock:
if self._scan_job is not None:
raise Exception("SignalHandler already watching a scan job")
if self._first_interrupted_at is not None:
return False
self._scan_job = scan_job
return True

def unbind_scan_job(self):
with self._lock:
self._scan_job = None
if self._scan_unbidden_event is not None:
self._scan_unbidden_event.set()
self._scan_unbidden_event = None

def _on_sigint(self, sig, frame):
limit_seconds = 12.0
with self._lock:
scan_job = self._scan_job
first_interrupted_at = self._first_interrupted_at

if scan_job is not None and \
first_interrupted_at is None:
event = threading.Event()
print("\nInterrupting...")
with self._lock:
self._first_interrupted_at = time.time()
self._scan_unbidden_event = event
scan_job.interrupt()
event.wait()
sys.exit(0)

elif first_interrupted_at is None:
print("\nExiting...")
sys.exit(130)

else:
duration_seconds = time.time() - first_interrupted_at
if duration_seconds <= limit_seconds:
str_seconds = "{:.2f}".format(limit_seconds - duration_seconds)
print(f"\nForce stopping... (press again to force stop after {str_seconds}s)")
else:
print("\nForce stopping...")
print("It may corrupt the data structure of the database")
# TODO: 因为强制退出导致数据结构损坏,此处需要冻结数据库并重新开始
sys.exit(1)