Skip to content

Commit cf59e4d

Browse files
author
Devdutt Shenoi
authored
refactor: process ain't async (#1180)
- Streamlined the event processing flow by converting operations from asynchronous to synchronous execution. - Enhanced error management to simplify control flow and improve overall system responsiveness. This update refines internal event handling, potentially contributing to better system stability and performance.
1 parent 671abae commit cf59e4d

File tree

4 files changed

+5
-9
lines changed

4 files changed

+5
-9
lines changed

src/connectors/kafka/processor.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,7 @@ impl Processor<Vec<ConsumerRecord>, ()> for ParseableSinkProcessor {
109109
let len = records.len();
110110
debug!("Processing {} records", len);
111111

112-
self.build_event_from_chunk(&records)
113-
.await?
114-
.process()
115-
.await?;
112+
self.build_event_from_chunk(&records).await?.process()?;
116113

117114
debug!("Processed {} records", len);
118115
Ok(())

src/event/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ pub struct Event {
4646

4747
// Events holds the schema related to a each event for a single log stream
4848
impl Event {
49-
pub async fn process(self) -> Result<(), EventError> {
49+
pub fn process(self) -> Result<(), EventError> {
5050
let mut key = get_schema_key(&self.rb.schema().fields);
5151
if self.time_partition.is_some() {
5252
let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string();

src/handlers/http/ingest.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,8 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
106106
custom_partition_values: HashMap::new(),
107107
stream_type: StreamType::Internal,
108108
}
109-
.process()
110-
.await?;
109+
.process()?;
110+
111111
Ok(())
112112
}
113113

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,7 @@ async fn push_logs(
160160
custom_partition_values,
161161
stream_type: StreamType::UserDefined,
162162
}
163-
.process()
164-
.await?;
163+
.process()?;
165164
}
166165
Ok(())
167166
}

0 commit comments

Comments
 (0)