Skip to content

Commit

Permalink
fix(dashboard): show creating job edges in bp graph (#19066)
Browse files Browse the repository at this point in the history
  • Loading branch information
kwannoel authored Oct 23, 2024
1 parent fd975e0 commit e6f830b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 45 deletions.
2 changes: 1 addition & 1 deletion src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl StreamManagerService for StreamServiceImpl {
let dependencies = self
.metadata_manager
.catalog_controller
.list_object_dependencies()
.list_created_object_dependencies()
.await?;

Ok(Response::new(ListObjectDependenciesResponse {
Expand Down
111 changes: 68 additions & 43 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -672,25 +672,42 @@ impl CatalogController {
Ok(tables)
}

pub async fn list_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
let inner = self.inner.read().await;
pub async fn list_all_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
self.list_object_dependencies(true).await
}

let dependencies: Vec<(ObjectId, ObjectId)> = ObjectDependency::find()
.select_only()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object1.def(),
)
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(streaming_job::Column::JobStatus.eq(JobStatus::Created))
.into_tuple()
.all(&inner.db)
.await?;
pub async fn list_created_object_dependencies(&self) -> MetaResult<Vec<PbObjectDependencies>> {
self.list_object_dependencies(false).await
}

async fn list_object_dependencies(
&self,
include_creating: bool,
) -> MetaResult<Vec<PbObjectDependencies>> {
let inner = self.inner.read().await;

let dependencies: Vec<(ObjectId, ObjectId)> = {
let filter = if include_creating {
Expr::value(true)
} else {
streaming_job::Column::JobStatus.eq(JobStatus::Created)
};
ObjectDependency::find()
.select_only()
.columns([
object_dependency::Column::Oid,
object_dependency::Column::UsedBy,
])
.join(
JoinType::InnerJoin,
object_dependency::Relation::Object1.def(),
)
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(filter)
.into_tuple()
.all(&inner.db)
.await?
};
let mut obj_dependencies = dependencies
.into_iter()
.map(|(oid, used_by)| PbObjectDependencies {
Expand Down Expand Up @@ -721,43 +738,51 @@ impl CatalogController {
}
}));

let sink_dependencies: Vec<(SinkId, TableId)> = Sink::find()
.select_only()
.columns([sink::Column::SinkId, sink::Column::TargetTable])
.join(JoinType::InnerJoin, sink::Relation::Object.def())
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(
let sink_dependencies: Vec<(SinkId, TableId)> = {
let filter = if include_creating {
sink::Column::TargetTable.is_not_null()
} else {
streaming_job::Column::JobStatus
.eq(JobStatus::Created)
.and(sink::Column::TargetTable.is_not_null()),
)
.into_tuple()
.all(&inner.db)
.await?;

.and(sink::Column::TargetTable.is_not_null())
};
Sink::find()
.select_only()
.columns([sink::Column::SinkId, sink::Column::TargetTable])
.join(JoinType::InnerJoin, sink::Relation::Object.def())
.join(JoinType::InnerJoin, object::Relation::StreamingJob.def())
.filter(filter)
.into_tuple()
.all(&inner.db)
.await?
};
obj_dependencies.extend(sink_dependencies.into_iter().map(|(sink_id, table_id)| {
PbObjectDependencies {
object_id: table_id as _,
referenced_object_id: sink_id as _,
}
}));

let subscription_dependencies: Vec<(SubscriptionId, TableId)> = Subscription::find()
.select_only()
.columns([
subscription::Column::SubscriptionId,
subscription::Column::DependentTableId,
])
.join(JoinType::InnerJoin, subscription::Relation::Object.def())
.filter(
let subscription_dependencies: Vec<(SubscriptionId, TableId)> = {
let filter = if include_creating {
subscription::Column::DependentTableId.is_not_null()
} else {
subscription::Column::SubscriptionState
.eq(Into::<i32>::into(SubscriptionState::Created))
.and(subscription::Column::DependentTableId.is_not_null()),
)
.into_tuple()
.all(&inner.db)
.await?;

.and(subscription::Column::DependentTableId.is_not_null())
};
Subscription::find()
.select_only()
.columns([
subscription::Column::SubscriptionId,
subscription::Column::DependentTableId,
])
.join(JoinType::InnerJoin, subscription::Relation::Object.def())
.filter(filter)
.into_tuple()
.all(&inner.db)
.await?
};
obj_dependencies.extend(subscription_dependencies.into_iter().map(
|(subscription_id, table_id)| PbObjectDependencies {
object_id: subscription_id as _,
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/dashboard/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub(super) mod handlers {
let object_dependencies = srv
.metadata_manager
.catalog_controller
.list_object_dependencies()
.list_all_object_dependencies()
.await
.map_err(err)?;

Expand Down

0 comments on commit e6f830b

Please sign in to comment.