forked from wwkimball/yamlpath
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathyaml_get.py
214 lines (187 loc) · 7.5 KB
/
yaml_get.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
"""
Enable users to get data out of YAML/Compatible files using YAML Paths.
Retrieves one or more values from a YAML file at a specified YAML Path.
Output is printed to STDOUT, one line per match. When a result is a complex
data-type (Array or Hash), a JSON dump is produced to represent each complex
result. EYAML can be employed to decrypt the values.
Copyright 2018, 2019, 2020, 2021 William W. Kimball, Jr. MBA MSIS
"""
import sys
import argparse
import json
from os import access, R_OK
from os.path import isfile
from ruamel.yaml.comments import CommentedSet
from yamlpath.patches.timestamp import (
AnchoredTimeStamp,
AnchoredDate,
)
from yamlpath import __version__ as YAMLPATH_VERSION
from yamlpath.common import Parsers, Nodes
from yamlpath import YAMLPath
from yamlpath.exceptions import YAMLPathException
from yamlpath.eyaml.exceptions import EYAMLCommandException
from yamlpath.enums import PathSeparators
from yamlpath.wrappers import NodeCoords
from yamlpath.eyaml import EYAMLProcessor
from yamlpath.wrappers import ConsolePrinter
# pylint: enable=wrong-import-position,ungrouped-imports
def processcli():
"""Process command-line arguments."""
parser = argparse.ArgumentParser(
description=(
"Retrieves one or more values from a YAML/JSON/Compatible"
" file at a specified YAML Path. Output is printed to STDOUT, one"
" line per result. When a result is a complex data-type (Array or"
" Hash), a JSON dump is produced to represent it. EYAML can be"
" employed to decrypt the values."),
epilog=(
"For more information about YAML Paths, please visit"
" https://github.com/wwkimball/yamlpath/wiki. To report issues"
" with this tool or to request enhancements, please visit"
" https://github.com/wwkimball/yamlpath/issues.")
)
parser.add_argument("-V", "--version", action="version",
version="%(prog)s " + YAMLPATH_VERSION)
required_group = parser.add_argument_group("required settings")
required_group.add_argument(
"-p", "--query",
required=True,
metavar="YAML_PATH",
help="YAML Path to query"
)
parser.add_argument(
"-t", "--pathsep",
default="dot",
choices=PathSeparators,
metavar=PathSeparators.get_choices(),
type=PathSeparators.from_str,
help="indicate which YAML Path separator to use when rendering\
results; default=dot")
parser.add_argument(
"-S", "--nostdin", action="store_true",
help=(
"Do not implicitly read from STDIN, even when YAML_FILE is not set"
" and the session is non-TTY"))
eyaml_group = parser.add_argument_group(
"EYAML options", "Left unset, the EYAML keys will default to your\
system or user defaults. Both keys must be set either here or in\
your system or user EYAML configuration file when using EYAML.")
eyaml_group.add_argument(
"-x", "--eyaml",
default="eyaml",
help="the eyaml binary to use when it isn't on the PATH")
eyaml_group.add_argument("-r", "--privatekey", help="EYAML private key")
eyaml_group.add_argument("-u", "--publickey", help="EYAML public key")
noise_group = parser.add_mutually_exclusive_group()
noise_group.add_argument(
"-d", "--debug",
action="store_true",
help="output debugging details")
noise_group.add_argument(
"-v", "--verbose",
action="store_true",
help="increase output verbosity")
noise_group.add_argument(
"-q", "--quiet",
action="store_true",
help="suppress all output except errors")
parser.add_argument(
"yaml_file", metavar="YAML_FILE",
nargs="?",
help="the YAML file to query; omit or use - to read from STDIN")
return parser.parse_args()
def validateargs(args, log):
"""Validate command-line arguments."""
has_errors = False
in_file = args.yaml_file if args.yaml_file else ""
in_stream_mode = in_file.strip() == "-" or (
not in_file and not args.nostdin and not sys.stdin.isatty()
)
# When there is no YAML_FILE and no STDIN, there is nothing to read
if not (in_file or in_stream_mode):
has_errors = True
log.error("YAML_FILE must be set or be read from STDIN.")
# When set, --privatekey must be a readable file
if args.privatekey and not (
isfile(args.privatekey) and access(args.privatekey, R_OK)
):
has_errors = True
log.error(
"EYAML private key is not a readable file: " + args.privatekey
)
# When set, --publickey must be a readable file
if args.publickey and not (
isfile(args.publickey) and access(args.publickey, R_OK)
):
has_errors = True
log.error(
"EYAML public key is not a readable file: " + args.publickey
)
# When either --publickey or --privatekey are set, the other must also
# be. This is because the `eyaml` command requires them both when
# decrypting values.
if (
(args.publickey and not args.privatekey)
or (args.privatekey and not args.publickey)
):
has_errors = True
log.error("Both private and public EYAML keys must be set.")
# When dumping the document to STDOUT, mute all non-errors
force_verbose = args.verbose
force_debug = args.debug
if in_stream_mode and not (force_verbose or force_debug):
args.quiet = True
args.verbose = False
args.debug = False
if has_errors:
sys.exit(1)
def main():
"""Perform the work specified via CLI arguments and exit.
Main code.
"""
args = processcli()
log = ConsolePrinter(args)
validateargs(args, log)
yaml_path = YAMLPath(args.query, pathsep=args.pathsep)
# Prep the YAML parser
yaml = Parsers.get_yaml_editor()
# Attempt to open the YAML file; check for parsing errors
(yaml_data, doc_loaded) = Parsers.get_yaml_data(
yaml, log,
args.yaml_file if args.yaml_file else "-")
if not doc_loaded:
# An error message has already been logged
sys.exit(1)
# Seek the queried value(s)
discovered_nodes = []
processor = EYAMLProcessor(
log, yaml_data, binary=args.eyaml,
publickey=args.publickey, privatekey=args.privatekey)
try:
for node in processor.get_eyaml_values(yaml_path, mustexist=True):
log.debug(
"Got node from {}:".format(yaml_path), data=node,
prefix="yaml_get::main: ")
discovered_nodes.append(NodeCoords.unwrap_node_coords(node))
except YAMLPathException as ex:
log.critical(ex, 1)
except EYAMLCommandException as ex:
log.critical(ex, 2)
try:
for node in discovered_nodes:
if isinstance(node, (dict, list, CommentedSet)):
print(json.dumps(Parsers.jsonify_yaml_data(node)))
else:
if node is None:
node = "\x00"
elif isinstance(node, AnchoredDate):
node = node.date().isoformat()
elif isinstance(node, AnchoredTimeStamp):
node = Nodes.get_timestamp_with_tzinfo(node).isoformat()
print("{}".format(str(node).replace("\n", r"\n")))
except RecursionError:
log.critical(
"The YAML data contains an infinitely recursing YAML Alias!", 1)
if __name__ == "__main__":
main() # pragma: no cover