Skip to content

Commit

Permalink
refactor: generalize log streaming from serve_utils._follow_logs in…
Browse files Browse the repository at this point in the history
…to centralized `log_utils.follow_logs`
  • Loading branch information
andylizf committed Nov 8, 2024
1 parent bb39707 commit 1f25cd3
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 72 deletions.
105 changes: 38 additions & 67 deletions sky/serve/serve_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,61 +598,48 @@ def _follow_replica_logs(
finish_stream: Callable[[], bool],
exit_if_stream_end: bool = False,
no_new_content_timeout: Optional[int] = None) -> Iterator[str]:
line = ''
log_file = None
no_new_content_cnt = 0

def cluster_is_up() -> bool:
cluster_record = global_user_state.get_cluster_from_name(cluster_name)
if cluster_record is None:
return False
return cluster_record['status'] == status_lib.ClusterStatus.UP

while True:
tmp = file.readline()
if tmp is not None and tmp != '':
no_new_content_cnt = 0
line += tmp
if '\n' in line or '\r' in line:
# Tailing detailed progress for user. All logs in skypilot is
# of format `To view detailed progress: tail -n100 -f *.log`.
x = re.match(_SKYPILOT_PROVISION_LOG_PATTERN, line)
if x is not None:
log_file = os.path.expanduser(x.group(1))
elif re.match(_SKYPILOT_LOG_PATTERN, line) is None:
# Not print other logs (file sync logs) since we lack
# utility to determine when these log files are finished
# writing.
# TODO(tian): Not skip these logs since there are small
# chance that error will happen in file sync. Need to find
# a better way to do this.
yield line
# Output next line first since it indicates the process is
# starting. For our launching logs, it's always:
# Launching on <cloud> <region> (<zone>)
if log_file is not None:
with open(log_file, 'r', newline='',
encoding='utf-8') as f:
# We still exit if more than 10 seconds without new
# content to avoid any internal bug that causes
# the launch failed and cluster status remains INIT.
for l in _follow_replica_logs(
f,
cluster_name,
finish_stream=cluster_is_up,
exit_if_stream_end=exit_if_stream_end,
no_new_content_timeout=10):
yield l
log_file = None
line = ''
else:
if exit_if_stream_end or finish_stream():
break
if no_new_content_timeout is not None:
if no_new_content_cnt >= no_new_content_timeout:
break
no_new_content_cnt += 1
time.sleep(1)
def handle_line(line: str) -> Iterator[str]:
# Tailing detailed progress for user. All logs in skypilot is
# of format `To view detailed progress: tail -n100 -f *.log`.
match_provision = re.match(_SKYPILOT_PROVISION_LOG_PATTERN, line)
if match_provision:
log_file = os.path.expanduser(match_provision.group(1))
yield line
with open(log_file, 'r', newline='', encoding='utf-8') as f:
# We still exit if more than 10 seconds without new
# content to avoid any internal bug that causes
# the launch failed and cluster status remains INIT.
yield from _follow_replica_logs(
f,
cluster_name,
finish_stream=cluster_is_up,
exit_if_stream_end=exit_if_stream_end,
no_new_content_timeout=10)
elif not re.match(_SKYPILOT_LOG_PATTERN, line):
# Not print other logs (file sync logs) since we lack
# utility to determine when these log files are finished
# writing.
# TODO(tian): Not skip these logs since there are small
# chance that error will happen in file sync. Need to find
# a better way to do this.

# Output next line first since it indicates the process is
# starting. For our launching logs, it's always:
# Launching on <cloud> <region> (<zone>)
yield line

return log_utils.follow_logs(file,
finish_stream=finish_stream,
exit_if_stream_end=exit_if_stream_end,
line_handler=handle_line,
no_new_content_timeout=no_new_content_timeout)


def stream_replica_logs(service_name: str, replica_id: int,
Expand Down Expand Up @@ -719,22 +706,6 @@ def _get_replica_status() -> serve_state.ReplicaStatus:
return ''


def _follow_logs(file: TextIO, *, finish_stream: Callable[[], bool],
exit_if_stream_end: bool) -> Iterator[str]:
line = ''
while True:
tmp = file.readline()
if tmp is not None and tmp != '':
line += tmp
if '\n' in line or '\r' in line:
yield line
line = ''
else:
if exit_if_stream_end or finish_stream():
break
time.sleep(1)


def stream_serve_process_logs(service_name: str, stream_controller: bool,
follow: bool) -> str:
msg = check_service_status_healthy(service_name)
Expand All @@ -753,9 +724,9 @@ def _service_is_terminal() -> bool:

with open(os.path.expanduser(log_file), 'r', newline='',
encoding='utf-8') as f:
for line in _follow_logs(f,
finish_stream=_service_is_terminal,
exit_if_stream_end=not follow):
for line in log_utils.follow_logs(f,
finish_stream=_service_is_terminal,
exit_if_stream_end=not follow):
print(line, end='', flush=True)
return ''

Expand Down
5 changes: 1 addition & 4 deletions sky/skylet/log_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,11 +315,8 @@ def run_bash_command_with_log(bash_command: str,
# Need this `-i` option to make sure `source ~/.bashrc` work.
inner_command = f'/bin/bash -i {script_path}'

subprocess_cmd: Union[str, List[str]]
subprocess_cmd = inner_command

return run_with_log(
subprocess_cmd,
inner_command,
log_path,
stream_logs=stream_logs,
with_ray=with_ray,
Expand Down
44 changes: 43 additions & 1 deletion sky/utils/log_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Logging utils."""
import enum
import time
import types
from typing import List, Optional, Type
from typing import Callable, Iterator, List, Optional, TextIO, Type

import colorama
import pendulum
Expand Down Expand Up @@ -284,3 +285,44 @@ def readable_time_duration(start: Optional[float],
diff = diff.replace('hour', 'hr')

return diff


def follow_logs(file: TextIO,
*,
finish_stream: Callable[[], bool],
exit_if_stream_end: bool = False,
line_handler: Optional[Callable[[str], Iterator[str]]] = None,
no_new_content_timeout: Optional[int] = None) -> Iterator[str]:
"""Follow and process logs line by line.
Args:
file: Text file to read logs from
finish_stream: Callback to determine if streaming should stop
exit_if_stream_end: Whether to exit when reaching end of file
line_handler: Optional callback to process each line
no_new_content_timeout: Seconds to wait before timing out on
no new content
Yields:
Lines of logs, or nested lines of logs if line_handler is provided.
"""
line = ''
no_new_content_cnt = 0

while True:
tmp = file.readline()
if tmp:
no_new_content_cnt = 0
line += tmp
if '\n' in line or '\r' in line:
if line_handler:
yield from line_handler(line)
else:
yield line
line = ''
else:
if exit_if_stream_end or finish_stream():
break
if no_new_content_timeout is not None:
if no_new_content_cnt >= no_new_content_timeout:
break
no_new_content_cnt += 1
time.sleep(1)

0 comments on commit 1f25cd3

Please sign in to comment.