diff --git a/Cargo.lock b/Cargo.lock index 70aab8897b1..28cc099a643 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -280,6 +280,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "clap" version = "4.5.16" @@ -339,7 +345,7 @@ dependencies = [ ] [[package]] -name = "conn-pool" +name = "conn_pool" version = "0.1.0" dependencies = [ "anyhow", @@ -351,6 +357,7 @@ dependencies = [ "lru", "pretty_assertions", "pyo3", + "pyo3_util", "rand", "rstest", "scopeguard", @@ -363,7 +370,6 @@ dependencies = [ "thiserror", "tokio", "tracing", - "tracing-subscriber", ] [[package]] @@ -977,12 +983,12 @@ dependencies = [ "eventsource-stream", "futures", "pyo3", + "pyo3_util", "reqwest", "rstest", "scopeguard", "tokio", "tracing", - "tracing-subscriber", ] [[package]] @@ -1341,6 +1347,18 @@ dependencies = [ "tempfile", ] +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -1741,6 +1759,19 @@ dependencies = [ "syn", ] +[[package]] +name = "pyo3_util" +version = "0.1.0" +dependencies = [ + "nix", + "pyo3", + "scopeguard", + "thiserror", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "quote" version = "1.0.37" @@ -1967,6 +1998,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rust_native" +version = "0.1.0" +dependencies = [ + "conn_pool", + "http 0.1.0", + "pgrust", + "pyo3", + "pyo3_util", + "tokio", +] + [[package]] name = "rustc-demangle" version = "0.1.24" diff --git a/Cargo.toml b/Cargo.toml index d5bb17fc60d..d9750b71ba2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,17 +4,23 @@ members = [ "edb/edgeql-parser/edgeql-parser-derive", "edb/edgeql-parser/edgeql-parser-python", "edb/graphql-rewrite", - "edb/server/conn_pool", - "edb/server/pgrust", - "edb/server/http", + "edb/server/_rust_native", + "rust/conn_pool", + "rust/pgrust", + "rust/http", + "rust/pyo3_util" ] resolver = "2" [workspace.dependencies] -pyo3 = { version = "0.23.1", features = ["extension-module", "serde"] } +pyo3 = { version = "0.23.1", features = ["extension-module", "serde", "macros"] } tokio = { version = "1", features = ["rt", "rt-multi-thread", "macros", "time", "sync", "net", "io-util"] } tracing = "0.1.40" -tracing-subscriber = "0.3.18" +tracing-subscriber = { version = "0.3.18", features = ["registry", "env-filter"] } +conn_pool = { path = "rust/conn_pool" } +pgrust = { path = "rust/pgrust" } +http = { path = "rust/http" } +pyo3_util = { path = "rust/pyo3_util" } [profile.release] debug = true diff --git a/edb/graphql-rewrite/src/py_entry.rs b/edb/graphql-rewrite/src/py_entry.rs index 0910c2fd596..d819625c35b 100644 --- a/edb/graphql-rewrite/src/py_entry.rs +++ b/edb/graphql-rewrite/src/py_entry.rs @@ -1,5 +1,5 @@ use pyo3::prelude::*; -use pyo3::types::{PyDict, PyInt, PyList, PyString, PyTuple, PyType}; +use pyo3::types::{PyDict, PyInt, PyList, PyString, PyType}; use edb_graphql_parser::position::Pos; diff --git a/edb/server/_rust_native/Cargo.toml b/edb/server/_rust_native/Cargo.toml new file mode 100644 index 00000000000..08b6927a797 --- /dev/null +++ b/edb/server/_rust_native/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "rust_native" +version = "0.1.0" +license = "MIT/Apache-2.0" +authors = ["MagicStack Inc. "] +edition = "2021" + +[lint] +workspace = true + +[features] +python_extension = ["pyo3/extension-module", "pyo3/serde"] + +[dependencies] +pyo3 = { workspace = true } +tokio.workspace = true +pyo3_util.workspace = true +conn_pool = { workspace = true, features = [ "python_extension" ] } +pgrust = { workspace = true, features = [ "python_extension" ] } +http = { workspace = true, features = [ "python_extension" ] } + +[lib] +crate-type = ["lib", "cdylib"] +path = "src/lib.rs" diff --git a/edb/server/_rust_native/src/lib.rs b/edb/server/_rust_native/src/lib.rs new file mode 100644 index 00000000000..931aabc2bae --- /dev/null +++ b/edb/server/_rust_native/src/lib.rs @@ -0,0 +1,39 @@ +use pyo3::{ + pymodule, + types::{PyAnyMethods, PyModule, PyModuleMethods}, + Bound, PyResult, Python, +}; +use pyo3_util::logging::{get_python_logger_level, initialize_logging_in_thread}; + +const MODULE_PREFIX: &str = "edb.server._rust_native"; + +fn add_child_module( + py: Python, + parent: &Bound, + name: &str, + init_fn: fn(Python, &Bound) -> PyResult<()>, +) -> PyResult<()> { + let full_name = format!("{}.{}", MODULE_PREFIX, name); + let child_module = PyModule::new(py, &full_name)?; + init_fn(py, &child_module)?; + parent.add(name, &child_module)?; + + // Add the child module to the sys.modules dictionary so it can be imported + // by name. + let sys_modules = py.import("sys")?.getattr("modules")?; + sys_modules.set_item(full_name, child_module)?; + Ok(()) +} + +#[pymodule] +fn _rust_native(py: Python, m: &Bound) -> PyResult<()> { + // Initialize any logging in this thread to route to "edb.server" + let level = get_python_logger_level(py, "edb.server")?; + initialize_logging_in_thread("edb.server", level); + + add_child_module(py, m, "_conn_pool", conn_pool::python::_conn_pool)?; + add_child_module(py, m, "_pg_rust", pgrust::python::_pg_rust)?; + add_child_module(py, m, "_http", http::python::_http)?; + + Ok(()) +} diff --git a/edb/server/connpool/pool2.py b/edb/server/connpool/pool2.py index f5b12bdfad5..1b9f749f86d 100644 --- a/edb/server/connpool/pool2.py +++ b/edb/server/connpool/pool2.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -import edb.server._conn_pool +import edb.server._rust_native._conn_pool as _rust import asyncio import time import typing @@ -26,8 +26,6 @@ from .config import logger from edb.server import rust_async_channel -guard = edb.server._conn_pool.LoggingGuard() - # Connections must be hashable because we use them to reverse-lookup # an internal ID. C = typing.TypeVar("C", bound=typing.Hashable) @@ -86,7 +84,7 @@ def __call__(self, stats: Snapshot) -> None: class Pool(typing.Generic[C]): - _pool: edb.server._conn_pool.ConnPool + _pool: _rust.ConnPool _next_conn_id: int _failed_connects: int _failed_disconnects: int @@ -118,12 +116,11 @@ def __init__( logger = config.logger logger.info( - f'Creating a connection pool with \ - max_capacity={max_capacity}' + f'Creating a connection pool with max_capacity={max_capacity}' ) self._connect = connect self._disconnect = disconnect - self._pool = edb.server._conn_pool.ConnPool( + self._pool = _rust.ConnPool( max_capacity, min_idle_time_before_gc, config.STATS_COLLECT_INTERVAL ) self._max_capacity = max_capacity @@ -360,11 +357,11 @@ def _build_snapshot(self, *, now: float) -> Snapshot: v = stats['value'] block_snapshot = BlockSnapshot( dbname=dbname, - nconns=v[edb.server._conn_pool.METRIC_ACTIVE], - nwaiters_avg=v[edb.server._conn_pool.METRIC_WAITING], - npending=v[edb.server._conn_pool.METRIC_CONNECTING] - + v[edb.server._conn_pool.METRIC_RECONNECTING], - nwaiters=v[edb.server._conn_pool.METRIC_WAITING], + nconns=v[_rust.METRIC_ACTIVE], + nwaiters_avg=v[_rust.METRIC_WAITING], + npending=v[_rust.METRIC_CONNECTING] + + v[_rust.METRIC_RECONNECTING], + nwaiters=v[_rust.METRIC_WAITING], quota=stats['target'], ) blocks.append(block_snapshot) diff --git a/edb/server/edbrust/Cargo.toml b/edb/server/edbrust/Cargo.toml deleted file mode 100644 index c5bbd5a77ce..00000000000 --- a/edb/server/edbrust/Cargo.toml +++ /dev/null @@ -1,40 +0,0 @@ -[package] -name = "edbrust" -version = "0.1.0" -license = "MIT/Apache-2.0" -authors = ["MagicStack Inc. "] -edition = "2021" - -[lint] -workspace = true - -[features] -python_extension = ["pyo3/extension-module", "pyo3/serde"] - -[dependencies] -pyo3 = { workspace = true, optional = true } -tokio.workspace = true - -[dependencies.derive_more] -version = "1.0.0-beta.6" -features = ["full"] - -[dev-dependencies] -pretty_assertions = "1.2.0" -test-log = { version = "0", features = ["trace"] } -anyhow = "1" -rstest = "0" -statrs = "0" -lru = "0" -byteorder = "1.5" -clap = "4" -clap_derive = "4" -hex-literal = "0.4" - -[dev-dependencies.tokio] -version = "1" -features = ["test-util"] - -[lib] -crate-type = ["lib", "cdylib"] -path = "src/lib.rs" diff --git a/edb/server/edbrust/edbrust-util/src/lib.rs b/edb/server/edbrust/edbrust-util/src/lib.rs deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/edb/server/edbrust/src/lib.rs b/edb/server/edbrust/src/lib.rs deleted file mode 100644 index e69de29bb2d..00000000000 diff --git a/edb/server/http.py b/edb/server/http.py index ae37b148af2..ed6ef329dba 100644 --- a/edb/server/http.py +++ b/edb/server/http.py @@ -37,7 +37,7 @@ import time from http import HTTPStatus as HTTPStatus -from edb.server._http import Http +from edb.server._rust_native._http import Http from . import rust_async_channel logger = logging.getLogger("edb.server") diff --git a/edb/server/pgcon/rust_transport.py b/edb/server/pgcon/rust_transport.py index f3ae6b81eb4..cc37d724b10 100644 --- a/edb/server/pgcon/rust_transport.py +++ b/edb/server/pgcon/rust_transport.py @@ -37,7 +37,7 @@ from enum import Enum, auto from dataclasses import dataclass -from edb.server import _pg_rust as pgrust +from edb.server._rust_native import _pg_rust as pgrust from edb.server.pgconnparams import ( ConnectionParams, SSLMode, diff --git a/edb/server/pgconnparams.py b/edb/server/pgconnparams.py index 4bd1214ecfd..cd5b7af7ddd 100644 --- a/edb/server/pgconnparams.py +++ b/edb/server/pgconnparams.py @@ -22,7 +22,7 @@ import platform import warnings -from edb.server._pg_rust import PyConnectionParams +from edb.server._rust_native._pg_rust import PyConnectionParams _system = platform.uname().system if _system == 'Windows': diff --git a/edb/server/conn_pool/Cargo.toml b/rust/conn_pool/Cargo.toml similarity index 94% rename from edb/server/conn_pool/Cargo.toml rename to rust/conn_pool/Cargo.toml index 10a5966eae5..450b3ca707f 100644 --- a/edb/server/conn_pool/Cargo.toml +++ b/rust/conn_pool/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "conn-pool" +name = "conn_pool" version = "0.1.0" license = "MIT/Apache-2.0" authors = ["MagicStack Inc. "] @@ -15,13 +15,13 @@ optimizer = ["genetic_algorithm", "lru", "rand", "statrs", "anyhow", "tokio/test [dependencies] pyo3 = { workspace = true, optional = true } tokio.workspace = true +pyo3_util.workspace = true +tracing.workspace = true futures = "0" scopeguard = "1" itertools = "0" thiserror = "1" -tracing = "0" -tracing-subscriber = "0" strum = { version = "0.26", features = ["derive"] } consume_on_drop = "0" smart-default = "0" diff --git a/edb/server/conn_pool/README.md b/rust/conn_pool/README.md similarity index 100% rename from edb/server/conn_pool/README.md rename to rust/conn_pool/README.md diff --git a/edb/server/conn_pool/src/algo.rs b/rust/conn_pool/src/algo.rs similarity index 100% rename from edb/server/conn_pool/src/algo.rs rename to rust/conn_pool/src/algo.rs diff --git a/edb/server/conn_pool/src/bin/optimizer.rs b/rust/conn_pool/src/bin/optimizer.rs similarity index 100% rename from edb/server/conn_pool/src/bin/optimizer.rs rename to rust/conn_pool/src/bin/optimizer.rs diff --git a/edb/server/conn_pool/src/block.rs b/rust/conn_pool/src/block.rs similarity index 100% rename from edb/server/conn_pool/src/block.rs rename to rust/conn_pool/src/block.rs diff --git a/edb/server/conn_pool/src/conn.rs b/rust/conn_pool/src/conn.rs similarity index 100% rename from edb/server/conn_pool/src/conn.rs rename to rust/conn_pool/src/conn.rs diff --git a/edb/server/conn_pool/src/drain.rs b/rust/conn_pool/src/drain.rs similarity index 100% rename from edb/server/conn_pool/src/drain.rs rename to rust/conn_pool/src/drain.rs diff --git a/edb/server/conn_pool/src/lib.rs b/rust/conn_pool/src/lib.rs similarity index 97% rename from edb/server/conn_pool/src/lib.rs rename to rust/conn_pool/src/lib.rs index 7ec8d3ed51f..a18e0ed67cf 100644 --- a/edb/server/conn_pool/src/lib.rs +++ b/rust/conn_pool/src/lib.rs @@ -25,4 +25,4 @@ pub use pool::{Pool, PoolConfig, PoolHandle}; pub mod test; #[cfg(feature = "python_extension")] -mod python; +pub mod python; diff --git a/edb/server/conn_pool/src/metrics.rs b/rust/conn_pool/src/metrics.rs similarity index 100% rename from edb/server/conn_pool/src/metrics.rs rename to rust/conn_pool/src/metrics.rs diff --git a/edb/server/conn_pool/src/pool.rs b/rust/conn_pool/src/pool.rs similarity index 100% rename from edb/server/conn_pool/src/pool.rs rename to rust/conn_pool/src/pool.rs diff --git a/edb/server/conn_pool/src/python.rs b/rust/conn_pool/src/python.rs similarity index 67% rename from edb/server/conn_pool/src/python.rs rename to rust/conn_pool/src/python.rs index 2d05b73e25f..159f0c166e6 100644 --- a/edb/server/conn_pool/src/python.rs +++ b/rust/conn_pool/src/python.rs @@ -7,6 +7,7 @@ use crate::{ use derive_more::{Add, AddAssign}; use futures::future::poll_fn; use pyo3::{exceptions::PyException, prelude::*, types::PyByteArray}; +use pyo3_util::logging::{get_python_logger_level, initialize_logging_in_thread}; use serde_pickle::SerOptions; use std::{ cell::{Cell, RefCell}, @@ -20,8 +21,7 @@ use std::{ }; use strum::IntoEnumIterator; use tokio::{io::AsyncWrite, task::LocalSet}; -use tracing::{error, info, subscriber::DefaultGuard, trace}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing::{error, info, trace}; pyo3::create_exception!(_conn_pool, InternalError, PyException); @@ -50,7 +50,7 @@ impl RustToPythonMessage { Failed(conn, error) => (5, conn, error.0).into_pyobject(py), Metrics(metrics) => { // This is not really fast but it should not be happening very often - (6, PyByteArray::new(py, &metrics)).into_pyobject(py) + (6, PyByteArray::new(py, metrics)).into_pyobject(py) } } .map(|e| e.into()) @@ -79,9 +79,9 @@ type PythonConnId = u64; #[derive(Debug, Default, Clone, Copy, Add, AddAssign, PartialEq, Eq, Hash, PartialOrd, Ord)] struct ConnHandleId(u64); -impl Into> for ConnHandleId { - fn into(self) -> Box<(dyn derive_more::Error + std::marker::Send + Sync + 'static)> { - Box::new(ConnError::Underlying(format!("{self:?}"))) +impl From for Box<(dyn derive_more::Error + std::marker::Send + Sync + 'static)> { + fn from(val: ConnHandleId) -> Self { + Box::new(ConnError::Underlying(format!("{val:?}"))) } } @@ -127,7 +127,7 @@ impl RpcPipe { self.write(msg) .await .map_err(|_| ConnError::Underlying(conn_id))?; - if let Ok(_) = rx.await { + if rx.await.is_ok() { Err(ConnError::Underlying(conn_id)) } else { Ok(ok) @@ -293,46 +293,55 @@ impl ConnPool { /// Create the connection pool and automatically boot a tokio runtime on a /// new thread. When this [`ConnPool`] is GC'd, the thread will be torn down. #[new] - fn new(max_capacity: usize, min_idle_time_before_gc: f64, stats_interval: f64) -> Self { + fn new( + py: Python, + max_capacity: usize, + min_idle_time_before_gc: f64, + stats_interval: f64, + ) -> PyResult { + let level = get_python_logger_level(py, "edb.server.conn_pool")?; let min_idle_time_before_gc = min_idle_time_before_gc as usize; - info!("ConnPool::new(max_capacity={max_capacity}, min_idle_time_before_gc={min_idle_time_before_gc})"); - let (txrp, rxrp) = std::sync::mpsc::channel(); - let (txpr, rxpr) = tokio::sync::mpsc::unbounded_channel(); - let (txfd, rxfd) = std::sync::mpsc::channel(); - - thread::spawn(move || { - info!("Rust-side ConnPool thread booted"); - let rt = tokio::runtime::Builder::new_current_thread() - .enable_time() - .enable_io() - .build() - .unwrap(); - let _guard = rt.enter(); - let (txn, rxn) = tokio::net::unix::pipe::pipe().unwrap(); - let fd = rxn.into_nonblocking_fd().unwrap().into_raw_fd() as u64; - txfd.send(fd).unwrap(); - let local = LocalSet::new(); - - let rpc_pipe = RpcPipe { - python_to_rust: rxpr.into(), - rust_to_python: txrp, - rust_to_python_notify: txn.into(), - next_id: Default::default(), - handles: Default::default(), - async_ops: Default::default(), - }; - - let config = PoolConfig::suggested_default_for(max_capacity) - .with_min_idle_time_for_gc(Duration::from_secs(min_idle_time_before_gc as _)); - local.block_on(&rt, run_and_block(config, rpc_pipe, stats_interval)); + let new = py.allow_threads(|| { + let (txrp, rxrp) = std::sync::mpsc::channel(); + let (txpr, rxpr) = tokio::sync::mpsc::unbounded_channel(); + let (txfd, rxfd) = std::sync::mpsc::channel(); + thread::spawn(move || { + initialize_logging_in_thread("edb.server.conn_pool", level); + info!("ConnPool::new(max_capacity={max_capacity}, min_idle_time_before_gc={min_idle_time_before_gc})"); + info!("Rust-side ConnPool thread booted"); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_time() + .enable_io() + .build() + .unwrap(); + let _guard = rt.enter(); + let (txn, rxn) = tokio::net::unix::pipe::pipe().unwrap(); + let fd = rxn.into_nonblocking_fd().unwrap().into_raw_fd() as u64; + txfd.send(fd).unwrap(); + let local = LocalSet::new(); + + let rpc_pipe = RpcPipe { + python_to_rust: rxpr.into(), + rust_to_python: txrp, + rust_to_python_notify: txn.into(), + next_id: Default::default(), + handles: Default::default(), + async_ops: Default::default(), + }; + + let config = PoolConfig::suggested_default_for(max_capacity) + .with_min_idle_time_for_gc(Duration::from_secs(min_idle_time_before_gc as _)); + local.block_on(&rt, run_and_block(config, rpc_pipe, stats_interval)); + }); + + let notify_fd = rxfd.recv().unwrap(); + ConnPool { + python_to_rust: txpr, + rust_to_python: Mutex::new(rxrp), + notify_fd, + } }); - - let notify_fd = rxfd.recv().unwrap(); - ConnPool { - python_to_rust: txpr, - rust_to_python: Mutex::new(rxrp), - notify_fd, - } + Ok(new) } #[getter] @@ -407,102 +416,9 @@ impl ConnPool { } } -/// Ensure that logging does not outlive the Python runtime. -#[pyclass] -struct LoggingGuard { - #[allow(unused)] - guard: DefaultGuard, -} - -#[pymethods] -impl LoggingGuard { - #[new] - fn init_logging(py: Python) -> PyResult { - let logging = py.import("logging")?; - let logger = logging.getattr("getLogger")?.call(("edb.server",), None)?; - let level = logger - .getattr("getEffectiveLevel")? - .call((), None)? - .extract::()?; - - struct PythonSubscriber { - logger: Py, - } - - impl tracing_subscriber::Layer for PythonSubscriber { - fn on_event( - &self, - event: &tracing::Event, - _ctx: tracing_subscriber::layer::Context, - ) { - let mut message = format!("[{}] ", event.metadata().target()); - #[derive(Default)] - struct Visitor(String); - impl tracing::field::Visit for Visitor { - fn record_debug( - &mut self, - field: &tracing::field::Field, - value: &dyn std::fmt::Debug, - ) { - if field.name() == "message" { - self.0 += &format!("{value:?} "); - } else { - self.0 += &format!("{}={:?} ", field.name(), value) - } - } - } - - let mut visitor = Visitor::default(); - event.record(&mut visitor); - message += &visitor.0; - - Python::with_gil(|py| { - let Ok(log) = (match *event.metadata().level() { - tracing::Level::TRACE => self.logger.getattr(py, "debug"), - tracing::Level::DEBUG => self.logger.getattr(py, "warning"), - tracing::Level::INFO => self.logger.getattr(py, "info"), - tracing::Level::WARN => self.logger.getattr(py, "warning"), - tracing::Level::ERROR => self.logger.getattr(py, "error"), - }) else { - return; - }; - // This may fail - _ = log.call1(py, (message,)); - }); - } - } - - let level = if level < 10 { - tracing_subscriber::filter::LevelFilter::TRACE - } else if level <= 10 { - tracing_subscriber::filter::LevelFilter::DEBUG - } else if level <= 20 { - tracing_subscriber::filter::LevelFilter::INFO - } else if level <= 30 { - tracing_subscriber::filter::LevelFilter::WARN - } else if level <= 40 { - tracing_subscriber::filter::LevelFilter::ERROR - } else { - tracing_subscriber::filter::LevelFilter::OFF - }; - - let subscriber = PythonSubscriber { - logger: logger.into(), - }; - let guard = tracing_subscriber::registry() - .with(level) - .with(subscriber) - .set_default(); - - tracing::info!("ConnPool initialized (level = {level})"); - Ok(LoggingGuard { guard }) - } -} - #[pymodule] -fn _conn_pool(py: Python, m: &Bound) -> PyResult<()> { +pub fn _conn_pool(py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; - m.add_class::()?; m.add("InternalError", py.get_type::())?; // Add each metric variant as a constant diff --git a/edb/server/conn_pool/src/test/mod.rs b/rust/conn_pool/src/test/mod.rs similarity index 100% rename from edb/server/conn_pool/src/test/mod.rs rename to rust/conn_pool/src/test/mod.rs diff --git a/edb/server/conn_pool/src/test/spec.rs b/rust/conn_pool/src/test/spec.rs similarity index 100% rename from edb/server/conn_pool/src/test/spec.rs rename to rust/conn_pool/src/test/spec.rs diff --git a/edb/server/conn_pool/src/waitqueue.rs b/rust/conn_pool/src/waitqueue.rs similarity index 100% rename from edb/server/conn_pool/src/waitqueue.rs rename to rust/conn_pool/src/waitqueue.rs diff --git a/edb/server/http/Cargo.toml b/rust/http/Cargo.toml similarity index 92% rename from edb/server/http/Cargo.toml rename to rust/http/Cargo.toml index 0557767b227..bba08eabf02 100644 --- a/edb/server/http/Cargo.toml +++ b/rust/http/Cargo.toml @@ -14,8 +14,9 @@ python_extension = ["pyo3/extension-module"] [dependencies] pyo3 = { workspace = true, optional = true } tokio.workspace = true -tracing = "0" -tracing-subscriber = "0" +pyo3_util.workspace = true +tracing.workspace = true + reqwest = { version = "0.12", features = ["gzip", "deflate", "stream"] } scopeguard = "1" eventsource-stream = "0.2.3" diff --git a/edb/server/http/src/lib.rs b/rust/http/src/lib.rs similarity index 69% rename from edb/server/http/src/lib.rs rename to rust/http/src/lib.rs index 83610f1acca..71429ec8ff9 100644 --- a/edb/server/http/src/lib.rs +++ b/rust/http/src/lib.rs @@ -1,2 +1,2 @@ #[cfg(feature = "python_extension")] -mod python; +pub mod python; diff --git a/edb/server/http/src/python.rs b/rust/http/src/python.rs similarity index 98% rename from edb/server/http/src/python.rs rename to rust/http/src/python.rs index 83e113afd76..03832f2b00b 100644 --- a/edb/server/http/src/python.rs +++ b/rust/http/src/python.rs @@ -1,6 +1,7 @@ use eventsource_stream::Eventsource; use futures::{future::poll_fn, TryStreamExt}; use pyo3::{exceptions::PyException, prelude::*, types::PyByteArray}; +use pyo3_util::logging::{get_python_logger_level, initialize_logging_in_thread}; use reqwest::Method; use scopeguard::{defer, guard, ScopeGuard}; use std::{ @@ -163,6 +164,7 @@ async fn request_bytes( Ok((status, body, headers)) } +#[allow(clippy::too_many_arguments)] async fn request_sse( client: reqwest::Client, id: PythonConnId, @@ -317,6 +319,7 @@ impl PermitManager { self.assert_valid(); } + #[allow(clippy::comparison_chain)] fn update_limit(&self, new_limit: usize) { let mut counts = self.counts.lock().unwrap(); let old_capacity = counts.capacity; @@ -431,7 +434,7 @@ async fn run_and_block(capacity: usize, rpc_pipe: RpcPipe) { _ => (None, None), }; let task = tokio::task::spawn_local(execute( - id.clone(), + id, backpressure.clone(), tasks.clone(), rpc, @@ -449,6 +452,7 @@ async fn run_and_block(capacity: usize, rpc_pipe: RpcPipe) { } } +#[allow(clippy::too_many_arguments)] async fn execute( id: Option, backpressure: Option>, @@ -550,7 +554,9 @@ impl Http { /// Create the HTTP pool and automatically boot a tokio runtime on a /// new thread. When this class is GC'd, the thread will be torn down. #[new] - fn new(max_capacity: usize) -> Self { + fn new(py: Python, max_capacity: usize) -> PyResult { + let level = get_python_logger_level(py, "edgedb.server.http")?; + info!("Http::new(max_capacity={max_capacity})"); let (txrp, rxrp) = std::sync::mpsc::channel(); let (txpr, rxpr) = tokio::sync::mpsc::unbounded_channel(); @@ -559,6 +565,7 @@ impl Http { thread::Builder::new() .name("edgedb-http".to_string()) .spawn(move || { + initialize_logging_in_thread("edgedb.server.http", level); defer!(info!("Rust-side Http thread exiting")); info!("Rust-side Http thread booted"); let rt = tokio::runtime::Builder::new_current_thread() @@ -583,11 +590,11 @@ impl Http { .expect("Failed to create HTTP thread"); let notify_fd = rxfd.recv().unwrap(); - Http { + Ok(Http { python_to_rust: txpr, rust_to_python: Mutex::new(rxrp), notify_fd, - } + }) } #[getter] @@ -664,7 +671,7 @@ impl Http { } #[pymodule] -fn _http(py: Python, m: &Bound) -> PyResult<()> { +pub fn _http(py: Python, m: &Bound) -> PyResult<()> { m.add_class::()?; m.add("InternalError", py.get_type::())?; diff --git a/edb/server/pgrust/Cargo.toml b/rust/pgrust/Cargo.toml similarity index 98% rename from edb/server/pgrust/Cargo.toml rename to rust/pgrust/Cargo.toml index af621fb73fc..c23310dd1ae 100644 --- a/edb/server/pgrust/Cargo.toml +++ b/rust/pgrust/Cargo.toml @@ -15,13 +15,12 @@ optimizer = [] [dependencies] pyo3.workspace = true tokio.workspace = true +tracing.workspace = true futures = "0" scopeguard = "1" itertools = "0" thiserror = "1" -tracing = "0" -tracing-subscriber = "0" strum = { version = "0.26", features = ["derive"] } consume_on_drop = "0" smart-default = "0" @@ -63,6 +62,7 @@ hex-literal = "0.4" tempfile = "3" socket2 = "0.5.7" libc = "0.2.158" +tracing-subscriber = "0" [dev-dependencies.tokio] version = "1" diff --git a/edb/server/pgrust/examples/connect.rs b/rust/pgrust/examples/connect.rs similarity index 100% rename from edb/server/pgrust/examples/connect.rs rename to rust/pgrust/examples/connect.rs diff --git a/edb/server/pgrust/examples/dsn.rs b/rust/pgrust/examples/dsn.rs similarity index 100% rename from edb/server/pgrust/examples/dsn.rs rename to rust/pgrust/examples/dsn.rs diff --git a/edb/server/pgrust/src/auth/md5.rs b/rust/pgrust/src/auth/md5.rs similarity index 100% rename from edb/server/pgrust/src/auth/md5.rs rename to rust/pgrust/src/auth/md5.rs diff --git a/edb/server/pgrust/src/auth/mod.rs b/rust/pgrust/src/auth/mod.rs similarity index 100% rename from edb/server/pgrust/src/auth/mod.rs rename to rust/pgrust/src/auth/mod.rs diff --git a/edb/server/pgrust/src/auth/scram.rs b/rust/pgrust/src/auth/scram.rs similarity index 100% rename from edb/server/pgrust/src/auth/scram.rs rename to rust/pgrust/src/auth/scram.rs diff --git a/edb/server/pgrust/src/auth/stringprep.rs b/rust/pgrust/src/auth/stringprep.rs similarity index 100% rename from edb/server/pgrust/src/auth/stringprep.rs rename to rust/pgrust/src/auth/stringprep.rs diff --git a/edb/server/pgrust/src/auth/stringprep_table.rs b/rust/pgrust/src/auth/stringprep_table.rs similarity index 100% rename from edb/server/pgrust/src/auth/stringprep_table.rs rename to rust/pgrust/src/auth/stringprep_table.rs diff --git a/edb/server/pgrust/src/auth/stringprep_table_prep.py b/rust/pgrust/src/auth/stringprep_table_prep.py similarity index 100% rename from edb/server/pgrust/src/auth/stringprep_table_prep.py rename to rust/pgrust/src/auth/stringprep_table_prep.py diff --git a/edb/server/pgrust/src/connection/conn.rs b/rust/pgrust/src/connection/conn.rs similarity index 100% rename from edb/server/pgrust/src/connection/conn.rs rename to rust/pgrust/src/connection/conn.rs diff --git a/edb/server/pgrust/src/connection/dsn/host.rs b/rust/pgrust/src/connection/dsn/host.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/host.rs rename to rust/pgrust/src/connection/dsn/host.rs diff --git a/edb/server/pgrust/src/connection/dsn/mod.rs b/rust/pgrust/src/connection/dsn/mod.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/mod.rs rename to rust/pgrust/src/connection/dsn/mod.rs diff --git a/edb/server/pgrust/src/connection/dsn/params.rs b/rust/pgrust/src/connection/dsn/params.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/params.rs rename to rust/pgrust/src/connection/dsn/params.rs diff --git a/edb/server/pgrust/src/connection/dsn/passfile.rs b/rust/pgrust/src/connection/dsn/passfile.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/passfile.rs rename to rust/pgrust/src/connection/dsn/passfile.rs diff --git a/edb/server/pgrust/src/connection/dsn/raw_params.rs b/rust/pgrust/src/connection/dsn/raw_params.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/raw_params.rs rename to rust/pgrust/src/connection/dsn/raw_params.rs diff --git a/edb/server/pgrust/src/connection/dsn/url.rs b/rust/pgrust/src/connection/dsn/url.rs similarity index 100% rename from edb/server/pgrust/src/connection/dsn/url.rs rename to rust/pgrust/src/connection/dsn/url.rs diff --git a/edb/server/pgrust/src/connection/mod.rs b/rust/pgrust/src/connection/mod.rs similarity index 100% rename from edb/server/pgrust/src/connection/mod.rs rename to rust/pgrust/src/connection/mod.rs diff --git a/edb/server/pgrust/src/connection/openssl.rs b/rust/pgrust/src/connection/openssl.rs similarity index 97% rename from edb/server/pgrust/src/connection/openssl.rs rename to rust/pgrust/src/connection/openssl.rs index de575001706..4d01fe1288a 100644 --- a/edb/server/pgrust/src/connection/openssl.rs +++ b/rust/pgrust/src/connection/openssl.rs @@ -110,7 +110,7 @@ mod tests { #[test] fn create_ssl() { - let cert_path = Path::new("../../../tests/certs").canonicalize().unwrap(); + let cert_path = Path::new("../../tests/certs").canonicalize().unwrap(); let ssl = SslContextBuilder::new(SslMethod::tls()).unwrap(); let ssl = create_ssl_client_context( diff --git a/edb/server/pgrust/src/connection/raw_conn.rs b/rust/pgrust/src/connection/raw_conn.rs similarity index 100% rename from edb/server/pgrust/src/connection/raw_conn.rs rename to rust/pgrust/src/connection/raw_conn.rs diff --git a/edb/server/pgrust/src/connection/stream.rs b/rust/pgrust/src/connection/stream.rs similarity index 100% rename from edb/server/pgrust/src/connection/stream.rs rename to rust/pgrust/src/connection/stream.rs diff --git a/edb/server/pgrust/src/connection/tokio.rs b/rust/pgrust/src/connection/tokio.rs similarity index 100% rename from edb/server/pgrust/src/connection/tokio.rs rename to rust/pgrust/src/connection/tokio.rs diff --git a/edb/server/pgrust/src/errors/mod.rs b/rust/pgrust/src/errors/mod.rs similarity index 100% rename from edb/server/pgrust/src/errors/mod.rs rename to rust/pgrust/src/errors/mod.rs diff --git a/edb/server/pgrust/src/handshake/client_state_machine.rs b/rust/pgrust/src/handshake/client_state_machine.rs similarity index 100% rename from edb/server/pgrust/src/handshake/client_state_machine.rs rename to rust/pgrust/src/handshake/client_state_machine.rs diff --git a/edb/server/pgrust/src/handshake/mod.rs b/rust/pgrust/src/handshake/mod.rs similarity index 100% rename from edb/server/pgrust/src/handshake/mod.rs rename to rust/pgrust/src/handshake/mod.rs diff --git a/edb/server/pgrust/src/handshake/server_state_machine.rs b/rust/pgrust/src/handshake/server_state_machine.rs similarity index 100% rename from edb/server/pgrust/src/handshake/server_state_machine.rs rename to rust/pgrust/src/handshake/server_state_machine.rs diff --git a/edb/server/pgrust/src/lib.rs b/rust/pgrust/src/lib.rs similarity index 100% rename from edb/server/pgrust/src/lib.rs rename to rust/pgrust/src/lib.rs diff --git a/edb/server/pgrust/src/protocol/arrays.rs b/rust/pgrust/src/protocol/arrays.rs similarity index 100% rename from edb/server/pgrust/src/protocol/arrays.rs rename to rust/pgrust/src/protocol/arrays.rs diff --git a/edb/server/pgrust/src/protocol/buffer.rs b/rust/pgrust/src/protocol/buffer.rs similarity index 100% rename from edb/server/pgrust/src/protocol/buffer.rs rename to rust/pgrust/src/protocol/buffer.rs diff --git a/edb/server/pgrust/src/protocol/datatypes.rs b/rust/pgrust/src/protocol/datatypes.rs similarity index 100% rename from edb/server/pgrust/src/protocol/datatypes.rs rename to rust/pgrust/src/protocol/datatypes.rs diff --git a/edb/server/pgrust/src/protocol/definition.rs b/rust/pgrust/src/protocol/definition.rs similarity index 100% rename from edb/server/pgrust/src/protocol/definition.rs rename to rust/pgrust/src/protocol/definition.rs diff --git a/edb/server/pgrust/src/protocol/gen.rs b/rust/pgrust/src/protocol/gen.rs similarity index 100% rename from edb/server/pgrust/src/protocol/gen.rs rename to rust/pgrust/src/protocol/gen.rs diff --git a/edb/server/pgrust/src/protocol/message_group.rs b/rust/pgrust/src/protocol/message_group.rs similarity index 100% rename from edb/server/pgrust/src/protocol/message_group.rs rename to rust/pgrust/src/protocol/message_group.rs diff --git a/edb/server/pgrust/src/protocol/mod.rs b/rust/pgrust/src/protocol/mod.rs similarity index 100% rename from edb/server/pgrust/src/protocol/mod.rs rename to rust/pgrust/src/protocol/mod.rs diff --git a/edb/server/pgrust/src/protocol/writer.rs b/rust/pgrust/src/protocol/writer.rs similarity index 100% rename from edb/server/pgrust/src/protocol/writer.rs rename to rust/pgrust/src/protocol/writer.rs diff --git a/edb/server/pgrust/src/python.rs b/rust/pgrust/src/python.rs similarity index 99% rename from edb/server/pgrust/src/python.rs rename to rust/pgrust/src/python.rs index 2931a212b23..749fe61fd04 100644 --- a/edb/server/pgrust/src/python.rs +++ b/rust/pgrust/src/python.rs @@ -90,6 +90,7 @@ impl PyConnectionParams { } #[getter] + #[allow(clippy::type_complexity)] pub fn host_candidates( &self, py: Python, @@ -354,7 +355,7 @@ impl PyConnectionState { let buffer = PyBuffer::::get(data)?; if self.inner.read_ssl_response() { // SSL responses are always one character - let response = [buffer.as_slice(py).unwrap().get(0).unwrap().get()]; + let response = [buffer.as_slice(py).unwrap().first().unwrap().get()]; let response = SSLResponse::new(&response)?; self.inner .drive(ConnectionDrive::SslResponse(response), &mut self.update)?; diff --git a/edb/server/pgrust/tests/edgedb_test_cases.rs b/rust/pgrust/tests/edgedb_test_cases.rs similarity index 100% rename from edb/server/pgrust/tests/edgedb_test_cases.rs rename to rust/pgrust/tests/edgedb_test_cases.rs diff --git a/edb/server/pgrust/tests/hardcore_host_tests_cases.rs b/rust/pgrust/tests/hardcore_host_tests_cases.rs similarity index 100% rename from edb/server/pgrust/tests/hardcore_host_tests_cases.rs rename to rust/pgrust/tests/hardcore_host_tests_cases.rs diff --git a/edb/server/pgrust/tests/libpq_test_cases.rs b/rust/pgrust/tests/libpq_test_cases.rs similarity index 100% rename from edb/server/pgrust/tests/libpq_test_cases.rs rename to rust/pgrust/tests/libpq_test_cases.rs diff --git a/edb/server/pgrust/tests/real_postgres.rs b/rust/pgrust/tests/real_postgres.rs similarity index 100% rename from edb/server/pgrust/tests/real_postgres.rs rename to rust/pgrust/tests/real_postgres.rs diff --git a/edb/server/pgrust/tests/test_util/dsn_libpq.rs b/rust/pgrust/tests/test_util/dsn_libpq.rs similarity index 100% rename from edb/server/pgrust/tests/test_util/dsn_libpq.rs rename to rust/pgrust/tests/test_util/dsn_libpq.rs diff --git a/edb/server/pgrust/tests/test_util/mod.rs b/rust/pgrust/tests/test_util/mod.rs similarity index 100% rename from edb/server/pgrust/tests/test_util/mod.rs rename to rust/pgrust/tests/test_util/mod.rs diff --git a/edb/server/edbrust/edbrust-util/Cargo.toml b/rust/pyo3_util/Cargo.toml similarity index 57% rename from edb/server/edbrust/edbrust-util/Cargo.toml rename to rust/pyo3_util/Cargo.toml index 6c6866a603d..ea097ebbeb6 100644 --- a/edb/server/edbrust/edbrust-util/Cargo.toml +++ b/rust/pyo3_util/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "edbrust-rust" +name = "pyo3_util" version = "0.1.0" license = "MIT/Apache-2.0" authors = ["MagicStack Inc. "] @@ -8,12 +8,15 @@ edition = "2021" [lint] workspace = true -[features] -python_extension = ["pyo3/extension-module", "pyo3/serde"] - [dependencies] -pyo3 = { workspace = true, optional = true } +pyo3.workspace = true tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +nix = { version = "0.29", features = ["fs"] } + +scopeguard = "1" +thiserror = "1" [lib] crate-type = ["lib", "cdylib"] diff --git a/rust/pyo3_util/src/lib.rs b/rust/pyo3_util/src/lib.rs new file mode 100644 index 00000000000..31348d2fdac --- /dev/null +++ b/rust/pyo3_util/src/lib.rs @@ -0,0 +1 @@ +pub mod logging; diff --git a/rust/pyo3_util/src/logging.rs b/rust/pyo3_util/src/logging.rs new file mode 100644 index 00000000000..6c6564f8193 --- /dev/null +++ b/rust/pyo3_util/src/logging.rs @@ -0,0 +1,244 @@ +use std::cell::RefCell; +use std::os::fd::IntoRawFd; +use std::sync::{Mutex, OnceLock}; +use std::time::Duration; + +use pyo3::{ + prelude::*, + types::{PyAnyMethods, PyDict}, + PyResult, Python, +}; +use scopeguard::defer; +use tracing::{subscriber::DefaultGuard, Dispatch, Level}; +use tracing_subscriber::{filter::LevelFilter, layer::SubscriberExt}; + +/// A useful tool for debugging logging. +#[macro_export] +macro_rules! debug_log_method { + ($method_name:expr, $($arg:tt)*) => { + if is_debug_enabled() { + debug_log!($($arg)*); + defer! { + debug_log!("{} exited", $method_name); + } + } + }; +} + +/// A simple debug logging macro that prints to stderr if debug logging is enabled. +#[macro_export] +macro_rules! debug_log { + ($($arg:tt)*) => { + if is_debug_enabled() { + eprint!("LOGGING [{}]: ", std::process::id()); + eprintln!($($arg)*); + } + }; +} + +/// Initializes logging for the current thread. This function should be called +/// at the start of any new thread that needs to use logging. +/// +/// Important: logging from threads involves a write to a socket and taking the GIL, +/// any may have performance impacts when logging is enabled. Disabled logging is +/// nearly free, however. +pub fn initialize_logging_in_thread(python_package: &'static str, level: LevelFilter) { + debug_log_method!( + "initialize_logging_in_thread", + "Initializing logging in thread {python_package:?}" + ); + thread_local! { + static GUARD: RefCell> = const { RefCell::new(None) }; + } + GUARD.with(|g| { + debug_log_method!("initialize_logging_in_thread", "Initializing logger bridge"); + let unix_socket = get_logging_socket(); + + debug_log!("Got logging socket"); + + let logger_bridge = LoggerBridge { + unix_socket, + buffer: Mutex::new([0; 65536]), + python_package, + }; + + let dispatch = Dispatch::new( + tracing_subscriber::registry() + .with(level) + .with(logger_bridge), + ); + *g.borrow_mut() = Some(tracing::dispatcher::set_default(&dispatch)); + }); +} + +static EDGEDB_RUST_PYTHON_LOGGER_DEBUG: OnceLock = OnceLock::new(); + +fn is_debug_enabled() -> bool { + *EDGEDB_RUST_PYTHON_LOGGER_DEBUG.get_or_init(|| { + std::env::var("EDGEDB_RUST_PYTHON_LOGGER_DEBUG") + .map(|v| v == "1") + .unwrap_or(false) + }) +} + +/// The Python script that spawns the log reader thread. This runs within a blocking I/O thread +/// to avoid interaction with the asyncio event loop. +const THREAD_SCRIPT: &std::ffi::CStr = cr#" +def spawn_log_reader(fd): + import socket + import threading + + def _log_reader(sock): + import socket + import struct + import logging + + log_cache = {} + + while True: + try: + # Receive entire datagram at once + data, _ = sock.recvfrom(65536) # Max UDP datagram size + if not data: + break + + # Parse level (first 4 bytes) in big-endian + level = struct.unpack('>I', data[:4])[0] + + # Parse first string length and data in big-endian + str1_len = struct.unpack('>I', data[4:8])[0] + logger_name = data[8:8+str1_len].decode('utf-8') + + # Parse second string length and data in big-endian + str2_start = 8 + str1_len + str2_len = struct.unpack('>I', data[str2_start:str2_start+4])[0] + msg = data[str2_start+4:str2_start+4+str2_len].decode('utf-8') + + logger = log_cache.get(logger_name, None); + if logger is None: + log_cache[logger_name] = logger = logging.getLogger(logger_name) + logger.log(level, msg) + + except socket.error: + break + sock.close() + + sock = socket.fromfd(fd, socket.AF_UNIX, socket.SOCK_DGRAM) + thread = threading.Thread(target=_log_reader, args=(sock,), daemon=True, name="Python/Rust logging bridge") + thread.start() + +try: + spawn_log_reader(fd) +except Exception as e: + import traceback + traceback.print_exc() + raise +"#; + +fn python_to_rust_level(level: i32) -> LevelFilter { + match level { + ..10 => LevelFilter::TRACE, + 10 => LevelFilter::DEBUG, + 11..=20 => LevelFilter::INFO, + 21..=30 => LevelFilter::WARN, + 31..=40 => LevelFilter::ERROR, + _ => LevelFilter::OFF, + } +} + +/// Call this from the thread that is running the Python interpreter. +pub fn get_python_logger_level(py: Python, python_package: &'static str) -> PyResult { + let logging = py.import("logging")?; + let logger = logging.call_method("getLogger", (python_package,), None)?; + let level = logger.call_method("getEffectiveLevel", (), None)?; + debug_log!("Python logger '{python_package}' level = {level:?}"); + Ok(python_to_rust_level(level.extract::()?)) +} + +struct LoggerBridge { + unix_socket: std::os::unix::net::UnixDatagram, + buffer: Mutex<[u8; 65536]>, + python_package: &'static str, +} + +impl tracing_subscriber::Layer for LoggerBridge { + fn on_event( + &self, + event: &tracing::Event<'_>, + _ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + debug_log_method!("on_event", "LoggerBridge on_event called: {event:?}"); + let mut message = format!("[{}] ", event.metadata().target()); + #[derive(Default)] + struct Visitor(String); + impl tracing::field::Visit for Visitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "message" { + self.0 += &format!("{value:?} "); + } else { + self.0 += &format!("{}={:?} ", field.name(), value) + } + } + } + + let mut visitor = Visitor::default(); + event.record(&mut visitor); + message += &visitor.0; + + // Clamp the length of message to 4kB + let message = &message[..message.len().min(4096)]; + + let level = match *event.metadata().level() { + Level::TRACE => 5, // NOTSET + Level::DEBUG => 10, // DEBUG + Level::INFO => 20, // INFO + Level::WARN => 30, // WARNING + Level::ERROR => 40, // ERROR + }; + + let logger = self.python_package; + let mut lock = self.buffer.lock().unwrap(); + + // Write the level, logger, and message to the buffer + let buf = lock.as_mut_slice(); + buf[..4].copy_from_slice(&(level as u32).to_be_bytes()); + buf[4..8].copy_from_slice(&(logger.len() as u32).to_be_bytes()); + buf[8..8 + logger.len()].copy_from_slice(logger.as_bytes()); + let str2_start = 8 + logger.len(); + buf[str2_start..str2_start + 4].copy_from_slice(&(message.len() as u32).to_be_bytes()); + buf[str2_start + 4..str2_start + 4 + message.len()].copy_from_slice(message.as_bytes()); + + let total_len = str2_start + 4 + message.len(); + + _ = self.unix_socket.send(&buf[..total_len]); + } +} + +static LOGGING_WRITER: OnceLock = OnceLock::new(); + +fn get_logging_socket() -> std::os::unix::net::UnixDatagram { + debug_log_method!("get_logging_socket", "Getting logging socket"); + let tx = LOGGING_WRITER + .get_or_init(|| { + debug_log_method!("get_logging_socket", "Creating logging socket"); + let (tx, rx) = + std::os::unix::net::UnixDatagram::pair().expect("Failed to create logging socket"); + let rx = rx.into_raw_fd(); + Python::with_gil(|py| { + debug_log_method!("get_logging_socket", "Running thread script"); + let locals = PyDict::new(py); + locals + .set_item("fd", rx) + .expect("Failed to set fd in locals"); + py.run(THREAD_SCRIPT, None, Some(&locals)) + .expect("Failed to run thread script"); + }); + tx + }) + .try_clone() + .expect("Failed to clone logging socket"); + + tx.set_write_timeout(Some(Duration::from_secs(10))) + .expect("Failed to set write timeout"); + tx +} diff --git a/setup.py b/setup.py index a3353112526..a69e044329a 100644 --- a/setup.py +++ b/setup.py @@ -1182,20 +1182,8 @@ def _version(): binding=setuptools_rust.Binding.PyO3, ), setuptools_rust.RustExtension( - "edb.server._conn_pool", - path="edb/server/conn_pool/Cargo.toml", - features=["python_extension"], - binding=setuptools_rust.Binding.PyO3, - ), - setuptools_rust.RustExtension( - "edb.server._pg_rust", - path="edb/server/pgrust/Cargo.toml", - features=["python_extension"], - binding=setuptools_rust.Binding.PyO3, - ), - setuptools_rust.RustExtension( - "edb.server._http", - path="edb/server/http/Cargo.toml", + "edb.server._rust_native", + path="edb/server/_rust_native/Cargo.toml", features=["python_extension"], binding=setuptools_rust.Binding.PyO3, ),