Skip to content

Commit

Permalink
WCM-285: write channels column
Browse files Browse the repository at this point in the history
  • Loading branch information
stollero committed Sep 13, 2024
1 parent 0e6a462 commit 34f71e6
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 5 deletions.
19 changes: 18 additions & 1 deletion core/src/zeit/connector/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Uuid,
)
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import declared_attr, mapped_column, relationship
from sqlalchemy.orm import declared_attr, mapped_column, relationship, validates
import pytz
import sqlalchemy

Expand Down Expand Up @@ -42,6 +42,20 @@ def table_args(tablename):
channels = mapped_column(ARRAY(Unicode), nullable=True)
subchannels = mapped_column(ARRAY(Unicode), nullable=True)

@validates('channels')
def validate_channels(self, _, value):
"""channels are separated by ; and subchannels by whitespace"""
channels = []
subchannels = []
if value:
elements = [i.split() for i in value.split(';')]
for element in elements:
channels.extend(element[:1])
if len(element) > 1:
subchannels.extend(element[1:])
self.subchannels = subchannels
return channels


class DevelopmentCommonMetadata:
access = mapped_column(Unicode, index=True, info={'namespace': 'document', 'name': 'access'})
Expand Down Expand Up @@ -180,6 +194,9 @@ def from_webdav(self, props):
if value is not self:
setattr(self, column.name, value)

if FEATURE_TOGGLES.find('write_metadata_column_channels'):
self.channels = props.get(('channels', self.NS + 'document'))

unsorted = collections.defaultdict(dict)
for (k, ns), v in props.items():
if v is DeleteProperty:
Expand Down
76 changes: 72 additions & 4 deletions core/src/zeit/connector/tests/test_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
import transaction

from zeit.cms.content.sources import FEATURE_TOGGLES
from zeit.cms.interfaces import DOCUMENT_SCHEMA_NS
from zeit.cms.repository.interfaces import ConflictError
from zeit.connector.interfaces import INTERNAL_PROPERTY
from zeit.connector.models import Content, Lock
from zeit.connector.models import Lock
from zeit.connector.postgresql import _unlock_overdue_locks
from zeit.connector.resource import Resource, WriteableCachedResource
from zeit.connector.search import SearchVar
Expand Down Expand Up @@ -334,15 +335,15 @@ class PropertiesColumnTest(zeit.connector.testing.SQLTest):
def test_properties_can_be_stored_in_separate_columns(self):
FEATURE_TOGGLES.set('write_metadata_columns', True)
FEATURE_TOGGLES.set('read_metadata_columns', True)
res = self.add_resource('foo', properties={('access', Content.NS + 'document'): 'foo'})
self.assertEqual('foo', res.properties[('access', Content.NS + 'document')])
res = self.add_resource('foo', properties={('access', DOCUMENT_SCHEMA_NS): 'foo'})
self.assertEqual('foo', res.properties[('access', DOCUMENT_SCHEMA_NS)])
content = self.connector._get_content(res.id)
self.assertEqual('foo', content.access)

def test_search_looks_in_columns_or_unsorted_depending_on_toggle(self):
FEATURE_TOGGLES.set('write_metadata_columns', True)

res = self.add_resource('foo', properties={('access', Content.NS + 'document'): 'foo'})
res = self.add_resource('foo', properties={('access', DOCUMENT_SCHEMA_NS): 'foo'})
access = SearchVar('access', 'http://namespaces.zeit.de/CMS/document')
for toggle in [False, True]: # XXX parametrize would be nice
FEATURE_TOGGLES.set('read_metadata_columns', toggle)
Expand All @@ -352,3 +353,70 @@ def test_search_looks_in_columns_or_unsorted_depending_on_toggle(self):
result = self.connector.search([access], access == 'foo')
unique_id, uuid = next(result)
self.assertEqual(res.id, unique_id)


class ChannelsColumnTest(zeit.connector.testing.SQLTest):
layer = zeit.connector.testing.SQL_CONTENT_LAYER

def setUp(self):
super().setUp()
FEATURE_TOGGLES.set('write_metadata_column_channels', True)

def _make_resource(self, channels):
res = self.add_resource('foo', properties={('channels', DOCUMENT_SCHEMA_NS): channels})
self.assertEqual(channels, res.properties[('channels', DOCUMENT_SCHEMA_NS)])
return self.connector._get_content(res.id)

def assert_channels(self, content, expected_channels, expected_subchannels):
self.assertEqual(expected_channels, content.channels)
self.assertEqual(expected_subchannels, content.subchannels)

def test_modify_channels(self):
content = self._make_resource('channel1;channel2 sub1 sub2')
self.assert_channels(content, ['channel1', 'channel2'], ['sub1', 'sub2'])
self.connector.changeProperties(
content.uniqueid, {('channels', DOCUMENT_SCHEMA_NS): 'channel2'}
)
content = self.connector._get_content(content.uniqueid)
self.assert_channels(content, ['channel2'], [])

def test_empty_input(self):
content = self._make_resource('')
self.assert_channels(content, [], [])

def test_single_channel(self):
content = self._make_resource('channel1')
self.assert_channels(content, ['channel1'], [])

def test_multiple_channels(self):
content = self._make_resource('channel1;channel2;channel3')
self.assert_channels(content, ['channel1', 'channel2', 'channel3'], [])

def test_single_channel_with_subchannels(self):
content = self._make_resource('channel1 sub1 sub2')
self.assert_channels(content, ['channel1'], ['sub1', 'sub2'])

def test_multiple_channels_with_subchannels(self):
content = self._make_resource('channel1 sub1;channel2 sub2 sub3;channel3 sub4')
self.assert_channels(
content, ['channel1', 'channel2', 'channel3'], ['sub1', 'sub2', 'sub3', 'sub4']
)

def test_whitespace_handling(self):
channels = ' channel1 sub1 ; channel2 sub2 sub3 ; channel3 sub4 '
content = self._make_resource(channels)
self.assert_channels(
content, ['channel1', 'channel2', 'channel3'], ['sub1', 'sub2', 'sub3', 'sub4']
)

def test_trailing_semicolon(self):
content = self._make_resource('channel1;channel2;')
self.assert_channels(content, ['channel1', 'channel2'], [])

def test_leading_semicolon(self):
content = self._make_resource(';channel1;channel2')
self.assert_channels(content, ['channel1', 'channel2'], [])

def test_multiple_semicolons(self):
content = self._make_resource('channel1;;channel2;;;channel3')
self.assert_channels(content, ['channel1', 'channel2', 'channel3'], [])

0 comments on commit 34f71e6

Please sign in to comment.