diff --git a/rocketmq-client/src/consumer/store.rs b/rocketmq-client/src/consumer/store.rs index 665178e3..defb6c8a 100644 --- a/rocketmq-client/src/consumer/store.rs +++ b/rocketmq-client/src/consumer/store.rs @@ -17,6 +17,7 @@ mod controllable_offset; pub(crate) mod local_file_offset_store; +mod offset_serialize_wrapper; pub(crate) mod offset_store; pub(crate) mod read_offset_type; pub(crate) mod remote_broker_offset_store; diff --git a/rocketmq-client/src/consumer/store/controllable_offset.rs b/rocketmq-client/src/consumer/store/controllable_offset.rs index 85a8a499..ef8a1baf 100644 --- a/rocketmq-client/src/consumer/store/controllable_offset.rs +++ b/rocketmq-client/src/consumer/store/controllable_offset.rs @@ -32,6 +32,13 @@ impl ControllableOffset { } } + pub fn new_atomic(value: AtomicI64) -> Self { + Self { + value: Arc::new(value), + allow_to_update: Arc::new(AtomicBool::new(true)), + } + } + pub fn update(&self, target: i64, increase_only: bool) { if self.allow_to_update.load(Ordering::SeqCst) { self.value diff --git a/rocketmq-client/src/consumer/store/local_file_offset_store.rs b/rocketmq-client/src/consumer/store/local_file_offset_store.rs index 8c3c2d4d..f562bd24 100644 --- a/rocketmq-client/src/consumer/store/local_file_offset_store.rs +++ b/rocketmq-client/src/consumer/store/local_file_offset_store.rs @@ -16,53 +16,235 @@ */ use std::collections::HashMap; use std::collections::HashSet; +use std::path::PathBuf; +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use once_cell::sync::Lazy; use rocketmq_common::common::message::message_queue::MessageQueue; +use rocketmq_common::utils::file_utils; use rocketmq_common::ArcRefCellWrapper; +use rocketmq_remoting::protocol::RemotingDeserializable; +use rocketmq_remoting::protocol::RemotingSerializable; +use tokio::sync::Mutex; +use tracing::error; +use tracing::info; +use crate::consumer::store::controllable_offset::ControllableOffset; +use crate::consumer::store::offset_serialize_wrapper::OffsetSerializeWrapper; use crate::consumer::store::offset_store::OffsetStoreTrait; use crate::consumer::store::read_offset_type::ReadOffsetType; +use crate::error::MQClientError; use crate::factory::mq_client_instance::MQClientInstance; +use crate::Result; -pub struct LocalFileOffsetStore; +static LOCAL_OFFSET_STORE_DIR: Lazy = Lazy::new(|| { + #[cfg(target_os = "windows")] + let home = std::env::var("USERPROFILE") + .map_or(PathBuf::from("C:\\tmp\\.rocketmq_offsets"), |home| { + PathBuf::from(home).join(".rocketmq_offsets") + }); + + #[cfg(not(target_os = "windows"))] + let home = std::env::var("HOME").map_or(PathBuf::from("/tmp/.rocketmq_offsets"), |home| { + PathBuf::from(home).join(".rocketmq_offsets") + }); + + std::env::var("rocketmq.client.localOffsetStoreDir").map_or(home, PathBuf::from) +}); + +pub struct LocalFileOffsetStore { + client_instance: ArcRefCellWrapper, + group_name: String, + store_path: String, + offset_table: Arc>>, +} impl LocalFileOffsetStore { - pub fn new(mq_client_factory: ArcRefCellWrapper, group_name: String) -> Self { - Self + pub fn new(client_instance: ArcRefCellWrapper, group_name: String) -> Self { + Self { + client_instance, + group_name, + store_path: LOCAL_OFFSET_STORE_DIR + .clone() + .join("offsets.json") + .to_string_lossy() + .to_string(), + offset_table: Arc::new(Mutex::new(HashMap::new())), + } + } + + fn read_local_offset(&self) -> Result> { + let content = + file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content); + if content.is_empty() { + self.read_local_offset_bak() + } else { + match OffsetSerializeWrapper::decode(content.as_bytes()) { + Ok(value) => Ok(Some(value)), + Err(e) => Err(MQClientError::MQClientErr( + -1, + format!("Failed to deserialize local offset: {}", e), + )), + } + } + } + fn read_local_offset_bak(&self) -> Result> { + let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak")) + .map_or("".to_string(), |content| content); + if content.is_empty() { + Ok(None) + } else { + match OffsetSerializeWrapper::decode(content.as_bytes()) { + Ok(value) => Ok(Some(value)), + Err(_) => Err(MQClientError::MQClientErr( + -1, + format!("read local offset bak failed, content: {}", content), + )), + } + } } } impl OffsetStoreTrait for LocalFileOffsetStore { async fn load(&self) -> crate::Result<()> { - todo!() + let offset_serialize_wrapper = self.read_local_offset()?; + if let Some(offset_serialize_wrapper) = offset_serialize_wrapper { + let offset_table = offset_serialize_wrapper.offset_table; + let mut offset_table_inner = self.offset_table.lock().await; + for (mq, offset) in offset_table { + let offset = offset.load(Ordering::Relaxed); + info!( + "load consumer's offset, {} {} {}", + self.group_name, mq, offset + ); + offset_table_inner.insert(mq, ControllableOffset::new(offset)); + } + } + Ok(()) } async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) { - todo!() + let mut offset_table = self.offset_table.lock().await; + let offset_old = offset_table + .entry(mq.clone()) + .or_insert_with(|| ControllableOffset::new(offset)); + if increase_only { + offset_old.update(offset, true); + } else { + offset_old.update_unconditionally(offset); + } } async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) { - todo!() + let mut offset_table = self.offset_table.lock().await; + offset_table + .entry(mq.clone()) + .or_insert_with(|| ControllableOffset::new(offset)) + .update_and_freeze(offset); } async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 { - todo!() + match type_ { + ReadOffsetType::ReadFromMemory | ReadOffsetType::MemoryFirstThenStore => { + let offset_table = self.offset_table.lock().await; + if let Some(offset) = offset_table.get(mq) { + offset.get_offset() + } else { + -1 + } + } + ReadOffsetType::ReadFromStore => match self.read_local_offset() { + Ok(offset_serialize_wrapper) => { + if let Some(offset_serialize_wrapper) = offset_serialize_wrapper { + if let Some(offset) = offset_serialize_wrapper.offset_table.get(mq) { + offset.load(Ordering::Relaxed) + } else { + -1 + } + } else { + -1 + } + } + Err(_) => -1, + }, + } } async fn persist_all(&mut self, mqs: &HashSet) { - todo!() + if mqs.is_empty() { + return; + } + let mut offset_serialize_wrapper = match self.read_local_offset() { + Ok(value) => value.unwrap_or_default(), + Err(e) => { + error!("read local offset failed: {}", e); + return; + } + }; + let offset_table = self.offset_table.lock().await; + for (mq, offset) in offset_table.iter() { + if mqs.contains(mq) { + offset_serialize_wrapper + .offset_table + .insert(mq.clone(), AtomicI64::new(offset.get_offset())); + } + } + let content = offset_serialize_wrapper.to_json_pretty(); + if !content.is_empty() { + if let Err(e) = file_utils::string_to_file(&content, &self.store_path) { + error!( + "persistAll consumer offset Exception, {},{}", + self.store_path, e + ); + } + } } async fn persist(&mut self, mq: &MessageQueue) { - todo!() + let offset_table = self.offset_table.lock().await; + if let Some(offset) = offset_table.get(mq) { + let mut offset_serialize_wrapper = match self.read_local_offset() { + Ok(value) => value.unwrap_or_default(), + Err(e) => { + error!("read local offset failed: {}", e); + return; + } + }; + offset_serialize_wrapper + .offset_table + .insert(mq.clone(), AtomicI64::new(offset.get_offset())); + let content = offset_serialize_wrapper.to_json_pretty(); + if !content.is_empty() { + if let Err(e) = file_utils::string_to_file(&content, &self.store_path) { + error!( + "persist consumer offset Exception, {},{}", + self.store_path, e + ); + } + } + } } async fn remove_offset(&self, mq: &MessageQueue) { - todo!() + let mut offset_table = self.offset_table.lock().await; + offset_table.remove(mq); + info!( + "remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}", + mq, + self.group_name, + offset_table.len() + ); } async fn clone_offset_table(&self, topic: &str) -> HashMap { - todo!() + let offset_table = self.offset_table.lock().await; + offset_table + .iter() + .filter(|(mq, _)| topic.is_empty() || mq.get_topic() == topic) + .map(|(mq, offset)| (mq.clone(), offset.get_offset())) + .collect() } async fn update_consume_offset_to_broker( @@ -71,6 +253,6 @@ impl OffsetStoreTrait for LocalFileOffsetStore { offset: i64, is_oneway: bool, ) -> crate::Result<()> { - todo!() + Ok(()) } } diff --git a/rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs b/rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs new file mode 100644 index 00000000..f609689a --- /dev/null +++ b/rocketmq-client/src/consumer/store/offset_serialize_wrapper.rs @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +use std::collections::HashMap; +use std::sync::atomic::AtomicI64; + +use rocketmq_common::common::message::message_queue::MessageQueue; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct OffsetSerializeWrapper { + pub offset_table: HashMap, +}