From 67d20d5808c26499612e441d3942a94ad06a85f7 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Fri, 20 Dec 2024 21:08:50 +0100 Subject: [PATCH 1/7] Add HTTP retry handling into task SDK api.client --- task_sdk/pyproject.toml | 1 + task_sdk/src/airflow/sdk/api/client.py | 25 ++++++ task_sdk/tests/api/test_client.py | 117 +++++++++++++++++++++++++ 3 files changed, 143 insertions(+) diff --git a/task_sdk/pyproject.toml b/task_sdk/pyproject.toml index aa0271c85fca7..a27f4cb7c91ed 100644 --- a/task_sdk/pyproject.toml +++ b/task_sdk/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "msgspec>=0.18.6", "psutil>=6.1.0", "structlog>=24.4.0", + "retryhttp>=1.2.0", ] classifiers = [ "Framework :: Apache Airflow", diff --git a/task_sdk/src/airflow/sdk/api/client.py b/task_sdk/src/airflow/sdk/api/client.py index 7488ef3e88a81..a24c6eaa91b10 100644 --- a/task_sdk/src/airflow/sdk/api/client.py +++ b/task_sdk/src/airflow/sdk/api/client.py @@ -17,6 +17,7 @@ from __future__ import annotations +import os import sys import uuid from http import HTTPStatus @@ -26,6 +27,8 @@ import msgspec import structlog from pydantic import BaseModel +from retryhttp import retry, wait_retry_after +from tenacity import wait_random_exponential from uuid6 import uuid7 from airflow.sdk import __version__ @@ -268,6 +271,14 @@ def noop_handler(request: httpx.Request) -> httpx.Response: return httpx.Response(200, json={"text": "Hello, world!"}) +# Config options for SDK how retries on HTTP requests should be handled +# Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min +# As long as there is no other config facility in SDK we use ENV for the moment +API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)) +API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1)) +API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90)) + + class Client(httpx.Client): def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, **kwargs: Any): if (not base_url) ^ dry_run: @@ -289,6 +300,20 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, * **kwargs, ) + _default_wait = wait_random_exponential(min=API_RETRY_WAIT_MIN, max=API_RETRY_WAIT_MAX) + + @retry( + reraise=True, + max_attempt_number=API_RETRIES, + wait_server_errors=_default_wait, + wait_network_errors=_default_wait, + wait_timeouts=_default_wait, + wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429 + ) + def request(self, *args, **kwargs): + """Implement a convenience for httpx.Client.request with a retry layer.""" + return super().request(*args, **kwargs) + # We "group" or "namespace" operations by what they operate on, rather than a flat namespace with all # methods on one object prefixed with the object type (`.task_instances.update` rather than # `task_instance_update` etc.) diff --git a/task_sdk/tests/api/test_client.py b/task_sdk/tests/api/test_client.py index 279502793ee23..7318c58d689bd 100644 --- a/task_sdk/tests/api/test_client.py +++ b/task_sdk/tests/api/test_client.py @@ -18,6 +18,7 @@ from __future__ import annotations import json +from unittest import mock import httpx import pytest @@ -82,6 +83,122 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert err.value.args == ("Not found",) assert err.value.detail is None + @mock.patch("time.sleep", return_value=None) + def test_retry_handling_unrecoverable_error(self, mock_sleep): + responses: list[httpx.Response] = [ + *[httpx.Response(500, text="Internal Server Error")] * 11, + httpx.Response(200, json={"detail": "Recovered from error - but will fail before"}), + httpx.Response(400, json={"detail": "Should not get here"}), + ] + + def mock_handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + client = Client( + base_url=None, + dry_run=True, + token="", + mounts={"'http://": httpx.MockTransport(mock_handle_request)}, + ) + + with pytest.raises(httpx.HTTPStatusError) as err: + client.get("http://error") + assert not isinstance(err.value, ServerResponseError) + assert len(responses) == 3 + assert mock_sleep.call_count == 9 + + @mock.patch("time.sleep", return_value=None) + def test_retry_handling_recovered(self, mock_sleep): + responses: list[httpx.Response] = [ + *[httpx.Response(500, text="Internal Server Error")] * 3, + httpx.Response(200, json={"detail": "Recovered from error"}), + httpx.Response(400, json={"detail": "Should not get here"}), + ] + + def mock_handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + client = Client( + base_url=None, + dry_run=True, + token="", + mounts={"'http://": httpx.MockTransport(mock_handle_request)}, + ) + + response = client.get("http://error") + assert response.status_code == 200 + assert len(responses) == 1 + assert mock_sleep.call_count == 3 + + @mock.patch("time.sleep", return_value=None) + def test_retry_handling_overload(self, mock_sleep): + responses: list[httpx.Response] = [ + httpx.Response(429, text="I am really busy atm, please back-off", headers={"Retry-After": "37"}), + httpx.Response(200, json={"detail": "Recovered from error"}), + httpx.Response(400, json={"detail": "Should not get here"}), + ] + + def mock_handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + client = Client( + base_url=None, + dry_run=True, + token="", + mounts={"'http://": httpx.MockTransport(mock_handle_request)}, + ) + + response = client.get("http://error") + assert response.status_code == 200 + assert len(responses) == 1 + assert mock_sleep.call_count == 1 + assert mock_sleep.call_args[0][0] == 37 + + @mock.patch("time.sleep", return_value=None) + def test_retry_handling_non_retry_error(self, mock_sleep): + responses: list[httpx.Response] = [ + httpx.Response(422, json={"detail": "Somehow this is a bad request"}), + httpx.Response(400, json={"detail": "Should not get here"}), + ] + + def mock_handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + client = Client( + base_url=None, + dry_run=True, + token="", + mounts={"'http://": httpx.MockTransport(mock_handle_request)}, + ) + + with pytest.raises(ServerResponseError) as err: + client.get("http://error") + assert len(responses) == 1 + assert mock_sleep.call_count == 0 + assert err.value.args == ("Somehow this is a bad request",) + + @mock.patch("time.sleep", return_value=None) + def test_retry_handling_ok(self, mock_sleep): + responses: list[httpx.Response] = [ + httpx.Response(200, json={"detail": "Recovered from error"}), + httpx.Response(400, json={"detail": "Should not get here"}), + ] + + def mock_handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + client = Client( + base_url=None, + dry_run=True, + token="", + mounts={"'http://": httpx.MockTransport(mock_handle_request)}, + ) + + response = client.get("http://error") + assert response.status_code == 200 + assert len(responses) == 1 + assert mock_sleep.call_count == 0 + def make_client(transport: httpx.MockTransport) -> Client: """Get a client with a custom transport""" From 454beadae9810766ac29e0eb2f179ee815a48031 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 21 Dec 2024 13:14:45 +0100 Subject: [PATCH 2/7] Add logging of call failures --- task_sdk/src/airflow/sdk/api/client.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/task_sdk/src/airflow/sdk/api/client.py b/task_sdk/src/airflow/sdk/api/client.py index a24c6eaa91b10..3ec8187120d61 100644 --- a/task_sdk/src/airflow/sdk/api/client.py +++ b/task_sdk/src/airflow/sdk/api/client.py @@ -17,6 +17,7 @@ from __future__ import annotations +import logging import os import sys import uuid @@ -28,7 +29,7 @@ import structlog from pydantic import BaseModel from retryhttp import retry, wait_retry_after -from tenacity import wait_random_exponential +from tenacity import before_log, wait_random_exponential from uuid6 import uuid7 from airflow.sdk import __version__ @@ -309,6 +310,7 @@ def __init__(self, *, base_url: str | None, dry_run: bool = False, token: str, * wait_network_errors=_default_wait, wait_timeouts=_default_wait, wait_rate_limited=wait_retry_after(fallback=_default_wait), # No infinite timeout on HTTP 429 + before_sleep=before_log(log, logging.WARNING), ) def request(self, *args, **kwargs): """Implement a convenience for httpx.Client.request with a retry layer.""" From ee2d69c84818b0452808831847c22c2b3dca4a33 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 21 Dec 2024 15:01:06 +0100 Subject: [PATCH 3/7] Prevent task sdk tests with LocalExecutor fail with retries --- tests/cli/commands/remote_commands/test_task_command.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/cli/commands/remote_commands/test_task_command.py b/tests/cli/commands/remote_commands/test_task_command.py index 66177c2d84ecd..843d6817cdcc5 100644 --- a/tests/cli/commands/remote_commands/test_task_command.py +++ b/tests/cli/commands/remote_commands/test_task_command.py @@ -496,6 +496,8 @@ def test_cli_run_no_local_no_raw_runs_executor(self, dag_maker): mock.patch( "airflow.executors.executor_loader.ExecutorLoader.get_default_executor" ) as get_default_mock, + mock.patch("airflow.executors.local_executor.SimpleQueue"), # Prevent a task being queued + mock.patch("airflow.executors.local_executor.LocalExecutor.end"), ): EmptyOperator(task_id="task1") EmptyOperator(task_id="task2", executor="foo_executor_alias") From 0f973641a1eb2c8521ed3d7e166d3bad2d512e0b Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 22 Dec 2024 22:28:47 +0100 Subject: [PATCH 4/7] Review feedback --- task_sdk/tests/api/test_client.py | 111 ++++++++---------------------- 1 file changed, 29 insertions(+), 82 deletions(-) diff --git a/task_sdk/tests/api/test_client.py b/task_sdk/tests/api/test_client.py index 7318c58d689bd..4644613741ec6 100644 --- a/task_sdk/tests/api/test_client.py +++ b/task_sdk/tests/api/test_client.py @@ -31,18 +31,28 @@ from airflow.utils.state import TerminalTIState -class TestClient: - def test_error_parsing(self): - def handle_request(request: httpx.Request) -> httpx.Response: - """ - A transport handle that always returns errors - """ +def make_client(transport: httpx.MockTransport) -> Client: + """Get a client with a custom transport""" + return Client(base_url="test://server", token="", transport=transport) - return httpx.Response(422, json={"detail": [{"loc": ["#0"], "msg": "err", "type": "required"}]}) - client = Client( - base_url=None, dry_run=True, token="", mounts={"'http://": httpx.MockTransport(handle_request)} - ) +def make_client_w_responses(responses: list[httpx.Response]) -> Client: + """Helper fixture to create a mock client with custom responses.""" + + def handle_request(request: httpx.Request) -> httpx.Response: + return responses.pop(0) + + return Client( + base_url=None, dry_run=True, token="", mounts={"'http://": httpx.MockTransport(handle_request)} + ) + + +class TestClient: + def test_error_parsing(self): + responses = [ + httpx.Response(422, json={"detail": [{"loc": ["#0"], "msg": "err", "type": "required"}]}) + ] + client = make_client_w_responses(responses) with pytest.raises(ServerResponseError) as err: client.get("http://error") @@ -54,29 +64,16 @@ def handle_request(request: httpx.Request) -> httpx.Response: ] def test_error_parsing_plain_text(self): - def handle_request(request: httpx.Request) -> httpx.Response: - """ - A transport handle that always returns errors - """ - - return httpx.Response(422, content=b"Internal Server Error") - - client = Client( - base_url=None, dry_run=True, token="", mounts={"'http://": httpx.MockTransport(handle_request)} - ) + responses = [httpx.Response(422, content=b"Internal Server Error")] + client = make_client_w_responses(responses) with pytest.raises(httpx.HTTPStatusError) as err: client.get("http://error") assert not isinstance(err.value, ServerResponseError) def test_error_parsing_other_json(self): - def handle_request(request: httpx.Request) -> httpx.Response: - # Some other json than an error body. - return httpx.Response(404, json={"detail": "Not found"}) - - client = Client( - base_url=None, dry_run=True, token="", mounts={"'http://": httpx.MockTransport(handle_request)} - ) + responses = [httpx.Response(404, json={"detail": "Not found"})] + client = make_client_w_responses(responses) with pytest.raises(ServerResponseError) as err: client.get("http://error") @@ -90,16 +87,7 @@ def test_retry_handling_unrecoverable_error(self, mock_sleep): httpx.Response(200, json={"detail": "Recovered from error - but will fail before"}), httpx.Response(400, json={"detail": "Should not get here"}), ] - - def mock_handle_request(request: httpx.Request) -> httpx.Response: - return responses.pop(0) - - client = Client( - base_url=None, - dry_run=True, - token="", - mounts={"'http://": httpx.MockTransport(mock_handle_request)}, - ) + client = make_client_w_responses(responses) with pytest.raises(httpx.HTTPStatusError) as err: client.get("http://error") @@ -114,16 +102,7 @@ def test_retry_handling_recovered(self, mock_sleep): httpx.Response(200, json={"detail": "Recovered from error"}), httpx.Response(400, json={"detail": "Should not get here"}), ] - - def mock_handle_request(request: httpx.Request) -> httpx.Response: - return responses.pop(0) - - client = Client( - base_url=None, - dry_run=True, - token="", - mounts={"'http://": httpx.MockTransport(mock_handle_request)}, - ) + client = make_client_w_responses(responses) response = client.get("http://error") assert response.status_code == 200 @@ -137,16 +116,7 @@ def test_retry_handling_overload(self, mock_sleep): httpx.Response(200, json={"detail": "Recovered from error"}), httpx.Response(400, json={"detail": "Should not get here"}), ] - - def mock_handle_request(request: httpx.Request) -> httpx.Response: - return responses.pop(0) - - client = Client( - base_url=None, - dry_run=True, - token="", - mounts={"'http://": httpx.MockTransport(mock_handle_request)}, - ) + client = make_client_w_responses(responses) response = client.get("http://error") assert response.status_code == 200 @@ -160,16 +130,7 @@ def test_retry_handling_non_retry_error(self, mock_sleep): httpx.Response(422, json={"detail": "Somehow this is a bad request"}), httpx.Response(400, json={"detail": "Should not get here"}), ] - - def mock_handle_request(request: httpx.Request) -> httpx.Response: - return responses.pop(0) - - client = Client( - base_url=None, - dry_run=True, - token="", - mounts={"'http://": httpx.MockTransport(mock_handle_request)}, - ) + client = make_client_w_responses(responses) with pytest.raises(ServerResponseError) as err: client.get("http://error") @@ -183,16 +144,7 @@ def test_retry_handling_ok(self, mock_sleep): httpx.Response(200, json={"detail": "Recovered from error"}), httpx.Response(400, json={"detail": "Should not get here"}), ] - - def mock_handle_request(request: httpx.Request) -> httpx.Response: - return responses.pop(0) - - client = Client( - base_url=None, - dry_run=True, - token="", - mounts={"'http://": httpx.MockTransport(mock_handle_request)}, - ) + client = make_client_w_responses(responses) response = client.get("http://error") assert response.status_code == 200 @@ -200,11 +152,6 @@ def mock_handle_request(request: httpx.Request) -> httpx.Response: assert mock_sleep.call_count == 0 -def make_client(transport: httpx.MockTransport) -> Client: - """Get a client with a custom transport""" - return Client(base_url="test://server", token="", transport=transport) - - class TestTaskInstanceOperations: """ Test that the TestVariableOperations class works as expected. While the operations are simple, it From 713e271e4bc2e6110f58328d1483bdd337145a92 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 23 Dec 2024 14:41:35 +0100 Subject: [PATCH 5/7] Review feedback, Adjust wording --- task_sdk/src/airflow/sdk/api/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task_sdk/src/airflow/sdk/api/client.py b/task_sdk/src/airflow/sdk/api/client.py index 3ec8187120d61..ebb9a4fd4f2df 100644 --- a/task_sdk/src/airflow/sdk/api/client.py +++ b/task_sdk/src/airflow/sdk/api/client.py @@ -274,7 +274,7 @@ def noop_handler(request: httpx.Request) -> httpx.Response: # Config options for SDK how retries on HTTP requests should be handled # Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min -# As long as there is no other config facility in SDK we use ENV for the moment +# So far there is no other config facility in SDK we use ENV for the moment API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)) API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1)) API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90)) From 27a872fe90caf6fb50c584d4ce710b85a001f1fe Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Mon, 23 Dec 2024 14:43:35 +0100 Subject: [PATCH 6/7] Correct time parameters to float --- task_sdk/src/airflow/sdk/api/client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/task_sdk/src/airflow/sdk/api/client.py b/task_sdk/src/airflow/sdk/api/client.py index ebb9a4fd4f2df..ce74b9ef37bd1 100644 --- a/task_sdk/src/airflow/sdk/api/client.py +++ b/task_sdk/src/airflow/sdk/api/client.py @@ -276,8 +276,8 @@ def noop_handler(request: httpx.Request) -> httpx.Response: # Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min # So far there is no other config facility in SDK we use ENV for the moment API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)) -API_RETRY_WAIT_MIN = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1)) -API_RETRY_WAIT_MAX = int(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90)) +API_RETRY_WAIT_MIN = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0)) +API_RETRY_WAIT_MAX = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0)) class Client(httpx.Client): From b1878bcd39419187e23ad3ecc3d56c672f56f819 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 26 Dec 2024 12:30:49 +0100 Subject: [PATCH 7/7] Review Feedback --- task_sdk/src/airflow/sdk/api/client.py | 1 + task_sdk/tests/api/test_client.py | 33 +++++++++++++++++++++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/task_sdk/src/airflow/sdk/api/client.py b/task_sdk/src/airflow/sdk/api/client.py index ce74b9ef37bd1..ee4144c7f543a 100644 --- a/task_sdk/src/airflow/sdk/api/client.py +++ b/task_sdk/src/airflow/sdk/api/client.py @@ -275,6 +275,7 @@ def noop_handler(request: httpx.Request) -> httpx.Response: # Config options for SDK how retries on HTTP requests should be handled # Note: Given defaults make attempts after 1, 3, 7, 15, 31seconds, 1:03, 2:07, 3:37 and fails after 5:07min # So far there is no other config facility in SDK we use ENV for the moment +# TODO: Consider these env variables while handling airflow confs in task sdk API_RETRIES = int(os.getenv("AIRFLOW__WORKERS__API_RETRIES", 10)) API_RETRY_WAIT_MIN = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MIN", 1.0)) API_RETRY_WAIT_MAX = float(os.getenv("AIRFLOW__WORKERS__API_RETRY_WAIT_MAX", 90.0)) diff --git a/task_sdk/tests/api/test_client.py b/task_sdk/tests/api/test_client.py index 4644613741ec6..c52feb9676670 100644 --- a/task_sdk/tests/api/test_client.py +++ b/task_sdk/tests/api/test_client.py @@ -159,7 +159,8 @@ class TestTaskInstanceOperations: response parsing. """ - def test_task_instance_start(self, make_ti_context): + @mock.patch("time.sleep", return_value=None) # To have retries not slowing down tests + def test_task_instance_start(self, mock_sleep, make_ti_context): # Simulate a successful response from the server that starts a task ti_id = uuid6.uuid7() start_date = "2024-10-31T12:00:00Z" @@ -169,7 +170,14 @@ def test_task_instance_start(self, make_ti_context): run_type="manual", ) + # ...including a validation that retry really works + call_count = 0 + def handle_request(request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + if call_count < 4: + return httpx.Response(status_code=500, json={"detail": "Internal Server Error"}) if request.url.path == f"/task-instances/{ti_id}/run": actual_body = json.loads(request.read()) assert actual_body["pid"] == 100 @@ -184,6 +192,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: client = make_client(transport=httpx.MockTransport(handle_request)) resp = client.task_instances.start(ti_id, 100, start_date) assert resp == ti_context + assert call_count == 4 @pytest.mark.parametrize("state", [state for state in TerminalTIState]) def test_task_instance_finish(self, state): @@ -309,9 +318,17 @@ class TestVariableOperations: response parsing. """ - def test_variable_get_success(self): + @mock.patch("time.sleep", return_value=None) # To have retries not slowing down tests + def test_variable_get_success(self, mock_sleep): # Simulate a successful response from the server with a variable + # ...including a validation that retry really works + call_count = 0 + def handle_request(request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + if call_count < 2: + return httpx.Response(status_code=500, json={"detail": "Internal Server Error"}) if request.url.path == "/variables/test_key": return httpx.Response( status_code=200, @@ -325,6 +342,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert isinstance(result, VariableResponse) assert result.key == "test_key" assert result.value == "test_value" + assert call_count == 2 def test_variable_not_found(self): # Simulate a 404 response from the server @@ -387,9 +405,17 @@ class TestXCOMOperations: pytest.param({"key": "test_key", "value": {"key2": "value2"}}, id="nested-dict-value"), ], ) - def test_xcom_get_success(self, value): + @mock.patch("time.sleep", return_value=None) # To have retries not slowing down tests + def test_xcom_get_success(self, mock_sleep, value): # Simulate a successful response from the server when getting an xcom + # ...including a validation that retry really works + call_count = 0 + def handle_request(request: httpx.Request) -> httpx.Response: + nonlocal call_count + call_count += 1 + if call_count < 3: + return httpx.Response(status_code=500, json={"detail": "Internal Server Error"}) if request.url.path == "/xcoms/dag_id/run_id/task_id/key": return httpx.Response( status_code=201, @@ -407,6 +433,7 @@ def handle_request(request: httpx.Request) -> httpx.Response: assert isinstance(result, XComResponse) assert result.key == "test_key" assert result.value == value + assert call_count == 3 def test_xcom_get_success_with_map_index(self): # Simulate a successful response from the server when getting an xcom with map_index passed