Skip to content

Commit

Permalink
Duplicate timestamp and input-sorting changes
Browse files Browse the repository at this point in the history
IPAv3 sorts the input records, so add sorting of input records to
ipa_in_the_clear, and remove it from input drivers.

IPAv3 also shuffles the input records, so if there are duplicate
timestamps among a single user's records, the output may be
non-deterministic. To avoid end-to-end test failures, modify
the test data generator to avoid generating duplicate timestamps.
Add a directed test to exercise duplicate timestamps.
  • Loading branch information
andyleiserson committed Apr 16, 2024
1 parent 428d1e9 commit cc22bcc
Show file tree
Hide file tree
Showing 5 changed files with 231 additions and 105 deletions.
5 changes: 1 addition & 4 deletions ipa-core/benches/oneshot/ipa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn run(args: Args) -> Result<(), Error> {
args.query_size,
)
};
let mut raw_data = EventGenerator::with_config(
let raw_data = EventGenerator::with_config(
rng,
EventGeneratorConfig {
user_count,
Expand All @@ -143,9 +143,6 @@ async fn run(args: Args) -> Result<(), Error> {
)
.take(query_size)
.collect::<Vec<_>>();
// EventGenerator produces events in random order, but IPA requires them to be sorted by
// timestamp.
raw_data.sort_by_key(|e| e.timestamp);

let order = CappingOrder::CapMostRecentFirst;

Expand Down
3 changes: 1 addition & 2 deletions ipa-core/src/bin/report_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,9 @@ fn gen_inputs(
let rng = seed
.map(StdRng::seed_from_u64)
.unwrap_or_else(StdRng::from_entropy);
let mut event_gen = EventGenerator::with_config(rng, args)
let event_gen = EventGenerator::with_config(rng, args)
.take(count as usize)
.collect::<Vec<_>>();
event_gen.sort_by_key(|e| e.timestamp);
let mut writer: Box<dyn Write> = if let Some(path) = output_file {
Box::new(OpenOptions::new().write(true).create_new(true).open(path)?)
} else {
Expand Down
99 changes: 64 additions & 35 deletions ipa-core/src/protocol/ipa_prf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,23 @@ pub mod tests {
test_executor::run,
test_fixture::{ipa::TestRawDataRecord, Reconstruct, Runner, TestWorld},
};
use rand::{seq::SliceRandom, thread_rng};

fn test_input(
timestamp: u64,
user_id: u64,
is_trigger_report: bool,
breakdown_key: u32,
trigger_value: u32,
) -> TestRawDataRecord {
TestRawDataRecord {
timestamp,
user_id,
is_trigger_report,
breakdown_key,
trigger_value,
}
}

#[test]
fn semi_honest() {
Expand All @@ -322,41 +339,11 @@ pub mod tests {
let world = TestWorld::default();

let records: Vec<TestRawDataRecord> = vec![
TestRawDataRecord {
timestamp: 0,
user_id: 12345,
is_trigger_report: false,
breakdown_key: 1,
trigger_value: 0,
},
TestRawDataRecord {
timestamp: 5,
user_id: 12345,
is_trigger_report: false,
breakdown_key: 2,
trigger_value: 0,
},
TestRawDataRecord {
timestamp: 10,
user_id: 12345,
is_trigger_report: true,
breakdown_key: 0,
trigger_value: 5,
},
TestRawDataRecord {
timestamp: 0,
user_id: 68362,
is_trigger_report: false,
breakdown_key: 1,
trigger_value: 0,
},
TestRawDataRecord {
timestamp: 20,
user_id: 68362,
is_trigger_report: true,
breakdown_key: 0,
trigger_value: 2,
},
test_input(0, 12345, false, 1, 0),
test_input(5, 12345, false, 2, 0),
test_input(10, 12345, true, 0, 5),
test_input(0, 68362, false, 1, 0),
test_input(20, 68362, true, 0, 2),
];

let mut result: Vec<_> = world
Expand All @@ -377,4 +364,46 @@ pub mod tests {
);
});
}

// Test that IPA tolerates duplicate timestamps among a user's records. The end-to-end test
// harness does not generate data like this because the attribution result is non-deterministic.
// To make the output deterministic for this case, all of the duplicate timestamp records are
// identical.
#[test]
fn duplicate_timestamps() {
const EXPECTED: &[u128] = &[0, 2, 10, 0, 0, 0, 0, 0];

run(|| async {
let world = TestWorld::default();

let mut records: Vec<TestRawDataRecord> = vec![
test_input(0, 12345, false, 1, 0),
test_input(5, 12345, false, 2, 0),
test_input(5, 12345, false, 2, 0),
test_input(10, 12345, true, 0, 5),
test_input(10, 12345, true, 0, 5),
test_input(0, 68362, false, 1, 0),
test_input(20, 68362, true, 0, 2),
];

records.shuffle(&mut thread_rng());

let mut result: Vec<_> = world
.semi_honest(records.into_iter(), |ctx, input_rows| async move {
oprf_ipa::<_, BA8, BA3, BA20, BA5, Fp31>(ctx, input_rows, None)
.await
.unwrap()
})
.await
.reconstruct();
result.truncate(EXPECTED.len());
assert_eq!(
result,
EXPECTED
.iter()
.map(|i| Fp31::try_from(*i).unwrap())
.collect::<Vec<_>>()
);
});
}
}
99 changes: 57 additions & 42 deletions ipa-core/src/test_fixture/event_gen.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
use std::{
collections::HashSet,
num::{NonZeroU32, NonZeroU64},
};

use crate::{rand::Rng, test_fixture::ipa::TestRawDataRecord};

#[derive(Copy, Clone, Hash, Ord, PartialOrd, Eq, PartialEq)]
struct UserId(u64);

Expand Down Expand Up @@ -26,6 +33,10 @@ impl UserId {
pub const FIRST: Self = Self(1);
}

// 7 days = 604800 seconds fits in 20 bits
pub type Timestamp = u32;
pub type NonZeroTimestamp = NonZeroU32;

#[derive(Debug, Copy, Clone)]
#[cfg_attr(feature = "clap", derive(clap::ValueEnum))]
pub enum ReportFilter {
Expand All @@ -46,8 +57,7 @@ pub struct Config {
#[cfg_attr(feature = "clap", arg(long, default_value = "20"))]
pub max_breakdown_key: NonZeroU32,
#[cfg_attr(feature = "clap", arg(long, hide = true, default_value = "604800"))]
// 7 days < 20 bits
pub max_timestamp: NonZeroU32,
pub max_timestamp: NonZeroTimestamp,
#[cfg_attr(feature = "clap", arg(long, default_value = "10"))]
pub max_events_per_user: NonZeroU32,
#[cfg_attr(feature = "clap", arg(long, default_value = "1"))]
Expand Down Expand Up @@ -89,14 +99,14 @@ impl Config {
max_breakdown_key: u32,
min_events_per_user: u32,
max_events_per_user: u32,
max_timestamp: u32,
max_timestamp: Timestamp,
) -> Self {
assert!(min_events_per_user < max_events_per_user);
Self {
user_count: NonZeroU64::try_from(user_count).unwrap(),
max_trigger_value: NonZeroU32::try_from(max_trigger_value).unwrap(),
max_breakdown_key: NonZeroU32::try_from(max_breakdown_key).unwrap(),
max_timestamp: NonZeroU32::try_from(max_timestamp).unwrap(),
max_timestamp: NonZeroTimestamp::try_from(max_timestamp).unwrap(),
min_events_per_user: NonZeroU32::try_from(min_events_per_user).unwrap(),
max_events_per_user: NonZeroU32::try_from(max_events_per_user).unwrap(),
report_filter: ReportFilter::All,
Expand All @@ -111,17 +121,11 @@ impl Config {
}
}

use std::{
collections::HashSet,
num::{NonZeroU32, NonZeroU64},
};

use crate::{rand::Rng, test_fixture::ipa::TestRawDataRecord};

struct UserStats {
user_id: UserId,
generated: u32,
max: u32,
used_timestamps: HashSet<Timestamp>,
}

impl UserStats {
Expand All @@ -130,6 +134,7 @@ impl UserStats {
user_id,
generated: 0,
max: max_events,
used_timestamps: HashSet::new(),
}
}

Expand All @@ -152,8 +157,7 @@ pub struct EventGenerator<R: Rng> {
config: Config,
rng: R,
users: Vec<UserStats>,
// even bit vector takes too long to initialize. Need a sparse structure here
used: HashSet<UserId>,
used_ids: HashSet<UserId>,
}

impl<R: Rng> EventGenerator<R> {
Expand All @@ -166,15 +170,21 @@ impl<R: Rng> EventGenerator<R> {
config,
rng,
users: vec![],
used: HashSet::new(),
used_ids: HashSet::new(),
}
}

fn gen_event(&mut self, user_id: UserId) -> TestRawDataRecord {
// Generate a new random timestamp between [0..`max_timestamp`).
// This means the generated events must be sorted by timestamp before being
// fed into the IPA protocols.
let current_ts = self.rng.gen_range(0..self.config.max_timestamp.get());
fn gen_event(&mut self, idx: usize) -> TestRawDataRecord {
let user_id = self.users[idx].user_id;

// Generate a new random timestamp between [0..`max_timestamp`) and distinct from
// already-used timestamps.
let current_ts = loop {
let ts = self.rng.gen_range(0..self.config.max_timestamp.get());
if self.users[idx].used_timestamps.insert(ts) {
break ts;
}
};

match self.config.report_filter {
ReportFilter::All => {
Expand All @@ -198,7 +208,7 @@ impl<R: Rng> EventGenerator<R> {
}
}

fn gen_trigger(&mut self, user_id: UserId, timestamp: u32) -> TestRawDataRecord {
fn gen_trigger(&mut self, user_id: UserId, timestamp: Timestamp) -> TestRawDataRecord {
let trigger_value = self.rng.gen_range(1..self.config.max_trigger_value.get());

TestRawDataRecord {
Expand All @@ -210,7 +220,7 @@ impl<R: Rng> EventGenerator<R> {
}
}

fn gen_source(&mut self, user_id: UserId, timestamp: u32) -> TestRawDataRecord {
fn gen_source(&mut self, user_id: UserId, timestamp: Timestamp) -> TestRawDataRecord {
let breakdown_key = self.rng.gen_range(0..self.config.max_breakdown_key.get());

TestRawDataRecord {
Expand All @@ -223,28 +233,28 @@ impl<R: Rng> EventGenerator<R> {
}

fn sample_user(&mut self) -> Option<UserStats> {
if self.used.len() == self.config.user_count() {
if self.used_ids.len() == self.config.user_count() {
return None;
}

let valid = |user_id| -> bool { !self.used.contains(&user_id) };

Some(loop {
let next = UserId::from(
loop {
let user_id = UserId::from(
self.rng
.gen_range(UserId::FIRST.into()..=self.config.user_count.get()),
);
if valid(next) {
self.used.insert(next);
break UserStats::new(
next,
self.rng.gen_range(
self.config.min_events_per_user.get()
..=self.config.max_events_per_user.get(),
),
);
if self.used_ids.contains(&user_id) {
continue;
}
})
self.used_ids.insert(user_id);

break Some(UserStats::new(
user_id,
self.rng.gen_range(
self.config.min_events_per_user.get()
..=self.config.max_events_per_user.get(),
),
));
}
}
}

Expand All @@ -266,12 +276,13 @@ impl<R: Rng> Iterator for EventGenerator<R> {
}

let idx = self.rng.gen_range(0..self.users.len());
let user_id = self.users[idx].user_id;
let event = self.gen_event(idx);

if self.users[idx].add_one() {
self.users.swap_remove(idx);
}

Some(self.gen_event(user_id))
Some(event)
}
}

Expand Down Expand Up @@ -366,6 +377,12 @@ mod tests {
"Found source report with trigger value set"
);
}

assert!(
event.timestamp < u64::from(self.max_timestamp.get()),
"Timestamp should not exceed configured maximum",
)

}
}

Expand All @@ -392,7 +409,7 @@ mod tests {
user_count: NonZeroU64::new(10_000).unwrap(),
max_trigger_value: NonZeroU32::new(max_trigger_value).unwrap(),
max_breakdown_key: NonZeroU32::new(max_breakdown_key).unwrap(),
max_timestamp: NonZeroU32::new(604_800).unwrap(),
max_timestamp: NonZeroTimestamp::new(604_800).unwrap(),
min_events_per_user: NonZeroU32::new(min_events_per_user).unwrap(),
max_events_per_user: NonZeroU32::new(max_events_per_user).unwrap(),
report_filter,
Expand Down Expand Up @@ -423,9 +440,7 @@ mod tests {
"Generated breakdown key greater than {max_breakdown}"
);

// Basic correctness checks. timestamps are not checked as the order of events
// is not guaranteed. The caller must sort the events by timestamp before
// feeding them into IPA.
// Basic correctness checks.
config.is_valid(&event);
}
}
Expand Down
Loading

0 comments on commit cc22bcc

Please sign in to comment.