|
| 1 | +from typing import Dict, Iterable, Tuple |
| 2 | +from enum import Enum, auto |
| 3 | +import threading |
| 4 | +from selfie_lib.TypedPath import TypedPath |
| 5 | +from selfie_lib.Slice import Slice |
| 6 | + |
| 7 | + |
| 8 | +# Placeholder implementations for CallStack, SnapshotFileLayout, and FS |
| 9 | +class CallStack: |
| 10 | + pass |
| 11 | + |
| 12 | + |
| 13 | +class SnapshotFileLayout: |
| 14 | + def sourcePathForCall(self, location) -> "TypedPath": |
| 15 | + # Placeholder return or raise NotImplementedError |
| 16 | + raise NotImplementedError("sourcePathForCall is not implemented") |
| 17 | + |
| 18 | + |
| 19 | +class WritableComment(Enum): |
| 20 | + NO_COMMENT = auto() |
| 21 | + ONCE = auto() |
| 22 | + FOREVER = auto() |
| 23 | + |
| 24 | + @property |
| 25 | + def writable(self) -> bool: |
| 26 | + return self != WritableComment.NO_COMMENT |
| 27 | + |
| 28 | + |
| 29 | +class CommentTracker: |
| 30 | + def __init__(self): |
| 31 | + self.cache: Dict[TypedPath, WritableComment] = {} |
| 32 | + self.lock = threading.Lock() |
| 33 | + |
| 34 | + def pathsWithOnce(self) -> Iterable[TypedPath]: |
| 35 | + with self.lock: |
| 36 | + return [ |
| 37 | + path |
| 38 | + for path, comment in self.cache.items() |
| 39 | + if comment == WritableComment.ONCE |
| 40 | + ] |
| 41 | + |
| 42 | + def hasWritableComment(self, call: CallStack, layout: SnapshotFileLayout) -> bool: |
| 43 | + path = layout.sourcePathForCall(call) |
| 44 | + with self.lock: |
| 45 | + if path in self.cache: |
| 46 | + comment = self.cache[path] |
| 47 | + if comment.writable: |
| 48 | + return True |
| 49 | + else: |
| 50 | + return False |
| 51 | + else: |
| 52 | + new_comment, _ = self.__commentAndLine(path) |
| 53 | + self.cache[path] = new_comment |
| 54 | + return new_comment.writable |
| 55 | + |
| 56 | + @staticmethod |
| 57 | + def commentString(typedPath: TypedPath) -> Tuple[str, int]: |
| 58 | + comment, line = CommentTracker.__commentAndLine(typedPath) |
| 59 | + if comment == WritableComment.NO_COMMENT: |
| 60 | + raise ValueError("No writable comment found") |
| 61 | + elif comment == WritableComment.ONCE: |
| 62 | + return ("//selfieonce", line) |
| 63 | + elif comment == WritableComment.FOREVER: |
| 64 | + return ("//SELFIEWRITE", line) |
| 65 | + else: |
| 66 | + raise ValueError("Invalid comment type") |
| 67 | + |
| 68 | + @staticmethod |
| 69 | + def __commentAndLine(typedPath: TypedPath) -> Tuple[WritableComment, int]: |
| 70 | + with open(typedPath.absolute_path, "r") as file: |
| 71 | + content = Slice(file.read()) |
| 72 | + for comment_str in [ |
| 73 | + "//selfieonce", |
| 74 | + "// selfieonce", |
| 75 | + "//SELFIEWRITE", |
| 76 | + "// SELFIEWRITE", |
| 77 | + ]: |
| 78 | + index = content.indexOf(comment_str) |
| 79 | + if index != -1: |
| 80 | + lineNumber = content.baseLineAtOffset(index) |
| 81 | + comment = ( |
| 82 | + WritableComment.ONCE |
| 83 | + if "once" in comment_str |
| 84 | + else WritableComment.FOREVER |
| 85 | + ) |
| 86 | + return (comment, lineNumber) |
| 87 | + return (WritableComment.NO_COMMENT, -1) |
0 commit comments