Skip to content

Commit

Permalink
feat(stream): create cdc table reader and source data stream with ret…
Browse files Browse the repository at this point in the history
…ry (#19467)
  • Loading branch information
StrikeW authored Dec 3, 2024
1 parent e333787 commit 404998e
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 88 deletions.
22 changes: 5 additions & 17 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,8 @@ use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, Field,
use risingwave_common::types::{Datum, JsonbVal};
use risingwave_common::util::epoch::{test_epoch, EpochExt};
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::cdc::external::mock_external_table::MockExternalTableReader;
use risingwave_connector::source::cdc::external::mysql::MySqlOffset;
use risingwave_connector::source::cdc::external::{
DebeziumOffset, DebeziumSourceOffset, ExternalTableReaderImpl, SchemaTableName,
CdcTableType, DebeziumOffset, DebeziumSourceOffset, ExternalTableConfig, SchemaTableName,
};
use risingwave_connector::source::cdc::DebeziumCdcSplit;
use risingwave_connector::source::SplitImpl;
Expand Down Expand Up @@ -160,19 +158,6 @@ async fn test_cdc_backfill() -> StreamResult<()> {
MockOffsetGenExecutor::new(source).boxed(),
);

let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];

let table_name = SchemaTableName {
schema_name: "public".to_string(),
table_name: "mock_table".to_string(),
Expand All @@ -183,11 +168,14 @@ async fn test_cdc_backfill() -> StreamResult<()> {
]);
let table_pk_indices = vec![0];
let table_pk_order_types = vec![OrderType::ascending()];
let config = ExternalTableConfig::default();

let external_table = ExternalStorageTable::new(
TableId::new(1234),
table_name,
"mydb".to_string(),
ExternalTableReaderImpl::Mock(MockExternalTableReader::new(binlog_watermarks)),
config,
CdcTableType::Mock,
table_schema.clone(),
table_pk_order_types,
table_pk_indices.clone(),
Expand Down
14 changes: 13 additions & 1 deletion src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,19 @@ pub struct MockExternalTableReader {
}

impl MockExternalTableReader {
pub fn new(binlog_watermarks: Vec<MySqlOffset>) -> Self {
pub fn new() -> Self {
let binlog_file = String::from("1.binlog");
// mock binlog watermarks for backfill
// initial low watermark: 1.binlog, pos=2 and expected behaviors:
// - ignore events before (1.binlog, pos=2);
// - apply events in the range of (1.binlog, pos=2, 1.binlog, pos=4) to the snapshot
let binlog_watermarks = vec![
MySqlOffset::new(binlog_file.clone(), 2), // binlog low watermark
MySqlOffset::new(binlog_file.clone(), 4),
MySqlOffset::new(binlog_file.clone(), 6),
MySqlOffset::new(binlog_file.clone(), 8),
MySqlOffset::new(binlog_file.clone(), 10),
];
Self {
binlog_watermarks,
snapshot_cnt: AtomicUsize::new(0),
Expand Down
6 changes: 4 additions & 2 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,10 @@ use crate::source::cdc::external::sql_server::{
use crate::source::cdc::CdcSourceType;
use crate::WithPropertiesExt;

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum CdcTableType {
Undefined,
Mock,
MySql,
Postgres,
SqlServer,
Expand Down Expand Up @@ -97,6 +98,7 @@ impl CdcTableType {
Self::SqlServer => Ok(ExternalTableReaderImpl::SqlServer(
SqlServerExternalTableReader::new(config, schema, pk_indices).await?,
)),
Self::Mock => Ok(ExternalTableReaderImpl::Mock(MockExternalTableReader::new())),
_ => bail!("invalid external table type: {:?}", *self),
}
}
Expand Down Expand Up @@ -214,7 +216,7 @@ pub enum ExternalTableReaderImpl {
Mock(MockExternalTableReader),
}

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Default, Clone, Deserialize)]
pub struct ExternalTableConfig {
pub connector: String,

Expand Down
103 changes: 91 additions & 12 deletions src/stream/src/executor/backfill/cdc/cdc_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::Pin;

use either::Either;
Expand All @@ -27,9 +28,11 @@ use risingwave_connector::parser::{
ByteStreamSourceParser, DebeziumParser, DebeziumProps, EncodingProperties, JsonProperties,
ProtocolProperties, SourceStreamChunkBuilder, SpecificParserConfig,
};
use risingwave_connector::source::cdc::external::CdcOffset;
use risingwave_connector::source::cdc::external::{CdcOffset, ExternalTableReaderImpl};
use risingwave_connector::source::{SourceColumnDesc, SourceContext};
use rw_futures_util::pausable;
use thiserror_ext::AsReport;
use tracing::Instrument;

use crate::executor::backfill::cdc::state::CdcBackfillState;
use crate::executor::backfill::cdc::upstream_table::external::ExternalStorageTable;
Expand All @@ -42,6 +45,7 @@ use crate::executor::backfill::utils::{
use crate::executor::backfill::CdcScanOptions;
use crate::executor::monitor::CdcBackfillMetrics;
use crate::executor::prelude::*;
use crate::executor::source::get_infinite_backoff_strategy;
use crate::executor::UpdateMutation;
use crate::task::CreateMviewProgressReporter;

Expand Down Expand Up @@ -140,7 +144,6 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
let upstream_table_name = self.external_table.qualified_table_name();
let schema_table_name = self.external_table.schema_table_name().clone();
let external_database_name = self.external_table.database_name().to_owned();
let upstream_table_reader = UpstreamTableReader::new(self.external_table);

let additional_columns = self
.output_columns
Expand Down Expand Up @@ -168,29 +171,85 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// if not, we should bypass the backfill directly.
let mut state_impl = self.state_impl;

let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();

state_impl.init_epoch(first_barrier_epoch).await?;

// restore backfill state
let state = state_impl.restore_state().await?;
current_pk_pos = state.current_pk_pos.clone();

let to_backfill = !self.options.disable_backfill && !state.is_finished;
let need_backfill = !self.options.disable_backfill && !state.is_finished;

// Keep track of rows from the snapshot.
let mut total_snapshot_row_count = state.row_count as u64;

// After init the state table and forward the initial barrier to downstream,
// we now try to create the table reader with retry.
// If backfill hasn't finished, we can ignore upstream cdc events before we create the table reader;
// If backfill is finished, we should forward the upstream cdc events to downstream.
let mut table_reader: Option<ExternalTableReaderImpl> = None;
let external_table = self.external_table.clone();
let mut future = Box::pin(async move {
let backoff = get_infinite_backoff_strategy();
tokio_retry::Retry::spawn(backoff, || async {
match external_table.create_table_reader().await {
Ok(reader) => Ok(reader),
Err(e) => {
tracing::warn!(error = %e.as_report(), "failed to create cdc table reader, retrying...");
Err(e)
}
}
})
.instrument(tracing::info_span!("create_cdc_table_reader_with_retry"))
.await
.expect("Retry create cdc table reader until success.")
});
loop {
if let Some(msg) =
build_reader_and_poll_upstream(&mut upstream, &mut table_reader, &mut future)
.await?
{
match msg {
Message::Barrier(barrier) => {
// commit state to bump the epoch of state table
state_impl.commit_state(barrier.epoch).await?;
yield Message::Barrier(barrier);
}
Message::Chunk(chunk) => {
if need_backfill {
// ignore chunk if we need backfill, since we can read the data from the snapshot
} else {
// forward the chunk to downstream
yield Message::Chunk(chunk);
}
}
Message::Watermark(_) => {
// ignore watermark
}
}
} else {
assert!(table_reader.is_some(), "table reader must created");
tracing::info!(
table_id,
upstream_table_name,
"table reader created successfully"
);
break;
}
}

let upstream_table_reader = UpstreamTableReader::new(
self.external_table.clone(),
table_reader.expect("table reader must created"),
);

let mut upstream = transform_upstream(upstream, &self.output_columns)
.boxed()
.peekable();
let mut last_binlog_offset: Option<CdcOffset> = state
.last_cdc_offset
.map_or(upstream_table_reader.current_cdc_offset().await?, Some);

let offset_parse_func = upstream_table_reader
.inner()
.table_reader()
.get_cdc_offset_parser();
let offset_parse_func = upstream_table_reader.reader.get_cdc_offset_parser();
let mut consumed_binlog_offset: Option<CdcOffset> = None;

tracing::info!(
Expand Down Expand Up @@ -227,7 +286,7 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
// finished.
//
// Once the backfill loop ends, we forward the upstream directly to the downstream.
if to_backfill {
if need_backfill {
// drive the upstream changelog first to ensure we can receive timely changelog event,
// otherwise the upstream changelog may be blocked by the snapshot read stream
let _ = Pin::new(&mut upstream).peek().await;
Expand Down Expand Up @@ -702,6 +761,26 @@ impl<S: StateStore> CdcBackfillExecutor<S> {
}
}

async fn build_reader_and_poll_upstream(
upstream: &mut BoxedMessageStream,
table_reader: &mut Option<ExternalTableReaderImpl>,
future: &mut Pin<Box<impl Future<Output = ExternalTableReaderImpl>>>,
) -> StreamExecutorResult<Option<Message>> {
if table_reader.is_some() {
return Ok(None);
}
tokio::select! {
biased;
reader = &mut *future => {
*table_reader = Some(reader);
Ok(None)
}
msg = upstream.next() => {
msg.transpose()
}
}
}

#[try_stream(ok = Message, error = StreamExecutorError)]
pub async fn transform_upstream(upstream: BoxedMessageStream, output_columns: &[ColumnDesc]) {
let props = SpecificParserConfig {
Expand Down
36 changes: 30 additions & 6 deletions src/stream/src/executor/backfill/cdc/upstream_table/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@

use risingwave_common::catalog::{Schema, TableId};
use risingwave_common::util::sort_util::OrderType;
use risingwave_connector::source::cdc::external::{ExternalTableReaderImpl, SchemaTableName};
use risingwave_connector::error::ConnectorResult;
use risingwave_connector::source::cdc::external::{
CdcOffset, CdcTableType, ExternalTableConfig, ExternalTableReader, ExternalTableReaderImpl,
SchemaTableName,
};

/// This struct represents an external table to be read during backfill
#[derive(Debug, Clone)]
pub struct ExternalStorageTable {
/// Id for this table.
table_id: TableId,
Expand All @@ -28,7 +33,9 @@ pub struct ExternalStorageTable {

database_name: String,

table_reader: ExternalTableReaderImpl,
config: ExternalTableConfig,

table_type: CdcTableType,

/// The schema of the output columns, i.e., this table VIEWED BY some executor like
/// `RowSeqScanExecutor`.
Expand All @@ -43,14 +50,16 @@ pub struct ExternalStorageTable {
}

impl ExternalStorageTable {
#[allow(clippy::too_many_arguments)]
pub fn new(
table_id: TableId,
SchemaTableName {
table_name,
schema_name,
}: SchemaTableName,
database_name: String,
table_reader: ExternalTableReaderImpl,
config: ExternalTableConfig,
table_type: CdcTableType,
schema: Schema,
pk_order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
Expand All @@ -60,7 +69,8 @@ impl ExternalStorageTable {
table_name,
schema_name,
database_name,
table_reader,
config,
table_type,
schema,
pk_order_types,
pk_indices,
Expand Down Expand Up @@ -90,8 +100,14 @@ impl ExternalStorageTable {
}
}

pub fn table_reader(&self) -> &ExternalTableReaderImpl {
&self.table_reader
pub async fn create_table_reader(&self) -> ConnectorResult<ExternalTableReaderImpl> {
self.table_type
.create_table_reader(
self.config.clone(),
self.schema.clone(),
self.pk_indices.clone(),
)
.await
}

pub fn qualified_table_name(&self) -> String {
Expand All @@ -101,4 +117,12 @@ impl ExternalStorageTable {
pub fn database_name(&self) -> &str {
self.database_name.as_str()
}

pub async fn current_cdc_offset(
&self,
table_reader: &ExternalTableReaderImpl,
) -> ConnectorResult<Option<CdcOffset>> {
let binlog = table_reader.current_cdc_offset().await?;
Ok(Some(binlog))
}
}
Loading

0 comments on commit 404998e

Please sign in to comment.