Skip to content

Commit

Permalink
fix the logs rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
tommady committed Sep 10, 2024
1 parent 82627f6 commit 6b1ba2c
Showing 1 changed file with 183 additions and 130 deletions.
313 changes: 183 additions & 130 deletions web3-service/src/rpc/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1662,156 +1662,209 @@ impl EthApi for EthService {

fn logs(&self, filter: Filter) -> BoxFuture<Result<Vec<Log>>> {
log::info!(target: "eth api", "logs filter:{:?}", &filter);
let mut ret: Vec<Log> = Vec::new();
if let Some(block_hash) = filter.block_hash {
let block = match self.getter.get_block_by_hash(block_hash) {
Ok(value) => {
if let Some(b) = value {
b
} else {
return Box::pin(future::err(internal_err(String::new())));
}
}
Err(e) => {
return Box::pin(future::err(internal_err(format!(
"eth api logs get_block_by_hash error:{:?}",
e.to_string()
))));
}
};
let getter = self.getter.clone();

match self.getter.get_transaction_status_by_block_hash(block_hash) {
Ok(value) => {
if let Some(statuses) = value {
filter_block_logs(&mut ret, &filter, block, statuses);
}
}
Err(e) => {
return Box::pin(future::err(internal_err(format!(
"eth api logs get_transaction_status_by_block_hash error:{:?}",
e.to_string()
))));
}
};
} else {
let current_number = match self.getter.latest_height() {
Ok(height) => height,
Err(e) => {
return Box::pin(future::err(internal_err(format!(
"eth api logs latest_height error:{:?}",
Box::pin(async move {
let mut ret: Vec<Log> = Vec::new();

if let Some(block_hash) = filter.block_hash {
let getter_clone = getter.clone();
let block_result = tokio::task::spawn_blocking(move || {
getter_clone.get_block_by_hash(block_hash).map_err(|e| {
internal_err(format!(
"eth api logs get_block_by_hash error:{:?}",
e.to_string()
))
})
})
.await
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking get_block_by_hash error:{:?}",
e.to_string()
))));
}
};
))
})?;

let from_number = filter
.from_block
.clone()
.and_then(|v| v.to_min_block_num())
.map(|from| {
if from as u32 > current_number {
current_number
} else {
from as u32
}
let block = match block_result {
Ok(Some(b)) => b,
Ok(None) => return Err(internal_err("Block not found".to_string())),
Err(e) => return Err(e),
};

let statuses_result = tokio::task::spawn_blocking(move || {
getter
.get_transaction_status_by_block_hash(block_hash)
.map_err(|e| {
internal_err(format!(
"eth api logs get_transaction_status_by_block_hash error:{:?}",
e.to_string()
))
})
})
.unwrap_or(current_number);

let to_number = filter
.to_block
.clone()
.and_then(|v| v.to_min_block_num())
.map(|to| {
if to as u32 > current_number {
current_number
} else {
to as u32
.await
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking get_transaction_status_by_block_hash error:{:?}",
e.to_string()
))
})?;

let statuses = match statuses_result {
Ok(Some(s)) => s,
Ok(None) => {
return Err(internal_err("Transaction statuses not found".to_string()))
}
})
.unwrap_or(current_number);
Err(e) => return Err(e),
};

let topics_input = if filter.topics.is_some() {
let filtered_params = FilteredParams::new(Some(filter.clone()));
Some(filtered_params.flat_topics)
filter_block_logs(&mut ret, &filter, block, statuses);
} else {
None
};
let address_bloom_filter = FilteredParams::addresses_bloom_filter(&filter.address);
let topics_bloom_filter = FilteredParams::topics_bloom_filter(&topics_input);

let mut current = to_number;
while current >= from_number {
let block_hash = match self.getter.get_block_hash_by_height(U256::from(current)) {
Ok(value) => {
if let Some(hash) = value {
hash
} else {
return Box::pin(future::err(internal_err(
"eth api logs get_block_hash_by_height return none",
)));
}
}
Err(e) => {
return Box::pin(future::err(internal_err(format!(
"eth api logs get_block_hash_by_height error:{:?}",
let getter_clone = getter.clone();
let current_number = tokio::task::spawn_blocking(move || {
getter_clone.latest_height().map_err(|e| {
internal_err(format!(
"eth api logs latest_height error:{:?}",
e.to_string()
))));
}
))
})
})
.await
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking latest_height error:{:?}",
e.to_string()
))
})?
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking latest_height error:{:?}",
e.to_string()
))
})?;

let from_number = match filter.from_block.clone() {
Some(v) => match v.to_min_block_num() {
Some(from) => (from as u32).max(current_number),
None => current_number,
},
None => current_number,
};

let block = match self.getter.get_block_by_hash(block_hash) {
Ok(value) => {
if let Some(b) = value {
b
} else {
return Box::pin(future::err(internal_err(
"eth api logs get_block_by_hash return none",
)));
}
}
Err(e) => {
return Box::pin(future::err(internal_err(format!(
"eth api logs get_block_by_hash error:{:?}",
e.to_string()
))));
}
let to_number = match filter.to_block.clone() {
Some(v) => match v.to_min_block_num() {
Some(to) => (to as u32).min(current_number),
None => current_number,
},
None => current_number,
};

let topics_input = if filter.topics.is_some() {
let filtered_params = FilteredParams::new(Some(filter.clone()));
Some(filtered_params.flat_topics)
} else {
None
};
let address_bloom_filter = FilteredParams::addresses_bloom_filter(&filter.address);
let topics_bloom_filter = FilteredParams::topics_bloom_filter(&topics_input);

let mut current = to_number;
while current >= from_number {
let getter_clone = getter.clone();
let block_hash_result = tokio::task::spawn_blocking(move || {
getter_clone
.get_block_hash_by_height(U256::from(current))
.map_err(|e| {
internal_err(format!(
"eth api logs get_block_hash_by_height error:{:?}",
e.to_string()
))
})
})
.await
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking get_block_hash_by_height error:{:?}",
e.to_string()
))
})?;

let block_hash = match block_hash_result {
Ok(Some(h)) => h,
Ok(None) => return Err(internal_err("Block hash not found".to_string())),
Err(e) => return Err(e),
};

let getter_clone = getter.clone();
let block_result = tokio::task::spawn_blocking(move || {
getter_clone.get_block_by_hash(block_hash).map_err(|e| {
internal_err(format!(
"eth api logs get_block_by_hash error:{:?}",
e.to_string()
))
})
})
.await
.map_err(|e| {
internal_err(format!(
"eth api logs spawn_blocking get_block_by_hash error:{:?}",
e.to_string()
))
})?;

let block = match block_result {
Ok(Some(b)) => b,
Ok(None) => return Err(internal_err("Block not found".to_string())),
Err(e) => return Err(e),
};

if FilteredParams::address_in_bloom(block.header.logs_bloom, &address_bloom_filter)
&& FilteredParams::topics_in_bloom(
if FilteredParams::address_in_bloom(
block.header.logs_bloom,
&address_bloom_filter,
) && FilteredParams::topics_in_bloom(
block.header.logs_bloom,
&topics_bloom_filter,
)
{
match self.getter.get_transaction_status_by_block_hash(block_hash) {
Ok(value) => {
if let Some(statuses) = value {
let mut logs: Vec<Log> = Vec::new();
filter_block_logs(&mut logs, &filter, block, statuses);
ret.append(&mut logs);
}
}
Err(e) => {
return Box::pin(future::err(internal_err(format!(
) {
let getter_clone = getter.clone();
let statuses_result = tokio::task::spawn_blocking(move || {
getter_clone .get_transaction_status_by_block_hash(block_hash)
.map_err(|e| internal_err(format!(
"eth api logs get_transaction_status_by_block_hash error:{:?}",
e.to_string()
))));
}
};
}
)))
})
.await
.map_err(|e| internal_err(format!(
"eth api logs spawn_blocking get_transaction_status_by_block_hash error:{:?}",
e.to_string()
)))?;

let statuses = match statuses_result {
Ok(Some(s)) => s,
Ok(None) => {
return Err(internal_err(
"Transaction statuses not found".to_string(),
))
}
Err(e) => return Err(e),
};

// Check for restrictions
if ret.len() as u32 > MAX_PAST_LOGS {
break;
}
if 0 == current {
break;
} else {
let mut logs: Vec<Log> = Vec::new();
filter_block_logs(&mut logs, &filter, block, statuses);
ret.append(&mut logs);
}

if ret.len() as u32 > MAX_PAST_LOGS {
break;
}
if current == 0 {
break;
}
current -= 1;
}
}
}
Box::pin(future::ok(ret))

Ok(ret)
})
}

fn work(&self) -> Result<Work> {
Expand Down

0 comments on commit 6b1ba2c

Please sign in to comment.