Skip to content

Commit 8c74161

Browse files
committed
[ISSUE #1012]🚀Support client Broadcasting consume-local file store⚡️
1 parent 5cefbc9 commit 8c74161

File tree

4 files changed

+230
-12
lines changed

4 files changed

+230
-12
lines changed

rocketmq-client/src/consumer/store.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
mod controllable_offset;
1919
pub(crate) mod local_file_offset_store;
20+
mod offset_serialize_wrapper;
2021
pub(crate) mod offset_store;
2122
pub(crate) mod read_offset_type;
2223
pub(crate) mod remote_broker_offset_store;

rocketmq-client/src/consumer/store/controllable_offset.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@ impl ControllableOffset {
3232
}
3333
}
3434

35+
pub fn new_atomic(value: AtomicI64) -> Self {
36+
Self {
37+
value: Arc::new(value),
38+
allow_to_update: Arc::new(AtomicBool::new(true)),
39+
}
40+
}
41+
3542
pub fn update(&self, target: i64, increase_only: bool) {
3643
if self.allow_to_update.load(Ordering::SeqCst) {
3744
self.value

rocketmq-client/src/consumer/store/local_file_offset_store.rs

Lines changed: 194 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,53 +16,235 @@
1616
*/
1717
use std::collections::HashMap;
1818
use std::collections::HashSet;
19+
use std::path::PathBuf;
20+
use std::sync::atomic::AtomicI64;
21+
use std::sync::atomic::Ordering;
22+
use std::sync::Arc;
1923

24+
use once_cell::sync::Lazy;
2025
use rocketmq_common::common::message::message_queue::MessageQueue;
26+
use rocketmq_common::utils::file_utils;
2127
use rocketmq_common::ArcRefCellWrapper;
28+
use rocketmq_remoting::protocol::RemotingDeserializable;
29+
use rocketmq_remoting::protocol::RemotingSerializable;
30+
use tokio::sync::Mutex;
31+
use tracing::error;
32+
use tracing::info;
2233

34+
use crate::consumer::store::controllable_offset::ControllableOffset;
35+
use crate::consumer::store::offset_serialize_wrapper::OffsetSerializeWrapper;
2336
use crate::consumer::store::offset_store::OffsetStoreTrait;
2437
use crate::consumer::store::read_offset_type::ReadOffsetType;
38+
use crate::error::MQClientError;
2539
use crate::factory::mq_client_instance::MQClientInstance;
40+
use crate::Result;
2641

27-
pub struct LocalFileOffsetStore;
42+
static LOCAL_OFFSET_STORE_DIR: Lazy<PathBuf> = Lazy::new(|| {
43+
#[cfg(target_os = "windows")]
44+
let home = std::env::var("user.home")
45+
.map_or(PathBuf::from("C:\\tmp\\.rocketmq_offsets"), |home| {
46+
PathBuf::from(home).join(".rocketmq_offsets")
47+
});
48+
49+
#[cfg(not(target_os = "windows"))]
50+
let home = std::env::var("user.home").map_or(PathBuf::from("/tmp/.rocketmq_offsets"), |home| {
51+
PathBuf::from(home).join(".rocketmq_offsets")
52+
});
53+
54+
std::env::var("rocketmq.client.localOffsetStoreDir").map_or(home, PathBuf::from)
55+
});
56+
57+
pub struct LocalFileOffsetStore {
58+
client_instance: ArcRefCellWrapper<MQClientInstance>,
59+
group_name: String,
60+
store_path: String,
61+
offset_table: Arc<Mutex<HashMap<MessageQueue, ControllableOffset>>>,
62+
}
2863

2964
impl LocalFileOffsetStore {
30-
pub fn new(mq_client_factory: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self {
31-
Self
65+
pub fn new(client_instance: ArcRefCellWrapper<MQClientInstance>, group_name: String) -> Self {
66+
Self {
67+
client_instance,
68+
group_name,
69+
store_path: LOCAL_OFFSET_STORE_DIR
70+
.clone()
71+
.join("offsets.json")
72+
.to_string_lossy()
73+
.to_string(),
74+
offset_table: Arc::new(Mutex::new(HashMap::new())),
75+
}
76+
}
77+
78+
fn read_local_offset(&self) -> Result<Option<OffsetSerializeWrapper>> {
79+
let content =
80+
file_utils::file_to_string(&self.store_path).map_or("".to_string(), |content| content);
81+
if content.is_empty() {
82+
self.read_local_offset_bak()
83+
} else {
84+
match OffsetSerializeWrapper::decode(content.as_bytes()) {
85+
Ok(value) => Ok(Some(value)),
86+
Err(_) => Err(MQClientError::MQClientErr(
87+
-1,
88+
format!("read local offset failed, content: {}", content),
89+
)),
90+
}
91+
}
92+
}
93+
fn read_local_offset_bak(&self) -> Result<Option<OffsetSerializeWrapper>> {
94+
let content = file_utils::file_to_string(&format!("{}{}", self.store_path, ".bak"))
95+
.map_or("".to_string(), |content| content);
96+
if content.is_empty() {
97+
Ok(None)
98+
} else {
99+
match OffsetSerializeWrapper::decode(content.as_bytes()) {
100+
Ok(value) => Ok(Some(value)),
101+
Err(_) => Err(MQClientError::MQClientErr(
102+
-1,
103+
format!("read local offset bak failed, content: {}", content),
104+
)),
105+
}
106+
}
32107
}
33108
}
34109

35110
impl OffsetStoreTrait for LocalFileOffsetStore {
36111
async fn load(&self) -> crate::Result<()> {
37-
todo!()
112+
let offset_serialize_wrapper = self.read_local_offset()?;
113+
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper {
114+
let offset_table = offset_serialize_wrapper.offset_table;
115+
let mut offset_table_inner = self.offset_table.lock().await;
116+
for (mq, offset) in offset_table {
117+
let offset = offset.load(Ordering::Relaxed);
118+
info!(
119+
"load consumer's offset, {} {} {}",
120+
self.group_name, mq, offset
121+
);
122+
offset_table_inner.insert(mq, ControllableOffset::new(offset));
123+
}
124+
}
125+
Ok(())
38126
}
39127

40128
async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) {
41-
todo!()
129+
let mut offset_table = self.offset_table.lock().await;
130+
let offset_old = offset_table
131+
.entry(mq.clone())
132+
.or_insert_with(|| ControllableOffset::new(offset));
133+
if increase_only {
134+
offset_old.update(offset, true);
135+
} else {
136+
offset_old.update_unconditionally(offset);
137+
}
42138
}
43139

44140
async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) {
45-
todo!()
141+
let mut offset_table = self.offset_table.lock().await;
142+
offset_table
143+
.entry(mq.clone())
144+
.or_insert_with(|| ControllableOffset::new(offset))
145+
.update_and_freeze(offset);
46146
}
47147

48148
async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 {
49-
todo!()
149+
match type_ {
150+
ReadOffsetType::ReadFromMemory | ReadOffsetType::MemoryFirstThenStore => {
151+
let offset_table = self.offset_table.lock().await;
152+
if let Some(offset) = offset_table.get(mq) {
153+
offset.get_offset()
154+
} else {
155+
-1
156+
}
157+
}
158+
ReadOffsetType::ReadFromStore => match self.read_local_offset() {
159+
Ok(offset_serialize_wrapper) => {
160+
if let Some(offset_serialize_wrapper) = offset_serialize_wrapper {
161+
if let Some(offset) = offset_serialize_wrapper.offset_table.get(mq) {
162+
offset.load(Ordering::Relaxed)
163+
} else {
164+
-1
165+
}
166+
} else {
167+
-1
168+
}
169+
}
170+
Err(_) => -1,
171+
},
172+
}
50173
}
51174

52175
async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) {
53-
todo!()
176+
if mqs.is_empty() {
177+
return;
178+
}
179+
let mut offset_serialize_wrapper = match self.read_local_offset() {
180+
Ok(value) => value.unwrap_or_default(),
181+
Err(e) => {
182+
error!("read local offset failed: {}", e);
183+
return;
184+
}
185+
};
186+
let offset_table = self.offset_table.lock().await;
187+
for (mq, offset) in offset_table.iter() {
188+
if mqs.contains(mq) {
189+
offset_serialize_wrapper
190+
.offset_table
191+
.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
192+
}
193+
}
194+
let content = offset_serialize_wrapper.to_json_pretty();
195+
if !content.is_empty() {
196+
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
197+
error!(
198+
"persistAll consumer offset Exception, {},{}",
199+
self.store_path, e
200+
);
201+
}
202+
}
54203
}
55204

56205
async fn persist(&mut self, mq: &MessageQueue) {
57-
todo!()
206+
let offset_table = self.offset_table.lock().await;
207+
if let Some(offset) = offset_table.get(mq) {
208+
let mut offset_serialize_wrapper = match self.read_local_offset() {
209+
Ok(value) => value.unwrap_or_default(),
210+
Err(e) => {
211+
error!("read local offset failed: {}", e);
212+
return;
213+
}
214+
};
215+
offset_serialize_wrapper
216+
.offset_table
217+
.insert(mq.clone(), AtomicI64::new(offset.get_offset()));
218+
let content = offset_serialize_wrapper.to_json_pretty();
219+
if !content.is_empty() {
220+
if let Err(e) = file_utils::string_to_file(&content, &self.store_path) {
221+
error!(
222+
"persist consumer offset Exception, {},{}",
223+
self.store_path, e
224+
);
225+
}
226+
}
227+
}
58228
}
59229

60230
async fn remove_offset(&self, mq: &MessageQueue) {
61-
todo!()
231+
let mut offset_table = self.offset_table.lock().await;
232+
offset_table.remove(mq);
233+
info!(
234+
"remove unnecessary messageQueue offset. group={}, mq={}, offsetTableSize={}",
235+
mq,
236+
self.group_name,
237+
offset_table.len()
238+
);
62239
}
63240

64241
async fn clone_offset_table(&self, topic: &str) -> HashMap<MessageQueue, i64> {
65-
todo!()
242+
let offset_table = self.offset_table.lock().await;
243+
offset_table
244+
.iter()
245+
.filter(|(mq, _)| topic.is_empty() || mq.get_topic() == topic)
246+
.map(|(mq, offset)| (mq.clone(), offset.get_offset()))
247+
.collect()
66248
}
67249

68250
async fn update_consume_offset_to_broker(
@@ -71,6 +253,6 @@ impl OffsetStoreTrait for LocalFileOffsetStore {
71253
offset: i64,
72254
is_oneway: bool,
73255
) -> crate::Result<()> {
74-
todo!()
256+
Ok(())
75257
}
76258
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
use std::collections::HashMap;
18+
use std::sync::atomic::AtomicI64;
19+
20+
use rocketmq_common::common::message::message_queue::MessageQueue;
21+
use serde::Deserialize;
22+
use serde::Serialize;
23+
24+
#[derive(Serialize, Deserialize, Debug, Default)]
25+
#[serde(rename_all = "camelCase")]
26+
pub struct OffsetSerializeWrapper {
27+
pub offset_table: HashMap<MessageQueue, AtomicI64>,
28+
}

0 commit comments

Comments
 (0)