-
Notifications
You must be signed in to change notification settings - Fork 7
/
log_utils.py
169 lines (139 loc) · 5.77 KB
/
log_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""Common utilities."""
import logging
import logging.handlers
import os
import platform
import sys
import warnings
from pathlib import Path
from uuid import uuid4
from huggingface_hub import CommitScheduler
import requests
import json
# build a local storage that will schedule uploads to the hub
# this is the robust way of pushing it, see https://huggingface.co/spaces/Wauplin/space_to_dataset_saver
JSON_DATASET_DIR = Path("results_dataset_to_upload")
JSON_DATASET_DIR.mkdir(parents=True, exist_ok=True)
# Each instance of this space will spawn a unique file for each type of result
# For the life of that space, it will append to that file pushed to a dataset every so often
# It also is append_only, so no previous data will be overwritten
JSON_DATASET_PATH = JSON_DATASET_DIR / f"NAME_TO_REPLACE-{uuid4()}.jsonl"
if os.getenv("HF_TOKEN"):
scheduler = CommitScheduler(
repo_id="mteb/arena-results",
repo_type="dataset",
folder_path=JSON_DATASET_DIR,
path_in_repo="data",
every=5,
token=os.environ["HF_TOKEN"]
)
else:
scheduler = None
print("No HF_TOKEN found, results will not be uploaded to the hub.")
# from .utils import save_log_str_on_log_server
handler = None
visited_loggers = set()
LOGDIR = os.getenv("LOGDIR", "./MTEB-Arena-logs/vote_log")
class APIHandler(logging.Handler):
"""Custom logging handler that sends logs to an API."""
def __init__(self, apiUrl, log_path, *args, **kwargs):
super(APIHandler, self).__init__(*args, **kwargs)
self.apiUrl = apiUrl
self.log_path = log_path
def emit(self, record):
log_entry = self.format(record)
try:
save_log_str_on_log_server(log_entry, self.log_path)
except requests.RequestException as e:
print(f"Error sending log to API: {e}", file=sys.stderr)
def build_logger(logger_name, logger_filename, add_remote_handler=False):
global handler
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Set the format of root handlers
if not logging.getLogger().handlers:
if sys.version_info[1] >= 9:
# This is for windows
logging.basicConfig(level=logging.INFO, encoding="utf-8")
else:
if platform.system() == "Windows":
warnings.warn(
"If you are running on Windows, "
"we recommend you use Python >= 3.9 for UTF-8 encoding."
)
logging.basicConfig(level=logging.INFO)
logging.getLogger().handlers[0].setFormatter(formatter)
# Redirect stdout and stderr to loggers
stdout_logger = logging.getLogger("stdout")
stdout_logger.setLevel(logging.INFO)
sl = StreamToLogger(stdout_logger, logging.INFO)
sys.stdout = sl
stderr_logger = logging.getLogger("stderr")
stderr_logger.setLevel(logging.ERROR)
sl = StreamToLogger(stderr_logger, logging.ERROR)
sys.stderr = sl
# Get logger
logger = logging.getLogger(logger_name)
logger.setLevel(logging.INFO)
if add_remote_handler:
# Add APIHandler to send logs to your API
api_url = f"{LOG_SERVER_ADDR}/{SAVE_LOG}"
remote_logger_filename = str(Path(logger_filename).stem + "_remote.log")
api_handler = APIHandler(apiUrl=api_url, log_path=f"{LOGDIR}/{remote_logger_filename}")
api_handler.setFormatter(formatter)
logger.addHandler(api_handler)
stdout_logger.addHandler(api_handler)
stderr_logger.addHandler(api_handler)
# if LOGDIR is empty, then don't try output log to local file
if LOGDIR != "":
os.makedirs(LOGDIR, exist_ok=True)
filename = os.path.join(LOGDIR, logger_filename)
handler = logging.handlers.TimedRotatingFileHandler(
filename, when="D", utc=True, encoding="utf-8"
)
handler.setFormatter(formatter)
for l in [stdout_logger, stderr_logger, logger]:
if l in visited_loggers:
continue
visited_loggers.add(l)
l.addHandler(handler)
return logger
class StreamToLogger(object):
"""
Fake file-like stream object that redirects writes to a logger instance.
"""
def __init__(self, logger, log_level=logging.INFO):
self.terminal = sys.stdout
self.logger = logger
self.log_level = log_level
self.linebuf = ""
def __getattr__(self, attr):
return getattr(self.terminal, attr)
def write(self, buf):
temp_linebuf = self.linebuf + buf
self.linebuf = ""
for line in temp_linebuf.splitlines(True):
# From the io.TextIOWrapper docs:
# On output, if newline is None, any '\n' characters written
# are translated to the system default line separator.
# By default sys.stdout.write() expects '\n' newlines and then
# translates them so this is still cross platform.
if line[-1] == "\n":
encoded_message = line.encode("utf-8", "ignore").decode("utf-8")
self.logger.log(self.log_level, encoded_message.rstrip())
else:
self.linebuf += line
def flush(self):
if self.linebuf != "":
encoded_message = self.linebuf.encode("utf-8", "ignore").decode("utf-8")
self.logger.log(self.log_level, encoded_message.rstrip())
self.linebuf = ""
def store_data_in_hub(message: str, message_type: str):
if scheduler:
with scheduler.lock:
file_to_upload = Path(str(JSON_DATASET_PATH).replace("NAME_TO_REPLACE", message_type))
with file_to_upload.open("a", encoding="utf-8") as f:
json.dump(message, f, ensure_ascii=False)
f.write("\n")