Skip to content

Commit

Permalink
feat: switch to using create table procedure (#1861)
Browse files Browse the repository at this point in the history
* feat: switch to using create table procedure

* fix: add missing table_id and fix uncaught error

* refactor: remove unused code and metrics

* chore: apply suggestions from CR

* chore: remove unused attributes

* feat: add info log and metrics

* fix: fix conflicts
  • Loading branch information
WenyXu authored Jul 10, 2023
1 parent 0018188 commit b31fad5
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 131 deletions.
9 changes: 8 additions & 1 deletion src/frontend/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ use store_api::storage::RegionNumber;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Execute the operation timeout, source: {}", source))]
Timeout {
location: Location,
source: tokio::time::error::Elapsed,
},

#[snafu(display("Failed to handle heartbeat response, source: {}", source))]
HandleHeartbeatResponse {
location: Location,
Expand Down Expand Up @@ -633,7 +639,8 @@ impl ErrorExt for Error {
| Error::FindRegionRoute { .. }
| Error::BuildDfLogicalPlan { .. }
| Error::BuildTableMeta { .. }
| Error::VectorToGrpcColumn { .. } => StatusCode::Internal,
| Error::VectorToGrpcColumn { .. }
| Error::Timeout { .. } => StatusCode::Internal,

Error::IncompleteGrpcResult { .. }
| Error::ContextValueNotFound { .. }
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ impl Instance {
.enable_router()
.enable_store()
.enable_heartbeat()
.enable_ddl()
.channel_manager(channel_manager)
.build();
meta_client
Expand Down
129 changes: 26 additions & 103 deletions src/frontend/src/instance/distributed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub(crate) mod inserter;

use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use api::helper::ColumnDataTypeWrapper;
use api::v1::ddl_request::Expr as DdlExpr;
Expand All @@ -33,14 +34,14 @@ use client::Database;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_catalog::format_full_table_name;
use common_error::prelude::BoxedError;
use common_meta::rpc::ddl::{DdlTask, SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::router::{
CreateRequest as MetaCreateRequest, DeleteRequest as MetaDeleteRequest,
Partition as MetaPartition, RouteRequest, RouteResponse,
DeleteRequest as MetaDeleteRequest, Partition as MetaPartition, RouteRequest,
};
use common_meta::rpc::store::CompareAndPutRequest;
use common_meta::table_name::TableName;
use common_query::Output;
use common_telemetry::{debug, info, warn};
use common_telemetry::{debug, info};
use datanode::instance::sql::table_idents_to_full_name;
use datanode::sql::SqlHandler;
use datatypes::prelude::ConcreteDataType;
Expand All @@ -63,6 +64,7 @@ use table::metadata::{RawTableInfo, RawTableMeta, TableIdent, TableType};
use table::requests::TableOptions;
use table::table::AlterContext;
use table::TableRef;
use tokio::time::timeout;

use crate::catalog::FrontendCatalogManager;
use crate::error::{
Expand Down Expand Up @@ -98,17 +100,6 @@ impl DistInstance {
}
}

async fn find_table(&self, table_name: &TableName) -> Result<Option<TableRef>> {
self.catalog_manager
.table(
&table_name.catalog_name,
&table_name.schema_name,
&table_name.table_name,
)
.await
.context(CatalogSnafu)
}

pub async fn create_table(
&self,
create_table: &mut CreateTableExpr,
Expand All @@ -121,56 +112,14 @@ impl DistInstance {
&create_table.table_name,
);

if let Some(table) = self.find_table(&table_name).await? {
return if create_table.create_if_not_exists {
Ok(table)
} else {
TableAlreadyExistSnafu {
table: table_name.to_string(),
}
.fail()
};
}

let mut table_info = create_table_info(create_table)?;

let response = self
.create_table_in_meta(create_table, partitions, &table_info)
.await;
let response = match response {
Ok(response) => response,
Err(e) => {
return if let Some(table) = self.find_table(&table_name).await? {
warn!("Table '{table_name}' is created concurrently by other Frontend nodes!");
Ok(table)
} else {
Err(e)
}
}
};
let resp = self
.create_table_procedure(create_table, partitions, table_info.clone())
.await?;

let table_routes = response.table_routes;
ensure!(
table_routes.len() == 1,
error::CreateTableRouteSnafu {
table_name: create_table.table_name.to_string()
}
);
let table_route = table_routes.first().unwrap();
info!(
"Creating distributed table {table_name} with table routes: {}",
serde_json::to_string_pretty(table_route)
.unwrap_or_else(|_| format!("{table_route:#?}"))
);
let region_routes = &table_route.region_routes;
ensure!(
!region_routes.is_empty(),
error::FindRegionRouteSnafu {
table_name: create_table.table_name.to_string()
}
);

let table_id = table_route.table.id as u32;
let table_id = resp.table_id;
info!("Successfully created distributed table '{table_name}' with table id {table_id}");
table_info.ident.table_id = table_id;
let table_info = Arc::new(table_info.try_into().context(error::CreateTableInfoSnafu)?);

Expand Down Expand Up @@ -199,26 +148,6 @@ impl DistInstance {
}
);

for datanode in table_route.find_leaders() {
let client = self.datanode_clients.get_client(&datanode).await;
let client = Database::new(&table_name.catalog_name, &table_name.schema_name, client);

let regions = table_route.find_leader_regions(&datanode);
let mut create_expr_for_region = create_table.clone();
create_expr_for_region.region_numbers = regions;

debug!(
"Creating table {:?} on Datanode {:?} with regions {:?}",
create_table, datanode, create_expr_for_region.region_numbers,
);

let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_DATANODE);
let _ = client
.create(create_expr_for_region)
.await
.context(RequestDatanodeSnafu)?;
}

// Since the table information created on meta does not go through KvBackend, so we
// manually invalidate the cache here.
//
Expand Down Expand Up @@ -554,33 +483,27 @@ impl DistInstance {
Ok(Output::AffectedRows(0))
}

async fn create_table_in_meta(
async fn create_table_procedure(
&self,
create_table: &CreateTableExpr,
partitions: Option<Partitions>,
table_info: &RawTableInfo,
) -> Result<RouteResponse> {
let _timer = common_telemetry::timer!(crate::metrics::DIST_CREATE_TABLE_IN_META);
let mut catalog_name = create_table.catalog_name.clone();
if catalog_name.is_empty() {
catalog_name = DEFAULT_CATALOG_NAME.to_string();
}
let mut schema_name = create_table.schema_name.clone();
if schema_name.is_empty() {
schema_name = DEFAULT_SCHEMA_NAME.to_string();
}
let table_name = TableName::new(catalog_name, schema_name, create_table.table_name.clone());

table_info: RawTableInfo,
) -> Result<SubmitDdlTaskResponse> {
let partitions = parse_partitions(create_table, partitions)?;
let request = MetaCreateRequest {
table_name,
partitions,
table_info,
let partitions = partitions.into_iter().map(Into::into).collect();

let request = SubmitDdlTaskRequest {
task: DdlTask::new_create_table(create_table.clone(), partitions, table_info),
};
self.meta_client
.create_route(request)
.await
.context(RequestMetaSnafu)

timeout(
// TODO(weny): makes timeout configurable.
Duration::from_secs(10),
self.meta_client.submit_ddl_task(request),
)
.await
.context(error::TimeoutSnafu)?
.context(error::RequestMetaSnafu)
}

async fn handle_dist_insert(
Expand Down
2 changes: 0 additions & 2 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ pub(crate) const METRIC_RUN_SCRIPT_ELAPSED: &str = "frontend.run_script_elapsed"
/// frontend metrics
/// Metrics for creating table in dist mode.
pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table";
pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta";
pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode";
pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows";

/// The samples count of Prometheus remote write.
Expand Down
29 changes: 27 additions & 2 deletions src/meta-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ mod store;

use api::v1::meta::Role;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::rpc::ddl::{SubmitDdlTaskRequest, SubmitDdlTaskResponse};
use common_meta::rpc::lock::{LockRequest, LockResponse, UnlockRequest};
use common_meta::rpc::router::{CreateRequest, DeleteRequest, RouteRequest, RouteResponse};
use common_meta::rpc::store::{
Expand Down Expand Up @@ -185,11 +186,14 @@ impl MetaClient {
client.start(urls.clone()).await?;
info!("Store client started");
}

if let Some(client) = &mut self.lock {
client.start(urls).await?;
client.start(urls.clone()).await?;
info!("Lock client started");
}
if let Some(client) = &mut self.ddl {
client.start(urls).await?;
info!("Ddl client started");
}

Ok(())
}
Expand Down Expand Up @@ -348,6 +352,20 @@ impl MetaClient {
Ok(())
}

pub async fn submit_ddl_task(
&self,
req: SubmitDdlTaskRequest,
) -> Result<SubmitDdlTaskResponse> {
let res = self
.ddl_client()?
.submit_ddl_task(req.try_into().context(error::ConvertMetaRequestSnafu)?)
.await?
.try_into()
.context(error::ConvertMetaResponseSnafu)?;

Ok(res)
}

#[inline]
pub fn heartbeat_client(&self) -> Result<HeartbeatClient> {
self.heartbeat.clone().context(error::NotStartedSnafu {
Expand Down Expand Up @@ -376,6 +394,13 @@ impl MetaClient {
})
}

#[inline]
pub fn ddl_client(&self) -> Result<DdlClient> {
self.ddl
.clone()
.context(error::NotStartedSnafu { name: "ddl_client" })
}

#[inline]
pub fn channel_config(&self) -> &ChannelConfig {
self.channel_manager.config()
Expand Down
4 changes: 0 additions & 4 deletions src/meta-client/src/client/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,10 @@ use crate::error;
use crate::error::Result;

#[derive(Clone, Debug)]
// TODO(weny): removes this in following PRs.
#[allow(unused)]
pub struct Client {
inner: Arc<RwLock<Inner>>,
}

// TODO(weny): removes this in following PRs.
#[allow(dead_code)]
impl Client {
pub fn new(id: Id, role: Role, channel_manager: ChannelManager) -> Self {
let inner = Arc::new(RwLock::new(Inner {
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::sync::Arc;

use api::v1::meta::cluster_server::ClusterServer;
use api::v1::meta::ddl_task_server::DdlTaskServer;
use api::v1::meta::heartbeat_server::HeartbeatServer;
use api::v1::meta::lock_server::LockServer;
use api::v1::meta::router_server::RouterServer;
Expand Down Expand Up @@ -147,6 +148,7 @@ pub fn router(meta_srv: MetaSrv) -> Router {
.add_service(StoreServer::new(meta_srv.clone()))
.add_service(ClusterServer::new(meta_srv.clone()))
.add_service(LockServer::new(meta_srv.clone()))
.add_service(DdlTaskServer::new(meta_srv.clone()))
.add_service(admin::make_admin_service(meta_srv))
}

Expand Down
2 changes: 0 additions & 2 deletions src/meta-srv/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ pub struct DdlManager {
server_addr: String,
}

// TODO(weny): removes in following PRs.
#[allow(unused)]
#[derive(Clone)]
pub(crate) struct DdlContext {
pub(crate) kv_store: KvStoreRef,
Expand Down
10 changes: 9 additions & 1 deletion src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub struct MetaSrvBuilder {
meta_peer_client: Option<MetaPeerClientRef>,
lock: Option<DistLockRef>,
metadata_service: Option<MetadataServiceRef>,
datanode_clients: Option<Arc<DatanodeClients>>,
}

impl MetaSrvBuilder {
Expand All @@ -67,6 +68,7 @@ impl MetaSrvBuilder {
options: None,
lock: None,
metadata_service: None,
datanode_clients: None,
}
}

Expand Down Expand Up @@ -115,6 +117,11 @@ impl MetaSrvBuilder {
self
}

pub fn datanode_clients(mut self, clients: Arc<DatanodeClients>) -> Self {
self.datanode_clients = Some(clients);
self
}

pub async fn build(self) -> Result<MetaSrv> {
let started = Arc::new(AtomicBool::new(false));

Expand All @@ -128,6 +135,7 @@ impl MetaSrvBuilder {
handler_group,
lock,
metadata_service,
datanode_clients,
} = self;

let options = options.unwrap_or_default();
Expand Down Expand Up @@ -162,7 +170,7 @@ impl MetaSrvBuilder {
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
datanode_clients.unwrap_or_else(|| Arc::new(DatanodeClients::default())),
mailbox.clone(),
options.server_addr.clone(),
));
Expand Down
5 changes: 5 additions & 0 deletions src/meta-srv/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,8 @@ pub(crate) const METRIC_META_KV_REQUEST: &str = "meta.kv_request";
pub(crate) const METRIC_META_ROUTE_REQUEST: &str = "meta.route_request";
pub(crate) const METRIC_META_HEARTBEAT_CONNECTION_NUM: &str = "meta.heartbeat_connection_num";
pub(crate) const METRIC_META_HANDLER_EXECUTE: &str = "meta.handler_execute";

pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_META: &str =
"meta.procedure.create_table.create_meta";
pub(crate) const METRIC_META_CREATE_TABLE_PROCEDURE_CREATE_TABLE: &str =
"meta.procedure.create_table.create_table";
Loading

0 comments on commit b31fad5

Please sign in to comment.