Skip to content

Commit

Permalink
Move log_str_to_parsed_log_stream to test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
jason810496 committed Dec 25, 2024
1 parent 57c1042 commit f9e354c
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 27 deletions.
39 changes: 12 additions & 27 deletions tests/utils/test_log_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import logging
import logging.config
import os
from contextlib import suppress
from http import HTTPStatus
from importlib import reload
from itertools import chain
Expand Down Expand Up @@ -49,7 +48,6 @@
_fetch_logs_from_service,
_get_parsed_log_stream,
_interleave_logs,
_parse_timestamp,
)
from airflow.utils.log.logging_mixin import set_context
from airflow.utils.net import get_hostname
Expand All @@ -59,6 +57,7 @@
from airflow.utils.types import DagRunType

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.file_task_handler import log_str_to_parsed_log_stream
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS

if AIRFLOW_V_3_0_PLUS:
Expand All @@ -71,20 +70,6 @@
FILE_TASK_HANDLER = "task"


def _log_sample_to_parsed_log_stream(log_sample: str):
lines = log_sample.splitlines()
timestamp = None
next_timestamp = None
for idx, line in enumerate(lines):
if line:
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, idx, line


class TestFileTaskLogHandler:
def _assert_is_log_stream_type(self, log_stream):
assert isinstance(log_stream, chain) or isinstance(log_stream, GeneratorType)
Expand Down Expand Up @@ -382,7 +367,7 @@ def test__read_when_local(self, mock_read_local, create_task_instance):
# messages, parsed_log_streams, log_size
mock_read_local.return_value = (
["the messages"],
[_log_sample_to_parsed_log_stream("the log")],
[log_str_to_parsed_log_stream("the log")],
len("the log"),
)
local_log_file_read = create_task_instance(
Expand Down Expand Up @@ -493,7 +478,7 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
# new implementation returns: messages, parsed_log_streams, log_size
fth._read_remote_logs.return_value = (
["found remote logs"],
[_log_sample_to_parsed_log_stream("remote\nlog\ncontent")],
[log_str_to_parsed_log_stream("remote\nlog\ncontent")],
16,
)
else:
Expand All @@ -503,13 +488,13 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
fth._read_from_local = mock.Mock()
fth._read_from_local.return_value = (
["found local logs"],
[_log_sample_to_parsed_log_stream("local\nlog\ncontent")],
[log_str_to_parsed_log_stream("local\nlog\ncontent")],
16,
)
fth._read_from_logs_server = mock.Mock()
fth._read_from_logs_server.return_value = (
["this message"],
[_log_sample_to_parsed_log_stream("this\nlog\ncontent")],
[log_str_to_parsed_log_stream("this\nlog\ncontent")],
16,
)

Expand Down Expand Up @@ -688,7 +673,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
]
)
log_sample1_stream = _log_sample_to_parsed_log_stream(log_sample1)
log_sample1_stream = log_str_to_parsed_log_stream(log_sample1)
log_sample2 = "\n".join(
[
"[2022-11-16T00:05:54.295-0800] {taskinstance.py:1278} INFO - Executing <Task(TimeDeltaSensorAsync): wait> on 2022-11-16 08:05:52.324532+00:00",
Expand All @@ -699,7 +684,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.309-0800] {standard_task_runner.py:83} INFO - Job 33648: Subtask wait",
]
)
log_sample2_stream = _log_sample_to_parsed_log_stream(log_sample2)
log_sample2_stream = log_str_to_parsed_log_stream(log_sample2)
log_sample3 = "\n".join(
[
"[2022-11-16T00:05:54.457-0800] {task_command.py:376} INFO - Running <TaskInstance: simple_async_timedelta.wait manual__2022-11-16T08:05:52.324532+00:00 [running]> on host daniels-mbp-2.lan",
Expand All @@ -712,7 +697,7 @@ def test_interleave_interleaves():
"[2022-11-16T00:05:54.604-0800] {taskinstance.py:1360} INFO - Pausing task as DEFERRED. dag_id=simple_async_timedelta, task_id=wait, execution_date=20221116T080552, start_date=20221116T080554",
]
)
log_sample3_stream = _log_sample_to_parsed_log_stream(log_sample3)
log_sample3_stream = log_str_to_parsed_log_stream(log_sample3)
expected = "\n".join(
[
"[2022-11-16T00:05:54.278-0800] {taskinstance.py:1258} INFO - Starting attempt 1 of 1",
Expand Down Expand Up @@ -810,9 +795,9 @@ def test_interleave_logs_correct_ordering():
"""

interleave_stream = _interleave_logs(
_log_sample_to_parsed_log_stream(sample_with_dupe),
_log_sample_to_parsed_log_stream(""),
_log_sample_to_parsed_log_stream(sample_with_dupe),
log_str_to_parsed_log_stream(sample_with_dupe),
log_str_to_parsed_log_stream(""),
log_str_to_parsed_log_stream(sample_with_dupe),
)
interleave_str = "\n".join(line for line in interleave_stream)
assert interleave_str == sample_with_dupe
Expand All @@ -831,7 +816,7 @@ def test_interleave_logs_correct_dedupe():
test"""

interleave_stream = _interleave_logs(
_log_sample_to_parsed_log_stream(sample_without_dupe),
log_str_to_parsed_log_stream(sample_without_dupe),
)
assert "\n".join(line for line in interleave_stream) == "test,\ntest"

Expand Down
37 changes: 37 additions & 0 deletions tests_common/test_utils/file_task_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from contextlib import suppress

from airflow.utils.log.file_task_handler import (
_parse_timestamp,
)


def log_str_to_parsed_log_stream(log_sample: str):
lines = log_sample.splitlines()
timestamp = None
next_timestamp = None
for idx, line in enumerate(lines):
if line:
with suppress(Exception):
# next_timestamp unchanged if line can't be parsed
next_timestamp = _parse_timestamp(line)
if next_timestamp:
timestamp = next_timestamp
yield timestamp, idx, line

0 comments on commit f9e354c

Please sign in to comment.