Skip to content

Commit

Permalink
Merge pull request #236 from influxdata/dependabot/cargo/j4rs-0.18.0
Browse files Browse the repository at this point in the history
chore(deps): update j4rs requirement from 0.17.0 to 0.18.0
  • Loading branch information
kodiakhq[bot] authored Apr 15, 2024
2 parents 626e2d6 + adf09a9 commit 1a3e411
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ assert_matches = "1.5"
criterion = { version = "0.5", features = ["async_tokio"] }
dotenvy = "0.15.1"
futures = "0.3"
j4rs = "0.17.0"
j4rs = "0.18.0"
once_cell = "1.9"
procspawn = "1.0"
proptest = "1"
Expand Down
68 changes: 48 additions & 20 deletions tests/java_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ pub async fn produce(
let headers = jvm
.create_instance(
"org.apache.kafka.common.header.internals.RecordHeaders",
&[],
InvocationArg::empty(),
)
.expect("creating KafkaProducer");
for (k, v) in record.headers {
Expand Down Expand Up @@ -116,12 +116,15 @@ pub async fn produce(
}

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#flush()
jvm.invoke(&producer, "flush", &[]).expect("flush");
jvm.invoke(&producer, "flush", InvocationArg::empty())
.expect("flush");

let mut offsets = vec![];
for fut in futures {
// https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/Future.html#get()
let record_metadata = jvm.invoke(&fut, "get", &[]).expect("polling future");
let record_metadata = jvm
.invoke(&fut, "get", InvocationArg::empty())
.expect("polling future");

// future returns `java.lang.Object`, cast that to the known result type
let record_metadata = jvm
Expand All @@ -133,15 +136,16 @@ pub async fn produce(

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/producer/RecordMetadata.html#offset()
let offset = jvm
.invoke(&record_metadata, "offset", &[])
.invoke(&record_metadata, "offset", InvocationArg::empty())
.expect("getting offset");

let offset: i64 = jvm.to_rust(offset).expect("converting offset to Rust");
offsets.push(offset);
}

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
jvm.invoke(&producer, "close", &[]).expect("close");
jvm.invoke(&producer, "close", InvocationArg::empty())
.expect("close");

offsets
}
Expand Down Expand Up @@ -221,7 +225,7 @@ pub async fn consume(

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerRecords.html#iterator()
let it = jvm
.invoke(&consumer_records, "iterator", &[])
.invoke(&consumer_records, "iterator", InvocationArg::empty())
.expect("iterator");
for consumer_record in JavaIterator::new(&jvm, it) {
// iterator returns `java.lang.Object` which we need to cast to the known type
Expand All @@ -233,42 +237,56 @@ pub async fn consume(
.expect("cast to ConsumerRecord");

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html
let key = jvm.invoke(&consumer_record, "key", &[]).expect("key");
let key = jvm
.invoke(&consumer_record, "key", InvocationArg::empty())
.expect("key");
let key: String = jvm.to_rust(key).expect("key to Rust");

let offset = jvm.invoke(&consumer_record, "offset", &[]).expect("offset");
let offset = jvm
.invoke(&consumer_record, "offset", InvocationArg::empty())
.expect("offset");
let offset: i64 = jvm.to_rust(offset).expect("offset to Rust");

let timestamp = jvm
.invoke(&consumer_record, "timestamp", &[])
.invoke(&consumer_record, "timestamp", InvocationArg::empty())
.expect("timestamp");
let timestamp: i64 = jvm.to_rust(timestamp).expect("timestamp to Rust");

let value = jvm.invoke(&consumer_record, "value", &[]).expect("value");
let value = jvm
.invoke(&consumer_record, "value", InvocationArg::empty())
.expect("value");
let value: String = jvm.to_rust(value).expect("value to Rust");

let headers = jvm
.invoke(&consumer_record, "headers", &[])
.invoke(&consumer_record, "headers", InvocationArg::empty())
.expect("headers");
let headers = jvm.invoke(&headers, "toArray", &[]).expect("toArray");
let headers = jvm
.invoke(&headers, "toArray", InvocationArg::empty())
.expect("toArray");
let headers = jvm
.invoke_static(
"java.util.Arrays",
"asList",
&[InvocationArg::from(headers)],
)
.expect("headers asList");
let headers_it = jvm.invoke(&headers, "iterator", &[]).expect("iterator");
let headers_it = jvm
.invoke(&headers, "iterator", InvocationArg::empty())
.expect("iterator");
let mut headers = BTreeMap::new();
for header in JavaIterator::new(&jvm, headers_it) {
let header = jvm
.cast(&header, "org.apache.kafka.common.header.Header")
.expect("cast to Header");

let key = jvm.invoke(&header, "key", &[]).expect("key");
let key = jvm
.invoke(&header, "key", InvocationArg::empty())
.expect("key");
let key: String = jvm.to_rust(key).expect("key to Rust");

let value = jvm.invoke(&header, "value", &[]).expect("value");
let value = jvm
.invoke(&header, "value", InvocationArg::empty())
.expect("value");
let value = from_java_bytes(&jvm, value);

headers.insert(key, value);
Expand All @@ -286,7 +304,8 @@ pub async fn consume(
}

// https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#close()
jvm.invoke(&consumer, "close", &[]).expect("close");
jvm.invoke(&consumer, "close", InvocationArg::empty())
.expect("close");

results
}
Expand Down Expand Up @@ -337,7 +356,7 @@ fn setup_jvm() -> Jvm {

fn create_properties(jvm: &Jvm, properties: &[(&str, &str)]) -> Instance {
let props = jvm
.create_instance("java.util.Properties", &[])
.create_instance("java.util.Properties", InvocationArg::empty())
.expect("creating Properties");

for (k, v) in properties {
Expand Down Expand Up @@ -380,7 +399,9 @@ fn from_java_bytes(jvm: &Jvm, bytes: Instance) -> Vec<u8> {
.invoke_static("java.util.Arrays", "asList", &[InvocationArg::from(bytes)])
.expect("bytes asList");

let it = jvm.invoke(&bytes, "iterator", &[]).expect("iterator");
let it = jvm
.invoke(&bytes, "iterator", InvocationArg::empty())
.expect("iterator");

let mut res = vec![];
for byte in JavaIterator::new(jvm, it) {
Expand Down Expand Up @@ -408,11 +429,18 @@ impl<'a> Iterator for JavaIterator<'a> {

fn next(&mut self) -> Option<Self::Item> {
// https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Iterator.html#hasNext()
let has_next = self.jvm.invoke(&self.it, "hasNext", &[]).expect("hasNext");
let has_next = self
.jvm
.invoke(&self.it, "hasNext", InvocationArg::empty())
.expect("hasNext");
let has_next: bool = self.jvm.to_rust(has_next).expect("hasNext to Rust");
if has_next {
// https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Iterator.html#next()
Some(self.jvm.invoke(&self.it, "next", &[]).expect("next"))
Some(
self.jvm
.invoke(&self.it, "next", InvocationArg::empty())
.expect("next"),
)
} else {
None
}
Expand Down

0 comments on commit 1a3e411

Please sign in to comment.