Skip to content

Commit

Permalink
some mitmproxy script tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
diogotr7 committed Dec 29, 2024
1 parent 5e1f2be commit db39f24
Showing 1 changed file with 45 additions and 34 deletions.
79 changes: 45 additions & 34 deletions scripts/stream-sc.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
from google.protobuf.descriptor_pool import DescriptorPool
from google.protobuf.message_factory import MessageFactory
from google.protobuf.message import DecodeError
from google.protobuf.text_format import MessageToString
from mitmproxy import ctx
import logging
from google.protobuf.json_format import MessageToJson


class ProtobufModifier:
Expand Down Expand Up @@ -79,10 +79,10 @@ def deserialize(
except DecodeError as e:
raise ValueError("Unable to deserialize input") from e

return MessageToString(
return MessageToJson(
message=message,
descriptor_pool=self.descriptor_pool,
print_unknown_fields=True,
always_print_fields_with_no_presence=True,
)

def find_method_by_path(self, path: str) -> MethodDescriptor:
Expand Down Expand Up @@ -145,25 +145,21 @@ def render_priority(
return float(content_type in self.supported_content_types)


class GrpcProtobufDebugWriter:
def write_dump(req_or_rep, path, content):
if not os.path.exists("dump"):
os.makedirs("dump")

with open(
os.path.join(
"dump",
f"[{datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')}] [{req_or_rep}] {path}.json",
),
"wb",
) as f:
f.write(content)

def __write(self, req_or_rep, path, content):
if not os.path.exists("dump"):
os.makedirs("dump")

with open(
os.path.join(
"dump",
datetime.datetime.now().strftime("%Y%m%d.%H%M%S.%f")
+ "-"
+ req_or_rep
+ "-"
+ path
+ ".grpc",
),
"wb",
) as f:
f.write(content.encode())

class GrpcProtobufDebugWriter:

def __init__(self, protobuf_modifier: ProtobufModifier) -> None:
self.protobuf_modifier = protobuf_modifier
Expand All @@ -172,15 +168,24 @@ def request(self, flow: mitmproxy.http.HTTPFlow):
req = self.protobuf_modifier.deserialize(
flow.request, flow.request.path, flow.request.content
)
path = flow.request.path.replace("/", "_")
self.__write("request", path, req)
path = flow.request.path.replace("/", ".")
write_dump("REQ", path, req.encode())

def response(self, flow: mitmproxy.http.HTTPFlow):
rep = self.protobuf_modifier.deserialize(
flow.response, flow.request.path, flow.response.content
)
path = flow.request.path.replace("/", "_")
self.__write("response", path, rep)
path = flow.request.path.replace("/", ".")
write_dump("REP", path, rep.encode())
if "ExternalEntitlementService" in flow.request.path:
with open(
os.path.join(
"dump",
f"[{datetime.datetime.now().strftime('%Y%m%d.%H%M%S.%f')}] [REP] {path}.bin",
),
"wb",
) as f:
f.write(flow.response.content)


class StreamSaver:
Expand Down Expand Up @@ -223,16 +228,22 @@ def __call__(self, data):
data,
)

print(
"Decoded streamed "
+ self.direction
+ ". Path: "
+ self.flow.request.path
+ ". Length: "
+ str(len(data))
+ ". Data: "
+ decodedData
write_dump(
self.direction == "request" and "SOUT" or "SIN",
self.flow.request.path.replace("/", ".") + "." + self.direction,
decodedData.encode(),
)

# print(
# "Decoded streamed "
# + self.direction
# + ". Path: "
# + self.flow.request.path
# + ". Length: "
# + str(len(data))
# + ". Data: "
# + decodedData
# )
except Exception as e:
print(
"Failed to decode "
Expand Down

0 comments on commit db39f24

Please sign in to comment.