Skip to content

Commit

Permalink
XFR zone updating. (#376)
Browse files Browse the repository at this point in the history
Major changes:
- Add ZoneUpdater, ZoneDiffBuilder and ZoneDiff types.

Other changes:
- ZoneUpdate:
  - Renamed the inner record type of ZoneUpdate from XfrRecord to ParsedRecord as it is not XFR specific.
  - Added variant ZoneUpdate::DeleteAllRecords which is now emitted at the start of an AXFR update by XfrZoneUpdateIterator.
  - Removed the superfluous serial number data members associated with some ZoneUpdate variants.

- zonetree::in_memory:
  - Factor out ZoneApex::prepare_name() as zonetree::util::rel_name_rev_iter() as it is also now used by ZoneUpdater.
  - Improve some naming, e.g. there was mix of terms remove and clean, all references to clean have now been renamed to remove based names instead.
  - Altered the logic in Versioned::remove_all() (formerly Versioned::clean()) as it made destructive changes to the zone that would have impacted readers of the current zone version while the new zone version was being created.
  - Adds an arc_into_inner() function for Rust <1.70.0 which doesn't have Arc::into_inner().

- Additional trait bounds where needed.

- Removed some unused code.
  • Loading branch information
ximon18 authored Sep 23, 2024
1 parent e08bfe7 commit 2f61942
Show file tree
Hide file tree
Showing 18 changed files with 2,358 additions and 208 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ parking_lot = { version = "0.12", optional = true }
moka = { version = "0.12.3", optional = true, features = ["future"] }
proc-macro2 = { version = "1.0.69", optional = true } # Force proc-macro2 to at least 1.0.69 for minimal-version build
ring = { version = "0.17", optional = true }
rustversion = { version = "1", optional = true }
serde = { version = "1.0.130", optional = true, features = ["derive"] }
siphasher = { version = "1", optional = true }
smallvec = { version = "1.3", optional = true }
Expand Down Expand Up @@ -64,7 +65,7 @@ unstable-server-transport = ["arc-swap", "chrono/clock", "libc", "net", "siphash
unstable-stelline = ["tokio/test-util", "tracing", "tracing-subscriber", "unstable-server-transport", "zonefile"]
unstable-validator = ["validate", "zonefile", "unstable-client-transport"]
unstable-xfr = []
unstable-zonetree = ["futures-util", "parking_lot", "serde", "tokio", "tracing"]
unstable-zonetree = ["futures-util", "parking_lot", "rustversion", "serde", "tokio", "tracing"]

[dev-dependencies]
lazy_static = { version = "1.4.0" }
Expand Down
28 changes: 13 additions & 15 deletions src/net/xfr/protocol/interpreter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::rdata::{Soa, ZoneRecordData};
use crate::zonetree::types::ZoneUpdate;

use super::iterator::XfrZoneUpdateIterator;
use super::types::{Error, IxfrUpdateMode, XfrRecord, XfrType};
use super::types::{Error, IxfrUpdateMode, ParsedRecord, XfrType};

//------------ XfrResponseInterpreter -----------------------------------------

Expand Down Expand Up @@ -40,6 +40,8 @@ pub struct XfrResponseInterpreter {
/// Internal state.
///
/// None until the first call to [`interpret_response()`].
///
/// [`interpret_response()`]: XfrResponseInterpreter::interpret_response()
inner: Option<Inner>,
}

Expand Down Expand Up @@ -148,6 +150,8 @@ impl XfrResponseInterpreter {
///
/// Separated out from [`XfrResponseInterpreter`] to avoid needing multiple
/// mutable self references in [`interpret_response()`].
///
/// [`interpret_response()`]: XfrResponseInterpreter::interpret_response()
struct Inner {
/// The response message currently being processed.
resp: Message<Bytes>,
Expand Down Expand Up @@ -259,8 +263,8 @@ impl RecordProcessor {
/// record, if any.
pub(super) fn process_record(
&mut self,
rec: XfrRecord,
) -> ZoneUpdate<XfrRecord> {
rec: ParsedRecord,
) -> ZoneUpdate<ParsedRecord> {
self.rr_count += 1;

// https://datatracker.ietf.org/doc/html/rfc5936#section-2.2
Expand Down Expand Up @@ -297,9 +301,7 @@ impl RecordProcessor {
ZoneUpdate::Finished(rec)
}

XfrType::Axfr => {
ZoneUpdate::AddRecord(self.current_soa.serial(), rec)
}
XfrType::Axfr => ZoneUpdate::AddRecord(rec),

XfrType::Ixfr if self.rr_count < 2 => unreachable!(),

Expand Down Expand Up @@ -348,7 +350,7 @@ impl RecordProcessor {
// assume that "incremental zone transfer is not available"
// and so "the behaviour is the same as an AXFR response",
self.actual_xfr_type = XfrType::Axfr;
ZoneUpdate::AddRecord(self.current_soa.serial(), rec)
ZoneUpdate::AddRecord(rec)
}
}

Expand Down Expand Up @@ -377,14 +379,10 @@ impl RecordProcessor {
}
} else {
match self.ixfr_update_mode {
IxfrUpdateMode::Deleting => ZoneUpdate::DeleteRecord(
self.current_soa.serial(),
rec,
),
IxfrUpdateMode::Adding => ZoneUpdate::AddRecord(
self.current_soa.serial(),
rec,
),
IxfrUpdateMode::Deleting => {
ZoneUpdate::DeleteRecord(rec)
}
IxfrUpdateMode::Adding => ZoneUpdate::AddRecord(rec),
}
}
}
Expand Down
21 changes: 18 additions & 3 deletions src/net/xfr/protocol/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::rdata::ZoneRecordData;
use crate::zonetree::types::ZoneUpdate;

use super::interpreter::RecordProcessor;
use super::types::{Error, IterationError, XfrRecord};
use super::types::{Error, IterationError, ParsedRecord, XfrType};

//------------ XfrZoneUpdateIterator ------------------------------------------

Expand Down Expand Up @@ -54,17 +54,32 @@ impl<'a, 'b> XfrZoneUpdateIterator<'a, 'b> {
let Some(Ok(_)) = iter.next() else {
return Err(Error::Malformed);
};
state.rr_count += 1;
}

Ok(Self { state, iter })
}
}

impl<'a, 'b> Iterator for XfrZoneUpdateIterator<'a, 'b> {
type Item = Result<ZoneUpdate<XfrRecord>, IterationError>;
type Item = Result<ZoneUpdate<ParsedRecord>, IterationError>;

fn next(&mut self) -> Option<Self::Item> {
if self.state.rr_count == 0 {
// We already skipped the first record in new() above by calling
// iter.next(). We didn't reflect that yet in rr_count because we
// wanted to still be able to detect the first call to next() and
// handle it specially for AXFR.
self.state.rr_count += 1;

if self.state.actual_xfr_type == XfrType::Axfr {
// For AXFR we're not making incremental changes to a zone,
// we're replacing its entire contents, so before returning
// any actual updates to apply first instruct the consumer to
// "discard" everything it has.
return Some(Ok(ZoneUpdate::DeleteAllRecords));
}
}

match self.iter.next()? {
Ok(record) => {
trace!("XFR record {}: {record:?}", self.state.rr_count);
Expand Down
2 changes: 1 addition & 1 deletion src/net/xfr/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ mod tests;

pub use interpreter::XfrResponseInterpreter;
pub use iterator::XfrZoneUpdateIterator;
pub use types::{Error, IterationError, XfrRecord};
pub use types::{Error, IterationError, ParsedRecord};
80 changes: 38 additions & 42 deletions src/net/xfr/protocol/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ use crate::base::iana::Rcode;
use crate::base::message_builder::{
AnswerBuilder, AuthorityBuilder, QuestionBuilder,
};
use crate::base::net::Ipv4Addr;
use crate::base::net::{Ipv4Addr, Ipv6Addr};
use crate::base::rdata::ComposeRecordData;
use crate::base::{
Message, MessageBuilder, ParsedName, Record, Rtype, Serial, Ttl,
};
use crate::base::{Name, ToName};
use crate::rdata::{Soa, ZoneRecordData, A};
use crate::rdata::{Aaaa, Soa, ZoneRecordData, A};
use crate::zonetree::types::{ZoneUpdate, ZoneUpdate as ZU};

use super::interpreter::XfrResponseInterpreter;
use super::types::{Error, IterationError, XfrRecord};
use super::types::{Error, IterationError, ParsedRecord};

#[test]
fn non_xfr_response_is_rejected() {
Expand Down Expand Up @@ -112,7 +112,8 @@ fn incomplete_axfr_response_is_accepted() {
// Process the response.
let mut it = interpreter.interpret_response(resp).unwrap();

// Verify that no updates are by the XFR interpreter.
// Verify that no updates are output by the XFR interpreter.
assert_eq!(it.next(), Some(Ok(ZoneUpdate::DeleteAllRecords)));
assert!(it.next().is_none());
}

Expand Down Expand Up @@ -141,6 +142,7 @@ fn axfr_response_with_only_soas_is_accepted() {
let mut it = interpreter.interpret_response(resp).unwrap();

// Verify the updates emitted by the XFR interpreter.
assert_eq!(it.next(), Some(Ok(ZoneUpdate::DeleteAllRecords)));
assert!(matches!(it.next(), Some(Ok(ZU::Finished(_)))));
assert!(it.next().is_none());
}
Expand Down Expand Up @@ -169,6 +171,7 @@ fn axfr_multi_response_with_only_soas_is_accepted() {
let mut it = interpreter.interpret_response(resp).unwrap();

// Verify the updates emitted by the XFR interpreter.
assert_eq!(it.next(), Some(Ok(ZoneUpdate::DeleteAllRecords)));
assert!(it.next().is_none());

// Create another AXFR response to complete the transfer.
Expand Down Expand Up @@ -200,18 +203,22 @@ fn axfr_response_generates_expected_updates() {
let soa = mk_soa(serial);
add_answer_record(&req, &mut answer, soa.clone());
add_answer_record(&req, &mut answer, A::new(Ipv4Addr::LOCALHOST));
add_answer_record(&req, &mut answer, A::new(Ipv4Addr::BROADCAST));
add_answer_record(&req, &mut answer, Aaaa::new(Ipv6Addr::LOCALHOST));
add_answer_record(&req, &mut answer, soa);
let resp = answer.into_message();

// Process the response.
let mut it = interpreter.interpret_response(resp).unwrap();

// Verify the updates emitted by the XFR interpreter.
let s = serial;
assert!(matches!(it.next(), Some(Ok(ZU::AddRecord(n, _))) if n == s));
assert!(matches!(it.next(), Some(Ok(ZU::AddRecord(n, _))) if n == s));
assert!(matches!(it.next(), Some(Ok(ZU::Finished(_)))));
assert_eq!(it.next(), Some(Ok(ZoneUpdate::DeleteAllRecords)));
assert!(
matches!(it.next(), Some(Ok(ZoneUpdate::AddRecord(r))) if r.rtype() == Rtype::A)
);
assert!(
matches!(it.next(), Some(Ok(ZoneUpdate::AddRecord(r))) if r.rtype() == Rtype::AAAA)
);
assert!(matches!(it.next(), Some(Ok(ZoneUpdate::Finished(_)))));
assert!(it.next().is_none());
}

Expand Down Expand Up @@ -276,49 +283,38 @@ fn ixfr_response_generates_expected_updates() {
// Verify the updates emitted by the XFR interpreter.
let owner =
ParsedName::<Bytes>::from(Name::from_str("example.com").unwrap());
let expected_updates: [Result<ZoneUpdate<XfrRecord>, IterationError>; 7] = [
let expected_updates: [Result<ZoneUpdate<ParsedRecord>, IterationError>;
7] = [
Ok(ZoneUpdate::BeginBatchDelete(Record::from((
owner.clone(),
0,
ZoneRecordData::Soa(expected_old_soa),
)))),
Ok(ZoneUpdate::DeleteRecord(
old_serial,
Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::LOCALHOST)),
)),
)),
Ok(ZoneUpdate::DeleteRecord(
old_serial,
Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::BROADCAST)),
)),
)),
Ok(ZoneUpdate::DeleteRecord(Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::LOCALHOST)),
)))),
Ok(ZoneUpdate::DeleteRecord(Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::BROADCAST)),
)))),
Ok(ZoneUpdate::BeginBatchAdd(Record::from((
owner.clone(),
0,
ZoneRecordData::Soa(expected_new_soa.clone()),
)))),
Ok(ZoneUpdate::AddRecord(
new_serial,
Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::BROADCAST)),
)),
)),
Ok(ZoneUpdate::AddRecord(
new_serial,
Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::LOCALHOST)),
)),
)),
Ok(ZoneUpdate::AddRecord(Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::BROADCAST)),
)))),
Ok(ZoneUpdate::AddRecord(Record::from((
owner.clone(),
0,
ZoneRecordData::A(A::new(Ipv4Addr::LOCALHOST)),
)))),
Ok(ZoneUpdate::Finished(Record::from((
owner.clone(),
0,
Expand Down
2 changes: 1 addition & 1 deletion src/net/xfr/protocol/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
/// The type of record processed by [`XfrResponseInterpreter`].
///
/// [`XfrResponseInterpreter`]: super::interpreter::XfrResponseInterpreter
pub type XfrRecord =
pub type ParsedRecord =
Record<ParsedName<Bytes>, ZoneRecordData<Bytes, ParsedName<Bytes>>>;

//------------ XfrType --------------------------------------------------------
Expand Down
3 changes: 3 additions & 0 deletions src/zonetree/in_memory/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
//! An in-memory backing store for [`Zone`]s.
//!
//! [`Zone`]: super::Zone
mod builder;
mod nodes;
mod read;
Expand Down
Loading

0 comments on commit 2f61942

Please sign in to comment.