Skip to content

Commit

Permalink
Pacman demo - switch to ksql query-stream API
Browse files Browse the repository at this point in the history
  • Loading branch information
gianlucanatali committed May 3, 2021
1 parent 3e30f22 commit 96e5741
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 24 deletions.
8 changes: 6 additions & 2 deletions streaming-pacman/create_ksqldb_app.sh
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ source $DELTA_CONFIGS_DIR/env.delta
#################################################################
# Confluent Cloud ksqlDB application
#################################################################
echo -e "\nConfluent Cloud ksqlDB application\n"
echo -e "\nConfluent Cloud ksqlDB application endpoin $KSQLDB_ENDPOINT\n"
ccloud::validate_ksqldb_up "$KSQLDB_ENDPOINT" || exit 1

# Create required topics and ACLs
Expand All @@ -34,12 +34,16 @@ do
ccloud kafka topic create "$TOPIC"
done

ccloud ksql app list

ksqlDBAppId=$(ccloud ksql app list | grep "$KSQLDB_ENDPOINT" | awk '{print $1}')
echo "ksqldb app id: ksqlDBAppId"
ccloud ksql app configure-acls $ksqlDBAppId $TOPICS_TO_CREATE

SERVICE_ACCOUNT_ID=$(ccloud kafka cluster list -o json | jq -r '.[0].name' | awk -F'-' '{print $4;}')
for TOPIC in $TOPICS_TO_CREATE
do
ccloud kafka acl create --allow --service-account $(ccloud service-account list | grep $ksqlDBAppId | awk '{print $1;}') --operation WRITE --topic $TOPIC
ccloud kafka acl create --allow --service-account $SERVICE_ACCOUNT_ID --operation WRITE --topic $TOPIC
done

# Submit KSQL queries
Expand Down
16 changes: 8 additions & 8 deletions streaming-pacman/pacman/game/js/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ function loadHighestScore(player, callback) {
if (this.status == 200) {
var result = JSON.parse(this.responseText);
if (result[1] != undefined || result[1] != null) {
var row = result[1].row;
highestScore = row.columns[0];
var row = result[1];
highestScore = row[0];
}
}
callback(highestScore);
Expand All @@ -34,9 +34,9 @@ function loadSummaryStats(callback) {
if (this.status == 200) {
var result = JSON.parse(this.responseText);
if (result[1] != undefined || result[1] != null) {
var row = result[1].row;
highestScore = row.columns[0];
usersSet = row.columns[1];
var row = result[1];
highestScore = row[0];
usersSet = row[1];
}
}
callback(highestScore, usersSet);
Expand All @@ -59,8 +59,8 @@ function sendksqlDBStmt(request, ksqlQuery){

function sendksqlDBQuery(request, ksqlQuery){
var query = {};
query.ksql = ksqlQuery;
query.endpoint = "query";
query.sql = ksqlQuery;
query.endpoint = "query-stream";
request.open('POST', KSQLDB_QUERY_API, true);
request.setRequestHeader('Accept', 'application/json');
request.setRequestHeader('Content-Type', 'application/json');
Expand All @@ -84,7 +84,7 @@ function getScoreboardJson(callback,userList) {

//First element is the header
result.shift();
var playersScores = result.map((item) => ({ user: item.row.columns[0], score:item.row.columns[1],level:item.row.columns[2],losses:item.row.columns[3] }));
var playersScores = result.map((item) => ({ user: item[0], score:item[1],level:item[2],losses:item[3] }));

playersScores = playersScores.sort(function(a, b) {
var res=0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

public class EventHandler implements RequestHandler<Map<String, Object>, Map<String, Object>> {

public static final MediaType MEDIATYPE_JSON = MediaType.get("application/json; charset=utf-8");
public static final MediaType MEDIATYPE_KSQL = MediaType.get("application/vnd.ksql.v1+json");
public static final MediaType MEDIATYPE_JSON = MediaType.parse("application/json; charset=utf-8");
public static final MediaType MEDIATYPE_KSQL = MediaType.parse("application/vnd.ksql.v1+json; charset=utf-8");
Gson gson = new GsonBuilder().setPrettyPrinting().create();

OkHttpClient client = new OkHttpClient.Builder().authenticator(new Authenticator() {
Expand All @@ -51,10 +51,17 @@ public Request authenticate(Route route, Response response) throws IOException {
.build();


private String post(String url, String json) throws IOException {
RequestBody body = RequestBody.create(json, MEDIATYPE_KSQL);
Request request = new Request.Builder()
.url(url)
private String post(String url, String json, String accept) throws IOException {
RequestBody body = RequestBody.create(MEDIATYPE_KSQL, json);
okhttp3.Request.Builder requestBuilder = new Request.Builder()
.url(url);

if(accept != null){
requestBuilder.addHeader("Accept", accept);
}


Request request = requestBuilder
.post(body)
.build();
try (Response response = client.newCall(request).execute()) {
Expand Down Expand Up @@ -101,17 +108,25 @@ public Map<String, Object> handleRequest(final Map<String, Object> request, fina
if (event != null ) {

JsonElement payloadRoot = JsonParser.parseString(event);
String payloadQuery = payloadRoot.getAsJsonObject().get("ksql").getAsString();

String payloadEndpoint = payloadRoot.getAsJsonObject().get("endpoint").getAsString();

logger.log("payloadQuery: "+payloadQuery);
String payloadQuery;

logger.log("payloadEndpoint: "+payloadEndpoint);

String endpoint;
String queryObjName;
String accept = null;


if(Constants.KSQLDB_ENDPOINT_QUERY.equals(payloadEndpoint)){
endpoint = Constants.KSQLDB_ENDPOINT_QUERY;
queryObjName = "sql";
accept = "application/json";
}else if(Constants.KSQLDB_ENDPOINT_KSQL.equals(payloadEndpoint)){
endpoint = Constants.KSQLDB_ENDPOINT_KSQL;
queryObjName = "ksql";

} else {
StringBuilder message = new StringBuilder();
message.append("The endpoint provided ("+payloadEndpoint+") is not supported");
Expand All @@ -121,14 +136,18 @@ public Map<String, Object> handleRequest(final Map<String, Object> request, fina
return response;
}

payloadQuery = payloadRoot.getAsJsonObject().get(queryObjName).getAsString();
logger.log("payloadQuery: "+payloadQuery);

JsonObject newPayload = new JsonObject();
newPayload.add("ksql", payloadRoot.getAsJsonObject().get("ksql"));
newPayload.add(queryObjName, payloadRoot.getAsJsonObject().get(queryObjName));


try {
logger.log("Sending POST to : "+Constants.KSQLDB_ENDPOINT + "/" + endpoint);
logger.log("Payload : "+ gson.toJson(newPayload));
result = post(Constants.KSQLDB_ENDPOINT + "/" + endpoint, gson.toJson(newPayload));
logger.log("Post worked: "+endpoint);
result = post(Constants.KSQLDB_ENDPOINT + "/" + endpoint, gson.toJson(newPayload), accept);
logger.log("Post worked: "+endpoint+" result: "+result);
} catch (Exception e) {
logger.log("Error! "+e.getMessage());
logger.log(ExceptionUtils.getStackTrace(e));
Expand Down Expand Up @@ -161,8 +180,9 @@ public Map<String, Object> handleRequest(final Map<String, Object> request, fina
logger.log("No origin!");
}

logger.log("Response will be sent : "+ gson.toJson(response));
return response;

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public interface Constants {

public static final String ENDPOINT_PARAMETER = "endpoint";
public static final String QUERY_PARAMETER = "ksql";
public static final String KSQLDB_ENDPOINT_QUERY = "query";
public static final String KSQLDB_ENDPOINT_QUERY = "query-stream";
public static final String KSQLDB_ENDPOINT_KSQL = "ksql";

public static final String USER_GAME_TOPIC = "USER_GAME";
Expand Down

0 comments on commit 96e5741

Please sign in to comment.