Skip to content

Commit

Permalink
[ZEPPELIN-5962] LivyInterpreter support more parameter when create se…
Browse files Browse the repository at this point in the history
…ssion (apache#4657)

* LivyInterpreter support more parameter when create session

* Add test for ZEPPELIN-5962

---------

Co-authored-by: mrzhao <[email protected]>
  • Loading branch information
2 people authored and akoira committed Feb 1, 2024
1 parent 5aa88d0 commit bdd31af
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import com.google.gson.annotations.SerializedName;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -302,15 +303,19 @@ private SessionInfo createSession(String user, String kind)
throws LivyException {
try {
Map<String, String> conf = new HashMap<>();
Map<String, String> params = new HashMap<>();
for (Map.Entry<Object, Object> entry : getProperties().entrySet()) {
if (entry.getKey().toString().startsWith("livy.spark.") &&
!entry.getValue().toString().isEmpty()) {
conf.put(entry.getKey().toString().substring(5), entry.getValue().toString());
} else if (entry.getKey().toString().startsWith("livy.") &&
!entry.getValue().toString().isEmpty()) {
params.put(entry.getKey().toString().substring(5), entry.getValue().toString());
}
}

CreateSessionRequest request = new CreateSessionRequest(kind,
user == null || user.equals("anonymous") ? null : user, conf);
user == null || user.equals("anonymous") ? null : user, conf, params);
SessionInfo sessionInfo = SessionInfo.fromJson(
callRestAPI("/sessions", "POST", request.toJson()));
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -776,15 +781,23 @@ private static class CreateSessionRequest {
@SerializedName("proxyUser")
public final String user;
public final Map<String, String> conf;
public final Map<String, String> params;

CreateSessionRequest(String kind, String user, Map<String, String> conf) {
CreateSessionRequest(String kind, String user, Map<String, String> conf,
Map<String, String> params) {
this.kind = kind;
this.user = user;
this.conf = conf;
this.params = params;
}

public String toJson() {
return gson.toJson(this);
JsonObject jsonObject = new JsonObject();
jsonObject.add("conf", gson.toJsonTree(conf));
params.forEach(jsonObject::addProperty);
jsonObject.addProperty("kind", kind);
jsonObject.addProperty("proxyUser", user);
return gson.toJson(jsonObject);
}
}

Expand Down
36 changes: 36 additions & 0 deletions livy/src/test/java/org/apache/zeppelin/livy/LivyInterpreterIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,42 @@ public void run() {
}
}

@Test
void testLivyParams() throws InterpreterException {
if (!checkPreCondition()) {
return;
}
InterpreterGroup interpreterGroup = new InterpreterGroup("group_1");
interpreterGroup.put("session_1", new ArrayList<Interpreter>());
Properties props = new Properties(properties);
props.setProperty("livy.spark.executor.cores", "4");
props.setProperty("livy.name", "zeppelin-livy");
LivySparkInterpreter sparkInterpreter = new LivySparkInterpreter(props);
sparkInterpreter.setInterpreterGroup(interpreterGroup);
interpreterGroup.get("session_1").add(sparkInterpreter);
AuthenticationInfo authInfo = new AuthenticationInfo("user1");
MyInterpreterOutputListener outputListener = new MyInterpreterOutputListener();
InterpreterOutput output = new InterpreterOutput(outputListener);
InterpreterContext context = InterpreterContext.builder()
.setNoteId("noteId")
.setParagraphId("paragraphId")
.setAuthenticationInfo(authInfo)
.setInterpreterOut(output)
.build();
sparkInterpreter.open();

try {
InterpreterResult result = sparkInterpreter.interpret("sc.version\n" +
"assert(sc.getConf.get(\"spark.executor.cores\") == \"4\" && " +
"sc.getConf.get(\"spark.app.name\") == \"zeppelin-livy\")"
, context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code(), result.toString());
assertEquals(1, result.message().size());
} finally {
sparkInterpreter.close();
}
}

@Test
void testLivyTutorialNote() throws IOException, InterpreterException {
if (!checkPreCondition()) {
Expand Down

0 comments on commit bdd31af

Please sign in to comment.