From 0804d6fed7ef2e2b12c8566cfa803809c91fefa9 Mon Sep 17 00:00:00 2001 From: Borja Hernandez Crespo Date: Tue, 12 Apr 2022 12:59:34 +0200 Subject: [PATCH] Version 7.0.0-v1.0.0 --- .DS_Store | Bin 6148 -> 6148 bytes .../dotnet-producer-avro.csproj | 16 +- .../src/main/java/clients/Producer.java | 2 +- ...ion_distance.avsc => position_string.avsc} | 4 +- .../src/main/java/clients/StreamsApp.java | 86 ++-- .../java/clients/avro/PositionString.java | 470 ++++++++++++++++++ .../main/java/clients/avro/PositionValue.java | 390 +++++++++++++++ challenge/python-consumer-avro/main.py | 29 +- .../python-consumer-avro/position_value.avsc | 8 + challenge/python-producer-avro/main.py | 47 +- .../python-producer-avro/position_key.avsc | 7 - docker-compose.yml | 210 ++++---- dotnet-install.sh | 13 + ...ion_distance.avsc => position_string.avsc} | 4 +- .../src/main/java/clients/StreamsApp.java | 88 ++-- .../java/clients/avro/PositionString.java | 470 ++++++++++++++++++ .../main/java/clients/avro/PositionValue.java | 390 +++++++++++++++ solution/python-consumer-avro/main.py | 29 +- .../python-consumer-avro/position_value.avsc | 8 + solution/python-producer-avro/main.py | 47 +- .../python-producer-avro/position_key.avsc | 7 - webserver-avro/server.js | 9 + 22 files changed, 2041 insertions(+), 293 deletions(-) rename challenge/java-streams-avro/src/main/avro/{position_distance.avsc => position_string.avsc} (67%) create mode 100644 challenge/java-streams-avro/src/main/java/clients/avro/PositionString.java create mode 100644 challenge/java-streams-avro/src/main/java/clients/avro/PositionValue.java create mode 100644 challenge/python-consumer-avro/position_value.avsc delete mode 100644 challenge/python-producer-avro/position_key.avsc create mode 100755 dotnet-install.sh rename solution/java-streams-avro/src/main/avro/{position_distance.avsc => position_string.avsc} (67%) create mode 100644 solution/java-streams-avro/src/main/java/clients/avro/PositionString.java create mode 100644 solution/java-streams-avro/src/main/java/clients/avro/PositionValue.java create mode 100644 solution/python-consumer-avro/position_value.avsc delete mode 100644 solution/python-producer-avro/position_key.avsc diff --git a/.DS_Store b/.DS_Store index 8d4776ffe67847699dc84c10d01ee83cf2085ff8..878e33c7cf6075f0625f82bbe16ae6f3746e3360 100644 GIT binary patch delta 133 zcmZoMXfc=|&e%S&P;8=}q9`*10|O%ig8&0V4nrzK9z!}qYD)3Mjq0`_2~LJ&h75*8 zpcIm1a!yiyehyHUfrFudA)ld`p#-R_2&lU_r5GwXu}5a(MrHQR>>L6djO`mYerKM{ RFQUuHu-Q@MF!RI)76A6uAzc6f delta 79 zcmZoMXfc=|&e%4wP;8=}q9`K+0|O8XFff!bWHRJ4w&zlbg?NY{bQwjzg_0WED6LI3~& diff --git a/challenge/dotnet-producer-avro/dotnet-producer-avro.csproj b/challenge/dotnet-producer-avro/dotnet-producer-avro.csproj index 62e010e..dc52407 100644 --- a/challenge/dotnet-producer-avro/dotnet-producer-avro.csproj +++ b/challenge/dotnet-producer-avro/dotnet-producer-avro.csproj @@ -9,14 +9,14 @@ 1591 - - - - - - runtime; build; native; contentfiles; analyzers; buildtransitive - all - + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + diff --git a/challenge/java-producer/src/main/java/clients/Producer.java b/challenge/java-producer/src/main/java/clients/Producer.java index 6589ec0..c1f8212 100644 --- a/challenge/java-producer/src/main/java/clients/Producer.java +++ b/challenge/java-producer/src/main/java/clients/Producer.java @@ -61,7 +61,7 @@ public static void main(String[] args) throws IOException, InterruptedException // TODO: write the lat/long position to a Kafka topic // TODO: print the key and value in the callback lambda producer.send(???, (md, e) -> { - System.out.println(??? + System.out.printf(??? }); Thread.sleep(1000); pos = (pos + 1) % rows.length; diff --git a/challenge/java-streams-avro/src/main/avro/position_distance.avsc b/challenge/java-streams-avro/src/main/avro/position_string.avsc similarity index 67% rename from challenge/java-streams-avro/src/main/avro/position_distance.avsc rename to challenge/java-streams-avro/src/main/avro/position_string.avsc index 2a0d2c5..311f20b 100644 --- a/challenge/java-streams-avro/src/main/avro/position_distance.avsc +++ b/challenge/java-streams-avro/src/main/avro/position_string.avsc @@ -1,9 +1,9 @@ {"namespace": "clients.avro", "type": "record", - "name": "PositionDistance", + "name": "PositionString", "fields": [ {"name": "latitude", "type": "double"}, {"name": "longitude", "type": "double"}, - {"name": "distance", "type": "double"} + {"name": "positionString", "type": "string"} ] } \ No newline at end of file diff --git a/challenge/java-streams-avro/src/main/java/clients/StreamsApp.java b/challenge/java-streams-avro/src/main/java/clients/StreamsApp.java index 7532264..a77495f 100644 --- a/challenge/java-streams-avro/src/main/java/clients/StreamsApp.java +++ b/challenge/java-streams-avro/src/main/java/clients/StreamsApp.java @@ -1,6 +1,6 @@ package clients; -import clients.avro.PositionDistance; +import clients.avro.PositionString; import clients.avro.PositionValue; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; @@ -10,8 +10,6 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; -import net.sf.geographiclib.Geodesic; - import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; @@ -20,8 +18,6 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; public class StreamsApp { @@ -37,7 +33,7 @@ public static void main(String[] args) { settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app-1"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, - Serdes.String().getClass().getName()); + Serdes.String().getClass().getName()); // Disabling caching ensures we get a complete "changelog" from the // aggregate(...) (i.e. every input event will have a corresponding output event. // see @@ -68,54 +64,52 @@ public static void main(String[] args) { } private static Topology getTopology() { + // When you want to override serdes explicitly/selectively final Map serdeConfig = Collections.singletonMap("schema.registry.url", - "http://schema-registry:8081"); + "http://schema-registry:8081"); final Serde positionValueSerde = new SpecificAvroSerde<>(); - positionValueSerde.configure(serdeConfig, false); - final Serde positionDistanceSerde = new SpecificAvroSerde<>(); - positionDistanceSerde.configure(serdeConfig, false); + positionValueSerde.configure(serdeConfig, false); + final Serde positionStringSerde = new SpecificAvroSerde<>(); + positionStringSerde.configure(serdeConfig, false); + // Create the StreamsBuilder object to create our Topology final StreamsBuilder builder = new StreamsBuilder(); - // create a KStream from the driver-positions-avro topic + // Create a KStream from the `driver-positions-avro` topic // configure a serdes that can read the string key, and avro value final KStream positions = builder.stream( - "driver-positions-avro", - Consumed.with(Serdes.String(), - positionValueSerde)); - - - // We do a groupByKey on the ‘positions’ stream which returns an - // intermediate KGroupedStream, we then aggregate to return a KTable. - final KTable reduced = positions.groupByKey().aggregate( - () -> null, - (aggKey, newValue, aggValue) -> { - final Double newLatitude = newValue.getLatitude(); - final Double newLongitude = newValue.getLongitude(); - - // initial record - no distance to calculate - if (aggValue == null) { - return new PositionDistance(newLatitude, newLongitude, 0.0); - } - - // cacluate the distance between the new value and the aggregate value - final Double aggLatitude = aggValue.getLatitude(); - final Double aggLongitude = aggValue.getLongitude(); - Double aggDistance = aggValue.getDistance(); - final Double distance = Geodesic.WGS84.Inverse(aggLatitude, aggLongitude, - newLatitude, newLongitude).s12; - aggDistance += distance; - - // return the new value and distance as the new aggregate - return new PositionDistance(newLatitude, newLongitude, aggDistance); - }, Materialized.with( - Serdes.String(), - positionDistanceSerde)); - - reduced.toStream().to( - "driver-distance-avro", - Produced.with(Serdes.String(), positionDistanceSerde)); + "driver-positions-avro", + Consumed.with(Serdes.String(), + positionValueSerde)); + + // TO-DO: Use filter() method to filter out the events from `driver-2`. + // Define the predicate in the lambda expression of the filter(). + final KStream positionsFiltered = positions.filter( + (key,value) -> ???); + + // TO-DO: Use mapValues() method to change the value of each + // event from PositionValue to PositionString class. + // You can check the two schemas under src/main/avro/. + // Notice that position_string.avsc contains a new field + // `positionString` as String type. + final KStream positionsString = positionsFiltered.mapValues( + value -> { + final Double latitude = ???; + final Double longitude = ???; + final String positionString = "Latitude: " + String.valueOf(???) + + ", Longitude: " + String.valueOf(???); + return new PositionString(???, ???, ???); + } + ); + + // Write the results to topic `driver-positions-string-avro` + // configure a serdes that can write the string key, and new avro value + positionsString.to( + "driver-positions-string-avro", + Produced.with(Serdes.String(), positionStringSerde)); + + // Build the Topology final Topology topology = builder.build(); return topology; } diff --git a/challenge/java-streams-avro/src/main/java/clients/avro/PositionString.java b/challenge/java-streams-avro/src/main/java/clients/avro/PositionString.java new file mode 100644 index 0000000..90da576 --- /dev/null +++ b/challenge/java-streams-avro/src/main/java/clients/avro/PositionString.java @@ -0,0 +1,470 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package clients.avro; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class PositionString extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -1651819559036848637L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"PositionString\",\"namespace\":\"clients.avro\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"},{\"name\":\"positionString\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this PositionString to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a PositionString from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a PositionString instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static PositionString fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public double latitude; + @Deprecated public double longitude; + @Deprecated public java.lang.String positionString; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public PositionString() {} + + /** + * All-args constructor. + * @param latitude The new value for latitude + * @param longitude The new value for longitude + * @param positionString The new value for positionString + */ + public PositionString(java.lang.Double latitude, java.lang.Double longitude, java.lang.String positionString) { + this.latitude = latitude; + this.longitude = longitude; + this.positionString = positionString; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return latitude; + case 1: return longitude; + case 2: return positionString; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: latitude = (java.lang.Double)value$; break; + case 1: longitude = (java.lang.Double)value$; break; + case 2: positionString = value$ != null ? value$.toString() : null; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value of the 'latitude' field. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value the value to set. + */ + public void setLatitude(double value) { + this.latitude = value; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value of the 'longitude' field. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value the value to set. + */ + public void setLongitude(double value) { + this.longitude = value; + } + + /** + * Gets the value of the 'positionString' field. + * @return The value of the 'positionString' field. + */ + public java.lang.String getPositionString() { + return positionString; + } + + + /** + * Sets the value of the 'positionString' field. + * @param value the value to set. + */ + public void setPositionString(java.lang.String value) { + this.positionString = value; + } + + /** + * Creates a new PositionString RecordBuilder. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder() { + return new clients.avro.PositionString.Builder(); + } + + /** + * Creates a new PositionString RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder(clients.avro.PositionString.Builder other) { + if (other == null) { + return new clients.avro.PositionString.Builder(); + } else { + return new clients.avro.PositionString.Builder(other); + } + } + + /** + * Creates a new PositionString RecordBuilder by copying an existing PositionString instance. + * @param other The existing instance to copy. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder(clients.avro.PositionString other) { + if (other == null) { + return new clients.avro.PositionString.Builder(); + } else { + return new clients.avro.PositionString.Builder(other); + } + } + + /** + * RecordBuilder for PositionString instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private double latitude; + private double longitude; + private java.lang.String positionString; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(clients.avro.PositionString.Builder other) { + super(other); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.positionString)) { + this.positionString = data().deepCopy(fields()[2].schema(), other.positionString); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + } + + /** + * Creates a Builder by copying an existing PositionString instance + * @param other The existing instance to copy. + */ + private Builder(clients.avro.PositionString other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.positionString)) { + this.positionString = data().deepCopy(fields()[2].schema(), other.positionString); + fieldSetFlags()[2] = true; + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value The value of 'latitude'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setLatitude(double value) { + validate(fields()[0], value); + this.latitude = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'latitude' field has been set. + * @return True if the 'latitude' field has been set, false otherwise. + */ + public boolean hasLatitude() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'latitude' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearLatitude() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value The value of 'longitude'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setLongitude(double value) { + validate(fields()[1], value); + this.longitude = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'longitude' field has been set. + * @return True if the 'longitude' field has been set, false otherwise. + */ + public boolean hasLongitude() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'longitude' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearLongitude() { + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'positionString' field. + * @return The value. + */ + public java.lang.String getPositionString() { + return positionString; + } + + + /** + * Sets the value of the 'positionString' field. + * @param value The value of 'positionString'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setPositionString(java.lang.String value) { + validate(fields()[2], value); + this.positionString = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'positionString' field has been set. + * @return True if the 'positionString' field has been set, false otherwise. + */ + public boolean hasPositionString() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'positionString' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearPositionString() { + positionString = null; + fieldSetFlags()[2] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public PositionString build() { + try { + PositionString record = new PositionString(); + record.latitude = fieldSetFlags()[0] ? this.latitude : (java.lang.Double) defaultValue(fields()[0]); + record.longitude = fieldSetFlags()[1] ? this.longitude : (java.lang.Double) defaultValue(fields()[1]); + record.positionString = fieldSetFlags()[2] ? this.positionString : (java.lang.String) defaultValue(fields()[2]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeDouble(this.latitude); + + out.writeDouble(this.longitude); + + out.writeString(this.positionString); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.latitude = in.readDouble(); + + this.longitude = in.readDouble(); + + this.positionString = in.readString(); + + } else { + for (int i = 0; i < 3; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.latitude = in.readDouble(); + break; + + case 1: + this.longitude = in.readDouble(); + break; + + case 2: + this.positionString = in.readString(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + + diff --git a/challenge/java-streams-avro/src/main/java/clients/avro/PositionValue.java b/challenge/java-streams-avro/src/main/java/clients/avro/PositionValue.java new file mode 100644 index 0000000..7889893 --- /dev/null +++ b/challenge/java-streams-avro/src/main/java/clients/avro/PositionValue.java @@ -0,0 +1,390 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package clients.avro; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class PositionValue extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -3016058280124914056L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"PositionValue\",\"namespace\":\"clients.avro\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this PositionValue to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a PositionValue from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a PositionValue instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static PositionValue fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public double latitude; + @Deprecated public double longitude; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public PositionValue() {} + + /** + * All-args constructor. + * @param latitude The new value for latitude + * @param longitude The new value for longitude + */ + public PositionValue(java.lang.Double latitude, java.lang.Double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return latitude; + case 1: return longitude; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: latitude = (java.lang.Double)value$; break; + case 1: longitude = (java.lang.Double)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value of the 'latitude' field. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value the value to set. + */ + public void setLatitude(double value) { + this.latitude = value; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value of the 'longitude' field. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value the value to set. + */ + public void setLongitude(double value) { + this.longitude = value; + } + + /** + * Creates a new PositionValue RecordBuilder. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder() { + return new clients.avro.PositionValue.Builder(); + } + + /** + * Creates a new PositionValue RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder(clients.avro.PositionValue.Builder other) { + if (other == null) { + return new clients.avro.PositionValue.Builder(); + } else { + return new clients.avro.PositionValue.Builder(other); + } + } + + /** + * Creates a new PositionValue RecordBuilder by copying an existing PositionValue instance. + * @param other The existing instance to copy. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder(clients.avro.PositionValue other) { + if (other == null) { + return new clients.avro.PositionValue.Builder(); + } else { + return new clients.avro.PositionValue.Builder(other); + } + } + + /** + * RecordBuilder for PositionValue instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private double latitude; + private double longitude; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(clients.avro.PositionValue.Builder other) { + super(other); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing PositionValue instance + * @param other The existing instance to copy. + */ + private Builder(clients.avro.PositionValue other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value The value of 'latitude'. + * @return This builder. + */ + public clients.avro.PositionValue.Builder setLatitude(double value) { + validate(fields()[0], value); + this.latitude = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'latitude' field has been set. + * @return True if the 'latitude' field has been set, false otherwise. + */ + public boolean hasLatitude() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'latitude' field. + * @return This builder. + */ + public clients.avro.PositionValue.Builder clearLatitude() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value The value of 'longitude'. + * @return This builder. + */ + public clients.avro.PositionValue.Builder setLongitude(double value) { + validate(fields()[1], value); + this.longitude = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'longitude' field has been set. + * @return True if the 'longitude' field has been set, false otherwise. + */ + public boolean hasLongitude() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'longitude' field. + * @return This builder. + */ + public clients.avro.PositionValue.Builder clearLongitude() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public PositionValue build() { + try { + PositionValue record = new PositionValue(); + record.latitude = fieldSetFlags()[0] ? this.latitude : (java.lang.Double) defaultValue(fields()[0]); + record.longitude = fieldSetFlags()[1] ? this.longitude : (java.lang.Double) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeDouble(this.latitude); + + out.writeDouble(this.longitude); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.latitude = in.readDouble(); + + this.longitude = in.readDouble(); + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.latitude = in.readDouble(); + break; + + case 1: + this.longitude = in.readDouble(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + + diff --git a/challenge/python-consumer-avro/main.py b/challenge/python-consumer-avro/main.py index 5a84656..23e5e68 100644 --- a/challenge/python-consumer-avro/main.py +++ b/challenge/python-consumer-avro/main.py @@ -1,20 +1,31 @@ "Python Avro Consumer" -from confluent_kafka.avro import AvroConsumer +from confluent_kafka import DeserializingConsumer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroDeserializer +from confluent_kafka.serialization import StringDeserializer from confluent_kafka.avro.serializer import SerializerError -KAFKA_TOPIC = "driver-positions-pyavro" +KAFKA_TOPIC = "driver-positions-avro" print("Starting Python Avro Consumer.") +with open("position_value.avsc","r") as avro_file: + value_schema = avro_file.read() + +schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'}) + +avro_deserializer = AvroDeserializer(value_schema, schema_registry_client) + # Configure the group id, location of the bootstrap server, # Confluent interceptors, and schema registry location -consumer = AvroConsumer({ - 'bootstrap.servers': 'kafka:9092', - 'plugin.library.paths': 'monitoring-interceptor', - 'group.id': 'python-consumer-avro', - 'auto.offset.reset': 'earliest', - 'schema.registry.url': 'http://schema-registry:8081' -}) +consumer_conf = {'bootstrap.servers': "kafka:9092", + 'key.deserializer': StringDeserializer('utf_8'), + 'value.deserializer': avro_deserializer, + 'group.id': 'python-consumer-avro', + 'plugin.library.paths': 'monitoring-interceptor', + 'auto.offset.reset': "earliest"} + +consumer = DeserializingConsumer(consumer_conf) # Subscribe to our topic consumer.subscribe([KAFKA_TOPIC]) diff --git a/challenge/python-consumer-avro/position_value.avsc b/challenge/python-consumer-avro/position_value.avsc new file mode 100644 index 0000000..e1131af --- /dev/null +++ b/challenge/python-consumer-avro/position_value.avsc @@ -0,0 +1,8 @@ +{"namespace": "clients.avro", + "type": "record", + "name": "PositionValue", + "fields": [ + {"name": "latitude", "type": "double"}, + {"name": "longitude", "type": "double"} + ] +} \ No newline at end of file diff --git a/challenge/python-producer-avro/main.py b/challenge/python-producer-avro/main.py index 5bd6a16..0051110 100644 --- a/challenge/python-producer-avro/main.py +++ b/challenge/python-producer-avro/main.py @@ -1,33 +1,47 @@ -"Python Producer" +"Python Avro Producer" + from time import sleep import os import atexit -from confluent_kafka import avro -from confluent_kafka.avro import AvroProducer +from confluent_kafka import SerializingProducer +from confluent_kafka.serialization import StringSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroSerializer DRIVER_FILE_PREFIX = "./drivers/" -KAFKA_TOPIC = "driver-positions-pyavro" +KAFKA_TOPIC = "driver-positions-avro" # Load a driver id from an environment variable # if it isn't present use "driver-3" DRIVER_ID = os.getenv("DRIVER_ID", "driver-3") print("Starting Python Avro producer.") -value_schema = avro.load("position_value.avsc") -key_schema = avro.load("position_key.avsc") +with open("position_value.avsc","r") as avro_file: + value_schema = avro_file.read() # Configure the location of the bootstrap server, Confluent interceptors # and a partitioner compatible with Java, and key/value schemas # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md -producer = AvroProducer({ - 'bootstrap.servers': 'kafka:9092', - 'plugin.library.paths': 'monitoring-interceptor', - 'partitioner': 'murmur2_random', - 'schema.registry.url': 'http://schema-registry:8081' -} - , default_key_schema=key_schema - , default_value_schema=value_schema) +schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'}) + +avro_serializer = AvroSerializer(value_schema, schema_registry_client) + +producer_conf = {'bootstrap.servers': 'kafka:9092', + 'key.serializer': StringSerializer('utf_8'), + 'value.serializer': avro_serializer, + 'plugin.library.paths': 'monitoring-interceptor', + 'partitioner': 'murmur2_random'} + +producer = SerializingProducer(producer_conf) + + +def delivery_report(err, msg): + if err is not None: + print("Delivery failed {}: {}".format(msg.key(), err)) + return + print("Sent Key:{} Value:{}".format(key, value)) + def exit_handler(): """Run this on exit""" @@ -47,7 +61,7 @@ def exit_handler(): line = lines[pos] # Trigger any available delivery report callbacks from previous produce() calls producer.poll(0) - key = {"key" : DRIVER_ID} + key = DRIVER_ID latitude = line.split(",")[0].strip() longitude = line.split(",")[1].strip() value = {"latitude" : float(latitude), "longitude" : float(longitude)} @@ -56,8 +70,7 @@ def exit_handler(): topic=KAFKA_TOPIC, value=value, key=key, - callback=lambda err, msg: - print("Sent Key:{} Value:{}".format(key, value) if err is None else err) + on_delivery=delivery_report ) sleep(1) pos = (pos + 1) % len(lines) diff --git a/challenge/python-producer-avro/position_key.avsc b/challenge/python-producer-avro/position_key.avsc deleted file mode 100644 index e7bb9fa..0000000 --- a/challenge/python-producer-avro/position_key.avsc +++ /dev/null @@ -1,7 +0,0 @@ -{"namespace": "clients.avro", - "type": "record", - "name": "PositionKey", - "fields": [ - {"name": "key", "type": "string"} - ] -} \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index d2ed9d8..e363638 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,7 +2,7 @@ version: '2' services: zookeeper: - image: confluentinc/cp-zookeeper:6.0.0-1-ubi8 + image: confluentinc/cp-zookeeper:7.0.0-1-ubi8 restart: always container_name: zookeeper ports: @@ -12,7 +12,7 @@ services: ZOOKEEPER_TICK_TIME: 2000 kafka: - image: confluentinc/cp-enterprise-kafka:6.0.0-1-ubi8 + image: confluentinc/cp-enterprise-kafka:7.0.0-1-ubi8 restart: always container_name: kafka ports: @@ -32,9 +32,88 @@ services: CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: "kafka:9092" CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1 + schema-registry: + image: confluentinc/cp-schema-registry:7.0.0-1-ubi8 + restart: always + container_name: schema-registry + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' + KAFKA_REST_CUB_KAFKA_TIMEOUT: 180 # https://github.com/confluentinc/cp-docker-images/issues/807 + + ksqldb-server: + image: confluentinc/cp-ksqldb-server:7.0.0-1-ubi8 + restart: always + container_name: ksqldb-server + ports: + - "8088:8088" + environment: + KSQL_CONFIG_DIR: "/etc/ksql" + KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties" + KSQL_BOOTSTRAP_SERVERS: "kafka:9092" + KSQL_HOST_NAME: ksqldb + KSQL_APPLICATION_ID: "dev-class" + KSQL_KSQL_CONNECT_URL: http://connect:8083/ + KSQL_LISTENERS: "http://0.0.0.0:8088" + KSQL_CACHE_MAX_BYTES_BUFFERING: 0 + KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" + KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" + KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" + KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" + + connect: + image: confluentinc/cp-kafka-connect:7.0.0-1-ubi8 + restart: always + container_name: connect + ports: + - "8083:8083" + environment: + CONNECT_REST_PORT: 8083 + CONNECT_BOOTSTRAP_SERVERS: kafka:9092 + CONNECT_GROUP_ID: "connect" + CONNECT_CONFIG_STORAGE_TOPIC: "connect-config" + CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" + CONNECT_STATUS_STORAGE_TOPIC: "connect-status" + CONNECT_REPLICATION_FACTOR: 1 + CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 + CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" + CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" + CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" + CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/,/data/connect-jars' + CONNECT_REST_ADVERTISED_HOST_NAME: "connect" + CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor + CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor + + control-center: + image: confluentinc/cp-enterprise-control-center:7.0.0-1-ubi8 + restart: always + container_name: control-center + ports: + - "9021:9021" + environment: + CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:9092 + CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181 + CONTROL_CENTER_REPLICATION_FACTOR: 1 + CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1 + CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1 + CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1 + CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1 + CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1 + CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: "960032" + CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" + CONTROL_CENTER_CONNECT_CONNECT_CLUSTER: "connect:8083" + CONTROL_CENTER_KSQL_KSQLDB_URL: "http://ksqldb-server:8088" + CONTROL_CENTER_UI_AUTOUPDATE_ENABLE: "false" + producer1: image: cnfltraining/java-producer-avro:1.0 - restart: always container_name: producer1 environment: DRIVER_ID: driver-1 @@ -46,7 +125,6 @@ services: producer2: image: cnfltraining/java-producer-avro:1.0 - restart: always container_name: producer2 environment: DRIVER_ID: driver-2 @@ -58,7 +136,6 @@ services: producer3: image: cnfltraining/java-producer-avro:1.0 - restart: always container_name: producer3 environment: DRIVER_ID: driver-3 @@ -67,10 +144,9 @@ services: echo waiting for schema; \ done; \ java -classpath \"lib/*\" clients.Producer'" - + producer4: image: cnfltraining/java-producer-avro:1.0 - restart: always container_name: producer4 environment: DRIVER_ID: driver-4 @@ -82,7 +158,6 @@ services: producer5: image: cnfltraining/java-producer-avro:1.0 - restart: always container_name: producer5 environment: DRIVER_ID: driver-5 @@ -94,7 +169,6 @@ services: webserver: image: cnfltraining/node-webserver:1.0 - restart: always container_name: webserver hostname: webserver ports: @@ -107,7 +181,6 @@ services: webserver-avro: image: cnfltraining/node-webserver-avro:1.0 - restart: always container_name: webserver-avro hostname: webserver-avro ports: @@ -120,31 +193,15 @@ services: done; \ npm run start'" - webserver-pyavro: - image: cnfltraining/node-webserver-avro:1.0 - restart: always - container_name: webserver-pyavro - hostname: webserver-pyavro - ports: - - 3012:3000 - environment: - TOPIC: driver-positions-pyavro - command: "bash -c 'until curl -fsSL schema-registry:8081/subjects/ | grep driver-positions-pyavro-value ; \ - do sleep 1; \ - echo waiting for schema; \ - done; \ - npm run start'" - webserver-streams: - image: cnfltraining/node-webserver-avro:1.0 - restart: always + image: cnfltraining/node-webserver-avro:2.0 container_name: webserver-streams hostname: webserver-streams ports: - 3003:3000 environment: - TOPIC: driver-distance-avro - command: "bash -c 'until curl -fsSL schema-registry:8081/subjects/ | grep driver-distance-avro-value ; \ + TOPIC: driver-positions-string-avro + command: "bash -c 'until curl -fsSL schema-registry:8081/subjects/ | grep driver-positions-string-avro-value ; \ do sleep 1; \ echo waiting for schema; \ done; \ @@ -152,7 +209,6 @@ services: webserver-ksql: image: cnfltraining/node-webserver-avro:1.0 - restart: always container_name: webserver-ksql hostname: webserver-ksql ports: @@ -165,36 +221,6 @@ services: done; \ npm run start'" - schema-registry: - image: confluentinc/cp-schema-registry:6.0.0-1-ubi8 - restart: always - container_name: schema-registry - ports: - - "8081:8081" - environment: - SCHEMA_REGISTRY_HOST_NAME: schema-registry - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' - KAFKA_REST_CUB_KAFKA_TIMEOUT: 180 # https://github.com/confluentinc/cp-docker-images/issues/807 - - ksqldb-server: - image: confluentinc/cp-ksqldb-server:6.0.0-1-ubi8 - restart: always - container_name: ksqldb-server - ports: - - "8088:8088" - environment: - KSQL_CONFIG_DIR: "/etc/ksql" - KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties" - KSQL_BOOTSTRAP_SERVERS: "kafka:9092" - KSQL_HOST_NAME: ksqldb-server - KSQL_APPLICATION_ID: "dev-class" - KSQL_KSQL_CONNECT_URL: http://connect:8083/ - KSQL_LISTENERS: "http://0.0.0.0:8088" - KSQL_CACHE_MAX_BYTES_BUFFERING: 0 - KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor" - KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor" - postgres: image: postgres:11.2-alpine restart: always @@ -204,79 +230,21 @@ services: ports: - 5432:5432 - connect: - image: confluentinc/cp-kafka-connect:6.0.0-1-ubi8 - restart: always - container_name: connect - ports: - - "8083:8083" - environment: - CONNECT_REST_PORT: 8083 - CONNECT_BOOTSTRAP_SERVERS: kafka:9092 - CONNECT_GROUP_ID: "connect" - CONNECT_CONFIG_STORAGE_TOPIC: "connect-config" - CONNECT_OFFSET_STORAGE_TOPIC: "connect-offsets" - CONNECT_STATUS_STORAGE_TOPIC: "connect-status" - CONNECT_REPLICATION_FACTOR: 1 - CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 - CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" - CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter" - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" - CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" - CONNECT_PLUGIN_PATH: /usr/share/java - CONNECT_REST_ADVERTISED_HOST_NAME: "connect" - CONNECT_PRODUCER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor - CONNECT_CONSUMER_INTERCEPTOR_CLASSES: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor - - control-center: - image: confluentinc/cp-enterprise-control-center:6.0.0-1-ubi8 - restart: always - container_name: control-center - ports: - - "9021:9021" - environment: - CONTROL_CENTER_BOOTSTRAP_SERVERS: kafka:9092 - CONTROL_CENTER_ZOOKEEPER_CONNECT: zookeeper:2181 - CONTROL_CENTER_REPLICATION_FACTOR: 1 - CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_REPLICATION: 1 - CONTROL_CENTER_INTERNAL_TOPICS_REPLICATION: 1 - CONTROL_CENTER_COMMAND_TOPIC_REPLICATION: 1 - CONTROL_CENTER_METRICS_TOPIC_REPLICATION: 1 - CONTROL_CENTER_STREAMS_NUM_STREAM_THREADS: 1 - CONTROL_CENTER_STREAMS_CONSUMER_REQUEST_TIMEOUT_MS: "960032" - CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081" - CONTROL_CENTER_CONNECT_CLUSTER: "connect:8083" - CONTROL_CENTER_KSQL_URL: "http://ksqldb-server:8088" - CONTROL_CENTER_UI_AUTOUPDATE_ENABLE: "false" - create-topics: - image: confluentinc/cp-enterprise-kafka:6.0.0-1-ubi8 + image: confluentinc/cp-enterprise-kafka:7.0.0-1-ubi8 container_name: create-topics command: > bash -c 'echo Waiting for Kafka to be ready... ; cub kafka-ready -b kafka:9092 1 300 ; echo Creating topics... ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-positions --partitions 3 --replication-factor 1 ; - kafka-topics --bootstrap-server kafka:9092 --create --topic driver-positions-pyavro --partitions 3 --replication-factor 1 ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-profiles-avro --partitions 3 --replication-factor 1 --config cleanup.policy=compact ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-profiles-protobuf --partitions 3 --replication-factor 1 --config cleanup.policy=compact ; - kafka-topics --bootstrap-server kafka:9092 --create --topic driver-profiles-ksqlavro --partitions 3 --replication-factor 1 --config cleanup.policy=compact ; + kafka-topics --bootstrap-server kafka:9092 --create --topic driver-profiles-ksql --partitions 3 --replication-factor 1 --config cleanup.policy=compact ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-positions-avro --partitions 3 --replication-factor 1 ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-distance-avro --partitions 3 --replication-factor 1 ; kafka-topics --bootstrap-server kafka:9092 --create --topic driver-augmented-avro --partitions 3 --replication-factor 1 ; kafka-topics --bootstrap-server kafka:9092 --create --topic _confluent-monitoring --partitions 12 --replication-factor 1 --config retention.ms=259200000 --config message.timestamp.type=LogAppendTime ; + kafka-topics --bootstrap-server kafka:9092 --create --topic driver-positions-string-avro --partitions 3 --replication-factor 1 ; true' - - tools: - image: cnfltraining/training-tools:6.0 - container_name: tools - hostname: tools - volumes: - - .:/root/confluent-dev/ - command: /bin/sh - tty: true - diff --git a/dotnet-install.sh b/dotnet-install.sh new file mode 100755 index 0000000..630985f --- /dev/null +++ b/dotnet-install.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +echo Installing .NET..... + +wget https://packages.microsoft.com/config/ubuntu/18.04/packages-microsoft-prod.deb -O packages-microsoft-prod.deb +sudo dpkg -i packages-microsoft-prod.deb +rm packages-microsoft-prod.deb + +sudo apt update +sudo apt install -y apt-transport-https +sudo apt-get install -y dotnet-sdk-2.1 +sudo apt install -y aspnetcore-runtime-2.1 + diff --git a/solution/java-streams-avro/src/main/avro/position_distance.avsc b/solution/java-streams-avro/src/main/avro/position_string.avsc similarity index 67% rename from solution/java-streams-avro/src/main/avro/position_distance.avsc rename to solution/java-streams-avro/src/main/avro/position_string.avsc index 2a0d2c5..311f20b 100644 --- a/solution/java-streams-avro/src/main/avro/position_distance.avsc +++ b/solution/java-streams-avro/src/main/avro/position_string.avsc @@ -1,9 +1,9 @@ {"namespace": "clients.avro", "type": "record", - "name": "PositionDistance", + "name": "PositionString", "fields": [ {"name": "latitude", "type": "double"}, {"name": "longitude", "type": "double"}, - {"name": "distance", "type": "double"} + {"name": "positionString", "type": "string"} ] } \ No newline at end of file diff --git a/solution/java-streams-avro/src/main/java/clients/StreamsApp.java b/solution/java-streams-avro/src/main/java/clients/StreamsApp.java index 7532264..02f60aa 100644 --- a/solution/java-streams-avro/src/main/java/clients/StreamsApp.java +++ b/solution/java-streams-avro/src/main/java/clients/StreamsApp.java @@ -1,6 +1,6 @@ package clients; -import clients.avro.PositionDistance; +import clients.avro.PositionString; import clients.avro.PositionValue; import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde; @@ -10,8 +10,6 @@ import java.util.Properties; import java.util.concurrent.CountDownLatch; -import net.sf.geographiclib.Geodesic; - import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; @@ -20,8 +18,6 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; public class StreamsApp { @@ -37,7 +33,7 @@ public static void main(String[] args) { settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-app-1"); settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092"); settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, - Serdes.String().getClass().getName()); + Serdes.String().getClass().getName()); // Disabling caching ensures we get a complete "changelog" from the // aggregate(...) (i.e. every input event will have a corresponding output event. // see @@ -68,56 +64,54 @@ public static void main(String[] args) { } private static Topology getTopology() { + // When you want to override serdes explicitly/selectively final Map serdeConfig = Collections.singletonMap("schema.registry.url", - "http://schema-registry:8081"); + "http://schema-registry:8081"); final Serde positionValueSerde = new SpecificAvroSerde<>(); - positionValueSerde.configure(serdeConfig, false); - final Serde positionDistanceSerde = new SpecificAvroSerde<>(); - positionDistanceSerde.configure(serdeConfig, false); + positionValueSerde.configure(serdeConfig, false); + final Serde positionStringSerde = new SpecificAvroSerde<>(); + positionStringSerde.configure(serdeConfig, false); + // Create the StreamsBuilder object to create our Topology final StreamsBuilder builder = new StreamsBuilder(); - // create a KStream from the driver-positions-avro topic + // Create a KStream from the `driver-positions-avro` topic // configure a serdes that can read the string key, and avro value final KStream positions = builder.stream( - "driver-positions-avro", - Consumed.with(Serdes.String(), - positionValueSerde)); - - - // We do a groupByKey on the ‘positions’ stream which returns an - // intermediate KGroupedStream, we then aggregate to return a KTable. - final KTable reduced = positions.groupByKey().aggregate( - () -> null, - (aggKey, newValue, aggValue) -> { - final Double newLatitude = newValue.getLatitude(); - final Double newLongitude = newValue.getLongitude(); - - // initial record - no distance to calculate - if (aggValue == null) { - return new PositionDistance(newLatitude, newLongitude, 0.0); - } - - // cacluate the distance between the new value and the aggregate value - final Double aggLatitude = aggValue.getLatitude(); - final Double aggLongitude = aggValue.getLongitude(); - Double aggDistance = aggValue.getDistance(); - final Double distance = Geodesic.WGS84.Inverse(aggLatitude, aggLongitude, - newLatitude, newLongitude).s12; - aggDistance += distance; - - // return the new value and distance as the new aggregate - return new PositionDistance(newLatitude, newLongitude, aggDistance); - }, Materialized.with( - Serdes.String(), - positionDistanceSerde)); - - reduced.toStream().to( - "driver-distance-avro", - Produced.with(Serdes.String(), positionDistanceSerde)); + "driver-positions-avro", + Consumed.with(Serdes.String(), + positionValueSerde)); + + // TO-DO: Use filter() method to filter out the events from `driver-2`. + // Define the predicate in the lambda expression of the filter(). + final KStream positionsFiltered = positions.filter( + (key,value) -> !key.equals("driver-2")); + + // TO-DO: Use mapValues() method to change the value of each + // event from PositionValue to PositionString class. + // You can check the two schemas under src/main/avro/. + // Notice that position_string.avsc contains a new field + // `positionString` as String type. + final KStream positionsString = positionsFiltered.mapValues( + value -> { + final Double latitude = value.getLatitude(); + final Double longitude = value.getLongitude(); + final String positionString = "Latitude: " + String.valueOf(latitude) + + ", Longitude: " + String.valueOf(longitude); + return new PositionString(latitude, longitude, positionString); + } + ); + + // Write the results to topic `driver-positions-string-avro` + // configure a serdes that can write the string key, and new avro value + positionsString.to( + "driver-positions-string-avro", + Produced.with(Serdes.String(), positionStringSerde)); + + // Build the Topology final Topology topology = builder.build(); return topology; } -} +} \ No newline at end of file diff --git a/solution/java-streams-avro/src/main/java/clients/avro/PositionString.java b/solution/java-streams-avro/src/main/java/clients/avro/PositionString.java new file mode 100644 index 0000000..90da576 --- /dev/null +++ b/solution/java-streams-avro/src/main/java/clients/avro/PositionString.java @@ -0,0 +1,470 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package clients.avro; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class PositionString extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -1651819559036848637L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"PositionString\",\"namespace\":\"clients.avro\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"},{\"name\":\"positionString\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this PositionString to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a PositionString from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a PositionString instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static PositionString fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public double latitude; + @Deprecated public double longitude; + @Deprecated public java.lang.String positionString; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public PositionString() {} + + /** + * All-args constructor. + * @param latitude The new value for latitude + * @param longitude The new value for longitude + * @param positionString The new value for positionString + */ + public PositionString(java.lang.Double latitude, java.lang.Double longitude, java.lang.String positionString) { + this.latitude = latitude; + this.longitude = longitude; + this.positionString = positionString; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return latitude; + case 1: return longitude; + case 2: return positionString; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: latitude = (java.lang.Double)value$; break; + case 1: longitude = (java.lang.Double)value$; break; + case 2: positionString = value$ != null ? value$.toString() : null; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value of the 'latitude' field. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value the value to set. + */ + public void setLatitude(double value) { + this.latitude = value; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value of the 'longitude' field. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value the value to set. + */ + public void setLongitude(double value) { + this.longitude = value; + } + + /** + * Gets the value of the 'positionString' field. + * @return The value of the 'positionString' field. + */ + public java.lang.String getPositionString() { + return positionString; + } + + + /** + * Sets the value of the 'positionString' field. + * @param value the value to set. + */ + public void setPositionString(java.lang.String value) { + this.positionString = value; + } + + /** + * Creates a new PositionString RecordBuilder. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder() { + return new clients.avro.PositionString.Builder(); + } + + /** + * Creates a new PositionString RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder(clients.avro.PositionString.Builder other) { + if (other == null) { + return new clients.avro.PositionString.Builder(); + } else { + return new clients.avro.PositionString.Builder(other); + } + } + + /** + * Creates a new PositionString RecordBuilder by copying an existing PositionString instance. + * @param other The existing instance to copy. + * @return A new PositionString RecordBuilder + */ + public static clients.avro.PositionString.Builder newBuilder(clients.avro.PositionString other) { + if (other == null) { + return new clients.avro.PositionString.Builder(); + } else { + return new clients.avro.PositionString.Builder(other); + } + } + + /** + * RecordBuilder for PositionString instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private double latitude; + private double longitude; + private java.lang.String positionString; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(clients.avro.PositionString.Builder other) { + super(other); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + if (isValidValue(fields()[2], other.positionString)) { + this.positionString = data().deepCopy(fields()[2].schema(), other.positionString); + fieldSetFlags()[2] = other.fieldSetFlags()[2]; + } + } + + /** + * Creates a Builder by copying an existing PositionString instance + * @param other The existing instance to copy. + */ + private Builder(clients.avro.PositionString other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = true; + } + if (isValidValue(fields()[2], other.positionString)) { + this.positionString = data().deepCopy(fields()[2].schema(), other.positionString); + fieldSetFlags()[2] = true; + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value The value of 'latitude'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setLatitude(double value) { + validate(fields()[0], value); + this.latitude = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'latitude' field has been set. + * @return True if the 'latitude' field has been set, false otherwise. + */ + public boolean hasLatitude() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'latitude' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearLatitude() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value The value of 'longitude'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setLongitude(double value) { + validate(fields()[1], value); + this.longitude = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'longitude' field has been set. + * @return True if the 'longitude' field has been set, false otherwise. + */ + public boolean hasLongitude() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'longitude' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearLongitude() { + fieldSetFlags()[1] = false; + return this; + } + + /** + * Gets the value of the 'positionString' field. + * @return The value. + */ + public java.lang.String getPositionString() { + return positionString; + } + + + /** + * Sets the value of the 'positionString' field. + * @param value The value of 'positionString'. + * @return This builder. + */ + public clients.avro.PositionString.Builder setPositionString(java.lang.String value) { + validate(fields()[2], value); + this.positionString = value; + fieldSetFlags()[2] = true; + return this; + } + + /** + * Checks whether the 'positionString' field has been set. + * @return True if the 'positionString' field has been set, false otherwise. + */ + public boolean hasPositionString() { + return fieldSetFlags()[2]; + } + + + /** + * Clears the value of the 'positionString' field. + * @return This builder. + */ + public clients.avro.PositionString.Builder clearPositionString() { + positionString = null; + fieldSetFlags()[2] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public PositionString build() { + try { + PositionString record = new PositionString(); + record.latitude = fieldSetFlags()[0] ? this.latitude : (java.lang.Double) defaultValue(fields()[0]); + record.longitude = fieldSetFlags()[1] ? this.longitude : (java.lang.Double) defaultValue(fields()[1]); + record.positionString = fieldSetFlags()[2] ? this.positionString : (java.lang.String) defaultValue(fields()[2]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeDouble(this.latitude); + + out.writeDouble(this.longitude); + + out.writeString(this.positionString); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.latitude = in.readDouble(); + + this.longitude = in.readDouble(); + + this.positionString = in.readString(); + + } else { + for (int i = 0; i < 3; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.latitude = in.readDouble(); + break; + + case 1: + this.longitude = in.readDouble(); + break; + + case 2: + this.positionString = in.readString(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + + diff --git a/solution/java-streams-avro/src/main/java/clients/avro/PositionValue.java b/solution/java-streams-avro/src/main/java/clients/avro/PositionValue.java new file mode 100644 index 0000000..7889893 --- /dev/null +++ b/solution/java-streams-avro/src/main/java/clients/avro/PositionValue.java @@ -0,0 +1,390 @@ +/** + * Autogenerated by Avro + * + * DO NOT EDIT DIRECTLY + */ +package clients.avro; + +import org.apache.avro.generic.GenericArray; +import org.apache.avro.specific.SpecificData; +import org.apache.avro.util.Utf8; +import org.apache.avro.message.BinaryMessageEncoder; +import org.apache.avro.message.BinaryMessageDecoder; +import org.apache.avro.message.SchemaStore; + +@org.apache.avro.specific.AvroGenerated +public class PositionValue extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord { + private static final long serialVersionUID = -3016058280124914056L; + public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{\"type\":\"record\",\"name\":\"PositionValue\",\"namespace\":\"clients.avro\",\"fields\":[{\"name\":\"latitude\",\"type\":\"double\"},{\"name\":\"longitude\",\"type\":\"double\"}]}"); + public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; } + + private static SpecificData MODEL$ = new SpecificData(); + + private static final BinaryMessageEncoder ENCODER = + new BinaryMessageEncoder(MODEL$, SCHEMA$); + + private static final BinaryMessageDecoder DECODER = + new BinaryMessageDecoder(MODEL$, SCHEMA$); + + /** + * Return the BinaryMessageEncoder instance used by this class. + * @return the message encoder used by this class + */ + public static BinaryMessageEncoder getEncoder() { + return ENCODER; + } + + /** + * Return the BinaryMessageDecoder instance used by this class. + * @return the message decoder used by this class + */ + public static BinaryMessageDecoder getDecoder() { + return DECODER; + } + + /** + * Create a new BinaryMessageDecoder instance for this class that uses the specified {@link SchemaStore}. + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + * @return a BinaryMessageDecoder instance for this class backed by the given SchemaStore + */ + public static BinaryMessageDecoder createDecoder(SchemaStore resolver) { + return new BinaryMessageDecoder(MODEL$, SCHEMA$, resolver); + } + + /** + * Serializes this PositionValue to a ByteBuffer. + * @return a buffer holding the serialized data for this instance + * @throws java.io.IOException if this instance could not be serialized + */ + public java.nio.ByteBuffer toByteBuffer() throws java.io.IOException { + return ENCODER.encode(this); + } + + /** + * Deserializes a PositionValue from a ByteBuffer. + * @param b a byte buffer holding serialized data for an instance of this class + * @return a PositionValue instance decoded from the given buffer + * @throws java.io.IOException if the given bytes could not be deserialized into an instance of this class + */ + public static PositionValue fromByteBuffer( + java.nio.ByteBuffer b) throws java.io.IOException { + return DECODER.decode(b); + } + + @Deprecated public double latitude; + @Deprecated public double longitude; + + /** + * Default constructor. Note that this does not initialize fields + * to their default values from the schema. If that is desired then + * one should use newBuilder(). + */ + public PositionValue() {} + + /** + * All-args constructor. + * @param latitude The new value for latitude + * @param longitude The new value for longitude + */ + public PositionValue(java.lang.Double latitude, java.lang.Double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + public org.apache.avro.specific.SpecificData getSpecificData() { return MODEL$; } + public org.apache.avro.Schema getSchema() { return SCHEMA$; } + // Used by DatumWriter. Applications should not call. + public java.lang.Object get(int field$) { + switch (field$) { + case 0: return latitude; + case 1: return longitude; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + // Used by DatumReader. Applications should not call. + @SuppressWarnings(value="unchecked") + public void put(int field$, java.lang.Object value$) { + switch (field$) { + case 0: latitude = (java.lang.Double)value$; break; + case 1: longitude = (java.lang.Double)value$; break; + default: throw new IndexOutOfBoundsException("Invalid index: " + field$); + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value of the 'latitude' field. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value the value to set. + */ + public void setLatitude(double value) { + this.latitude = value; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value of the 'longitude' field. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value the value to set. + */ + public void setLongitude(double value) { + this.longitude = value; + } + + /** + * Creates a new PositionValue RecordBuilder. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder() { + return new clients.avro.PositionValue.Builder(); + } + + /** + * Creates a new PositionValue RecordBuilder by copying an existing Builder. + * @param other The existing builder to copy. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder(clients.avro.PositionValue.Builder other) { + if (other == null) { + return new clients.avro.PositionValue.Builder(); + } else { + return new clients.avro.PositionValue.Builder(other); + } + } + + /** + * Creates a new PositionValue RecordBuilder by copying an existing PositionValue instance. + * @param other The existing instance to copy. + * @return A new PositionValue RecordBuilder + */ + public static clients.avro.PositionValue.Builder newBuilder(clients.avro.PositionValue other) { + if (other == null) { + return new clients.avro.PositionValue.Builder(); + } else { + return new clients.avro.PositionValue.Builder(other); + } + } + + /** + * RecordBuilder for PositionValue instances. + */ + @org.apache.avro.specific.AvroGenerated + public static class Builder extends org.apache.avro.specific.SpecificRecordBuilderBase + implements org.apache.avro.data.RecordBuilder { + + private double latitude; + private double longitude; + + /** Creates a new Builder */ + private Builder() { + super(SCHEMA$); + } + + /** + * Creates a Builder by copying an existing Builder. + * @param other The existing Builder to copy. + */ + private Builder(clients.avro.PositionValue.Builder other) { + super(other); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = other.fieldSetFlags()[0]; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = other.fieldSetFlags()[1]; + } + } + + /** + * Creates a Builder by copying an existing PositionValue instance + * @param other The existing instance to copy. + */ + private Builder(clients.avro.PositionValue other) { + super(SCHEMA$); + if (isValidValue(fields()[0], other.latitude)) { + this.latitude = data().deepCopy(fields()[0].schema(), other.latitude); + fieldSetFlags()[0] = true; + } + if (isValidValue(fields()[1], other.longitude)) { + this.longitude = data().deepCopy(fields()[1].schema(), other.longitude); + fieldSetFlags()[1] = true; + } + } + + /** + * Gets the value of the 'latitude' field. + * @return The value. + */ + public double getLatitude() { + return latitude; + } + + + /** + * Sets the value of the 'latitude' field. + * @param value The value of 'latitude'. + * @return This builder. + */ + public clients.avro.PositionValue.Builder setLatitude(double value) { + validate(fields()[0], value); + this.latitude = value; + fieldSetFlags()[0] = true; + return this; + } + + /** + * Checks whether the 'latitude' field has been set. + * @return True if the 'latitude' field has been set, false otherwise. + */ + public boolean hasLatitude() { + return fieldSetFlags()[0]; + } + + + /** + * Clears the value of the 'latitude' field. + * @return This builder. + */ + public clients.avro.PositionValue.Builder clearLatitude() { + fieldSetFlags()[0] = false; + return this; + } + + /** + * Gets the value of the 'longitude' field. + * @return The value. + */ + public double getLongitude() { + return longitude; + } + + + /** + * Sets the value of the 'longitude' field. + * @param value The value of 'longitude'. + * @return This builder. + */ + public clients.avro.PositionValue.Builder setLongitude(double value) { + validate(fields()[1], value); + this.longitude = value; + fieldSetFlags()[1] = true; + return this; + } + + /** + * Checks whether the 'longitude' field has been set. + * @return True if the 'longitude' field has been set, false otherwise. + */ + public boolean hasLongitude() { + return fieldSetFlags()[1]; + } + + + /** + * Clears the value of the 'longitude' field. + * @return This builder. + */ + public clients.avro.PositionValue.Builder clearLongitude() { + fieldSetFlags()[1] = false; + return this; + } + + @Override + @SuppressWarnings("unchecked") + public PositionValue build() { + try { + PositionValue record = new PositionValue(); + record.latitude = fieldSetFlags()[0] ? this.latitude : (java.lang.Double) defaultValue(fields()[0]); + record.longitude = fieldSetFlags()[1] ? this.longitude : (java.lang.Double) defaultValue(fields()[1]); + return record; + } catch (org.apache.avro.AvroMissingFieldException e) { + throw e; + } catch (java.lang.Exception e) { + throw new org.apache.avro.AvroRuntimeException(e); + } + } + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumWriter + WRITER$ = (org.apache.avro.io.DatumWriter)MODEL$.createDatumWriter(SCHEMA$); + + @Override public void writeExternal(java.io.ObjectOutput out) + throws java.io.IOException { + WRITER$.write(this, SpecificData.getEncoder(out)); + } + + @SuppressWarnings("unchecked") + private static final org.apache.avro.io.DatumReader + READER$ = (org.apache.avro.io.DatumReader)MODEL$.createDatumReader(SCHEMA$); + + @Override public void readExternal(java.io.ObjectInput in) + throws java.io.IOException { + READER$.read(this, SpecificData.getDecoder(in)); + } + + @Override protected boolean hasCustomCoders() { return true; } + + @Override public void customEncode(org.apache.avro.io.Encoder out) + throws java.io.IOException + { + out.writeDouble(this.latitude); + + out.writeDouble(this.longitude); + + } + + @Override public void customDecode(org.apache.avro.io.ResolvingDecoder in) + throws java.io.IOException + { + org.apache.avro.Schema.Field[] fieldOrder = in.readFieldOrderIfDiff(); + if (fieldOrder == null) { + this.latitude = in.readDouble(); + + this.longitude = in.readDouble(); + + } else { + for (int i = 0; i < 2; i++) { + switch (fieldOrder[i].pos()) { + case 0: + this.latitude = in.readDouble(); + break; + + case 1: + this.longitude = in.readDouble(); + break; + + default: + throw new java.io.IOException("Corrupt ResolvingDecoder."); + } + } + } + } +} + + + + + + + + + + + diff --git a/solution/python-consumer-avro/main.py b/solution/python-consumer-avro/main.py index 5a84656..23e5e68 100644 --- a/solution/python-consumer-avro/main.py +++ b/solution/python-consumer-avro/main.py @@ -1,20 +1,31 @@ "Python Avro Consumer" -from confluent_kafka.avro import AvroConsumer +from confluent_kafka import DeserializingConsumer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroDeserializer +from confluent_kafka.serialization import StringDeserializer from confluent_kafka.avro.serializer import SerializerError -KAFKA_TOPIC = "driver-positions-pyavro" +KAFKA_TOPIC = "driver-positions-avro" print("Starting Python Avro Consumer.") +with open("position_value.avsc","r") as avro_file: + value_schema = avro_file.read() + +schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'}) + +avro_deserializer = AvroDeserializer(value_schema, schema_registry_client) + # Configure the group id, location of the bootstrap server, # Confluent interceptors, and schema registry location -consumer = AvroConsumer({ - 'bootstrap.servers': 'kafka:9092', - 'plugin.library.paths': 'monitoring-interceptor', - 'group.id': 'python-consumer-avro', - 'auto.offset.reset': 'earliest', - 'schema.registry.url': 'http://schema-registry:8081' -}) +consumer_conf = {'bootstrap.servers': "kafka:9092", + 'key.deserializer': StringDeserializer('utf_8'), + 'value.deserializer': avro_deserializer, + 'group.id': 'python-consumer-avro', + 'plugin.library.paths': 'monitoring-interceptor', + 'auto.offset.reset': "earliest"} + +consumer = DeserializingConsumer(consumer_conf) # Subscribe to our topic consumer.subscribe([KAFKA_TOPIC]) diff --git a/solution/python-consumer-avro/position_value.avsc b/solution/python-consumer-avro/position_value.avsc new file mode 100644 index 0000000..e1131af --- /dev/null +++ b/solution/python-consumer-avro/position_value.avsc @@ -0,0 +1,8 @@ +{"namespace": "clients.avro", + "type": "record", + "name": "PositionValue", + "fields": [ + {"name": "latitude", "type": "double"}, + {"name": "longitude", "type": "double"} + ] +} \ No newline at end of file diff --git a/solution/python-producer-avro/main.py b/solution/python-producer-avro/main.py index 5bd6a16..0051110 100644 --- a/solution/python-producer-avro/main.py +++ b/solution/python-producer-avro/main.py @@ -1,33 +1,47 @@ -"Python Producer" +"Python Avro Producer" + from time import sleep import os import atexit -from confluent_kafka import avro -from confluent_kafka.avro import AvroProducer +from confluent_kafka import SerializingProducer +from confluent_kafka.serialization import StringSerializer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroSerializer DRIVER_FILE_PREFIX = "./drivers/" -KAFKA_TOPIC = "driver-positions-pyavro" +KAFKA_TOPIC = "driver-positions-avro" # Load a driver id from an environment variable # if it isn't present use "driver-3" DRIVER_ID = os.getenv("DRIVER_ID", "driver-3") print("Starting Python Avro producer.") -value_schema = avro.load("position_value.avsc") -key_schema = avro.load("position_key.avsc") +with open("position_value.avsc","r") as avro_file: + value_schema = avro_file.read() # Configure the location of the bootstrap server, Confluent interceptors # and a partitioner compatible with Java, and key/value schemas # see https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md -producer = AvroProducer({ - 'bootstrap.servers': 'kafka:9092', - 'plugin.library.paths': 'monitoring-interceptor', - 'partitioner': 'murmur2_random', - 'schema.registry.url': 'http://schema-registry:8081' -} - , default_key_schema=key_schema - , default_value_schema=value_schema) +schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'}) + +avro_serializer = AvroSerializer(value_schema, schema_registry_client) + +producer_conf = {'bootstrap.servers': 'kafka:9092', + 'key.serializer': StringSerializer('utf_8'), + 'value.serializer': avro_serializer, + 'plugin.library.paths': 'monitoring-interceptor', + 'partitioner': 'murmur2_random'} + +producer = SerializingProducer(producer_conf) + + +def delivery_report(err, msg): + if err is not None: + print("Delivery failed {}: {}".format(msg.key(), err)) + return + print("Sent Key:{} Value:{}".format(key, value)) + def exit_handler(): """Run this on exit""" @@ -47,7 +61,7 @@ def exit_handler(): line = lines[pos] # Trigger any available delivery report callbacks from previous produce() calls producer.poll(0) - key = {"key" : DRIVER_ID} + key = DRIVER_ID latitude = line.split(",")[0].strip() longitude = line.split(",")[1].strip() value = {"latitude" : float(latitude), "longitude" : float(longitude)} @@ -56,8 +70,7 @@ def exit_handler(): topic=KAFKA_TOPIC, value=value, key=key, - callback=lambda err, msg: - print("Sent Key:{} Value:{}".format(key, value) if err is None else err) + on_delivery=delivery_report ) sleep(1) pos = (pos + 1) % len(lines) diff --git a/solution/python-producer-avro/position_key.avsc b/solution/python-producer-avro/position_key.avsc deleted file mode 100644 index e7bb9fa..0000000 --- a/solution/python-producer-avro/position_key.avsc +++ /dev/null @@ -1,7 +0,0 @@ -{"namespace": "clients.avro", - "type": "record", - "name": "PositionKey", - "fields": [ - {"name": "key", "type": "string"} - ] -} \ No newline at end of file diff --git a/webserver-avro/server.js b/webserver-avro/server.js index b991077..18990c4 100644 --- a/webserver-avro/server.js +++ b/webserver-avro/server.js @@ -36,6 +36,14 @@ const schemas = { {'name': 'MODEL', 'type': ['null', 'string'], 'default': null}, ], }, + 'driver-positions-string-avro': { + type: 'record', + fields: [ + {'name': 'latitude', 'type': 'double'}, + {'name': 'longitude', 'type': 'double'}, + {'name': 'positionString', 'type': 'string'}, + ], + }, }; // only the python topic has an avro key @@ -72,6 +80,7 @@ stream.on('data', function(avroData) { if (data.LATITUDE) message['latitude'] = data.LATITUDE; if (data.LONGITUDE) message['longitude'] = data.LONGITUDE; if (data.distance) message['distance'] = Math.round(data.distance); + if (data.positionString) message['position-string'] = data.positionString; // different format for ksql avro stream if (avroData.topic == 'driver-augmented-avro') {