From 34f71e6447502a1106b0f6e5dcfd2f6feb65fddd Mon Sep 17 00:00:00 2001 From: Martin Stolle Date: Fri, 13 Sep 2024 14:47:11 +0200 Subject: [PATCH] WCM-285: write channels column --- core/src/zeit/connector/models.py | 19 ++++- .../zeit/connector/tests/test_postgresql.py | 76 ++++++++++++++++++- 2 files changed, 90 insertions(+), 5 deletions(-) diff --git a/core/src/zeit/connector/models.py b/core/src/zeit/connector/models.py index aeec87eafe..f4cc1ef8fa 100644 --- a/core/src/zeit/connector/models.py +++ b/core/src/zeit/connector/models.py @@ -14,7 +14,7 @@ Uuid, ) from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.orm import declared_attr, mapped_column, relationship +from sqlalchemy.orm import declared_attr, mapped_column, relationship, validates import pytz import sqlalchemy @@ -42,6 +42,20 @@ def table_args(tablename): channels = mapped_column(ARRAY(Unicode), nullable=True) subchannels = mapped_column(ARRAY(Unicode), nullable=True) + @validates('channels') + def validate_channels(self, _, value): + """channels are separated by ; and subchannels by whitespace""" + channels = [] + subchannels = [] + if value: + elements = [i.split() for i in value.split(';')] + for element in elements: + channels.extend(element[:1]) + if len(element) > 1: + subchannels.extend(element[1:]) + self.subchannels = subchannels + return channels + class DevelopmentCommonMetadata: access = mapped_column(Unicode, index=True, info={'namespace': 'document', 'name': 'access'}) @@ -180,6 +194,9 @@ def from_webdav(self, props): if value is not self: setattr(self, column.name, value) + if FEATURE_TOGGLES.find('write_metadata_column_channels'): + self.channels = props.get(('channels', self.NS + 'document')) + unsorted = collections.defaultdict(dict) for (k, ns), v in props.items(): if v is DeleteProperty: diff --git a/core/src/zeit/connector/tests/test_postgresql.py b/core/src/zeit/connector/tests/test_postgresql.py index bb3f7da7c6..bc5a90756e 100644 --- a/core/src/zeit/connector/tests/test_postgresql.py +++ b/core/src/zeit/connector/tests/test_postgresql.py @@ -10,9 +10,10 @@ import transaction from zeit.cms.content.sources import FEATURE_TOGGLES +from zeit.cms.interfaces import DOCUMENT_SCHEMA_NS from zeit.cms.repository.interfaces import ConflictError from zeit.connector.interfaces import INTERNAL_PROPERTY -from zeit.connector.models import Content, Lock +from zeit.connector.models import Lock from zeit.connector.postgresql import _unlock_overdue_locks from zeit.connector.resource import Resource, WriteableCachedResource from zeit.connector.search import SearchVar @@ -334,15 +335,15 @@ class PropertiesColumnTest(zeit.connector.testing.SQLTest): def test_properties_can_be_stored_in_separate_columns(self): FEATURE_TOGGLES.set('write_metadata_columns', True) FEATURE_TOGGLES.set('read_metadata_columns', True) - res = self.add_resource('foo', properties={('access', Content.NS + 'document'): 'foo'}) - self.assertEqual('foo', res.properties[('access', Content.NS + 'document')]) + res = self.add_resource('foo', properties={('access', DOCUMENT_SCHEMA_NS): 'foo'}) + self.assertEqual('foo', res.properties[('access', DOCUMENT_SCHEMA_NS)]) content = self.connector._get_content(res.id) self.assertEqual('foo', content.access) def test_search_looks_in_columns_or_unsorted_depending_on_toggle(self): FEATURE_TOGGLES.set('write_metadata_columns', True) - res = self.add_resource('foo', properties={('access', Content.NS + 'document'): 'foo'}) + res = self.add_resource('foo', properties={('access', DOCUMENT_SCHEMA_NS): 'foo'}) access = SearchVar('access', 'http://namespaces.zeit.de/CMS/document') for toggle in [False, True]: # XXX parametrize would be nice FEATURE_TOGGLES.set('read_metadata_columns', toggle) @@ -352,3 +353,70 @@ def test_search_looks_in_columns_or_unsorted_depending_on_toggle(self): result = self.connector.search([access], access == 'foo') unique_id, uuid = next(result) self.assertEqual(res.id, unique_id) + + +class ChannelsColumnTest(zeit.connector.testing.SQLTest): + layer = zeit.connector.testing.SQL_CONTENT_LAYER + + def setUp(self): + super().setUp() + FEATURE_TOGGLES.set('write_metadata_column_channels', True) + + def _make_resource(self, channels): + res = self.add_resource('foo', properties={('channels', DOCUMENT_SCHEMA_NS): channels}) + self.assertEqual(channels, res.properties[('channels', DOCUMENT_SCHEMA_NS)]) + return self.connector._get_content(res.id) + + def assert_channels(self, content, expected_channels, expected_subchannels): + self.assertEqual(expected_channels, content.channels) + self.assertEqual(expected_subchannels, content.subchannels) + + def test_modify_channels(self): + content = self._make_resource('channel1;channel2 sub1 sub2') + self.assert_channels(content, ['channel1', 'channel2'], ['sub1', 'sub2']) + self.connector.changeProperties( + content.uniqueid, {('channels', DOCUMENT_SCHEMA_NS): 'channel2'} + ) + content = self.connector._get_content(content.uniqueid) + self.assert_channels(content, ['channel2'], []) + + def test_empty_input(self): + content = self._make_resource('') + self.assert_channels(content, [], []) + + def test_single_channel(self): + content = self._make_resource('channel1') + self.assert_channels(content, ['channel1'], []) + + def test_multiple_channels(self): + content = self._make_resource('channel1;channel2;channel3') + self.assert_channels(content, ['channel1', 'channel2', 'channel3'], []) + + def test_single_channel_with_subchannels(self): + content = self._make_resource('channel1 sub1 sub2') + self.assert_channels(content, ['channel1'], ['sub1', 'sub2']) + + def test_multiple_channels_with_subchannels(self): + content = self._make_resource('channel1 sub1;channel2 sub2 sub3;channel3 sub4') + self.assert_channels( + content, ['channel1', 'channel2', 'channel3'], ['sub1', 'sub2', 'sub3', 'sub4'] + ) + + def test_whitespace_handling(self): + channels = ' channel1 sub1 ; channel2 sub2 sub3 ; channel3 sub4 ' + content = self._make_resource(channels) + self.assert_channels( + content, ['channel1', 'channel2', 'channel3'], ['sub1', 'sub2', 'sub3', 'sub4'] + ) + + def test_trailing_semicolon(self): + content = self._make_resource('channel1;channel2;') + self.assert_channels(content, ['channel1', 'channel2'], []) + + def test_leading_semicolon(self): + content = self._make_resource(';channel1;channel2') + self.assert_channels(content, ['channel1', 'channel2'], []) + + def test_multiple_semicolons(self): + content = self._make_resource('channel1;;channel2;;;channel3') + self.assert_channels(content, ['channel1', 'channel2', 'channel3'], [])