Skip to content

Commit 8003c86

Browse files
committed
Add initial SQL support
Additionally, this commit introduces(a.k.a. mixes) a number of changes. 1. `TabletClient::scope` to namespace keys, and `KvClient` to unify kv operations. Resolves #16. 2. Use `Status::details` to carry `BatchError`. We probably should enhance it to full featured grpc compatible solution. Resolves #17. Resolves #24.
1 parent 9336944 commit 8003c86

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+6535
-798
lines changed

Cargo.toml

+21-4
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@ documentation = "https://docs.rs/seamdb"
2727

2828
[dependencies]
2929
anyhow = { version = "1.0", features = ["backtrace"] }
30-
async-trait = "0.1.72"
30+
async-trait = "0.1.81"
3131
bytesize = "1.2.0"
3232
compact_str = "0.7.1"
33-
derivative = "2.2.0"
3433
hashlink = "0.8.3"
3534
rdkafka = "0.34.0"
3635
tokio = {version = "1.30.0", features = ["full"]}
@@ -55,20 +54,38 @@ bytemuck = { version = "1.13.1", features = ["derive"] }
5554
futures = "0.3.28"
5655
thiserror = "1.0.48"
5756
tokio-stream = "0.1.15"
58-
asyncs = "0.3.0"
57+
datafusion = "43.0.0"
58+
pgwire = "0.27.0"
59+
derive-where = "1.2.7"
60+
asyncs = { version = "0.3.0", features = ["tokio"] }
5961
async-io = "2.3.4"
62+
bytes = "1.7.2"
63+
pg_query = "5.1.1"
64+
lazy_static = "1.5.0"
65+
lazy-init = "0.5.1"
66+
enum_dispatch = "0.3.13"
67+
jiff = "0.1.15"
68+
clap = { version = "4.5.23", features = ["derive"] }
69+
tracing-appender = "0.2.3"
70+
tracing-subscriber = { version = "0.3.19", features = ["tracing-log", "env-filter", "std"] }
6071

6172
[dev-dependencies]
6273
assertor = "0.0.2"
6374
asyncs = { version = "0.3.0", features = ["test", "tokio"] }
64-
env_logger = "0.10.0"
75+
env_logger = "0.11.5"
6576
serial_test = "2.0.0"
6677
speculoos = "0.11.0"
6778
test-case = "3.1.0"
6879
test-log = "0.2.12"
6980
testcontainers = "0.14.0"
7081
tracing-test = "0.2.4"
7182

83+
[profile.dev]
84+
lto = "thin"
85+
86+
[profile.release]
87+
lto = "thin"
88+
7289
[workspace]
7390
members = ["src/protos/build"]
7491

Makefile

+5-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,11 @@ lint:
3535
cargo clippy --no-deps -- -D clippy::all
3636

3737
build:
38-
cargo build --tests
38+
cargo build --tests --bins
39+
release:
40+
cargo build --tests --bins --release
3941

4042
test:
4143
cargo test
44+
clean:
45+
cargo clean

src/bin/seamdbd.rs

+104
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright 2023 The SeamDB Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::net::SocketAddr;
16+
use std::time::Duration;
17+
18+
use anyhow::{anyhow, Result};
19+
use clap::Parser;
20+
use pgwire::tokio::process_socket;
21+
use seamdb::cluster::{ClusterEnv, EtcdClusterMetaDaemon, EtcdNodeRegistry, NodeId};
22+
use seamdb::endpoint::{Endpoint, Params, ServiceUri};
23+
use seamdb::log::{KafkaLogFactory, LogManager, MemoryLogFactory};
24+
use seamdb::protos::TableDescriptor;
25+
use seamdb::sql::postgres::PostgresqlHandlerFactory;
26+
use seamdb::tablet::{TabletClient, TabletNode};
27+
use tokio::net::{TcpListener, TcpStream};
28+
use tracing::{info, instrument};
29+
use tracing_subscriber::prelude::*;
30+
use tracing_subscriber::{fmt, EnvFilter};
31+
32+
async fn new_log_manager(uri: ServiceUri<'_>) -> Result<LogManager> {
33+
match uri.scheme() {
34+
"memory" => LogManager::new(MemoryLogFactory::new(), &MemoryLogFactory::ENDPOINT, &Params::default()).await,
35+
"kafka" => LogManager::new(KafkaLogFactory {}, &uri.endpoint(), uri.params()).await,
36+
scheme => Err(anyhow!("unsupported log schema: {}, supported: memory, kafka", scheme)),
37+
}
38+
}
39+
40+
#[instrument(skip_all, fields(addr = %addr))]
41+
async fn serve_connection(factory: PostgresqlHandlerFactory, stream: TcpStream, addr: SocketAddr) {
42+
match process_socket(stream, None, factory).await {
43+
Ok(_) => info!("connection terminated"),
44+
Err(err) => info!("connection terminated: {err}"),
45+
}
46+
}
47+
48+
#[derive(Parser, Debug)]
49+
#[command(version, about, long_about = None)]
50+
pub struct Args {
51+
/// Meta cluster uri to store cluster wide metadata, e.g. etcd://etcd-cluster/scope.
52+
#[arg(long = "cluster.uri")]
53+
cluster_uri: String,
54+
/// Cluster name.
55+
#[arg(long = "cluster.name", default_value = "seamdb")]
56+
cluster_name: String,
57+
/// Log cluster uri to store WAL logs, e.g. kafka://kafka-cluster.
58+
#[arg(long = "log.uri")]
59+
log_uri: String,
60+
/// Port to serve PostgreSQL compatible SQL statements.
61+
#[arg(long = "sql.postgresql.port", default_value_t = 5432)]
62+
pgsql_port: u16,
63+
}
64+
65+
#[tokio::main]
66+
async fn main() {
67+
let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());
68+
69+
tracing_subscriber::registry()
70+
.with(fmt::layer().with_writer(non_blocking).with_level(true).with_file(true).with_line_number(true))
71+
.with(EnvFilter::from_default_env())
72+
.init();
73+
74+
let args = Args::parse();
75+
let cluster_uri = ServiceUri::parse(&args.cluster_uri).unwrap();
76+
let log_uri = ServiceUri::parse(&args.log_uri).unwrap();
77+
78+
let node_id = NodeId::new_random();
79+
info!("Starting node {node_id}");
80+
81+
let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
82+
let address = format!("http://{}", listener.local_addr().unwrap());
83+
let endpoint = Endpoint::try_from(address.as_str()).unwrap();
84+
let (nodes, lease) =
85+
EtcdNodeRegistry::join(cluster_uri.clone(), node_id.clone(), Some(endpoint.to_owned())).await.unwrap();
86+
let log_manager = new_log_manager(log_uri).await.unwrap();
87+
let cluster_env = ClusterEnv::new(log_manager.into(), nodes).with_replicas(1);
88+
let mut cluster_meta_handle =
89+
EtcdClusterMetaDaemon::start(args.cluster_name, cluster_uri.clone(), cluster_env.clone()).await.unwrap();
90+
let descriptor_watcher = cluster_meta_handle.watch_descriptor(None).await.unwrap();
91+
let deployment_watcher = cluster_meta_handle.watch_deployment(None).await.unwrap();
92+
let cluster_env = cluster_env.with_descriptor(descriptor_watcher).with_deployment(deployment_watcher.monitor());
93+
let _node = TabletNode::start(node_id, listener, lease, cluster_env.clone());
94+
let client = TabletClient::new(cluster_env).scope(TableDescriptor::POSTGRESQL_DIALECT_PREFIX);
95+
tokio::time::sleep(Duration::from_secs(20)).await;
96+
97+
let factory = PostgresqlHandlerFactory::new(client);
98+
let listener = TcpListener::bind(format!("0.0.0.0:{}", args.pgsql_port)).await.unwrap();
99+
info!("Listening on {} ...", listener.local_addr().unwrap());
100+
loop {
101+
let (stream, addr) = listener.accept().await.unwrap();
102+
tokio::spawn(serve_connection(factory.clone(), stream, addr));
103+
}
104+
}

src/clock.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use std::ops::{Add, Sub};
1616
use std::sync::Arc;
1717
use std::time::{Duration, SystemTime};
1818

19+
use jiff::Timestamp as JiffTimestamp;
1920
use static_assertions::{assert_impl_all, assert_not_impl_any};
2021

2122
pub use crate::protos::Timestamp;
@@ -88,7 +89,15 @@ impl SystemTimeClock {
8889

8990
impl std::fmt::Display for Timestamp {
9091
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91-
write!(f, "{:?}", self)
92+
if let Some(sequence) = self.get_txn_sequence() {
93+
return write!(f, "txn-seq-{}", sequence);
94+
}
95+
let ts = JiffTimestamp::new(self.seconds as i64, self.nanoseconds as i32).unwrap();
96+
if self.logical == 0 {
97+
write!(f, "{}", ts)
98+
} else {
99+
write!(f, "{}-{}", ts, self.logical)
100+
}
92101
}
93102
}
94103

@@ -101,6 +110,13 @@ impl Timestamp {
101110
self.seconds == 0 && self.nanoseconds == 0 && self.logical == 0
102111
}
103112

113+
pub const fn get_txn_sequence(&self) -> Option<u32> {
114+
match self.seconds & 0x8000000000000000 != 0 {
115+
true => Some(self.seconds as u32),
116+
false => None,
117+
}
118+
}
119+
104120
pub const fn txn_sequence(sequence: u32) -> Self {
105121
Self { seconds: 0x8000000000000000 + sequence as u64, nanoseconds: 0, logical: 0 }
106122
}

src/cluster/etcd.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl EtcdLease {
4646
}
4747
}
4848

49-
pub(super) enum EtcdHelper {}
49+
pub enum EtcdHelper {}
5050

5151
impl EtcdHelper {
5252
pub async fn connect(endpoint: Endpoint<'_>, params: &Params<'_>) -> Result<Client> {
@@ -129,6 +129,8 @@ pub mod tests {
129129
container: Container<'static, GenericImage>,
130130
}
131131

132+
unsafe impl Send for EtcdContainer {}
133+
132134
impl EtcdContainer {
133135
pub fn uri(&self) -> ServiceUri<'static> {
134136
let cluster = format!("etcd://127.0.0.1:{}", self.container.get_host_port_ipv4(2379));

src/cluster/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
mod env;
16-
mod etcd;
16+
pub mod etcd;
1717
mod meta;
1818
mod node;
1919

src/endpoint.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ pub struct Params<'a> {
443443
/// Owned version of [Params].
444444
pub type OwnedParams = Params<'static>;
445445

446-
impl<'a> Params<'a> {
446+
impl Params<'_> {
447447
fn new(map: LinkedHashMap<CompactString, CompactString>) -> Self {
448448
Self { map, _marker: std::marker::PhantomData }
449449
}

0 commit comments

Comments
 (0)