Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow servers to start with no background services #16693

Merged
merged 2 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 22 additions & 9 deletions src/prefect/cli/server.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Command line interface for working with Prefect
Command line interface for working with the Prefect API and server.
"""

import os
Expand Down Expand Up @@ -212,6 +212,9 @@ def start(
),
late_runs: bool = SettingsOption(PREFECT_API_SERVICES_LATE_RUNS_ENABLED),
ui: bool = SettingsOption(PREFECT_UI_ENABLED),
no_services: bool = typer.Option(
False, "--no-services", help="Only run the webserver API and UI"
),
background: bool = typer.Option(
False, "--background", "-b", help="Run the server in the background"
),
Expand All @@ -234,6 +237,9 @@ def start(
"PREFECT_SERVER_LOGGING_LEVEL": log_level,
}

if no_services:
server_settings["PREFECT_SERVER_ANALYTICS_ENABLED"] = "False"

pid_file = Path(PREFECT_HOME.value() / PID_FILE)
# check if port is already in use
try:
Expand Down Expand Up @@ -265,9 +271,13 @@ def start(
app.console.print("\n")

if background:
_run_in_background(pid_file, server_settings, host, port, keep_alive_timeout)
_run_in_background(
pid_file, server_settings, host, port, keep_alive_timeout, no_services
)
else:
_run_in_foreground(pid_file, server_settings, host, port, keep_alive_timeout)
_run_in_foreground(
pid_file, server_settings, host, port, keep_alive_timeout, no_services
)


def _run_in_background(
Expand All @@ -276,6 +286,7 @@ def _run_in_background(
host: str,
port: int,
keep_alive_timeout: int,
no_services: bool,
) -> None:
command = [
sys.executable,
Expand All @@ -293,13 +304,14 @@ def _run_in_background(
str(keep_alive_timeout),
]
logger.debug("Opening server process with command: %s", shlex.join(command))

env = {**os.environ, **server_settings, "PREFECT__SERVER_FINAL": "1"}
if no_services:
env["PREFECT__SERVER_WEBSERVER_ONLY"] = "1"

process = subprocess.Popen(
command,
env={
**os.environ,
**server_settings,
"PREFECT__SERVER_FINAL": "1",
},
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
Expand All @@ -319,14 +331,15 @@ def _run_in_foreground(
host: str,
port: int,
keep_alive_timeout: int,
no_services: bool,
) -> None:
from prefect.server.api.server import create_app

with temporary_settings(
{getattr(prefect.settings, k): v for k, v in server_settings.items()}
):
uvicorn.run(
app=create_app(final=True),
app=create_app(final=True, webserver_only=no_services),
app_dir=str(prefect.__module_path__.parent),
host=host,
port=port,
Expand Down
18 changes: 13 additions & 5 deletions src/prefect/server/api/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> None:
def create_app(
settings: Optional[prefect.settings.Settings] = None,
ephemeral: bool = False,
webserver_only: bool = False,
final: bool = False,
ignore_cache: bool = False,
) -> FastAPI:
Expand All @@ -549,14 +550,17 @@ def create_app(
from the context.
ephemeral: If set, the application will be treated as ephemeral. The UI
and services will be disabled.
webserver_only: If set, the webserver and UI will be available but all background
services will be disabled.
final: whether this will be the last instance of the Prefect server to be
created in this process, so that additional optimizations may be applied
ignore_cache: If set, a new application will be created even if the settings
match. Otherwise, an application is returned from the cache.
"""
settings = settings or prefect.settings.get_current_settings()
cache_key = (settings.hash_key(), ephemeral)
cache_key = (settings.hash_key(), ephemeral, webserver_only)
ephemeral = ephemeral or bool(os.getenv("PREFECT__SERVER_EPHEMERAL"))
webserver_only = webserver_only or bool(os.getenv("PREFECT__SERVER_WEBSERVER_ONLY"))
final = final or bool(os.getenv("PREFECT__SERVER_FINAL"))

from prefect.logging.configuration import setup_logging
Expand Down Expand Up @@ -596,9 +600,7 @@ async def start_services():

service_instances: list[Any] = []

if prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value():
service_instances.append(services.telemetry.Telemetry())

# these services are for events and are not implemented as loop services right now
if prefect.settings.PREFECT_API_SERVICES_TASK_RUN_RECORDER_ENABLED:
service_instances.append(TaskRunRecorder())

Expand All @@ -608,8 +610,14 @@ async def start_services():
if prefect.settings.PREFECT_API_EVENTS_STREAM_OUT_ENABLED:
service_instances.append(stream.Distributor())

if (
not webserver_only
and prefect.settings.PREFECT_SERVER_ANALYTICS_ENABLED.value()
):
service_instances.append(services.telemetry.Telemetry())

# don't run services in ephemeral mode
if not ephemeral:
if not ephemeral and not webserver_only:
if prefect.settings.PREFECT_API_SERVICES_SCHEDULER_ENABLED.value():
service_instances.append(services.scheduler.Scheduler())
service_instances.append(
Expand Down
Loading