Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding JSON writing feature and error summarization #72

Open
wants to merge 23 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions pyx12/error_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
class err_iter(object):
"""
Iterate over the error tree

Implements an odd iterator???
"""

Expand Down Expand Up @@ -78,7 +77,7 @@ class err_handler(object):
"""
The interface to the error handling structures.
"""
def __init__(self):
def __init__(self, errhandler):
"""
"""

Expand All @@ -93,6 +92,16 @@ def __init__(self):
self.seg_node_added = False
self.cur_ele_node = None
self.cur_line = 0
self.errhandler = errhandler
self.err_cnt = 0

def add_summary(self, element_count):
"""
Params: element_count - reference to how many elements written in json output
"""
sout = "Data Elements Written: {}".format(element_count)
sout += "Number of Errors Found: {}".format(self.err_cnt)
self.errhandler.write(sout)

def accept(self, visitor):
"""
Expand Down Expand Up @@ -228,6 +237,8 @@ def isa_error(self, err_cde, err_str):
sout = ''
sout += 'Line:%i ' % (self.cur_isa_node.get_cur_line())
sout += 'ISA:%s - %s' % (err_cde, err_str)
self.errhandler.write(sout)
self.err_cnt += 1
logger.error(sout)
self.cur_isa_node.add_error(err_cde, err_str)

Expand All @@ -241,6 +252,8 @@ def gs_error(self, err_cde, err_str):
sout = ''
sout += 'Line:%i ' % (self.cur_gs_node.get_cur_line())
sout += 'GS:%s - %s' % (err_cde, err_str)
self.errhandler.write(sout)
self.err_cnt += 1
logger.error(sout)
self.cur_gs_node.add_error(err_cde, err_str)

Expand All @@ -254,6 +267,8 @@ def st_error(self, err_cde, err_str):
sout = ''
sout += 'Line:%i ' % (self.cur_st_node.get_cur_line())
sout += 'ST:%s - %s' % (err_cde, err_str)
self.errhandler.write(sout)
self.err_cnt += 1
logger.error(sout)
self.cur_st_node.add_error(err_cde, err_str)

Expand All @@ -278,6 +293,9 @@ def seg_error(self, err_cde, err_str, err_value=None, src_line=None):
sout += 'SEG:%s - %s' % (err_cde, err_str)
if err_value:
sout += ' (%s)' % err_value
sout += ''
self.errhandler.write(sout)
self.err_cnt += 1
logger.error(sout)

def ele_error(self, err_cde, err_str, bad_value, refdes=None):
Expand All @@ -295,6 +313,9 @@ def ele_error(self, err_cde, err_str, bad_value, refdes=None):
sout += 'ELE:%s - %s' % (err_cde, err_str)
if bad_value:
sout += ' (%s)' % (bad_value)
sout += ''
self.errhandler.write(sout)
self.err_cnt += 1
logger.error(sout)
#print self.cur_ele_node.errors

Expand Down Expand Up @@ -587,7 +608,6 @@ def __init__(self, parent, seg_data, src):
@type seg_data: L{segment<segment.Segment>}
@param src: X12file source
@type src: L{X12file<x12file.X12Reader>}

"""
self.seg_data = seg_data
self.isa_id = src.get_isa_id()
Expand Down Expand Up @@ -720,7 +740,6 @@ def __repr__(self):
class err_st(err_node):
"""
ST loops

Needs:
1. Transaction set id code (837, 834)
2. Transaction set control number
Expand Down Expand Up @@ -771,7 +790,6 @@ def add_error(self, err_cde, err_str):
def close(self, node, seg_data, src):
"""
Close ST loop

@param node: SE node
@type node: L{node<map_if.x12_node>}
@param seg_data: Segment object
Expand Down Expand Up @@ -935,7 +953,6 @@ class err_ele(err_node):
"""
Element Errors - Holds and generates output for element and
composite/sub-element errors

Each element with an error creates a new err_ele instance.
"""
def __init__(self, parent, map_node):
Expand Down Expand Up @@ -1307,4 +1324,4 @@ def is_closed(self):
def __repr__(self):
"""
"""
return '%i: %s' % (-1, self.id)
return '%i: %s' % (-1, self.id)
129 changes: 129 additions & 0 deletions pyx12/jsonwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
# A simple JSON Generator Based off of the XML Writer in pyx12.xmlwriter
import sys


class JSONriter(object):
"""
Doctest:
>>>from jsonwriter import JSONriter
>>>writer = JSONriter()
>>>writer.push(u"xsa")
>>>#Another element with child elements
>>>writer.push(u"vendor")
>>>#Element with simple text (#PCDATA) content
>>>writer.elem(u"name", u"Centigrade systems")
>>>writer.elem(u"email", u"[email protected]")
>>>writer.elem(u"vendor", u"Centigrade systems")
>>>#Close currently open element ("vendor)
>>>writer.pop()
>>>#Element with an attribute
>>>writer.push(u"product", {u"id": u"100\\u00B0"})
>>>writer.elem(u"name", u"100\\u00B0 Server")
>>>writer.elem(u"version", u"1.0")
>>>writer.elem(u"last-release", u"20030401")
"""

def __init__(self, out=sys.stdout, encoding="utf-8", indent="\t", words_mode=True):
"""
out - a stream for the output
encoding - an encoding used to wrap the output for unicode
indent - white space used for indentation
words_mode - boolean for using string fields rather than codes in output
"""
self.encoding = encoding
self.out = out
self.stack = []
self.indent = indent
self.words_mode = words_mode
self.element_count = 0

def push(self, elem, attrs={}, first=False):
"""
Create an element which will have child elements
"""
self._indent()

if first:
for (_, v) in list(attrs.items()):
if elem == "loop":
self._write("""{"%s": [""" % self._escape_attr(v))
elif elem == "seg":
self._write("""{"%s": {""" % self._escape_attr(v))
elif elem == "comp":
self._write(""""%s": {""" % self._escape_attr(v))
else:
for (_, v) in list(attrs.items()):
if elem == "loop":
self._write(""",{"%s": [""" % self._escape_attr(v))
elif elem == "seg":
self._write(""",{"%s": {""" % self._escape_attr(v))
elif elem == "comp":
self._write(""""%s": {""" % self._escape_attr(v)) #component is essentially an element
self.stack.append(elem)

def elem(self, elem, content, attrs={}, last=False):
"""
Create an element with text content only
"""
self._indent()
for (_, v) in list(attrs.items()):
self.element_count += 1
if last:
self._write('''"%s": "%s"''' % (self._escape_attr(v), self._escape_cont(content))) #Newline
else:
self._write(""""%s": "%s",""" % (self._escape_attr(v), self._escape_cont(content))) # Newline

def pop(self, last=True):
"""
Close an element started with the push() method
"""
if len(self.stack) > 0:
elem = self.stack[-1]
del self.stack[-1]
if last:
if elem == "seg":
self._indent()
self._write("}}") #newline
elif elem == "loop":
self._indent()
self._write("]}") #newline
elif elem == "comp":
self._indent()
self._write("}")
else:
if elem == "seg":
self._indent()
self._write("}},") #newline
elif elem == "loop":
self._indent()
self._write("]},") #newline
elif elem == "comp":
self._indent()
self._write("},")

def __len__(self):
return len(self.stack)

def _indent(self):
return
# Todo : enable multi line json output
# This gets tricky with formatting commas and indents!
self._write(self.indent * (len(self.stack) * 2))

def _escape_cont(self, text):
if text is None:
return None
return text.replace("&", "&amp;")\
.replace("<", "&lt;").replace(">", "&gt;").replace("\t", "")

def _escape_attr(self, text):
if self.words_mode:
return text.replace(' ', '_').replace(',', '').replace('(', '[').replace(')', ']')
if text is None:
return None
return text.replace("&", "&amp;") \
.replace("'", "&apos;").replace("<", "&lt;")\
.replace(">", "&gt;")

def _write(self, strval):
self.out.write(strval)
86 changes: 86 additions & 0 deletions pyx12/test/test_jsonwriter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import os.path
import sys
import os
import unittest

try:
from StringIO import StringIO
except:
from io import StringIO

import tempfile

from pyx12.jsonwriter import JSONriter

class TestJsonWriter(unittest.TestCase):
"""
"""

def test_write_loop_first(self):
# With StringIO Object
try:
fd = StringIO(encoding='ascii')
except:
fd = StringIO()

writer = JSONriter(fd)
attrs = {'id': 'TestLoopFirst'}
writer.push('loop', attrs, first=True)
while len(writer) > 0:
writer.pop()
writer.pop()
expected = """{"TestLoopFirst": []}"""
self.assertEqual(fd.getvalue(), expected)
fd.close()

# With Temp File
_, filename = tempfile.mkstemp('.json', 'pyx12_')
with open(filename, 'w') as fd:
writer = JSONriter(fd)
attrs = {'id': 'TestLoopFirst'}
writer.push('loop', attrs, first=True)
while len(writer) > 0:
writer.pop()

with open(filename, 'r') as fd:
self.assertEqual(fd.read(), expected)

try:
os.remove(filename)
except:
pass

def test_write_loop(self):
# With StringIO Object
try:
fd = StringIO(encoding='ascii')
except:
fd = StringIO()

writer = JSONriter(fd)
attrs = {'id': 'TestLoopFirst'}
writer.push('loop', attrs)
writer.pop()
expected = """,{"TestLoopFirst": []}"""
self.assertEqual(fd.getvalue(), expected)
fd.close()

# With Temp File
_, filename = tempfile.mkstemp('.json', 'pyx12_')
with open(filename, 'w') as fd:
writer = JSONriter(fd)
attrs = {'id': 'TestLoopFirst'}
writer.push('loop', attrs)
while len(writer) > 0:
writer.pop()

with open(filename, 'r') as fd:
self.assertEqual(fd.read(), expected)

try:
os.remove(filename)
except:
pass

if __name__ == "__main__":
unittest.main()
Loading