-
Notifications
You must be signed in to change notification settings - Fork 0
/
interactiveShell.py
175 lines (157 loc) · 6.08 KB
/
interactiveShell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
#!/usr/bin/env python3
#** *****************************************************************************
# *
# * If not stated otherwise in this file or this component's LICENSE file the
# * following copyright and licenses apply:
# *
# * Copyright 2024 RDK Management
# *
# * Licensed under the Apache License, Version 2.0 (the "License");
# * you may not use this file except in compliance with the License.
# * You may obtain a copy of the License at
# *
# *
# http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# *
#* ******************************************************************************
import os
import pty
import subprocess
import sys
import select
import fcntl
import time
import pexpect
import atexit
import re
#TODO: Move to raft framework, this module is ideal for local testing
#TODO: Write data to a file and have a read / write pointer, so the flushing of data is moving the read = write
dir_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(dir_path+"/../../../")
from framework.core.commandModules.consoleInterface import consoleInterface
from framework.core.logModule import logModule
gProcess = None
def InteractiveShellCleanUp():
if gProcess is not None:
print("Exiting and terminating shell...")
gProcess.terminate()
gProcess.wait()
class InteractiveShell(consoleInterface):
"""
Represents an interactive shell interface.
"""
def __init__(self, log:logModule=None):
"""
Initializes an InteractiveShell by opening a pseudo-terminal.
"""
# Open a pseudo-terminal
if log is None:
self.log = logModule(self.__class__.__name__)
self.log.setLevel( logModule.INFO )
self.prompt = r"\$ "
self.sessionOpen = False
def open(self):
"""
Starts the shell process (e.g., bash) using pexpect.
"""
current_cwd = os.getcwd()
self.process = pexpect.spawnu('/bin/bash', cwd=current_cwd)
atexit.register(InteractiveShellCleanUp)
gProcess = self.process
# Register the cleanup function to be called on exit
self.sessionOpen = True
# Always start the session in the cwd
result = self.read_until( self.prompt )
self.write( "cd {}".format(current_cwd) )
self.read_all()
return result
def write(self, command):
"""Sends a command to the shell using pexpect."""
self.process.sendline(command) # Use sendline to send commands and include newline
#self.process.sendline('') # To flush the data through
self.log.debug(command)
def read_until(self, message):
"""
Reads output from the shell until a specific message is encountered using pexpect.
"""
max_attempts=2
output = ""
intermediate_string = re.sub(r"\\(.)", r"\1", message)
non_raw_message = intermediate_string.encode('utf-8').decode('unicode_escape')
escaped_message = re.escape(message)
for attempt in range(max_attempts):
try:
found = self.process.expect(escaped_message, timeout=10) # Wait for the specific message
data = self.process.before # This doesn't contain the message
if isinstance(data, bytes):
output = data.decode('utf-8') + non_raw_message # Decode if it's bytes
else:
output = data + non_raw_message # No need to decode if it's already a string
self.log.debug("[{}]".format(output))
break;
except pexpect.TIMEOUT:
if attempt == max_attempts - 1: # Last attempt
self.process.before=""
return output # Return an empty string if the message is not found after all attempts
else:
continue # Continue to the next attempt
except pexpect.EOF:
self.log.error("Reached EOF - process has ended")
self.process.before=""
return output
self.process.before=""
return output
def read_all(self):
"""Reads all available output from the shell using pexpect."""
output = ""
loop = True
while loop:
try:
found = self.process.expect(pexpect.TIMEOUT, timeout=1) # Check for data without blocking
if found == 0:
loop = False
except pexpect.TIMEOUT:
break # No more data available
except pexpect.EOF:
print("Reached EOF - process has ended")
break
else:
data = self.process.before
if isinstance(data, bytes):
decodeData += data.decode('utf-8') # Decode if it's bytes
self.log.DEBUG(decodeData)
output += decodeData
else:
output += data # No need to decode if it's already a string
self.log.debug(data)
self.process.before=""
return output
def close(self):
"""Closes the shell process."""
self.process.close() # Use pexpect's close method
self.sessionOpen = False
if __name__ == '__main__':
# Create the interactive shell instance
shell = InteractiveShell()
# Send a command
result = shell.open()
print("result [{}]".format(result))
shell.write('echo "Hello, world!"')
# Read and print the output
output = shell.read_all()
print(output)
shell.write('echo "Hello, world!"')
result = shell.read_until( shell.prompt )
print( result )
# send another command and check
shell.write('ls')
output = shell.read_all()
print(output)
# When done, close the shell
shell.close()