diff --git a/pyx12/error_handler.py b/pyx12/error_handler.py index 6763b354..69d2355b 100644 --- a/pyx12/error_handler.py +++ b/pyx12/error_handler.py @@ -23,7 +23,6 @@ class err_iter(object): """ Iterate over the error tree - Implements an odd iterator??? """ @@ -78,7 +77,7 @@ class err_handler(object): """ The interface to the error handling structures. """ - def __init__(self): + def __init__(self, errhandler): """ """ @@ -93,6 +92,16 @@ def __init__(self): self.seg_node_added = False self.cur_ele_node = None self.cur_line = 0 + self.errhandler = errhandler + self.err_cnt = 0 + + def add_summary(self, element_count): + """ + Params: element_count - reference to how many elements written in json output + """ + sout = "Data Elements Written: {}".format(element_count) + sout += "Number of Errors Found: {}".format(self.err_cnt) + self.errhandler.write(sout) def accept(self, visitor): """ @@ -228,6 +237,8 @@ def isa_error(self, err_cde, err_str): sout = '' sout += 'Line:%i ' % (self.cur_isa_node.get_cur_line()) sout += 'ISA:%s - %s' % (err_cde, err_str) + self.errhandler.write(sout) + self.err_cnt += 1 logger.error(sout) self.cur_isa_node.add_error(err_cde, err_str) @@ -241,6 +252,8 @@ def gs_error(self, err_cde, err_str): sout = '' sout += 'Line:%i ' % (self.cur_gs_node.get_cur_line()) sout += 'GS:%s - %s' % (err_cde, err_str) + self.errhandler.write(sout) + self.err_cnt += 1 logger.error(sout) self.cur_gs_node.add_error(err_cde, err_str) @@ -254,6 +267,8 @@ def st_error(self, err_cde, err_str): sout = '' sout += 'Line:%i ' % (self.cur_st_node.get_cur_line()) sout += 'ST:%s - %s' % (err_cde, err_str) + self.errhandler.write(sout) + self.err_cnt += 1 logger.error(sout) self.cur_st_node.add_error(err_cde, err_str) @@ -278,6 +293,9 @@ def seg_error(self, err_cde, err_str, err_value=None, src_line=None): sout += 'SEG:%s - %s' % (err_cde, err_str) if err_value: sout += ' (%s)' % err_value + sout += '' + self.errhandler.write(sout) + self.err_cnt += 1 logger.error(sout) def ele_error(self, err_cde, err_str, bad_value, refdes=None): @@ -295,6 +313,9 @@ def ele_error(self, err_cde, err_str, bad_value, refdes=None): sout += 'ELE:%s - %s' % (err_cde, err_str) if bad_value: sout += ' (%s)' % (bad_value) + sout += '' + self.errhandler.write(sout) + self.err_cnt += 1 logger.error(sout) #print self.cur_ele_node.errors @@ -587,7 +608,6 @@ def __init__(self, parent, seg_data, src): @type seg_data: L{segment} @param src: X12file source @type src: L{X12file} - """ self.seg_data = seg_data self.isa_id = src.get_isa_id() @@ -720,7 +740,6 @@ def __repr__(self): class err_st(err_node): """ ST loops - Needs: 1. Transaction set id code (837, 834) 2. Transaction set control number @@ -771,7 +790,6 @@ def add_error(self, err_cde, err_str): def close(self, node, seg_data, src): """ Close ST loop - @param node: SE node @type node: L{node} @param seg_data: Segment object @@ -935,7 +953,6 @@ class err_ele(err_node): """ Element Errors - Holds and generates output for element and composite/sub-element errors - Each element with an error creates a new err_ele instance. """ def __init__(self, parent, map_node): @@ -1307,4 +1324,4 @@ def is_closed(self): def __repr__(self): """ """ - return '%i: %s' % (-1, self.id) + return '%i: %s' % (-1, self.id) \ No newline at end of file diff --git a/pyx12/jsonwriter.py b/pyx12/jsonwriter.py new file mode 100644 index 00000000..33b4d901 --- /dev/null +++ b/pyx12/jsonwriter.py @@ -0,0 +1,129 @@ +# A simple JSON Generator Based off of the XML Writer in pyx12.xmlwriter +import sys + + +class JSONriter(object): + """ + Doctest: + >>>from jsonwriter import JSONriter + >>>writer = JSONriter() + >>>writer.push(u"xsa") + >>>#Another element with child elements + >>>writer.push(u"vendor") + >>>#Element with simple text (#PCDATA) content + >>>writer.elem(u"name", u"Centigrade systems") + >>>writer.elem(u"email", u"info@centigrade.bogus") + >>>writer.elem(u"vendor", u"Centigrade systems") + >>>#Close currently open element ("vendor) + >>>writer.pop() + >>>#Element with an attribute + >>>writer.push(u"product", {u"id": u"100\\u00B0"}) + >>>writer.elem(u"name", u"100\\u00B0 Server") + >>>writer.elem(u"version", u"1.0") + >>>writer.elem(u"last-release", u"20030401") + """ + + def __init__(self, out=sys.stdout, encoding="utf-8", indent="\t", words_mode=True): + """ + out - a stream for the output + encoding - an encoding used to wrap the output for unicode + indent - white space used for indentation + words_mode - boolean for using string fields rather than codes in output + """ + self.encoding = encoding + self.out = out + self.stack = [] + self.indent = indent + self.words_mode = words_mode + self.element_count = 0 + + def push(self, elem, attrs={}, first=False): + """ + Create an element which will have child elements + """ + self._indent() + + if first: + for (_, v) in list(attrs.items()): + if elem == "loop": + self._write("""{"%s": [""" % self._escape_attr(v)) + elif elem == "seg": + self._write("""{"%s": {""" % self._escape_attr(v)) + elif elem == "comp": + self._write(""""%s": {""" % self._escape_attr(v)) + else: + for (_, v) in list(attrs.items()): + if elem == "loop": + self._write(""",{"%s": [""" % self._escape_attr(v)) + elif elem == "seg": + self._write(""",{"%s": {""" % self._escape_attr(v)) + elif elem == "comp": + self._write(""""%s": {""" % self._escape_attr(v)) #component is essentially an element + self.stack.append(elem) + + def elem(self, elem, content, attrs={}, last=False): + """ + Create an element with text content only + """ + self._indent() + for (_, v) in list(attrs.items()): + self.element_count += 1 + if last: + self._write('''"%s": "%s"''' % (self._escape_attr(v), self._escape_cont(content))) #Newline + else: + self._write(""""%s": "%s",""" % (self._escape_attr(v), self._escape_cont(content))) # Newline + + def pop(self, last=True): + """ + Close an element started with the push() method + """ + if len(self.stack) > 0: + elem = self.stack[-1] + del self.stack[-1] + if last: + if elem == "seg": + self._indent() + self._write("}}") #newline + elif elem == "loop": + self._indent() + self._write("]}") #newline + elif elem == "comp": + self._indent() + self._write("}") + else: + if elem == "seg": + self._indent() + self._write("}},") #newline + elif elem == "loop": + self._indent() + self._write("]},") #newline + elif elem == "comp": + self._indent() + self._write("},") + + def __len__(self): + return len(self.stack) + + def _indent(self): + return + # Todo : enable multi line json output + # This gets tricky with formatting commas and indents! + self._write(self.indent * (len(self.stack) * 2)) + + def _escape_cont(self, text): + if text is None: + return None + return text.replace("&", "&")\ + .replace("<", "<").replace(">", ">").replace("\t", "") + + def _escape_attr(self, text): + if self.words_mode: + return text.replace(' ', '_').replace(',', '').replace('(', '[').replace(')', ']') + if text is None: + return None + return text.replace("&", "&") \ + .replace("'", "'").replace("<", "<")\ + .replace(">", ">") + + def _write(self, strval): + self.out.write(strval) \ No newline at end of file diff --git a/pyx12/test/test_jsonwriter.py b/pyx12/test/test_jsonwriter.py new file mode 100644 index 00000000..93732081 --- /dev/null +++ b/pyx12/test/test_jsonwriter.py @@ -0,0 +1,86 @@ +import os.path +import sys +import os +import unittest + +try: + from StringIO import StringIO +except: + from io import StringIO + +import tempfile + +from pyx12.jsonwriter import JSONriter + +class TestJsonWriter(unittest.TestCase): + """ + """ + + def test_write_loop_first(self): + # With StringIO Object + try: + fd = StringIO(encoding='ascii') + except: + fd = StringIO() + + writer = JSONriter(fd) + attrs = {'id': 'TestLoopFirst'} + writer.push('loop', attrs, first=True) + while len(writer) > 0: + writer.pop() + writer.pop() + expected = """{"TestLoopFirst": []}""" + self.assertEqual(fd.getvalue(), expected) + fd.close() + + # With Temp File + _, filename = tempfile.mkstemp('.json', 'pyx12_') + with open(filename, 'w') as fd: + writer = JSONriter(fd) + attrs = {'id': 'TestLoopFirst'} + writer.push('loop', attrs, first=True) + while len(writer) > 0: + writer.pop() + + with open(filename, 'r') as fd: + self.assertEqual(fd.read(), expected) + + try: + os.remove(filename) + except: + pass + + def test_write_loop(self): + # With StringIO Object + try: + fd = StringIO(encoding='ascii') + except: + fd = StringIO() + + writer = JSONriter(fd) + attrs = {'id': 'TestLoopFirst'} + writer.push('loop', attrs) + writer.pop() + expected = """,{"TestLoopFirst": []}""" + self.assertEqual(fd.getvalue(), expected) + fd.close() + + # With Temp File + _, filename = tempfile.mkstemp('.json', 'pyx12_') + with open(filename, 'w') as fd: + writer = JSONriter(fd) + attrs = {'id': 'TestLoopFirst'} + writer.push('loop', attrs) + while len(writer) > 0: + writer.pop() + + with open(filename, 'r') as fd: + self.assertEqual(fd.read(), expected) + + try: + os.remove(filename) + except: + pass + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/pyx12/x12json_simple.py b/pyx12/x12json_simple.py new file mode 100644 index 00000000..06dd3c47 --- /dev/null +++ b/pyx12/x12json_simple.py @@ -0,0 +1,249 @@ +from os.path import commonprefix +import logging + +from .x12xml_simple import x12xml_simple +from .jsonwriter import JSONriter +from .errors import EngineError +from .map_walker import pop_to_parent_loop + +logger = logging.getLogger('pyx12.x12json.simple') + +class X12JsonSimple(x12xml_simple): + def __init__(self, fd, words_mode=True): + """ + @param fd: File stream for output + @param words_mode: Dump JSON using string names for fields rather than codes. + """ + self.writer = JSONriter(fd, words_mode=words_mode) + self.last_path = [] + self.visited = [] + self.words_mode = words_mode + + def __del__(self): + self.finalize() + + def finalize(self): + while len(self.writer) > 0: + self.writer.pop() + + @staticmethod + def get_parents_in_path(node): + # Todo: Ensure all x12 files start with Interchange Control Header + def safe_get_parent(node): + try: + if node.parent.id == 'ISA_LOOP': + return [] + else: + return [str(node.parent.name)] + except AttributeError: + return [] + parent_nodes_in_path = [] + parents = safe_get_parent(node) + while len(parents) > 0: + parent_nodes_in_path += parents + node = node.parent + parents = safe_get_parent(node) + # if parents[0] == "Application Sender's Code": + # import pdb;pdb.set_trace() + # print("hello") + return ['Interchange Control Header'] + list(reversed(parent_nodes_in_path)) + + def seg_with_names(self, seg_node, seg_data): + """ + Generate JSON for the segment data and matching map node. + Essentially the same as "seg", however this will write + String field names, rather than codes. + @param seg_node: Map Node + @type seg_node: L{node} + @param seg_data: Segment object + @type seg_data: L{segment} + """ + if not seg_node.is_segment(): + raise EngineError('Node must be a segment') + parent = pop_to_parent_loop(seg_node) # Get enclosing loop + # check path for new loops to be added + cur_path = self._path_list(parent.get_path()) + if self.last_path == cur_path and seg_node.is_first_seg_in_loop(): + # loop repeat + self.writer.pop() + (xname, attrib) = self._get_loop_info(cur_path[-1]) + attrib['id'] = parent.name + self.writer.push(xname, attrib, first=False) + else: + last_path = self.last_path + match_idx = self._get_path_match_idx(last_path, cur_path) + root_path = self._path_list(commonprefix(['/'.join(cur_path), '/'.join(last_path)])) + if seg_node.is_first_seg_in_loop() and root_path == cur_path: + match_idx -= 1 + loop_struct = range(len(last_path) - 1, match_idx - 1, -1) + for i in loop_struct: + if i == loop_struct[-1]: + self.writer.pop() + else: + self.writer.pop() + for i in range(match_idx, len(cur_path)): + (xname, attrib) = self._get_loop_info(cur_path[i]) + # Write a Loop + parent_path_nodes = self.get_parents_in_path(seg_node) + attrib['id'] = parent_path_nodes[i] + parent_loop = cur_path[i-1] + if parent_loop not in self.visited: + self.visited.append(parent_loop) + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + seg_node_id = self._get_node_id(seg_node, parent, seg_data) + (xname, attrib) = self._get_seg_info(seg_node_id) + attrib['id'] = seg_node.name + if seg_node.is_first_seg_in_loop(): + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + loop_struct = range(len(seg_data)) + for i in loop_struct: + if i == loop_struct[-1]: + last = True + else: + # Check to see if any of the next children exist. + # If no, then we are on last node + try: + next_children = [seg_node.get_child_node_by_idx(index) for index in loop_struct[i+1:]] + except IndexError: + next_children = [] + next_node_exists = [not(child_node.usage == 'N' or seg_data.get('%02i' % (i + 1)).is_empty()) for child_node in next_children] + if any(next_node_exists): + last = False + else: + last = True + child_node = seg_node.get_child_node_by_idx(i) + if child_node.usage == 'N' or seg_data.get('%02i' % (i + 1)).is_empty(): + pass # Do not try to ouput for invalid or empty elements + elif child_node.is_composite(): + (xname, attrib) = self._get_comp_info(child_node.id) + attrib['id'] = child_node.name + if i == loop_struct[0]: + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + comp_data = seg_data.get('%02i' % (i + 1)) + for j in range(len(comp_data)): + if j == range(len(comp_data))[-1]: + elem_last = True + else: + elem_last = False + subele_node = child_node.get_child_node_by_idx(j) + (xname, attrib) = self._get_subele_info(subele_node.id) + attrib['id'] = subele_node.name + self.writer.elem(xname, comp_data[j].get_value(), attrib, elem_last) + self.writer.pop(last=last) # end composite + elif child_node.is_element(): + if seg_data.get_value('%02i' % (i + 1)) == '': + pass + else: + attrib['id'] = child_node.name + self.writer.elem(xname, seg_data.get_value('%02i' % (i + 1)), attrib, last) + else: + raise EngineError('Node must be a either an element or a composite') + self.writer.pop() # end segment + if parent.id not in self.visited: + self.visited.append(parent.id) + self.last_path = cur_path + + + def seg(self, seg_node, seg_data): + """ + Generate JSON for the segment data and matching map node + @param seg_node: Map Node + @type seg_node: L{node} + @param seg_data: Segment object + @type seg_data: L{segment} + """ + if self.words_mode: + self.seg_with_names(seg_node, seg_data) + return + + if not seg_node.is_segment(): + raise EngineError('Node must be a segment') + parent = pop_to_parent_loop(seg_node) # Get enclosing loop + # check path for new loops to be added + cur_path = self._path_list(parent.get_path()) + if self.last_path == cur_path and seg_node.is_first_seg_in_loop(): + # loop repeat + self.writer.pop() + (xname, attrib) = self._get_loop_info(cur_path[-1]) + self.writer.push(xname, attrib, first=False) + else: + last_path = self.last_path + match_idx = self._get_path_match_idx(last_path, cur_path) + root_path = self._path_list(commonprefix(['/'.join(cur_path), '/'.join(last_path)])) + if seg_node.is_first_seg_in_loop() and root_path == cur_path: + match_idx -= 1 + loop_struct = range(len(last_path) - 1, match_idx - 1, -1) + for i in loop_struct: + if i == loop_struct[-1]: + self.writer.pop() + else: + self.writer.pop() + for i in range(match_idx, len(cur_path)): + (xname, attrib) = self._get_loop_info(cur_path[i]) + # Write a Loop + parent_loop = cur_path[i-1] + if parent_loop not in self.visited: + self.visited.append(parent_loop) + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + seg_node_id = self._get_node_id(seg_node, parent, seg_data) + (xname, attrib) = self._get_seg_info(seg_node_id) + if seg_node.is_first_seg_in_loop(): + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + loop_struct = range(len(seg_data)) + for i in loop_struct: + if i == loop_struct[-1]: + last = True + else: + # Check to see if any of the next children exist. + # If no, then we are on last node + try: + next_children = [seg_node.get_child_node_by_idx(index) for index in loop_struct[i+1:]] + except IndexError: + next_children = [] + next_node_exists = [not(child_node.usage == 'N' or seg_data.get('%02i' % (i + 1)).is_empty()) for child_node in next_children] + if any(next_node_exists): + last = False + else: + last = True + child_node = seg_node.get_child_node_by_idx(i) + if child_node.usage == 'N' or seg_data.get('%02i' % (i + 1)).is_empty(): + pass # Do not try to ouput for invalid or empty elements + elif child_node.is_composite(): + (xname, attrib) = self._get_comp_info(child_node.id) # formerly seg_node_id + if i == loop_struct[0]: + self.writer.push(xname, attrib, first=True) + else: + self.writer.push(xname, attrib, first=False) + comp_data = seg_data.get('%02i' % (i + 1)) + for j in range(len(comp_data)): + if j == range(len(comp_data))[-1]: + elem_last = True + else: + elem_last = False + subele_node = child_node.get_child_node_by_idx(j) + (xname, attrib) = self._get_subele_info(subele_node.id) + self.writer.elem(xname, comp_data[j].get_value(), attrib, elem_last) + self.writer.pop(last=last) # end composite + elif child_node.is_element(): + if seg_data.get_value('%02i' % (i + 1)) == '': + pass + #self.writer.empty(u"ele", attrs={u'id': child_node.id}) + else: + (xname, attrib) = self._get_ele_info(child_node.id) + self.writer.elem(xname, seg_data.get_value('%02i' % (i + 1)), attrib, last) + else: + raise EngineError('Node must be a either an element or a composite') + self.writer.pop() # end segment + if parent.id not in self.visited: + self.visited.append(parent.id) + self.last_path = cur_path \ No newline at end of file diff --git a/pyx12/x12n_document.py b/pyx12/x12n_document.py index 3a83b729..9dbee3f1 100644 --- a/pyx12/x12n_document.py +++ b/pyx12/x12n_document.py @@ -14,6 +14,7 @@ """ import logging +import sys # Intrapackage imports import pyx12.error_handler @@ -26,6 +27,7 @@ import pyx12.x12file from pyx12.map_walker import walk_tree import pyx12.x12xml_simple +import pyx12.x12json_simple def _reset_counter_to_isa_counts(walker): @@ -47,8 +49,8 @@ def _reset_counter_to_gs_counts(walker): def x12n_document(param, src_file, fd_997, fd_html, - fd_xmldoc=None, xslt_files=None, map_path=None, - callback=None): + fd_xmldoc=None, fd_jsondoc=None, dump_json_to_words=False, xslt_files=None, map_path=None, + callback=None, errhandler=sys.stderr): """ Primary X12 validation function @param param: pyx12.param instance @@ -60,10 +62,14 @@ def x12n_document(param, src_file, fd_997, fd_html, @type fd_html: file descriptor @param fd_xmldoc: XML output document @type fd_xmldoc: file descriptor + @param fd_jsondoc: JSON output document (outputs to single line) + @type fd_jsondoc: file descriptor @rtype: boolean + @param errhandler: Error Output Document + @type errhandler: file descriptor """ logger = logging.getLogger('pyx12') - errh = pyx12.error_handler.err_handler() + errh = pyx12.error_handler.err_handler(errhandler=errhandler) # Get X12 DATA file try: @@ -89,6 +95,8 @@ def x12n_document(param, src_file, fd_997, fd_html, err_iter = pyx12.error_handler.err_iter(errh) if fd_xmldoc: xmldoc = pyx12.x12xml_simple.x12xml_simple(fd_xmldoc, param.get('simple_dtd')) + if fd_jsondoc: + fd_jsondoc = pyx12.x12json_simple.X12JsonSimple(fd_jsondoc, words_mode=dump_json_to_words) #basedir = os.path.dirname(src_file) #erx = errh_xml.err_handler(basedir=basedir) @@ -216,11 +224,14 @@ def x12n_document(param, src_file, fd_997, fd_html, if fd_xmldoc: xmldoc.seg(node, seg) + + if fd_jsondoc: + fd_jsondoc.seg(node, seg) if False: print('\n\n') #erx.Write(src.cur_line) - + #erx.handleErrors(src.pop_errors()) src.cleanup() # Catch any skipped loop trailers errh.handle_errors(src.pop_errors()) @@ -233,6 +244,10 @@ def x12n_document(param, src_file, fd_997, fd_html, if fd_xmldoc: del xmldoc + + if fd_jsondoc: + element_cnt = fd_jsondoc.writer.element_count + errh.add_summary(element_cnt) #visit_debug = pyx12.error_debug.error_debug_visitor(sys.stdout) #errh.accept(visit_debug) @@ -262,9 +277,15 @@ def x12n_document(param, src_file, fd_997, fd_html, pass try: if not valid or errh.get_error_count() > 0: - return False + success = False else: - return True + success = True except Exception: - print(errh) - return False + print(errh) #todo: change to logging + success = False + + if fd_jsondoc: + fd_jsondoc.finalize() + + return success + \ No newline at end of file