Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

fill the data field in analytics events #31

Merged
merged 2 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub enum CaptureError {

#[error("request holds no event")]
EmptyBatch,
#[error("event submitted with an empty event name")]
MissingEventName,
#[error("event submitted without a distinct_id")]
MissingDistinctId,

Expand All @@ -58,6 +60,7 @@ impl IntoResponse for CaptureError {
CaptureError::RequestDecodingError(_)
| CaptureError::RequestParsingError(_)
| CaptureError::EmptyBatch
| CaptureError::MissingEventName
| CaptureError::MissingDistinctId
| CaptureError::EventTooBig
| CaptureError::NonRetryableSinkError => (StatusCode::BAD_REQUEST, self.to_string()),
Expand Down
26 changes: 25 additions & 1 deletion capture/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,20 @@ pub fn process_single_event(
_ => return Err(CaptureError::MissingDistinctId),
},
};
if event.event.is_empty() {
return Err(CaptureError::MissingEventName);
}

let data = serde_json::to_string(&event).map_err(|e| {
tracing::error!("failed to encode data field: {}", e);
CaptureError::NonRetryableSinkError
})?;

Ok(ProcessedEvent {
uuid: event.uuid.unwrap_or_else(uuid_v7),
distinct_id: distinct_id.to_string(),
ip: context.client_ip.clone(),
data: String::from("hallo I am some data 😊"),
data,
now: context.now.clone(),
sent_at: context.sent_at,
token: context.token.clone(),
Expand Down Expand Up @@ -158,13 +166,21 @@ mod tests {
uuid: None,
event: String::new(),
properties: HashMap::new(),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("hello"))]),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
];

Expand All @@ -181,13 +197,21 @@ mod tests {
uuid: None,
event: String::new(),
properties: HashMap::new(),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
RawEvent {
token: None,
distinct_id: Some("testing".to_string()),
uuid: None,
event: String::new(),
properties: HashMap::from([(String::from("token"), json!("goodbye"))]),
timestamp: None,
offset: None,
set: Default::default(),
set_once: Default::default(),
},
];

Expand Down
20 changes: 17 additions & 3 deletions capture/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,26 @@ pub struct EventFormData {

#[derive(Default, Debug, Deserialize, Serialize)]
pub struct RawEvent {
#[serde(alias = "$token", alias = "api_key")]
#[serde(
alias = "$token",
alias = "api_key",
skip_serializing_if = "Option::is_none"
)]
pub token: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub distinct_id: Option<String>,
pub uuid: Option<Uuid>,
pub event: String,
#[serde(default)]
pub properties: HashMap<String, Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>, // Passed through if provided, parsed by ingestion
#[serde(skip_serializing_if = "Option::is_none")]
pub offset: Option<i64>, // Passed through if provided, parsed by ingestion
#[serde(rename = "$set", skip_serializing_if = "Option::is_none")]
pub set: Option<HashMap<String, Value>>,
#[serde(rename = "$set_once", skip_serializing_if = "Option::is_none")]
pub set_once: Option<HashMap<String, Value>>,
}

#[derive(Deserialize)]
Expand All @@ -51,14 +65,14 @@ enum RawRequest {
/// Batch of events
Batch(Vec<RawEvent>),
/// Single event
One(RawEvent),
One(Box<RawEvent>),
}

impl RawRequest {
pub fn events(self) -> Vec<RawEvent> {
match self {
RawRequest::Batch(events) => events,
RawRequest::One(event) => vec![event],
RawRequest::One(event) => vec![*event],
ellie marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
26 changes: 24 additions & 2 deletions capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ impl EventSink for MemorySink {
}

#[tokio::test]
#[ignore]
async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let file = File::open(REQUESTS_DUMP_FILE_NAME)?;
let reader = BufReader::new(file);
Expand Down Expand Up @@ -107,6 +106,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
if !case.ip.is_empty() {
req = req.header("X-Forwarded-For", case.ip);
}

let res = req.send().await;
assert_eq!(
res.status(),
Expand Down Expand Up @@ -140,16 +140,38 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
OffsetDateTime::parse(value.as_str().expect("empty"), &Iso8601::DEFAULT)?;
*value = Value::String(sent_at.format(&Rfc3339)?)
}
if let Some(expected_data) = expected.get_mut("data") {
// Data is a serialized JSON map. Unmarshall both and compare them,
// instead of expecting the serialized bytes to be equal
let expected_props: Value =
serde_json::from_str(expected_data.as_str().expect("not str"))?;
let found_props: Value = serde_json::from_str(&message.data)?;
let match_config =
assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict);
if let Err(e) =
assert_json_matches_no_panic(&expected_props, &found_props, match_config)
{
println!(
"data field mismatch at line {}, event {}: {}",
line_number, event_number, e
);
mismatches += 1;
} else {
*expected_data = json!(&message.data)
}
}

if let Some(object) = expected.as_object_mut() {
// site_url is unused in the pipeline now, let's drop it
object.remove("site_url");
}

let match_config = assert_json_diff::Config::new(assert_json_diff::CompareMode::Strict);
if let Err(e) =
assert_json_matches_no_panic(&json!(expected), &json!(message), match_config)
{
println!(
"mismatch at line {}, event {}: {}",
"record mismatch at line {}, event {}: {}",
line_number, event_number, e
);
mismatches += 1;
Expand Down