Skip to content

Commit

Permalink
Size DB serialize/deserialize, and miss TTL
Browse files Browse the repository at this point in the history
  • Loading branch information
taoky committed Mar 28, 2024
1 parent b4ebd14 commit 81b2bbf
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 18 deletions.
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ description = "YUKI-based Next-generation Async-cache"

[dependencies]
anyhow = { version = "1.0.81", features = ["backtrace"] }
chrono = "0.4.35"
chrono = { version = "0.4.35", features = ["serde"] }
clap = { version = "4.5.3", features = ["derive"] }
console = { version = "0.15", default-features = false, features = ["ansi-parsing"] }
humansize = "2.1.3"
Expand All @@ -24,6 +24,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] }
url = "2.5.0"
walkdir = "2.5.0"
shadow-rs = "0.26.1"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3.3"
humantime = "2.1.0"

[dev-dependencies]
test-log = { version = "0.2.14", default-features = false, features = ["trace"] }
Expand Down
84 changes: 67 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use clap::Parser;
use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network};
use parse_size::parse_size;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
io::{BufRead, BufReader},
Expand Down Expand Up @@ -68,6 +69,10 @@ struct Cli {
/// A kv database of file size to speed up stage3 in case yukina would run frequently
#[clap(long)]
size_database: Option<PathBuf>,

/// Size database Miss TTL
#[clap(long, default_value = "2d")]
size_database_ttl: humantime::Duration,
}

enum LogFileType {
Expand Down Expand Up @@ -183,6 +188,12 @@ impl Ord for VoteValue {
}
}

#[derive(Debug, Serialize, Deserialize)]
struct SizeDBItem {
size: Option<u64>,
record_time: DateTime<Utc>,
}

/// Analyse nginx logs and get user votes
fn stage1(args: &Cli) -> UserVote {
let mut entries: Vec<_> = std::fs::read_dir(&args.log_path)
Expand Down Expand Up @@ -360,6 +371,18 @@ fn stage2(args: &Cli) -> FileStats {
FileStats::new(res)
}

fn insert_db(db: Option<&sled::Db>, key: &str, size: Option<u64>) {
if let Some(db) = db {
let size_item = SizeDBItem {
size,
record_time: Utc::now(),
};
if let Err(e) = db.insert(key, bincode::serialize(&size_item).unwrap()) {
tracing::warn!("Size db insert failed: {}", e);
}
}
}

/// Get size of non-existing files, and normalize vote value
async fn stage3(
args: &Cli,
Expand All @@ -373,6 +396,7 @@ async fn stage3(
// Stats counters
let mut local_hit = 0;
let mut sizedb_hit = 0;
let mut sizedb_nonexist = 0;
let mut remote_hit = 0;
let mut remote_miss = 0;

Expand All @@ -386,22 +410,50 @@ async fn stage3(
(*value, true, true)
} else {
// if size_db, require sled first
let mut size: Option<u64> = None;
let mut size_item: Option<SizeDBItem> = None;
let mut exceeded_miss_ttl = false;
if let Some(db) = &size_db {
if let Ok(Some(s)) = db.get(url_path) {
if let [b0, b1, b2, b3, b4, b5, b6, b7] = *s {
size = Some(u64::from_le_bytes([b0, b1, b2, b3, b4, b5, b6, b7]));
if let Ok(s) = bincode::deserialize::<SizeDBItem>(&s) {
if s.size.is_none() {
let ttl: std::time::Duration = args.size_database_ttl.into();
let duration = Utc::now()
.signed_duration_since(s.record_time)
.to_std()
.unwrap_or(std::time::Duration::default());
if duration > ttl {
exceeded_miss_ttl = true;
let _ = db.remove(url_path);
}
}
size_item = Some(s);
}
}
}
if let Some(size) = size {
tracing::debug!(
"File does not exist locally: {} (sizedb {})",
url_path,
size
);
sizedb_hit += 1;
(size, false, true)

// a bit ugly but seems no better solution without nightly rust
let size_db_condition = size_item.is_some() && !exceeded_miss_ttl;
if size_db_condition {
let size_item = size_item.unwrap();
match size_item.size {
Some(size) => {
if size == 0 {
tracing::warn!("Empty file: {}", url_path);
}
tracing::debug!(
"File does not exist locally: {} (sizedb {})",
url_path,
size
);
sizedb_hit += 1;
(size, false, true)
}
None => {
tracing::info!("File not found at remote (from sizedb): {}", url_path);
sizedb_nonexist += 1;
(0, false, false)
}
}
} else {
let url = args
.url
Expand All @@ -428,15 +480,12 @@ async fn stage3(
size
);
remote_hit += 1;
if let Some(db) = &size_db {
if let Err(e) = db.insert(url_path, &size.to_le_bytes()) {
tracing::warn!("Size db insert failed: {}", e);
}
}
insert_db(size_db.as_ref(), url_path, Some(size));
(size, false, true)
}
Err(e) => {
tracing::info!("Invalid file ({}): {}", e, url_path);
insert_db(size_db.as_ref(), url_path, None);
remote_miss += 1;
(0, false, false)
}
Expand All @@ -456,9 +505,10 @@ async fn stage3(
}
progressbar.finish();
tracing::info!(
"Local hit: {}, SizeDB hit: {}, Remote hit: {}, Remote miss: {}",
"Local hit: {}, SizeDB hit: {}, SizeDB 404: {}, Remote hit: {}, Remote miss: {}",
local_hit,
sizedb_hit,
sizedb_nonexist,
remote_hit,
remote_miss
);
Expand Down

0 comments on commit 81b2bbf

Please sign in to comment.