Skip to content

Commit

Permalink
chore: refactor fixtures in test_universal_registry
Browse files Browse the repository at this point in the history
Signed-off-by: tokoko <[email protected]>
  • Loading branch information
tokoko committed Sep 6, 2024
1 parent 96344b2 commit e168b71
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 120 deletions.
33 changes: 33 additions & 0 deletions sdk/python/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import pytest
from testcontainers.keycloak import KeycloakContainer
from testcontainers.minio import MinioContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer

from tests.utils.auth_permissions_util import setup_permissions_on_keycloak

Expand All @@ -14,3 +17,33 @@ def start_keycloak_server():
with KeycloakContainer("quay.io/keycloak/keycloak:24.0.1") as keycloak_container:
setup_permissions_on_keycloak(keycloak_container.get_client())
yield keycloak_container.get_url()


@pytest.fixture(scope="session")
def mysql_server():
container = MySqlContainer("mysql:latest")
container.start()

yield container

container.stop()


@pytest.fixture(scope="session")
def postgres_server():
container = PostgresContainer()
container.start()

yield container

container.stop()


@pytest.fixture(scope="session")
def minio_server():
container = MinioContainer()
container.start()

yield container

container.stop()
234 changes: 114 additions & 120 deletions sdk/python/tests/integration/registration/test_universal_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.
import logging
import os
import random
import string
import time
from datetime import timedelta, timezone
from tempfile import mkstemp
Expand All @@ -22,10 +24,8 @@
import pandas as pd
import pytest
from pytest_lazyfixture import lazy_fixture
from testcontainers.core.container import DockerContainer
from testcontainers.core.waiting_utils import wait_for_logs
from testcontainers.minio import MinioContainer
from testcontainers.mysql import MySqlContainer
from testcontainers.postgres import PostgresContainer

from feast import FeatureService, FileSource, RequestSource
from feast.data_format import AvroFormat, ParquetFormat
Expand Down Expand Up @@ -93,158 +93,136 @@ def s3_registry() -> Registry:


@pytest.fixture(scope="function")
def minio_registry() -> Registry:
bucket_name = "test-bucket"
def minio_registry(minio_server):
bucket_name = "".join(random.choices(string.ascii_lowercase, k=10))

container = MinioContainer()
container.start()
client = container.get_client()
client = minio_server.get_client()
client.make_bucket(bucket_name)

container_host = container.get_container_host_ip()
exposed_port = container.get_exposed_port(container.port)
container_host = minio_server.get_container_host_ip()
exposed_port = minio_server.get_exposed_port(minio_server.port)

registry_config = RegistryConfig(
path=f"s3://{bucket_name}/registry.db", cache_ttl_seconds=600
)

mock_environ = {
"FEAST_S3_ENDPOINT_URL": f"http://{container_host}:{exposed_port}",
"AWS_ACCESS_KEY_ID": container.access_key,
"AWS_SECRET_ACCESS_KEY": container.secret_key,
"AWS_ACCESS_KEY_ID": minio_server.access_key,
"AWS_SECRET_ACCESS_KEY": minio_server.secret_key,
"AWS_SESSION_TOKEN": "",
}

with mock.patch.dict(os.environ, mock_environ):
yield Registry("project", registry_config, None)

container.stop()


POSTGRES_USER = "test"
POSTGRES_PASSWORD = "test"
POSTGRES_DB = "test"

logger = logging.getLogger(__name__)


@pytest.fixture(scope="function")
def pg_registry():
container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", POSTGRES_USER)
.with_env("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.with_env("POSTGRES_DB", POSTGRES_DB)
)

container.start()

registry_config = _given_registry_config_for_pg_sql(container)

yield SqlRegistry(registry_config, "project", None)
def pg_registry(postgres_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

container.stop()
_create_pg_database(postgres_server, db_name)

container_port = postgres_server.get_exposed_port(5432)
container_host = postgres_server.get_container_host_ip()

@pytest.fixture(scope="function")
def pg_registry_async():
container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", POSTGRES_USER)
.with_env("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.with_env("POSTGRES_DB", POSTGRES_DB)
registry_config = SqlRegistryConfig(
registry_type="sql",
cache_ttl_seconds=2,
cache_mode="sync",
# The `path` must include `+psycopg` in order for `sqlalchemy.create_engine()`
# to understand that we are using psycopg3.
path=f"postgresql+psycopg://{postgres_server.username}:{postgres_server.password}@{container_host}:{container_port}/{db_name}",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=0,
purge_feast_metadata=False,
)

container.start()

registry_config = _given_registry_config_for_pg_sql(container, 2, "thread", 3)

yield SqlRegistry(registry_config, "project", None)

container.stop()

@pytest.fixture(scope="function")
def pg_registry_async(postgres_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

_create_pg_database(postgres_server, db_name)

def _given_registry_config_for_pg_sql(
container,
cache_ttl_seconds=2,
cache_mode="sync",
thread_pool_executor_worker_count=0,
purge_feast_metadata=False,
):
log_string_to_wait_for = "database system is ready to accept connections"
waited = wait_for_logs(
container=container,
predicate=log_string_to_wait_for,
timeout=30,
interval=10,
)
logger.info("Waited for %s seconds until postgres container was up", waited)
container_port = container.get_exposed_port(5432)
container_host = container.get_container_host_ip()
container_port = postgres_server.get_exposed_port(5432)
container_host = postgres_server.get_container_host_ip()

return SqlRegistryConfig(
registry_config = SqlRegistryConfig(
registry_type="sql",
cache_ttl_seconds=cache_ttl_seconds,
cache_mode=cache_mode,
cache_ttl_seconds=2,
cache_mode="thread",
# The `path` must include `+psycopg` in order for `sqlalchemy.create_engine()`
# to understand that we are using psycopg3.
path=f"postgresql+psycopg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@{container_host}:{container_port}/{POSTGRES_DB}",
path=f"postgresql+psycopg://{postgres_server.username}:{postgres_server.password}@{container_host}:{container_port}/{db_name}",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=thread_pool_executor_worker_count,
purge_feast_metadata=purge_feast_metadata,
thread_pool_executor_worker_count=3,
purge_feast_metadata=False,
)

yield SqlRegistry(registry_config, "project", None)

@pytest.fixture(scope="function")
def mysql_registry():
container = MySqlContainer("mysql:latest")
container.start()

registry_config = _given_registry_config_for_mysql(container)
def _create_mysql_database(container: MySqlContainer, database: str):
container.exec(
f"mysql -uroot -p{container.root_password} -e 'CREATE DATABASE {database}; GRANT ALL PRIVILEGES ON {database}.* TO {container.username};'"
)

yield SqlRegistry(registry_config, "project", None)

container.stop()
def _create_pg_database(container: PostgresContainer, database: str):
container.exec(f"psql -U {container.username} -c 'CREATE DATABASE {database}'")


@pytest.fixture(scope="function")
def mysql_registry_async():
container = MySqlContainer("mysql:latest")
container.start()
def mysql_registry(mysql_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

_create_mysql_database(mysql_server, db_name)

registry_config = _given_registry_config_for_mysql(container, 2, "thread", 3)
connection_url = (
"/".join(mysql_server.get_connection_url().split("/")[:-1]) + f"/{db_name}"
)

registry_config = SqlRegistryConfig(
registry_type="sql",
path=connection_url,
cache_ttl_seconds=2,
cache_mode="sync",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=0,
purge_feast_metadata=False,
)

yield SqlRegistry(registry_config, "project", None)

container.stop()

@pytest.fixture(scope="function")
def mysql_registry_async(mysql_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

def _given_registry_config_for_mysql(
container,
cache_ttl_seconds=2,
cache_mode="sync",
thread_pool_executor_worker_count=0,
purge_feast_metadata=False,
):
import sqlalchemy
_create_mysql_database(mysql_server, db_name)

engine = sqlalchemy.create_engine(
container.get_connection_url(), pool_pre_ping=True
connection_url = (
"/".join(mysql_server.get_connection_url().split("/")[:-1]) + f"/{db_name}"
)
engine.connect()

return SqlRegistryConfig(
registry_config = SqlRegistryConfig(
registry_type="sql",
path=container.get_connection_url(),
cache_ttl_seconds=cache_ttl_seconds,
cache_mode=cache_mode,
path=connection_url,
cache_ttl_seconds=2,
cache_mode="thread",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=thread_pool_executor_worker_count,
purge_feast_metadata=purge_feast_metadata,
thread_pool_executor_worker_count=3,
purge_feast_metadata=False,
)

yield SqlRegistry(registry_config, "project", None)


@pytest.fixture(scope="session")
def sqlite_registry():
Expand Down Expand Up @@ -339,11 +317,11 @@ def mock_remote_registry():
async_sql_fixtures = [
pytest.param(
lazy_fixture("pg_registry_async"),
marks=pytest.mark.xdist_group(name="pg_registry_async"),
marks=pytest.mark.xdist_group(name="pg_registry"),
),
pytest.param(
lazy_fixture("mysql_registry_async"),
marks=pytest.mark.xdist_group(name="mysql_registry_async"),
marks=pytest.mark.xdist_group(name="mysql_registry"),
),
]

Expand Down Expand Up @@ -1609,45 +1587,61 @@ def local_registry_purge_feast_metadata() -> Registry:


@pytest.fixture(scope="function")
def pg_registry_purge_feast_metadata():
container = (
DockerContainer("postgres:latest")
.with_exposed_ports(5432)
.with_env("POSTGRES_USER", POSTGRES_USER)
.with_env("POSTGRES_PASSWORD", POSTGRES_PASSWORD)
.with_env("POSTGRES_DB", POSTGRES_DB)
)
def pg_registry_purge_feast_metadata(postgres_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

container.start()
_create_pg_database(postgres_server, db_name)

registry_config = _given_registry_config_for_pg_sql(container, 2, "thread", 3, True)
container_port = postgres_server.get_exposed_port(5432)
container_host = postgres_server.get_container_host_ip()

yield SqlRegistry(registry_config, "project", None)
registry_config = SqlRegistryConfig(
registry_type="sql",
cache_ttl_seconds=2,
cache_mode="thread",
# The `path` must include `+psycopg` in order for `sqlalchemy.create_engine()`
# to understand that we are using psycopg3.
path=f"postgresql+psycopg://{postgres_server.username}:{postgres_server.password}@{container_host}:{container_port}/{db_name}",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=3,
purge_feast_metadata=True,
)

container.stop()
yield SqlRegistry(registry_config, "project", None)


@pytest.fixture(scope="function")
def mysql_registry_purge_feast_metadata():
container = MySqlContainer("mysql:latest")
container.start()
def mysql_registry_purge_feast_metadata(mysql_server):
db_name = "".join(random.choices(string.ascii_lowercase, k=10))

registry_config = _given_registry_config_for_mysql(container, 2, "thread", 3, True)
_create_mysql_database(mysql_server, db_name)

yield SqlRegistry(registry_config, "project", None)
connection_url = (
"/".join(mysql_server.get_connection_url().split("/")[:-1]) + f"/{db_name}"
)

container.stop()
registry_config = SqlRegistryConfig(
registry_type="sql",
path=connection_url,
cache_ttl_seconds=2,
cache_mode="thread",
sqlalchemy_config_kwargs={"echo": False, "pool_pre_ping": True},
thread_pool_executor_worker_count=3,
purge_feast_metadata=True,
)

yield SqlRegistry(registry_config, "project", None)


purge_feast_metadata_fixtures = [
lazy_fixture("local_registry_purge_feast_metadata"),
pytest.param(
lazy_fixture("pg_registry_purge_feast_metadata"),
marks=pytest.mark.xdist_group(name="pg_registry_purge_feast_metadata"),
marks=pytest.mark.xdist_group(name="pg_registry"),
),
pytest.param(
lazy_fixture("mysql_registry_purge_feast_metadata"),
marks=pytest.mark.xdist_group(name="mysql_registry_purge_feast_metadata"),
marks=pytest.mark.xdist_group(name="mysql_registry"),
),
]

Expand Down

0 comments on commit e168b71

Please sign in to comment.