-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbash_builtins.py
151 lines (126 loc) · 4.45 KB
/
bash_builtins.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import re
import os
import argparse
import threading
import signal
import globals
class BuiltinThreadWrapper:
"""
wraps Thread object. Adds to it wait() method
"""
def __init__(self, thread_object):
self.thread = thread_object
def wait(self):
self.thread.join()
### BUILTIN_FUNCTIONS
def cat_function(args, stdin, stdout):
"""
prints 1st argument file
:param args: arguments
:param stdin: input file descriptor
:param stdout: output file descriptor
:return: nothing
"""
parser = argparse.ArgumentParser(prog="cat", description='print file content')
parser.add_argument("FILE", nargs='?', help='path to file')
parsed_args = vars(parser.parse_args(args))
fin = None
if parsed_args.get("FILE"):
fin = open(parsed_args.get("FILE"), "r")
else:
fin = open(stdin, "r", closefd=False)
with fin, open(stdout, "w", closefd=False) as fout: # we should not close stdout
fout.write(fin.read())
def echo_function(args, stdin, stdout):
"""
prints arguments separated by ' '
:param args: arguments
:param stdin: input file descriptor
:param stdout: output file descriptor
:return: nothing
"""
parser = argparse.ArgumentParser(prog="echo", description='print arguments')
parser.add_argument("argument", nargs='+')
parsed_args = parser.parse_args(args)
fout = open(stdout, "w", closefd=False) # we should not close stdout
fout.write(' '.join(vars(parsed_args).get("argument")) + os.linesep)
def wc_function(args, stdin, stdout):
"""
prints count of lines, words and bytes in 1st argument file.
if file is not specified, prints count of lines, words and bytes in stdin
:param args: arguments
:param stdin: input file descriptor
:param stdout: output file descriptor
:return: nothing
"""
parser = argparse.ArgumentParser(prog="wc", description='print number of lines, words and bytes in FILE')
parser.add_argument("FILE", nargs='?', help='path to file')
parsed_args = parser.parse_args(args)
filepath = vars(parsed_args).get("FILE")
fout = open(stdout, "w", closefd=False) # we should not close stdout
fin = None
if filepath:
# READ FILEPATH
fin = open(filepath, "r")
else:
# READ STDIN
fin = open(stdin, "r", closefd=False)
with fin:
lines = fin.readlines()
words_count = sum([len(line.split()) for line in lines])
file_size = sum([len(line) for line in lines]) # TODO maybe replace with os.path.getsize() for files
fout.write(str(len(lines)) + " " + str(words_count) + " " + str(file_size))
def pwd_function(args, stdin, stdout):
"""
prints absolute path to current directory
:param args: arguments
:param stdin: input file descriptor
:param stdout: output file descriptor
:return: nothing
"""
parser = argparse.ArgumentParser(prog="pwd", description='print current directory')
parser.parse_args(args)
fout = open(stdout, "w", closefd=False) # we should not close stdout
fout.write(os.getcwd())
def exit_function(args, stdin, stdout):
"""
sends SIGTERM to this process.
:param args: arguments
:param stdin: input file descriptor
:param stdout: output file descriptor
:return: nothing
"""
globals.set_should_exit(True)
command_to_function = {
"cat": cat_function,
"echo": echo_function,
"wc": wc_function,
"pwd": pwd_function,
"exit": exit_function
}
def simple_interpret_single_builtin_command(command, stdin, stdout):
"""
execute single(without pipes) builtin command with stdin=stdin, stdout=stdout
:param stdout: output file descriptor
:param stdin: input file descriptor
:param command: list of attributes of command
:return: Object with .wait(), None if command unrecognized
"""
if len(command) == 0:
class EmptyCommand:
def wait(self):
pass
return EmptyCommand()
equality_match = re.match(r"^([\w]+)=(.+)$", command[0])
if equality_match:
os.environ[equality_match.group(1)] = equality_match.group(2)
class Equality:
def wait(self):
pass
return Equality()
elif command[0] in command_to_function:
thread = threading.Thread(target=command_to_function.get(command[0]), args=(command[1:], stdin, stdout))
thread.start()
return BuiltinThreadWrapper(thread)
else:
return None