From 21396dda110db945f00ffac2feccfce597d3ae00 Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Tue, 27 Apr 2021 02:55:09 +0000 Subject: [PATCH 1/2] gRPC python files --- src/main/grpc/OpaqueClient.py | 140 +++++++++++++++++++++++++++++ src/main/grpc/OpaqueRPCListener.py | 116 ++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 src/main/grpc/OpaqueClient.py create mode 100644 src/main/grpc/OpaqueRPCListener.py diff --git a/src/main/grpc/OpaqueClient.py b/src/main/grpc/OpaqueClient.py new file mode 100644 index 0000000000..3f31bf3183 --- /dev/null +++ b/src/main/grpc/OpaqueClient.py @@ -0,0 +1,140 @@ +import sys +import grpc +import atexit + +import rpc_pb2 +import rpc_pb2_grpc + +from ctypes import cdll, c_uint8, c_size_t, byref, POINTER, addressof, c_char, c_char_p, c_wchar_p +lib = cdll.LoadLibrary('/home/opaque/opaque/src/enclave/ServiceProvider/build/libra_jni.so') +sp = lib.sp_new() + +with open("user1.crt", "rb") as file: + cert = file.read() + + cert_bytes = bytearray(cert) + cert_len = c_size_t(len(cert_bytes)) + + c_cert = c_uint8 * cert_len.value + c_cert_bytes = c_cert.from_buffer(cert_bytes) + + lib.sp_init_wrapper(sp, c_cert_bytes, cert_len) + +def perform_ra(stub): + + response = stub.relayGenerateReport(rpc_pb2.RARequest(name = "user")) + key_request = rpc_pb2.KeyRequest(name = "user", success = True) + + # Parse and process + try: + # Parse and remove first item (which is list of eids) and last item (which is filler tail) + splitter = "2d2d2d2d2d424547494e205055424c4943204b45592d2d2d2d2d" # hex of '-----BEGIN PUBLIC KEY-----' + parsed = response.report.split(splitter) + + eids = parsed[0].split() + print(eids) + reports = [(splitter + x) for x in parsed[1:][:-1]] + + if len(eids) != len(reports): + raise Exception("number of enclaves and reports don't match") + + num_enclaves = len(eids) + + for i in range(num_enclaves): + + report_bytes = bytearray.fromhex(reports[i]) + report_len = c_size_t(len(report_bytes)) + + c_report = c_uint8 * report_len.value + c_report_bytes = c_report.from_buffer(report_bytes) + + ret_val = POINTER(c_uint8)() + ret_len = c_size_t() + + f_process = lib.sp_process_enclave_report + f_process(sp, c_report_bytes, byref(report_len), byref(ret_val), byref(ret_len)) + + msg_carray = c_uint8 * ret_len.value + msg_carray_bytes = msg_carray.from_address(addressof(ret_val.contents)) + + # Copy ServiceProvider response into bytes + msg = bytes(msg_carray_bytes) + + key_request.key.append(msg) + key_request.eid.append(eids[i]) + + response = stub.relayFinishAttestation(key_request) + + if response.success: + print("Attestation successful") + else: + print("Attestation failed") + + f_free = lib.sp_free_array + f_free(sp, byref(ret_val)) + + except Exception as e: + print("Report verification fail: " + str(e)) + +def decrypt(cipher): + + print(cipher) + + cipher_bytes = bytes(cipher, 'utf-8') + cipher_len = c_size_t(len(cipher_bytes)) + c_cipher_bytes = c_char_p(cipher_bytes) + + ret_val = POINTER(c_uint8)() + ret_len = c_size_t() + + f_process = lib.sp_decrypt + f_process(sp, c_cipher_bytes, byref(cipher_len), byref(ret_val), byref(ret_len)) + + msg_carray = c_uint8 * ret_len.value + msg_carray_bytes = msg_carray.from_address(addressof(ret_val.contents)) + + # Copy into bytes + decrypted_bytes = bytes(msg_carray_bytes) + f_free = lib.sp_free_array + f_free(sp, byref(ret_val)) + + return decrypted_bytes + +def send_query(stub, query): + response = stub.relayQuery(rpc_pb2.QueryRequest(query=query)) + if "postVerifyAndPrint" in query.split("(")[0]: + parsed = response.data.splitlines()[:-1] + for enc in parsed: + try: + print(decrypt(enc)) + except: + print(response.data) + else: + print(response.data) + +def shell(stub): + while True: + user_input = input("opaque> " ) + while user_input: + send_query(stub, user_input) + user_input = input("opaque> ") + +def clean_up(channel): + lib.sp_clean(sp) + channel.close() + print("Channel closed") + +def run(): + channel = grpc.insecure_channel('localhost:50060') + stub = rpc_pb2_grpc.OpaqueRPCStub(channel) + + atexit.register(clean_up, channel=channel) + + # Perform ra + perform_ra(stub) + + # Start shell + shell(stub) + +if __name__ == '__main__': + run() diff --git a/src/main/grpc/OpaqueRPCListener.py b/src/main/grpc/OpaqueRPCListener.py new file mode 100644 index 0000000000..14e57f9a57 --- /dev/null +++ b/src/main/grpc/OpaqueRPCListener.py @@ -0,0 +1,116 @@ +from concurrent import futures + +import grpc + +import os +import sys, getopt +import atexit +from pexpect import replwrap + +import rpc_pb2 +import rpc_pb2_grpc + +# Remove the first line of the output which is a repeat of the input +def clean_shell_output(output): + parsed_output = output.split("\n",1)[1] + return parsed_output.rstrip() + +class OpaqueRPCListener(rpc_pb2_grpc.OpaqueRPCServicer): + + def __init__(self): + + spark_home = os.getenv("SPARK_HOME") + opaque_jar = '' + master = '' + + # Determine opaque jar and ip of master + argv = sys.argv[1:] + + if len(argv) == 0: + print('Need arguments: OpaqueRPCListener.py -j -m ') + sys.exit(2) + + try: + opts, args = getopt.getopt(argv,"hj:m:",["jar=","master="]) + except getopt.GetoptError: + print('Need arguments: OpaqueRPCListener.py -j -m ') + sys.exit(2) + for opt, arg in opts: + if opt == '-h': + print('OpaqueRPCListener.py -j -m ') + sys.exit() + elif opt in ("-j", "--jar"): + opaque_jar = arg + elif opt in ("-m", "--master"): + master = arg + + # TODO: Currently hardcoding memory capacities. Will need to change to be dynamic eventually + spark_shell_bin = spark_home + "/bin/spark-shell" + " --jars " + opaque_jar + " --master " + master\ + + " --driver-memory 1G --executor-memory=2G --conf spark.executor.instances=2" + +# spark_shell_bin = spark_home + "/bin/spark-shell" + " --jars " + opaque_jar + + self.proc = replwrap.REPLWrapper(spark_shell_bin, "scala> ", prompt_change=None) + print("Instantiate subprocess") + + self.proc.run_command("import edu.berkeley.cs.rise.opaque.implicits._") + self.proc.run_command("edu.berkeley.cs.rise.opaque.Utils.initSQLContext(spark.sqlContext)") + print("Imported Opaque libraries") + + def relayGenerateReport(self, request, context): + output = self.proc.run_command("edu.berkeley.cs.rise.opaque.RA.printReport()") + clean_output = clean_shell_output(output) + reply = rpc_pb2.RAReply(success = True, report = clean_output) + return reply + + ''' TODO: Do not need repeated field anymore. Can remove and only send string back. + Other things that I can add for a better shell: + 1. Handle multi-line input + 2. Allow for up/down arrow of input (this should be done on client side) + ''' + def relayQuery(self, request, context): + + if not request.query: + return rpc_pb2.QueryReply(success = True, data = "") + + output = self.proc.run_command(request.query) + clean_output = clean_shell_output(output) + + return rpc_pb2.QueryReply(success = True, data = clean_output) + + def relayFinishAttestation(self, request, context): + key_arg = "" + eid_arg = "" + for key in request.key: + key_arg += key.hex() + " " + for eid in request.eid: + eid_arg += eid + " " + + # edu.berkeley.cs.rise.opaque.RA.grpcFinishAttestation(key_arg, eid_arg) + finish_ra_cmd = "edu.berkeley.cs.rise.opaque.RA.grpcFinishAttestation(\"" + key_arg + \ + "\", " + "\"" + eid_arg + "\")" + + output = self.proc.run_command(finish_ra_cmd) + + reply = rpc_pb2.KeyReply(success = True) + + if "OpaqueException" not in clean_shell_output(output) : + reply.success = True + else: + reply.success = False + return reply + +def clean_up(): + print("Server shutoff") + +def serve(): + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + rpc_pb2_grpc.add_OpaqueRPCServicer_to_server(OpaqueRPCListener(), server) + server.add_insecure_port('localhost:50060') + server.start() + server.wait_for_termination() + print("Hello world - serve") + +if __name__ == '__main__': + atexit.register(clean_up) + serve() From 3133eede1378fe1870f40164c5c52e2a4f4f42bd Mon Sep 17 00:00:00 2001 From: Eric Feng Date: Fri, 14 May 2021 02:49:48 +0000 Subject: [PATCH 2/2] Add specific parameters to RPC Listener --- src/main/grpc/OpaqueRPCListener.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/main/grpc/OpaqueRPCListener.py b/src/main/grpc/OpaqueRPCListener.py index 14e57f9a57..f7b8efdd3f 100644 --- a/src/main/grpc/OpaqueRPCListener.py +++ b/src/main/grpc/OpaqueRPCListener.py @@ -15,6 +15,17 @@ def clean_shell_output(output): parsed_output = output.split("\n",1)[1] return parsed_output.rstrip() +''' +Spark cluster needs to be started before running +Spark configurations can be set at conf/spark-env.sh + +To match configurations with MultiPartition Test Suite use: + +SPARK_WORKER_INSTANCES=3 +SPARK_WORKER_CORES=1 +SPARK_WORKER_MEMORY=1G + +''' class OpaqueRPCListener(rpc_pb2_grpc.OpaqueRPCServicer): def __init__(self): @@ -44,21 +55,19 @@ def __init__(self): elif opt in ("-m", "--master"): master = arg - # TODO: Currently hardcoding memory capacities. Will need to change to be dynamic eventually + # MultiPartition Test Suite Spark Configurations spark_shell_bin = spark_home + "/bin/spark-shell" + " --jars " + opaque_jar + " --master " + master\ - + " --driver-memory 1G --executor-memory=2G --conf spark.executor.instances=2" - -# spark_shell_bin = spark_home + "/bin/spark-shell" + " --jars " + opaque_jar + + " --conf spark.executor.instances=3 --conf spark.sql.shuffle.partitions=3 --conf spark.executor.memory=4g" self.proc = replwrap.REPLWrapper(spark_shell_bin, "scala> ", prompt_change=None) print("Instantiate subprocess") self.proc.run_command("import edu.berkeley.cs.rise.opaque.implicits._") - self.proc.run_command("edu.berkeley.cs.rise.opaque.Utils.initSQLContext(spark.sqlContext)") + self.proc.run_command("edu.berkeley.cs.rise.opaque.Utils.initSQLContext(spark.sqlContext)", timeout=None) print("Imported Opaque libraries") def relayGenerateReport(self, request, context): - output = self.proc.run_command("edu.berkeley.cs.rise.opaque.RA.printReport()") + output = self.proc.run_command("edu.berkeley.cs.rise.opaque.RA.printReport()", timeout=None) clean_output = clean_shell_output(output) reply = rpc_pb2.RAReply(success = True, report = clean_output) return reply @@ -73,7 +82,7 @@ def relayQuery(self, request, context): if not request.query: return rpc_pb2.QueryReply(success = True, data = "") - output = self.proc.run_command(request.query) + output = self.proc.run_command(request.query, timeout=None) clean_output = clean_shell_output(output) return rpc_pb2.QueryReply(success = True, data = clean_output) @@ -90,7 +99,7 @@ def relayFinishAttestation(self, request, context): finish_ra_cmd = "edu.berkeley.cs.rise.opaque.RA.grpcFinishAttestation(\"" + key_arg + \ "\", " + "\"" + eid_arg + "\")" - output = self.proc.run_command(finish_ra_cmd) + output = self.proc.run_command(finish_ra_cmd, timeout=None) reply = rpc_pb2.KeyReply(success = True)