Skip to content

Commit

Permalink
Merge pull request #61 from semiotic-ai/k8s_indexer_service_discovery
Browse files Browse the repository at this point in the history
k8s indexer-service auto-discovery
  • Loading branch information
aasseman authored Apr 4, 2023
2 parents db30582 + 2d6d346 commit 34b151b
Show file tree
Hide file tree
Showing 12 changed files with 1,579 additions and 1,167 deletions.
3 changes: 2 additions & 1 deletion .gcloudignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
#!include:.gitignore
#!include:.gitignore
.git
33 changes: 22 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ Configuration:
usage: autoagora [-h] [--log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}] [--json-logs JSON_LOGS]
--postgres-host POSTGRES_HOST [--postgres-port POSTGRES_PORT]
[--postgres-database POSTGRES_DATABASE] --postgres-username POSTGRES_USERNAME
--postgres-password POSTGRES_PASSWORD --indexer-agent-mgmt-endpoint
INDEXER_AGENT_MGMT_ENDPOINT --indexer-service-metrics-endpoint
INDEXER_SERVICE_METRICS_ENDPOINT
--postgres-password POSTGRES_PASSWORD
[--postgres-max-connections POSTGRES_MAX_CONNECTIONS]
--indexer-agent-mgmt-endpoint INDEXER_AGENT_MGMT_ENDPOINT
(--indexer-service-metrics-endpoint INDEXER_SERVICE_METRICS_ENDPOINT | --indexer-service-metrics-k8s-service INDEXER_SERVICE_METRICS_K8S_SERVICE)
[--qps-observation-duration QPS_OBSERVATION_DURATION] [--relative-query-costs]
[--relative-query-costs-exclude-subgraphs RELATIVE_QUERY_COSTS_EXCLUDE_SUBGRAPHS]
[--relative-query-costs-refresh-interval RELATIVE_QUERY_COSTS_REFRESH_INTERVAL]
Expand All @@ -66,10 +67,6 @@ optional arguments:
--indexer-agent-mgmt-endpoint INDEXER_AGENT_MGMT_ENDPOINT
URL to the indexer-agent management GraphQL endpoint. [env var:
INDEXER_AGENT_MGMT_ENDPOINT] (default: None)
--indexer-service-metrics-endpoint INDEXER_SERVICE_METRICS_ENDPOINT
HTTP endpoint for the indexer-service metrics. Can be a comma-separated
for multiple endpoints. [env var: INDEXER_SERVICE_METRICS_ENDPOINT]
(default: None)
--qps-observation-duration QPS_OBSERVATION_DURATION
Duration of the measurement period of the query-per-second after a price
multiplier update. [env var: QPS_OBSERVATION_DURATION] (default: 60)
Expand All @@ -85,23 +82,37 @@ Database settings:
Port of the postgres instance to be used by AutoAgora. [env var:
POSTGRES_PORT] (default: 5432)
--postgres-database POSTGRES_DATABASE
Name of the database to be used by AutoAgora. [env var: POSTGRES_DATABASE]
(default: autoagora)
Name of the database to be used by AutoAgora. [env var:
POSTGRES_DATABASE] (default: autoagora)
--postgres-username POSTGRES_USERNAME
Username for the database to be used by AutoAgora. [env var:
POSTGRES_USERNAME] (default: None)
--postgres-password POSTGRES_PASSWORD
Password for the database to be used by AutoAgora. [env var:
POSTGRES_PASSWORD] (default: None)
--postgres-max-connections POSTGRES_MAX_CONNECTIONS
Maximum postgres connections (internal pool). [env var:
POSTGRES_MAX_CONNECTIONS] (default: 1)
Indexer-service metrics endpoint. Exactly one argument required:
--indexer-service-metrics-endpoint INDEXER_SERVICE_METRICS_ENDPOINT
HTTP endpoint for the indexer-service metrics. Can be a comma-separated
for multiple endpoints. [env var: INDEXER_SERVICE_METRICS_ENDPOINT]
(default: None)
--indexer-service-metrics-k8s-service INDEXER_SERVICE_METRICS_K8S_SERVICE
Kubernetes service name for the indexer-service and pod port serving its
metrics. Will watch the service's endpoint IPs continuously for changes.
Format: <scheme>://<service_name>:<pod_metrics_port>/<path>. [env var:
INDEXER_SERVICE_METRICS_K8S_SERVICE] (default: None)
Relative query costs generator settings:
--relative-query-costs
(EXPERIMENTAL) Enables the relative query cost generator. Otherwise only
builds a default query pricing model with automated market price
discovery. [env var: RELATIVE_QUERY_COSTS] (default: False)
--relative-query-costs-exclude-subgraphs RELATIVE_QUERY_COSTS_EXCLUDE_SUBGRAPHS
Comma delimited list of subgraphs (ipfs hash) to exclude from the relative
query costs model generator. [env var:
Comma delimited list of subgraphs (ipfs hash) to exclude from the
relative query costs model generator. [env var:
RELATIVE_QUERY_COSTS_EXCLUDE_SUBGRAPHS] (default: None)
--relative-query-costs-refresh-interval RELATIVE_QUERY_COSTS_REFRESH_INTERVAL
(Seconds) Interval between rebuilds of the relative query costs models.
Expand Down
21 changes: 19 additions & 2 deletions autoagora/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,29 @@ def init_config(argv: Optional[Sequence[str]] = None):
#
# Query volume metrics
#
argparser.add_argument(

indexer_service_metrics_endpoint_group = argparser.add_argument_group(
"Indexer-service metrics endpoint. Exactly one argument required"
)
indexer_service_metrics_endpoint_exclusive_group = (
indexer_service_metrics_endpoint_group.add_mutually_exclusive_group(
required=True
)
)
indexer_service_metrics_endpoint_exclusive_group.add_argument(
"--indexer-service-metrics-endpoint",
env_var="INDEXER_SERVICE_METRICS_ENDPOINT",
required=True,
help="HTTP endpoint for the indexer-service metrics. Can be a comma-separated for multiple endpoints.",
)
indexer_service_metrics_endpoint_exclusive_group.add_argument(
"--indexer-service-metrics-k8s-service",
env_var="INDEXER_SERVICE_METRICS_K8S_SERVICE",
help="""
Kubernetes service name for the indexer-service and pod port serving its
metrics. Will watch the service's endpoint IPs continuously for changes.
Format: <scheme>://<service_name>:<pod_metrics_port>/<path>.
""",
)

#
# Price multiplier (Absolute price)
Expand Down
93 changes: 93 additions & 0 deletions autoagora/k8s_service_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2022-, Semiotic AI, Inc.
# SPDX-License-Identifier: Apache-2.0

import asyncio as aio
import logging

from kubernetes import client, config, watch
from kubernetes.client.api_client import ApiClient
from kubernetes.client.rest import ApiException

from autoagora.misc import async_exit_on_exception


class K8SServiceEndpointsWatcher:
def __init__(self, service_name: str) -> None:
"""Maintains an automatically, asynchronously updated list of endpoints backing
a kubernetes service in the current namespace.
This is supposed to be run from within a Kubernetes pod. The pod will need a
role that grants it:
```
rules:
- apiGroups: [""]
resources: ["endpoints"]
verbs: ["watch"]
```
Args:
service_name (str): Kubernetes service name.
Raises:
FileNotFoundError: couldn't find
`/var/run/secrets/kubernetes.io/serviceaccount/namespace`, which is
expected when running within a Kubernetes pod container.
"""
self.endpoint_ips = []
self._service_name = service_name

try:
with open(
"/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r"
) as f:
self._namespace = f.read().strip()
except FileNotFoundError:
logging.exception("Probably not running in Kubernetes.")
raise

# Starts the async _loop immediately
self._future = aio.ensure_future(self._watch_loop())

@async_exit_on_exception()
async def _watch_loop(self) -> None:
"""Restarts the k8s watch on expiration."""
while True:
try:
await self._watch()
except ApiException as api_exc:
if api_exc.status == watch.watch.HTTP_STATUS_GONE:
logging.debug("k8s_service_watcher 410 timeout.")
else:
raise
logging.debug("k8s_service_watcher restarted")

async def _watch(self) -> None:
"""Watches for changes in k8s service endpoints."""
config.load_incluster_config()

api = ApiClient()
v1 = client.CoreV1Api(api)
w = watch.Watch()
event_stream = w.stream(
v1.list_namespaced_endpoints,
namespace=self._namespace,
field_selector=f"metadata.name={self._service_name}",
)

loop = aio.get_running_loop()

while event := await loop.run_in_executor(None, next, event_stream):
result = event["object"] # type: ignore

self.endpoint_ips = [
address.ip
for subset in result.subsets # type: ignore
for address in subset.addresses # type: ignore
]

logging.debug(
"Got endpoint IPs for service %s: %s",
self._service_name,
self.endpoint_ips,
)
17 changes: 16 additions & 1 deletion autoagora/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from autoagora.indexer_utils import get_allocated_subgraphs, set_cost_model
from autoagora.model_builder import model_update_loop
from autoagora.price_multiplier import price_bandit_loop
from autoagora.query_metrics import (
K8SServiceWatcherMetricsEndpoints,
StaticMetricsEndpoints,
)

init_config()

Expand All @@ -36,6 +40,7 @@ async def allocated_subgraph_watcher():
(args.relative_query_costs_exclude_subgraphs or "").split(",")
)

# Initialize connection pool to PG database
try:
pgpool = await asyncpg.create_pool(
host=args.postgres_host,
Expand All @@ -53,6 +58,16 @@ async def allocated_subgraph_watcher():
)
raise

# Initialize indexer-service metrics endpoints
if args.indexer_service_metrics_endpoint: # static list
metrics_endpoints = StaticMetricsEndpoints(
args.indexer_service_metrics_endpoint
)
else: # auto from k8s
metrics_endpoints = K8SServiceWatcherMetricsEndpoints(
args.indexer_service_metrics_k8s_service
)

while True:
try:
allocated_subgraphs = (await get_allocated_subgraphs()) - excluded_subgraphs
Expand Down Expand Up @@ -85,7 +100,7 @@ async def allocated_subgraph_watcher():

# Launch the price multiplier update loop for the new subgraph
update_loops[new_subgraph].bandit = aio.ensure_future(
price_bandit_loop(new_subgraph, pgpool)
price_bandit_loop(new_subgraph, pgpool, metrics_endpoints)
)
logging.info(
"Added price multiplier update loop for subgraph %s", new_subgraph
Expand Down
26 changes: 26 additions & 0 deletions autoagora/misc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import functools
import logging


def async_exit_on_exception(exit_code: int = -1):
"""Returns decorator that logs any exception and exits the program immediately.
The goal of this function is to easily trigger an immediate program abort from any
asynchronous function.
Args:
exit_code (int, optional): Self explanatory. Defaults to -1.
"""

def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except:
logging.exception("exit_on_exception triggered")
exit(exit_code)

return wrapper

return decorator
9 changes: 6 additions & 3 deletions autoagora/price_multiplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from autoagora.config import args
from autoagora.price_save_state_db import PriceSaveStateDB
from autoagora.query_metrics import MetricsEndpoints
from autoagora.subgraph_wrapper import SubgraphWrapper

reward_gauge = Gauge(
Expand All @@ -34,7 +35,9 @@
)


async def price_bandit_loop(subgraph: str, pgpool: asyncpg.Pool):
async def price_bandit_loop(
subgraph: str, pgpool: asyncpg.Pool, metrics_endpoints: MetricsEndpoints
):
try:
# Instantiate environment.
environment = SubgraphWrapper(subgraph)
Expand Down Expand Up @@ -85,7 +88,7 @@ async def price_bandit_loop(subgraph: str, pgpool: asyncpg.Pool):

# Update the save state
# NOTE: `bid_scale` is specific to "scaled_gaussian" agent action type
logging.debug("Price bandit %s - Saving state to DB.")
logging.debug("Price bandit %s - Saving state to DB.", subgraph)
await save_state_db.save_state(
subgraph=subgraph,
mean=bandit.bid_scale(bandit.mean().item()),
Expand All @@ -106,7 +109,7 @@ async def price_bandit_loop(subgraph: str, pgpool: asyncpg.Pool):
# 3. Get the reward.
# Get queries per second.
queries_per_second = await environment.queries_per_second(
args.qps_observation_duration
metrics_endpoints, args.qps_observation_duration
)
logging.debug(
"Price bandit %s - Queries per second: %s", subgraph, queries_per_second
Expand Down
Loading

0 comments on commit 34b151b

Please sign in to comment.