-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathget-connection-trail.py
107 lines (83 loc) · 2.96 KB
/
get-connection-trail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
"""
This example fetches a trail file and prints stdin events seen in the trail.
"""
import base64
import json
import sys
import config
# this importation for demonstration purpose only
# for proper importation of privx_api module
# see https://github.com/SSHcom/privx-sdk-for-python#getting-started
try:
# Running example with pip-installed SDK
import privx_api
except ImportError:
# Running example without installing SDK
from utils import load_privx_api_lib_path
load_privx_api_lib_path()
import privx_api
# Replace with the ID of the connection to output
CONNECTION_ID = "CONNECTION_ID"
FORMAT = "jsonl"
# Initialize the API.
api = privx_api.PrivXAPI(
config.HOSTNAME,
config.HOSTPORT,
config.CA_CERT,
config.OAUTH_CLIENT_ID,
config.OAUTH_CLIENT_SECRET,
)
# Authenticate.
# NOTE: fill in your credentials from secure storage, this is just an example
api.authenticate(config.API_CLIENT_ID, config.API_CLIENT_SECRET)
def get_connection(conn_id: str):
"""Fetch connection object"""
connection = api.get_connection(conn_id)
if connection.ok:
return connection.data
print(connection.data["details"])
sys.exit(1)
def create_trail_session(conn_id: str, chan_id: str):
"""Create sessionId for trail log download"""
session = api.create_trail_log_download_handle(conn_id, chan_id)
if session.ok:
return session.data
print(session.data["details"])
sys.exit(1)
def download_trail_log(conn_id: str, chan_id: str, sess_id: str, log_format: str):
"""Download trail log"""
trail = api.download_trail_log(conn_id, chan_id, sess_id, log_format)
if trail.ok:
return b"".join(trail.iter_content()).decode()
sys.exit(1)
def print_trail(trail: str):
"""Parse and print trail log"""
trail_lines = trail.splitlines()
input_str = ""
for line_item in trail_lines:
line = json.loads(line_item)
if line["type"] == "stdin":
input_event = base64.b64decode(line["data"]).decode("latin1")
if input_event == "\r":
print(line["ts"][0:19] + " " + input_str)
input_str = ""
else:
input_str += input_event
def main():
"""Fetch the connection, create sessionId, download trail and parse it."""
connection = get_connection(CONNECTION_ID)
# Iterate channels
for channel in connection["trail"]["channels"]:
if channel["type"] == "shell":
# Get a session ID for trail download
session = create_trail_session(CONNECTION_ID, channel["id"])
# Get the trail
trail_log = download_trail_log(
CONNECTION_ID, channel["id"], session["session_id"], FORMAT
)
# Parse the trail
print("\r\nstdin conn " + CONNECTION_ID + " chan " + channel["id"])
print("------------------------------------------------------")
print_trail(trail_log)
if __name__ == "__main__":
main()