From 857c446150dcd39ace230ef49282bbf76ef166ea Mon Sep 17 00:00:00 2001 From: Andrew Wason Date: Fri, 15 Apr 2022 13:54:19 -0400 Subject: [PATCH] Support wildcard Accessors for segment and repeat. --- docs/accessors.rst | 18 ++++++++++++ hl7/accessor.py | 33 +++++++++++++++++---- hl7/containers.py | 59 ++++++++++++++++++++++++++++---------- tests/samples.py | 9 ++++++ tests/test_accessor.py | 18 ++++++++++++ tests/test_construction.py | 6 ++++ tests/test_containers.py | 42 ++++++++++++++++++++++++++- 7 files changed, 163 insertions(+), 22 deletions(-) diff --git a/docs/accessors.rst b/docs/accessors.rst index 22b1d7b..35b3730 100644 --- a/docs/accessors.rst +++ b/docs/accessors.rst @@ -58,6 +58,8 @@ you need to specify: Field 3, Repeat 1, Component 2, Sub-Component 2 (PID.F1.R1. Reading values from a tree structure in this manner is the only safe way to read data from a message. +The segment and Repeat indexes may be wildcard with "*". + .. doctest:: >>> h['PID.F1.R1'] @@ -66,6 +68,9 @@ from a message. >>> h['PID.F2.R1.C1'] 'Component1' + >>> h['PID.F4.*.C1'] + ['Repeat1', 'Repeat2'] + You can also access values using :py:class:`hl7.Accessor`, or by directly calling :py:meth:`hl7.Message.extract_field`. The following are all equivalent: @@ -80,6 +85,19 @@ You can also access values using :py:class:`hl7.Accessor`, or by directly callin >>> h.extract_field('PID', 1, 2, 1, 1) 'Component1' +The following are also all equivalent: + +.. doctest:: + + >>> h['PID.F4.*.C1'] + ['Repeat1', 'Repeat2'] + + >>> h[hl7.Accessor('PID', 1, 4, hl7.Accessor.WILDCARD, 1)] + ['Repeat1', 'Repeat2'] + + >>> h.extract_field('PID', 1, 4, hl7.Accessor.WILDCARD, 1) + ['Repeat1', 'Repeat2'] + All values should be accessed in this manner. Even if a field is marked as being non-repeating a repeat of "1" should be specified as later version messages could have a repeating value. diff --git a/hl7/accessor.py b/hl7/accessor.py index 42f4f72..6fe1e48 100644 --- a/hl7/accessor.py +++ b/hl7/accessor.py @@ -17,6 +17,8 @@ class Accessor( ): __slots__ = () + WILDCARD = object() + def __new__( cls, segment, @@ -27,6 +29,12 @@ def __new__( subcomponent_num=None, ): """Create a new instance of Accessor for *segment*. Index numbers start from 1.""" + if ( + field_num is cls.WILDCARD + or component_num is cls.WILDCARD + or subcomponent_num is cls.WILDCARD + ): + raise ValueError("wildcard only supported for segment and repeat") return super(Accessor, cls).__new__( cls, segment, @@ -43,14 +51,15 @@ def key(self): seg = ( self.segment if self.segment_num == 1 - else self.segment + str(self.segment_num) + else self.segment + + str("*" if self.segment_num is self.WILDCARD else self.segment_num) ) return ".".join( str(f) for f in [ seg, self.field_num, - self.repeat_num, + "*" if self.repeat_num is self.WILDCARD else self.repeat_num, self.component_num, self.subcomponent_num, ] @@ -73,6 +82,7 @@ def parse_key(cls, key): | S Sub-Component | | *Indexing is from 1 for compatibility with HL7 spec numbering.* + | 'n' may be the wildcard specifier '*' for SEG[n] and Rn Example: @@ -83,25 +93,36 @@ def parse_key(cls, key): | R1 (repeat counting from 1) | C2 (component 2 counting from 1) | S2 (component 2 counting from 1) + | + | PID.1.*.2.2 + | + | return a list of sub-components, one for each repetition """ - def parse_part(keyparts, index, prefix): + def parse_part(keyparts, index, prefix, allow_wildcard=False): if len(keyparts) > index: num = keyparts[index] if num[0].upper() == prefix: num = num[1:] - return int(num) + if num == "*": + if allow_wildcard: + return Accessor.WILDCARD + else: + raise ValueError(f"wildcard not supported for {prefix}") + else: + return int(num) else: return None parts = key.split(".") segment = parts[0][:3] if len(parts[0]) > 3: - segment_num = int(parts[0][3:]) + snum = parts[0][3:] + segment_num = Accessor.WILDCARD if snum == "*" else int(snum) else: segment_num = 1 field_num = parse_part(parts, 1, "F") - repeat_num = parse_part(parts, 2, "R") + repeat_num = parse_part(parts, 2, "R", allow_wildcard=True) component_num = parse_part(parts, 3, "C") subcomponent_num = parse_part(parts, 4, "S") return cls( diff --git a/hl7/containers.py b/hl7/containers.py index 1620b38..4061e5a 100644 --- a/hl7/containers.py +++ b/hl7/containers.py @@ -466,9 +466,17 @@ def extract_field( | PID.F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1) """ - return self.segments(segment)(segment_num).extract_field( - segment_num, field_num, repeat_num, component_num, subcomponent_num - ) + if segment_num is Accessor.WILDCARD: + return [ + segment.extract_field( + num, field_num, repeat_num, component_num, subcomponent_num + ) + for num, segment in enumerate(self.segments(segment), start=1) + ] + else: + return self.segments(segment)(segment_num).extract_field( + segment_num, field_num, repeat_num, component_num, subcomponent_num + ) def assign_field( self, @@ -487,6 +495,8 @@ def assign_field( Extract a field using a future proofed approach, based on rules in: http://wiki.medical-objects.com.au/index.php/Hl7v2_parsing """ + if segment_num is Accessor.WILDCARD or repeat_num is Accessor.WILDCARD: + raise ValueError("wildcards not supported for assignment") self.segments(segment)(segment_num).assign_field( value, field_num, repeat_num, component_num, subcomponent_num ) @@ -664,7 +674,7 @@ def extract_field( | F4.R1.C1.SC1 = 'Repeat1' (ignore .SC1) """ # Save original values for error messages - accessor = Accessor( + original_accessor = Accessor( self[0][0], segment_num, field_num, @@ -683,13 +693,32 @@ def extract_field( else: if repeat_num == 1 and component_num == 1 and subcomponent_num == 1: return "" # Assume non-present optional value - raise IndexError("Field not present: {0}".format(accessor.key)) + raise IndexError("Field not present: {0}".format(original_accessor.key)) + + accessor = Accessor( + original_accessor.segment, + segment_num, + field_num, + repeat_num, + component_num, + subcomponent_num, + ) + if repeat_num is Accessor.WILDCARD: + return [ + self._extract_repetition(field, accessor, original_accessor, rnum) + for rnum in range(1, len(field) + 1) + ] + else: + return self._extract_repetition( + field, accessor, original_accessor, repeat_num + ) + def _extract_repetition(self, field, accessor, original_accessor, repeat_num): rep = field(repeat_num) if not isinstance(rep, Repetition): # leaf - if component_num == 1 and subcomponent_num == 1: + if accessor.component_num == 1 and accessor.subcomponent_num == 1: return ( rep if accessor.segment == "MSH" and accessor.field_num in (1, 2) @@ -697,28 +726,28 @@ def extract_field( ) raise IndexError( "Field reaches leaf node before completing path: {0}".format( - accessor.key + original_accessor.key ) ) - if component_num > len(rep): - if subcomponent_num == 1: + if accessor.component_num > len(rep): + if accessor.subcomponent_num == 1: return "" # Assume non-present optional value - raise IndexError("Component not present: {0}".format(accessor.key)) + raise IndexError("Component not present: {0}".format(original_accessor.key)) - component = rep(component_num) + component = rep(accessor.component_num) if not isinstance(component, Component): # leaf - if subcomponent_num == 1: + if accessor.subcomponent_num == 1: return unescape(self, component) raise IndexError( "Field reaches leaf node before completing path: {0}".format( - accessor.key + original_accessor.key ) ) - if subcomponent_num <= len(component): - subcomponent = component(subcomponent_num) + if accessor.subcomponent_num <= len(component): + subcomponent = component(accessor.subcomponent_num) return unescape(self, subcomponent) else: return "" # Assume non-present optional value diff --git a/tests/samples.py b/tests/samples.py index b9da86d..12c05e3 100644 --- a/tests/samples.py +++ b/tests/samples.py @@ -21,6 +21,15 @@ ] ) +rep_sample_2 = "\r".join( + [ + "MSH|^~\\&|GHH LAB|ELAB-3|GHH OE|BLDG4|200202150930||ORU^R01|CNTRL-3456|P|2.4", + "PID|AField1|AComponent1^AComponent2|A1Component1^A1Sub-Component1&A1Sub-Component2^A1Component3~A2Component1^A2Sub-Component1&A2Sub-Component2^A2Component3|ARepeat1~ARepeat2", + "PID|BField1|BComponent1^BComponent2|B1Component1^B1Sub-Component1&B1Sub-Component2^B1Component3~B2Component1^B2Sub-Component1&B2Sub-Component2^B2Component3|BRepeat1~BRepeat2~BRepeat3", + "", + ] +) + # Source: http://www.health.vic.gov.au/hdss/vinah/2006-07/appendix-a-sample-messages.pdf sample_batch = "\r".join( [ diff --git a/tests/test_accessor.py b/tests/test_accessor.py index 295280b..018eafd 100644 --- a/tests/test_accessor.py +++ b/tests/test_accessor.py @@ -10,12 +10,30 @@ def test_key(self): self.assertEqual("FOO2", Accessor("FOO", 2).key) self.assertEqual("FOO2.3", Accessor("FOO", 2, 3).key) self.assertEqual("FOO2.3.1.4.6", Accessor("FOO", 2, 3, 1, 4, 6).key) + self.assertEqual("FOO*", Accessor("FOO*").key) + self.assertEqual("FOO2.3.*.4", Accessor("FOO", 2, 3, Accessor.WILDCARD, 4).key) + self.assertEqual( + "FOO*.3.*.4", + Accessor("FOO", Accessor.WILDCARD, 3, Accessor.WILDCARD, 4).key, + ) + with self.assertRaises(ValueError) as cm: + Accessor("FOO", 1, Accessor.WILDCARD, 1) + self.assertIn( + "wildcard only supported for segment and repeat", cm.exception.args[0] + ) def test_parse(self): self.assertEqual(Accessor("FOO"), Accessor.parse_key("FOO")) self.assertEqual( Accessor("FOO", 2, 3, 1, 4, 6), Accessor.parse_key("FOO2.3.1.4.6") ) + self.assertEqual( + Accessor("FOO", Accessor.WILDCARD, 3, Accessor.WILDCARD, 4, 6), + Accessor.parse_key("FOO*.3.*.4.6"), + ) + with self.assertRaises(ValueError) as cm: + Accessor.parse_key("FOO.*.1") + self.assertIn("wildcard not supported for F", cm.exception.args[0]) def test_equality(self): self.assertEqual(Accessor("FOO", 1, 3, 4), Accessor("FOO", 1, 3, 4)) diff --git a/tests/test_construction.py b/tests/test_construction.py index f6af481..d087e5d 100644 --- a/tests/test_construction.py +++ b/tests/test_construction.py @@ -19,6 +19,12 @@ def test_create_msg(self): response["MSH.F2.R1"] = SEP[1:] self.assertEqual(str(response), "MSH|^~\\&|\rMSA\r") + def test_disallow_wildcard(self): + src_msg = hl7.parse(rep_sample_hl7) + with self.assertRaises(ValueError) as cm: + src_msg["PID.4.*"] = "X" + self.assertIn("wildcards not supported for assignment", cm.exception.args[0]) + def test_append(self): # Append a segment to a message MSH = hl7.Segment(SEP[0], [hl7.Field(SEP[2], ["MSH"])]) diff --git a/tests/test_containers.py b/tests/test_containers.py index b79d9e2..b0bbc1e 100644 --- a/tests/test_containers.py +++ b/tests/test_containers.py @@ -4,7 +4,7 @@ import hl7 from hl7 import Field, Segment -from .samples import sample_hl7 +from .samples import rep_sample_2, sample_hl7, sample_msh class ContainerTest(TestCase): @@ -32,6 +32,12 @@ def test_segments(self): self.assertIsInstance(s[0][1], Field) + def test_segments_wildcard(self): + msg = hl7.parse(sample_hl7) + s = msg["OBX*.2"] + self.assertEqual(len(s), 2) + self.assertEqual(s, ["SN", "FN"]) + def test_segments_does_not_exist(self): msg = hl7.parse(sample_hl7) self.assertRaises(KeyError, msg.segments, "BAD") @@ -52,6 +58,40 @@ def test_segments_dict_key(self): self.assertEqual(s[0][0:3], [["OBX"], ["1"], ["SN"]]) self.assertEqual(s[1][0:3], [["OBX"], ["2"], ["FN"]]) + def test_repetition_wildcard(self): + msg = hl7.parse(sample_msh) + s = msg["PID.3.*"] + self.assertEqual(s, ["2148790", "162840"]) + + def test_segment_repetition_wildcard(self): + msg = hl7.parse(rep_sample_2) + self.assertEqual(msg["PID.3.*"], ["A1Component1", "A2Component1"]) + self.assertEqual(msg["PID.3.*.2"], ["A1Sub-Component1", "A2Sub-Component1"]) + self.assertEqual(msg["PID.3.*.2.2"], ["A1Sub-Component2", "A2Sub-Component2"]) + self.assertEqual( + msg["PID*.3.*"], + [["A1Component1", "A2Component1"], ["B1Component1", "B2Component1"]], + ) + self.assertEqual( + msg["PID*.3.*.2"], + [ + ["A1Sub-Component1", "A2Sub-Component1"], + ["B1Sub-Component1", "B2Sub-Component1"], + ], + ) + self.assertEqual( + msg["PID*.3.*.2.2"], + [ + ["A1Sub-Component2", "A2Sub-Component2"], + ["B1Sub-Component2", "B2Sub-Component2"], + ], + ) + self.assertEqual(msg["PID.4.*"], ["ARepeat1", "ARepeat2"]) + self.assertEqual( + msg["PID*.4.*"], + [["ARepeat1", "ARepeat2"], ["BRepeat1", "BRepeat2", "BRepeat3"]], + ) + def test_MSH_1_field(self): msg = hl7.parse(sample_hl7) f = msg["MSH.1"]