Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polish samples #137

Merged
merged 1 commit into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The sample application test uses Testcontainers, and creates a temporary Oracle
To run the test application, run the following command:

```shell
mvn test
mvn test -Dtest=JSONDualitySampleApplicationTest
```

## Configure your project to use Oracle JSON Relational Duality Views
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,13 @@
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonduality;

import java.util.UUID;

public class Course {
private String _id;
private String name;
private String description;
private Integer credits;
private LectureHall lecture_hall;

public static Course createCourse() {
Course course = new Course();
course.set_id(UUID.randomUUID().toString());
return course;
}

public Course() {
}

Expand Down Expand Up @@ -67,4 +59,15 @@ public LectureHall getLecture_hall() {
public void setLecture_hall(LectureHall lecture_hall) {
this.lecture_hall = lecture_hall;
}

@Override
public String toString() {
return "Course{" +
"_id='" + _id + '\'' +
", name='" + name + '\'' +
", description='" + description + '\'' +
", credits=" + credits +
", lecture_hall=" + lecture_hall +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonduality;

import java.util.UUID;

public class Enrollment {
private String _id;
private Course course;

public Enrollment createEnrollment() {
Enrollment enrollment = new Enrollment();
enrollment.set_id(UUID.randomUUID().toString());
return enrollment;
}

public Enrollment() {}

public Enrollment(String _id, Course course) {
Expand All @@ -36,4 +28,12 @@ public Course getCourse() {
public void setCourse(Course course) {
this.course = course;
}

@Override
public String toString() {
return "Enrollment{" +
"_id='" + _id + '\'' +
", course=" + course +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,10 @@
// Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl.
package com.oracle.database.spring.jsonduality;

import java.util.UUID;

public class LectureHall {
private String _id;
private String name;

public static LectureHall createLecureHall() {
LectureHall hall = new LectureHall();
hall.set_id(UUID.randomUUID().toString());
return hall;
}

public LectureHall() {}

public LectureHall(String _id, String name) {
Expand All @@ -36,4 +28,12 @@ public String getName() {
public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "LectureHall{" +
"_id='" + _id + '\'' +
", name='" + name + '\'' +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@
package com.oracle.database.spring.jsonduality;

import java.util.List;
import java.util.UUID;


public class Student {
public final class Student {
private String _id;
private String first_name;
private String last_name;
Expand All @@ -16,12 +15,6 @@ public class Student {
private double credits;
private List<Enrollment> enrollments;

public static Student createStudent() {
Student student = new Student();
student.set_id(UUID.randomUUID().toString());
return student;
}

public Student() {}

public Student(String _id, String first_name, String last_name, String email, String major, double gpa, double credits, List<Enrollment> enrollments) {
Expand Down Expand Up @@ -98,4 +91,18 @@ public List<Enrollment> getEnrollments() {
public void setEnrollments(List<Enrollment> enrollments) {
this.enrollments = enrollments;
}

@Override
public String toString() {
return "Student{" +
"_id='" + _id + '\'' +
", first_name='" + first_name + '\'' +
", last_name='" + last_name + '\'' +
", email='" + email + '\'' +
", major='" + major + '\'' +
", gpa=" + gpa +
", credits=" + credits +
", enrollments=" + enrollments +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ static void properties(DynamicPropertyRegistry registry) {

@Test
void jsonDualityViewsSampleApplication() {
System.out.println("#### Querying Courses By Name:");
// fetch courses
List<Course> courseByName = courseService.getCourseByName("Introduction to Computer Science");
assertThat(courseByName).hasSize(1);
Course introToCS = courseByName.get(0);
System.out.println("#### Intro to Computer Science:\n" + introToCS);
courseByName = courseService.getCourseByName("Data Structures and Algorithms");
assertThat(courseByName).hasSize(1);
Course dsAndAlgo = courseByName.get(0);
System.out.println("\n#### Data Structures and Algorithms:\n" + dsAndAlgo);


System.out.println("\n\n\n#### Enrolling Student in CS101");
// Enroll existing student in a new course
Student aliceSmith = getStudent("Alice", "Smith");
Enrollment introToCSEnrollment = new Enrollment();
Expand All @@ -76,7 +79,10 @@ void jsonDualityViewsSampleApplication() {
.isEqualTo(introToCS.getName());
assertThat(asEnrollments.get(0).getCourse().getLecture_hall().getName())
.isEqualTo("Hoffman Hall");
System.out.println("#### Enrollment created:\n" + aliceSmith);


System.out.println("\n\n\n#### Creating new student with two enrollments");
// Create a new student with two enrollments
Student bobSwanson = new Student();
bobSwanson.setFirst_name("Robert");
Expand All @@ -94,6 +100,7 @@ void jsonDualityViewsSampleApplication() {
// Verify student created with enrollments
bobSwanson = getStudent("Robert", "Swanson");
assertThat(bobSwanson.getEnrollments()).hasSize(2);
System.out.println("#### Student created:\n" + bobSwanson);
}

private Student getStudent(String firstName, String lastName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ The sample application test uses Testcontainers, and creates a temporary Oracle
To run application test, run the following command:

```shell
mvn test
mvn test -Dtest=JSONEventsSampleTest
```

The test starts a sensor data consumer, and sends a series of raw weather station events to the producer. The test verifies that the events have been processed and saved to the database, available in JSON Relational Duality View form.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import jakarta.annotation.PostConstruct;
import org.apache.kafka.clients.admin.Admin;
Expand All @@ -13,24 +14,26 @@
import org.oracle.okafka.clients.admin.AdminClient;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.stereotype.Component;

/**
* OKafkaSetup creates the app's OKafka topic, and starts the consumer thread.
*/
@Configuration
public class OKafkaSetup {
@Component
public class OKafkaComponent {
private final AsyncTaskExecutor asyncTaskExecutor;
private final SensorConsumer sensorConsumer;
private final Properties okafkaProperties;

@Value("${app.topic}")
@Value("${app.topic:weathersensor}")
private String topic;

public OKafkaSetup(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor asyncTaskExecutor,
SensorConsumer sensorConsumer,
@Qualifier("okafkaProperties") Properties okafkaProperties) {
private Future<?> consumer;

public OKafkaComponent(@Qualifier("applicationTaskExecutor") AsyncTaskExecutor asyncTaskExecutor,
SensorConsumer sensorConsumer,
@Qualifier("okafkaProperties") Properties okafkaProperties) {
this.asyncTaskExecutor = asyncTaskExecutor;
this.sensorConsumer = sensorConsumer;
this.okafkaProperties = okafkaProperties;
Expand All @@ -50,6 +53,14 @@ void init() {
throw new RuntimeException(e);
}
}
asyncTaskExecutor.submit(sensorConsumer);
consumer = asyncTaskExecutor.submit(sensorConsumer);
}

public void await() {
try {
consumer.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public Consumer<String, Sensor> okafkaConsumer() {
props.put("group.id", consumerGroup);
props.put("enable.auto.commit","false");
props.put("max.poll.records", 2000);
props.put("auto.offset.reset", "earliest");

Deserializer<String> keyDeserializer = new StringDeserializer();
Deserializer<Sensor> valueDeserializer = new JSONBDeserializer<>(jsonb, Sensor.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,36 @@ public class SensorConsumer implements Runnable, AutoCloseable {
private final String topic;
private final SensorEnricher sensorEnricher;
private final SensorService sensorService;
// Used to end the consumer for example/testing purposes
private final int limit;

public SensorConsumer(@Qualifier("okafkaConsumer") Consumer<String, Sensor> consumer,
@Value("${app.topic}") String topic,
@Value("${app.topic:weathersensor}") String topic,
@Value("${app.consumer.limit:15}") int limit,
SensorEnricher sensorEnricher,
SensorService sensorService) {
this.consumer = consumer;
this.topic = topic;
this.limit = limit;
this.sensorEnricher = sensorEnricher;
this.sensorService = sensorService;

}

@Override
public void run() {
consumer.subscribe(List.of(topic));
int consumedRecords = 0;
while (true) {
ConsumerRecords<String, Sensor> records = consumer.poll(Duration.ofMillis(100));
processRecords(records);
// Commit records when done processing.
consumer.commitAsync();
consumer.commitSync();
// End the consumed once we have consumed all records.
consumedRecords += records.count();
if (consumedRecords >= limit) {
return;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public class SensorEventProducer implements AutoCloseable {
private final SensorEventParser sensorEventParser;

public SensorEventProducer(@Qualifier("okafkaProducer") Producer<String, Sensor> producer,
@Value("${app.topic}") String topic,
@Value("${app.topic:weathersensor}") String topic,
SensorEventParser sensorEventParser) {
this.producer = producer;
this.topic = topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
app:
topic: weathersensor
consumerGroup: weathersensor
bootstrapServers: localhost:1521
ojdbcPath: /path/to/ojbdc/properties

spring:
jpa:
hibernate:
Expand All @@ -27,5 +21,5 @@ server:

logging:
level:
org.apache.kafka: WARN
org.oracle.okafka: WARN
org.apache.kafka: FATAL
org.oracle.okafka: FATAL
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
log4j.rootLogger=INFO, stdout
log4j.logger.org.apache.kafka=WARN
log4j.logger.org.oracle.okafka=WARN
log4j.logger.org.apache.kafka=FATAL
log4j.logger.org.oracle.okafka=FATAL
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,31 @@ public static void setUp() throws Exception {
@Autowired
SensorController sensorController;

@Autowired
OKafkaComponent okafkaComponent;


@Test
void jsonEventsSampleAppTest() throws InterruptedException {
// Produce events for two different stations
System.out.println("Produced events for ST001");
sensorController.produce(event1());
System.out.println("Produced events for ST002");
sensorController.produce(event2());

System.out.println("Waiting for consumer to process all events");
// Wait for queues to process all events
Thread.sleep(3000);
okafkaComponent.await();

// Assert all events have been processed and are available in the database
ResponseEntity<List<Sensor>> st001Events = sensorController.getEvents("ST001");
ResponseEntity<List<Sensor>> st002Events = sensorController.getEvents("ST002");

System.out.printf("Received %d events for ST001\n", st001Events.getBody().size());
assertEquals(st001Events.getBody().size(), 5);
System.out.printf("Received %d events for ST002\n", st002Events.getBody().size());
assertEquals(st002Events.getBody().size(), 10);

}

private SensorEvent event1() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ The sample application test uses Testcontainers, and creates a temporary Oracle
To run the test application, run the following command:

```shell
mvn test
mvn test -Dtest=OKafkaSampleTest
```

## Configure your project to use the Kafka Java Client for Oracle Database Transactional Event Queues
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public SampleConsumer<String> sampleConsumer() {
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
// Note the use of the org.oracle.okafka.clients.producer.KafkaConsumer class, for Oracle TxEventQ.
Consumer<String, String> okafkaConsumer = new KafkaConsumer<>(props);
return new SampleConsumer<>(okafkaConsumer, TOPIC_NAME, expectedMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public void run() {
consumer.subscribe(List.of(topic));
int consumedRecords = 0;
while (true) {
ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(100));
ConsumerRecords<String, T> records = consumer.poll(Duration.ofMillis(500));
System.out.println("Consumed records: " + records.count());
consumedRecords += records.count();
if (consumedRecords >= expectedMessages) {
Expand Down
Loading
Loading