Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC python files #217

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions src/main/grpc/OpaqueClient.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import sys
import grpc
import atexit

import rpc_pb2
import rpc_pb2_grpc

from ctypes import cdll, c_uint8, c_size_t, byref, POINTER, addressof, c_char, c_char_p, c_wchar_p
lib = cdll.LoadLibrary('/home/opaque/opaque/src/enclave/ServiceProvider/build/libra_jni.so')
sp = lib.sp_new()

with open("user1.crt", "rb") as file:
cert = file.read()

cert_bytes = bytearray(cert)
cert_len = c_size_t(len(cert_bytes))

c_cert = c_uint8 * cert_len.value
c_cert_bytes = c_cert.from_buffer(cert_bytes)

lib.sp_init_wrapper(sp, c_cert_bytes, cert_len)

def perform_ra(stub):

response = stub.relayGenerateReport(rpc_pb2.RARequest(name = "user"))
key_request = rpc_pb2.KeyRequest(name = "user", success = True)

# Parse and process
try:
# Parse and remove first item (which is list of eids) and last item (which is filler tail)
splitter = "2d2d2d2d2d424547494e205055424c4943204b45592d2d2d2d2d" # hex of '-----BEGIN PUBLIC KEY-----'
parsed = response.report.split(splitter)

eids = parsed[0].split()
print(eids)
reports = [(splitter + x) for x in parsed[1:][:-1]]

if len(eids) != len(reports):
raise Exception("number of enclaves and reports don't match")

num_enclaves = len(eids)

for i in range(num_enclaves):

report_bytes = bytearray.fromhex(reports[i])
report_len = c_size_t(len(report_bytes))

c_report = c_uint8 * report_len.value
c_report_bytes = c_report.from_buffer(report_bytes)

ret_val = POINTER(c_uint8)()
ret_len = c_size_t()

f_process = lib.sp_process_enclave_report
f_process(sp, c_report_bytes, byref(report_len), byref(ret_val), byref(ret_len))

msg_carray = c_uint8 * ret_len.value
msg_carray_bytes = msg_carray.from_address(addressof(ret_val.contents))

# Copy ServiceProvider response into bytes
msg = bytes(msg_carray_bytes)

key_request.key.append(msg)
key_request.eid.append(eids[i])

response = stub.relayFinishAttestation(key_request)

if response.success:
print("Attestation successful")
else:
print("Attestation failed")

f_free = lib.sp_free_array
f_free(sp, byref(ret_val))

except Exception as e:
print("Report verification fail: " + str(e))

def decrypt(cipher):

print(cipher)

cipher_bytes = bytes(cipher, 'utf-8')
cipher_len = c_size_t(len(cipher_bytes))
c_cipher_bytes = c_char_p(cipher_bytes)

ret_val = POINTER(c_uint8)()
ret_len = c_size_t()

f_process = lib.sp_decrypt
f_process(sp, c_cipher_bytes, byref(cipher_len), byref(ret_val), byref(ret_len))

msg_carray = c_uint8 * ret_len.value
msg_carray_bytes = msg_carray.from_address(addressof(ret_val.contents))

# Copy into bytes
decrypted_bytes = bytes(msg_carray_bytes)
f_free = lib.sp_free_array
f_free(sp, byref(ret_val))

return decrypted_bytes

def send_query(stub, query):
response = stub.relayQuery(rpc_pb2.QueryRequest(query=query))
if "postVerifyAndPrint" in query.split("(")[0]:
parsed = response.data.splitlines()[:-1]
for enc in parsed:
try:
print(decrypt(enc))
except:
print(response.data)
else:
print(response.data)

def shell(stub):
while True:
user_input = input("opaque> " )
while user_input:
send_query(stub, user_input)
user_input = input("opaque> ")

def clean_up(channel):
lib.sp_clean(sp)
channel.close()
print("Channel closed")

def run():
channel = grpc.insecure_channel('localhost:50060')
stub = rpc_pb2_grpc.OpaqueRPCStub(channel)

atexit.register(clean_up, channel=channel)

# Perform ra
perform_ra(stub)

# Start shell
shell(stub)

if __name__ == '__main__':
run()
125 changes: 125 additions & 0 deletions src/main/grpc/OpaqueRPCListener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from concurrent import futures

import grpc

import os
import sys, getopt
import atexit
from pexpect import replwrap

import rpc_pb2
import rpc_pb2_grpc

# Remove the first line of the output which is a repeat of the input
def clean_shell_output(output):
parsed_output = output.split("\n",1)[1]
return parsed_output.rstrip()

'''
Spark cluster needs to be started before running
Spark configurations can be set at conf/spark-env.sh

To match configurations with MultiPartition Test Suite use:

SPARK_WORKER_INSTANCES=3
SPARK_WORKER_CORES=1
SPARK_WORKER_MEMORY=1G

'''
class OpaqueRPCListener(rpc_pb2_grpc.OpaqueRPCServicer):

def __init__(self):

spark_home = os.getenv("SPARK_HOME")
opaque_jar = ''
master = ''

# Determine opaque jar and ip of master
argv = sys.argv[1:]

if len(argv) == 0:
print('Need arguments: OpaqueRPCListener.py -j <opaque jar> -m <master cluster ip>')
sys.exit(2)

try:
opts, args = getopt.getopt(argv,"hj:m:",["jar=","master="])
except getopt.GetoptError:
print('Need arguments: OpaqueRPCListener.py -j <opaque jar> -m <master cluster ip>')
sys.exit(2)
for opt, arg in opts:
if opt == '-h':
print('OpaqueRPCListener.py -j <opaque jar> -m <master cluster ip>')
sys.exit()
elif opt in ("-j", "--jar"):
opaque_jar = arg
elif opt in ("-m", "--master"):
master = arg

# MultiPartition Test Suite Spark Configurations
spark_shell_bin = spark_home + "/bin/spark-shell" + " --jars " + opaque_jar + " --master " + master\
+ " --conf spark.executor.instances=3 --conf spark.sql.shuffle.partitions=3 --conf spark.executor.memory=4g"

self.proc = replwrap.REPLWrapper(spark_shell_bin, "scala> ", prompt_change=None)
print("Instantiate subprocess")

self.proc.run_command("import edu.berkeley.cs.rise.opaque.implicits._")
self.proc.run_command("edu.berkeley.cs.rise.opaque.Utils.initSQLContext(spark.sqlContext)", timeout=None)
print("Imported Opaque libraries")

def relayGenerateReport(self, request, context):
output = self.proc.run_command("edu.berkeley.cs.rise.opaque.RA.printReport()", timeout=None)
clean_output = clean_shell_output(output)
reply = rpc_pb2.RAReply(success = True, report = clean_output)
return reply

''' TODO: Do not need repeated field anymore. Can remove and only send string back.
Other things that I can add for a better shell:
1. Handle multi-line input
2. Allow for up/down arrow of input (this should be done on client side)
'''
def relayQuery(self, request, context):

if not request.query:
return rpc_pb2.QueryReply(success = True, data = "")

output = self.proc.run_command(request.query, timeout=None)
clean_output = clean_shell_output(output)

return rpc_pb2.QueryReply(success = True, data = clean_output)

def relayFinishAttestation(self, request, context):
key_arg = ""
eid_arg = ""
for key in request.key:
key_arg += key.hex() + " "
for eid in request.eid:
eid_arg += eid + " "

# edu.berkeley.cs.rise.opaque.RA.grpcFinishAttestation(key_arg, eid_arg)
finish_ra_cmd = "edu.berkeley.cs.rise.opaque.RA.grpcFinishAttestation(\"" + key_arg + \
"\", " + "\"" + eid_arg + "\")"

output = self.proc.run_command(finish_ra_cmd, timeout=None)

reply = rpc_pb2.KeyReply(success = True)

if "OpaqueException" not in clean_shell_output(output) :
reply.success = True
else:
reply.success = False
return reply

def clean_up():
print("Server shutoff")

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
rpc_pb2_grpc.add_OpaqueRPCServicer_to_server(OpaqueRPCListener(), server)
server.add_insecure_port('localhost:50060')
server.start()
server.wait_for_termination()
print("Hello world - serve")

if __name__ == '__main__':
atexit.register(clean_up)
serve()