Skip to content

Commit

Permalink
Update database file path handling and server shutdown message (#1686)
Browse files Browse the repository at this point in the history
* Update database file path handling in base.py and version.py

* Update server.py to handle SIGINT signal in LangflowUvicornWorker

* Add shutdown message when shutting down Langflow in main.py

* Update datetime type for created_at and updated_at fields in apikey and variable tables

* Update package versions in pyproject.toml and poetry.lock files

* Update package versions in pyproject.toml and poetry.lock files

* Fix import error in base.py

* Refactor database file path handling in base.py

* Update unit test command in python_test.yml
  • Loading branch information
ogabrielluiz authored Apr 11, 2024
1 parent e38ab09 commit e73754b
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 330 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
poetry install
- name: Run unit tests
run: |
make tests
make tests args="-n auto"
268 changes: 135 additions & 133 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "langflow"
version = "1.0.0a17"
version = "1.0.0a18"
description = "A Python package with a built-in web application"
authors = ["Logspace <[email protected]>"]
maintainers = [
Expand Down
41 changes: 24 additions & 17 deletions src/backend/base/langflow/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@
import httpx
import typer
from dotenv import load_dotenv
from langflow.main import setup_app
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from langflow.services.utils import initialize_services
from langflow.utils.logger import configure, logger
from langflow.utils.util import update_settings
from multiprocess import Process, cpu_count # type: ignore
from packaging import version as pkg_version
from rich import box
Expand All @@ -23,6 +17,13 @@
from rich.panel import Panel
from rich.table import Table

from langflow.main import setup_app
from langflow.services.database.utils import session_getter
from langflow.services.deps import get_db_service
from langflow.services.utils import initialize_services
from langflow.utils.logger import configure, logger
from langflow.utils.util import update_settings

console = Console()

app = typer.Typer(no_args_is_help=True)
Expand Down Expand Up @@ -151,17 +152,21 @@ def run(
# Define an env variable to know if we are just testing the server
if "pytest" in sys.modules:
return

if platform.system() in ["Windows"]:
# Run using uvicorn on MacOS and Windows
# Windows doesn't support gunicorn
# MacOS requires an env variable to be set to use gunicorn
run_on_windows(host, port, log_level, options, app)
else:
# Run using gunicorn on Linux
run_on_mac_or_linux(host, port, log_level, options, app)
if open_browser:
click.launch(f"http://{host}:{port}")
try:
if platform.system() in ["Windows"]:
# Run using uvicorn on MacOS and Windows
# Windows doesn't support gunicorn
# MacOS requires an env variable to be set to use gunicorn
process = run_on_windows(host, port, log_level, options, app)
else:
# Run using gunicorn on Linux
process = run_on_mac_or_linux(host, port, log_level, options, app)
if open_browser:
click.launch(f"http://{host}:{port}")
if process:
process.join()
except KeyboardInterrupt:
pass


def wait_for_server_ready(host, port):
Expand All @@ -182,6 +187,7 @@ def run_on_mac_or_linux(host, port, log_level, options, app):
wait_for_server_ready(host, port)

print_banner(host, port)
return webapp_process


def run_on_windows(host, port, log_level, options, app):
Expand All @@ -190,6 +196,7 @@ def run_on_windows(host, port, log_level, options, app):
"""
print_banner(host, port)
run_langflow(host, port, log_level, options, app)
return None


def is_port_in_use(port, host="localhost"):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
"""Change datetime type
Revision ID: 79e675cb6752
Revises: e3bc869fa272
Create Date: 2024-04-11 19:23:10.697335
"""
from calendar import c
from typing import Sequence, Union

import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine.reflection import Inspector

# revision identifiers, used by Alembic.
revision: str = "79e675cb6752"
down_revision: Union[str, None] = "e3bc869fa272"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
if "apikey" in table_names:
columns = inspector.get_columns("apikey")
created_at_column = next((column for column in columns if column["name"] == "created_at"), None)
if created_at_column is not None and created_at_column["type"] == postgresql.TIMESTAMP():
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column(
"created_at",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=False,
)
if "variable" in table_names:
columns = inspector.get_columns("variable")
created_at_column = next((column for column in columns if column["name"] == "created_at"), None)
updated_at_column = next((column for column in columns if column["name"] == "updated_at"), None)
with op.batch_alter_table("variable", schema=None) as batch_op:
if created_at_column is not None and created_at_column["type"] == postgresql.TIMESTAMP():
batch_op.alter_column(
"created_at",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)
if updated_at_column is not None and updated_at_column["type"] == postgresql.TIMESTAMP():
batch_op.alter_column(
"updated_at",
existing_type=postgresql.TIMESTAMP(),
type_=sa.DateTime(timezone=True),
existing_nullable=True,
)

# ### end Alembic commands ###


def downgrade() -> None:
conn = op.get_bind()
inspector = Inspector.from_engine(conn) # type: ignore
table_names = inspector.get_table_names()
# ### commands auto generated by Alembic - please adjust! ###
if "variable" in table_names:
columns = inspector.get_columns("variable")
created_at_column = next((column for column in columns if column["name"] == "created_at"), None)
updated_at_column = next((column for column in columns if column["name"] == "updated_at"), None)
with op.batch_alter_table("variable", schema=None) as batch_op:
if updated_at_column is not None and updated_at_column["type"] == sa.DateTime(timezone=True):
batch_op.alter_column(
"updated_at",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)
if created_at_column is not None and created_at_column["type"] == sa.DateTime(timezone=True):
batch_op.alter_column(
"created_at",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=True,
)

if "apikey" in table_names:
columns = inspector.get_columns("apikey")
created_at_column = next((column for column in columns if column["name"] == "created_at"), None)
if created_at_column is not None and created_at_column["type"] == sa.DateTime(timezone=True):
with op.batch_alter_table("apikey", schema=None) as batch_op:
batch_op.alter_column(
"created_at",
existing_type=sa.DateTime(timezone=True),
type_=postgresql.TIMESTAMP(),
existing_nullable=False,
)

# ### end Alembic commands ###
3 changes: 3 additions & 0 deletions src/backend/base/langflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from loguru import logger
from rich import print as rprint

from langflow.api import router
from langflow.initial_setup.setup import create_or_update_starter_projects
Expand All @@ -28,6 +29,8 @@ async def lifespan(app: FastAPI):
LangfuseInstance.update()
create_or_update_starter_projects()
yield
# Shutdown message
rprint("[bold red]Shutting down Langflow...[/bold red]")
teardown_services()

return lifespan
Expand Down
17 changes: 17 additions & 0 deletions src/backend/base/langflow/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
import logging
import signal

from gunicorn import glogging # type: ignore
from gunicorn.app.base import BaseApplication # type: ignore
Expand All @@ -10,6 +12,21 @@
class LangflowUvicornWorker(UvicornWorker):
CONFIG_KWARGS = {"loop": "asyncio"}

def _install_sigint_handler(self) -> None:
"""Install a SIGQUIT handler on workers.
- https://github.com/encode/uvicorn/issues/1116
- https://github.com/benoitc/gunicorn/issues/2604
"""

loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGINT, self.handle_exit, signal.SIGINT, None)

async def _serve(self) -> None:
# We do this to not log the "Worker (pid:XXXXX) was sent SIGINT"
self._install_sigint_handler()
await super()._serve()


class Logger(glogging.Logger):
"""Implements and overrides the gunicorn logging interface.
Expand Down
43 changes: 36 additions & 7 deletions src/backend/base/langflow/services/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,21 +151,50 @@ def set_database_url(cls, value, values):
# if there is a database in that location
if not values["CONFIG_DIR"]:
raise ValueError("CONFIG_DIR not set, please set it or provide a DATABASE_URL")

new_path = f"{values['CONFIG_DIR']}/langflow.db"
if Path("./langflow.db").exists():
from langflow.version import is_pre_release # type: ignore

pre_db_file_name = "langflow-pre.db"
db_file_name = "langflow.db"
new_pre_path = f"{values['CONFIG_DIR']}/{pre_db_file_name}"
new_path = f"{values['CONFIG_DIR']}/{db_file_name}"
final_path = None
if is_pre_release:
if Path(new_pre_path).exists():
final_path = new_pre_path
elif Path(new_path).exists():
# We need to copy the current db to the new location
logger.debug("Copying existing database to new location")
copy2(new_path, new_pre_path)
logger.debug(f"Copied existing database to {new_pre_path}")
elif Path(f"./{db_file_name}").exists():
logger.debug("Copying existing database to new location")
copy2(f"./{db_file_name}", new_pre_path)
logger.debug(f"Copied existing database to {new_pre_path}")
else:
logger.debug(f"Database already exists at {new_pre_path}, using it")
final_path = new_pre_path
else:
if Path(new_path).exists():
logger.debug(f"Database already exists at {new_path}, using it")
else:
final_path = new_path
elif Path("./{db_file_name}").exists():
try:
logger.debug("Copying existing database to new location")
copy2("./langflow.db", new_path)
copy2("./{db_file_name}", new_path)
logger.debug(f"Copied existing database to {new_path}")
except Exception:
logger.error("Failed to copy database, using default path")
new_path = "./langflow.db"
new_path = "./{db_file_name}"
else:
final_path = new_path

if final_path is None:
if is_pre_release:
final_path = new_pre_path
else:
final_path = new_path

value = f"sqlite:///{new_path}"
value = f"sqlite:///{final_path}"

return value

Expand Down
Loading

0 comments on commit e73754b

Please sign in to comment.