Skip to content

Commit

Permalink
consolidate TTL conversion logic (twitter#457)
Browse files Browse the repository at this point in the history
Adds a new type `Ttl` to be used throughout the memcache protocol
crate. This allows us to define and document the conversion
between external and internal representations of the TTL.
  • Loading branch information
brayniac authored Sep 9, 2022
1 parent 8d71b4d commit 627cb92
Show file tree
Hide file tree
Showing 13 changed files with 182 additions and 137 deletions.
38 changes: 25 additions & 13 deletions src/entrystore/src/seg/memcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ impl Storage for Seg {
}

fn set(&mut self, set: &Set) -> Response {
if set.ttl() == Some(0) {
let ttl = set.ttl().get().unwrap_or(0);

if ttl < 0 {
// immediate expire maps to a delete
self.data.delete(set.key());
Response::stored(set.noreply())
Expand All @@ -98,7 +100,7 @@ impl Storage for Seg {
set.key(),
v,
Some(&set.flags().to_be_bytes()),
Duration::from_secs(set.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -112,7 +114,7 @@ impl Storage for Seg {
set.key(),
set.value(),
Some(&set.flags().to_be_bytes()),
Duration::from_secs(set.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -126,7 +128,7 @@ impl Storage for Seg {
set.key(),
set.value(),
Some(&set.flags().to_be_bytes()),
Duration::from_secs(set.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -141,7 +143,9 @@ impl Storage for Seg {
return Response::not_stored(add.noreply());
}

if add.ttl() == Some(0) {
let ttl = add.ttl().get().unwrap_or(0);

if ttl < 0 {
// immediate expire maps to a delete
self.data.delete(add.key());
Response::stored(add.noreply())
Expand All @@ -153,7 +157,7 @@ impl Storage for Seg {
add.key(),
v,
Some(&add.flags().to_be_bytes()),
Duration::from_secs(add.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -167,7 +171,7 @@ impl Storage for Seg {
add.key(),
add.value(),
Some(&add.flags().to_be_bytes()),
Duration::from_secs(add.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -181,7 +185,7 @@ impl Storage for Seg {
add.key(),
add.value(),
Some(&add.flags().to_be_bytes()),
Duration::from_secs(add.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -196,7 +200,9 @@ impl Storage for Seg {
return Response::not_stored(replace.noreply());
}

if replace.ttl() == Some(0) {
let ttl = replace.ttl().get().unwrap_or(0);

if ttl < 0 {
// immediate expire maps to a delete
self.data.delete(replace.key());
Response::stored(replace.noreply())
Expand All @@ -208,7 +214,7 @@ impl Storage for Seg {
replace.key(),
v,
Some(&replace.flags().to_be_bytes()),
Duration::from_secs(replace.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -222,7 +228,7 @@ impl Storage for Seg {
replace.key(),
replace.value(),
Some(&replace.flags().to_be_bytes()),
Duration::from_secs(replace.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand All @@ -236,7 +242,7 @@ impl Storage for Seg {
replace.key(),
replace.value(),
Some(&replace.flags().to_be_bytes()),
Duration::from_secs(replace.ttl().unwrap_or(0).into()),
Duration::from_secs(ttl as u64),
)
.is_ok()
{
Expand Down Expand Up @@ -283,7 +289,13 @@ impl Storage for Seg {
// no way of checking the cas value without performing a cas
// and checking the result, setting the shortest possible ttl
// results in nearly immediate expiry
let ttl = Duration::from_secs(cas.ttl().unwrap_or(1).into());
let ttl = cas.ttl().get().unwrap_or(1);

let ttl = if ttl < 0 {
Duration::from_secs(0)
} else {
Duration::from_secs(ttl as u64)
};

if let Ok(s) = std::str::from_utf8(cas.value()) {
if let Ok(v) = s.parse::<u64>() {
Expand Down
3 changes: 1 addition & 2 deletions src/protocol/memcache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ pub use storage::*;

pub use protocol_common::*;

use common::expiry::TimeType;
use logger::Klog;
use rustcommon_metrics::*;

// use common::expiry::TimeType;

const CRLF: &[u8] = b"\r\n";

pub enum MemcacheError {
Expand Down
17 changes: 6 additions & 11 deletions src/protocol/memcache/src/request/add.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Add {
pub(crate) key: Box<[u8]>,
pub(crate) value: Box<[u8]>,
pub(crate) flags: u32,
pub(crate) ttl: Option<u32>,
pub(crate) ttl: Ttl,
pub(crate) noreply: bool,
}

Expand All @@ -22,7 +22,7 @@ impl Add {
&self.value
}

pub fn ttl(&self) -> Option<u32> {
pub fn ttl(&self) -> Ttl {
self.ttl
}

Expand Down Expand Up @@ -68,7 +68,7 @@ impl Compose for Add {
fn compose(&self, session: &mut dyn BufMut) -> usize {
let verb = b"add ";
let flags = format!(" {}", self.flags).into_bytes();
let ttl = convert_ttl(self.ttl);
let ttl = format!(" {}", self.ttl.get().unwrap_or(0)).into_bytes();
let vlen = format!(" {}", self.value.len()).into_bytes();
let header_end = if self.noreply {
" noreply\r\n".as_bytes()
Expand Down Expand Up @@ -102,11 +102,6 @@ impl Klog for Add {
type Response = Response;

fn klog(&self, response: &Self::Response) {
let ttl: i64 = match self.ttl() {
None => 0,
Some(0) => -1,
Some(t) => t as _,
};
let (code, len) = match response {
Response::Stored(ref res) => {
ADD_STORED.increment();
Expand All @@ -124,7 +119,7 @@ impl Klog for Add {
"\"add {} {} {} {}\" {} {}",
string_key(self.key()),
self.flags(),
ttl,
self.ttl.get().unwrap_or(0),
self.value().len(),
code,
len
Expand All @@ -149,7 +144,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
noreply: false,
})
))
Expand All @@ -164,7 +159,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
noreply: true,
})
))
Expand Down
17 changes: 6 additions & 11 deletions src/protocol/memcache/src/request/append.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct Append {
pub(crate) key: Box<[u8]>,
pub(crate) value: Box<[u8]>,
pub(crate) flags: u32,
pub(crate) ttl: Option<u32>,
pub(crate) ttl: Ttl,
pub(crate) noreply: bool,
}

Expand All @@ -22,7 +22,7 @@ impl Append {
&self.value
}

pub fn ttl(&self) -> Option<u32> {
pub fn ttl(&self) -> Ttl {
self.ttl
}

Expand Down Expand Up @@ -68,7 +68,7 @@ impl Compose for Append {
fn compose(&self, session: &mut dyn BufMut) -> usize {
let verb = b"append ";
let flags = format!(" {}", self.flags).into_bytes();
let ttl = convert_ttl(self.ttl);
let ttl = format!(" {}", self.ttl.get().unwrap_or(0)).into_bytes();
let vlen = format!(" {}", self.value.len());
let header_end = if self.noreply {
" noreply\r\n".as_bytes()
Expand Down Expand Up @@ -102,11 +102,6 @@ impl Klog for Append {
type Response = Response;

fn klog(&self, response: &Self::Response) {
let ttl: i64 = match self.ttl() {
None => 0,
Some(0) => -1,
Some(t) => t as _,
};
let (code, len) = match response {
Response::Stored(ref res) => {
APPEND_STORED.increment();
Expand All @@ -124,7 +119,7 @@ impl Klog for Append {
"\"append {} {} {} {}\" {} {}",
string_key(self.key()),
self.flags(),
ttl,
self.ttl.get().unwrap_or(0),
self.value().len(),
code,
len
Expand All @@ -149,7 +144,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
noreply: false,
})
))
Expand All @@ -164,7 +159,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
noreply: true,
})
))
Expand Down
38 changes: 7 additions & 31 deletions src/protocol/memcache/src/request/cas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@
// http://www.apache.org/licenses/LICENSE-2.0

use super::*;
use common::time::Seconds;
use common::time::UnixInstant;

#[derive(Debug, PartialEq, Eq)]
pub struct Cas {
pub(crate) key: Box<[u8]>,
pub(crate) value: Box<[u8]>,
pub(crate) flags: u32,
pub(crate) ttl: Option<u32>,
pub(crate) ttl: Ttl,
pub(crate) cas: u64,
pub(crate) noreply: bool,
}
Expand All @@ -25,7 +23,7 @@ impl Cas {
&self.value
}

pub fn ttl(&self) -> Option<u32> {
pub fn ttl(&self) -> Ttl {
self.ttl
}

Expand Down Expand Up @@ -60,7 +58,7 @@ impl RequestParser {
let (input, _) = space1(input)?;
let (input, flags) = parse_u32(input)?;
let (input, _) = space1(input)?;
let (input, exptime) = parse_i64(input)?;
let (input, ttl) = parse_ttl(input, self.time_type)?;
let (input, _) = space1(input)?;
let (input, bytes) = parse_usize(input)?;

Expand All @@ -84,23 +82,6 @@ impl RequestParser {
let (input, value) = take(bytes)(input)?;
let (input, _) = crlf(input)?;

let ttl = if exptime < 0 {
Some(0)
} else if exptime == 0 {
None
} else if self.time_type == TimeType::Unix
|| (self.time_type == TimeType::Memcache && exptime > 60 * 60 * 24 * 30)
{
Some(
UnixInstant::from_secs(exptime as u32)
.checked_duration_since(UnixInstant::<Seconds<u32>>::recent())
.map(|v| v.as_secs())
.unwrap_or(0),
)
} else {
Some(exptime as u32)
};

Ok((
input,
Cas {
Expand Down Expand Up @@ -136,7 +117,7 @@ impl Compose for Cas {
fn compose(&self, session: &mut dyn BufMut) -> usize {
let verb = b"cas ";
let flags = format!(" {}", self.flags).into_bytes();
let ttl = convert_ttl(self.ttl);
let ttl = format!(" {}", self.ttl.get().unwrap_or(0)).into_bytes();
let vlen = format!(" {}", self.value.len()).into_bytes();
let cas = format!(" {}", self.cas).into_bytes();
let header_end = if self.noreply {
Expand Down Expand Up @@ -173,11 +154,6 @@ impl Klog for Cas {
type Response = Response;

fn klog(&self, response: &Self::Response) {
let ttl: i64 = match self.ttl() {
None => 0,
Some(0) => -1,
Some(t) => t as _,
};
let (code, len) = match response {
Response::Stored(ref res) => {
CAS_STORED.increment();
Expand All @@ -199,7 +175,7 @@ impl Klog for Cas {
"\"cas {} {} {} {} {}\" {} {}",
string_key(self.key()),
self.flags(),
ttl,
self.ttl.get().unwrap_or(0),
self.value().len(),
self.cas(),
code,
Expand All @@ -225,7 +201,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
cas: 42,
noreply: false,
})
Expand All @@ -241,7 +217,7 @@ mod tests {
key: b"0".to_vec().into_boxed_slice(),
value: b"0".to_vec().into_boxed_slice(),
flags: 0,
ttl: None,
ttl: Ttl::none(),
cas: 42,
noreply: true,
})
Expand Down
Loading

0 comments on commit 627cb92

Please sign in to comment.