Skip to content

Commit

Permalink
Rust postgres client
Browse files Browse the repository at this point in the history
  • Loading branch information
mmastrac committed Sep 12, 2024
1 parent 1082945 commit a763e48
Show file tree
Hide file tree
Showing 64 changed files with 12,055 additions and 3,079 deletions.
89 changes: 89 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ resolver = "2"
[workspace.dependencies]
pyo3 = { version = "0.22.2", features = ["extension-module", "serde"] }
tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "time", "sync", "net", "io-util"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"

[profile.release]
debug = true
Expand Down
39 changes: 39 additions & 0 deletions connect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from edb.server.pgcon.rust_transport import create_postgres_connection
import asyncio
import time

class MyProtocol(asyncio.Protocol):
def __init__(self):
self.closed = asyncio.Future()

def connection_made(self, transport):
print(f"Connection made: {transport.connection}")
self.transport = transport

def data_received(self, data):
print(f"Received: {data}")
self.transport.close()

def connection_lost(self, exc):
print(f"Connection lost: {exc}")
self.closed.set_result(None)

async def main():
now = time.perf_counter_ns()
transport, protocol = await create_postgres_connection(
"postgres://user:password@localhost/postgres",
lambda: MyProtocol(),
state_change_callback=lambda state: print(f"Connection: {state.name}"))
print(f"Connection time: {(time.perf_counter_ns() - now) // 1000}µs")
print(f"Connected: {transport}")
print(f"Peer: {transport.get_extra_info('peername')}")
print(f"Cipher: {transport.get_extra_info('cipher')}")

# Send a simple query message
query = b'SELECT version();\0'
message = b'Q' + (len(query) + 4).to_bytes(4, 'big') + query
transport.write(message)

await protocol.closed

asyncio.run(main())
31 changes: 25 additions & 6 deletions edb/server/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,29 @@ class PGConnectionProxy:
def __init__(
self,
cluster: pgcluster.BaseCluster,
source_description: str,
dbname: Optional[str] = None,
log_listener: Optional[Callable[[str, str], None]] = None,
):
self._conn: Optional[pgcon.PGConnection] = None
self._cluster = cluster
self._dbname = dbname
self._log_listener = log_listener or _pg_log_listener
self._source_description = source_description

async def connect(self) -> None:
if self._conn is not None:
self._conn.terminate()

if self._dbname:
self._conn = await self._cluster.connect(database=self._dbname)
self._conn = await self._cluster.connect(
source_description=self._source_description,
database=self._dbname
)
else:
self._conn = await self._cluster.connect()
self._conn = await self._cluster.connect(
source_description=self._source_description
)

if self._log_listener is not None:
self._conn.add_log_listener(self._log_listener)
Expand Down Expand Up @@ -2310,7 +2317,11 @@ async def _check_catalog_compatibility(
)
)

conn = PGConnectionProxy(ctx.cluster, sys_db.decode("utf-8"))
conn = PGConnectionProxy(
ctx.cluster,
source_description="_check_catalog_compatibility",
dbname=sys_db.decode("utf-8")
)

try:
instancedata = await _get_instance_data(conn)
Expand Down Expand Up @@ -2473,7 +2484,11 @@ async def _bootstrap(
else:
new_template_db_id = uuidgen.uuid1mc()
tpl_db = cluster.get_db_name(edbdef.EDGEDB_TEMPLATE_DB)
conn = PGConnectionProxy(cluster, tpl_db)
conn = PGConnectionProxy(
cluster,
source_description="_bootstrap",
dbname=tpl_db
)

tpl_ctx = dataclasses.replace(ctx, conn=conn)
else:
Expand Down Expand Up @@ -2596,7 +2611,8 @@ async def _bootstrap(

sys_conn = PGConnectionProxy(
cluster,
cluster.get_db_name(edbdef.EDGEDB_SYSTEM_DB),
source_description="_bootstrap",
dbname=cluster.get_db_name(edbdef.EDGEDB_SYSTEM_DB),
)

try:
Expand Down Expand Up @@ -2667,7 +2683,10 @@ async def ensure_bootstrapped(
Returns True if bootstrap happened and False if the instance was already
bootstrapped, along with the bootstrap compiler state.
"""
pgconn = PGConnectionProxy(cluster)
pgconn = PGConnectionProxy(
cluster,
source_description="ensure_bootstrapped"
)
ctx = BootstrapContext(cluster=cluster, conn=pgconn, args=args)

try:
Expand Down
26 changes: 12 additions & 14 deletions edb/server/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from edb.server import args as edgedb_args
from edb.server import defines as edgedb_defines
from edb.server import pgconnparams

from . import pgcluster

Expand Down Expand Up @@ -118,7 +119,7 @@ def __init__(
self._runstate_dir = runstate_dir
self._edgedb_cmd.extend(['--runstate-dir', str(runstate_dir)])
self._pg_cluster: Optional[pgcluster.BaseCluster] = None
self._pg_connect_args: Dict[str, Any] = {}
self._pg_connect_args: pgconnparams.CreateParamsKwargs = {}
self._daemon_process: Optional[subprocess.Popen[str]] = None
self._port = port
self._effective_port = None
Expand Down Expand Up @@ -146,7 +147,7 @@ async def get_status(self) -> str:
conn = None
try:
conn = await pg_cluster.connect(
timeout=5,
source_description=f"{self.__class__}.get_status",
**self._pg_connect_args,
)

Expand Down Expand Up @@ -314,37 +315,34 @@ async def test() -> None:
started = time.monotonic()
await test()
left -= (time.monotonic() - started)

if self._admin_query("SELECT ();", f"{max(1, int(left))}s"):
if res := self._admin_query("SELECT ();", f"{max(1, int(left))}s"):
raise ClusterError(
f'could not connect to edgedb-server '
f'within {timeout} seconds') from None
f'within {timeout} seconds (exit code = {res})') from None

def _admin_query(
self,
query: str,
wait_until_available: str = "0s",
) -> int:
return subprocess.call(
[
args = [
"edgedb",
"--host",
"query",
"--unix-path",
str(os.path.abspath(self._runstate_dir)),
"--port",
str(self._effective_port),
"--admin",
"--user",
edgedb_defines.EDGEDB_SUPERUSER,
"--database",
"--branch",
edgedb_defines.EDGEDB_SUPERUSER_DB,
"--wait-until-available",
wait_until_available,
"-c",
query,
],
stdout=subprocess.DEVNULL,
stderr=subprocess.STDOUT,
)
]
res = subprocess.call(args=args)
return res

async def set_test_config(self) -> None:
self._admin_query(f'''
Expand Down
Loading

0 comments on commit a763e48

Please sign in to comment.