diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java index 462a19cd7..d4a94557a 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java @@ -29,6 +29,7 @@ import cz.o2.proxima.direct.core.randomaccess.RandomOffset; import cz.o2.proxima.direct.io.cassandra.CassandraDBAccessor.ClusterHolder; import cz.o2.proxima.direct.io.cassandra.CqlFactory.KvIterable; +import cz.o2.proxima.direct.io.cassandra.Offsets.Raw; import java.nio.ByteBuffer; import java.util.Objects; import java.util.Optional; @@ -68,7 +69,7 @@ public Optional> get( byte[] rowValue = val.array(); try { - return Optional.of( + return Optional.ofNullable( accessor .getCqlFactory() .toKeyValue( @@ -133,23 +134,27 @@ public void scanWildcard( byte[] rowValue = val.array(); // by convention String name = wildcard.toAttributePrefix() + accessor.asString(attribute); + @Nullable + KeyValue keyValue = + accessor + .getCqlFactory() + .toKeyValue( + getEntityDescriptor(), + wildcard, + key, + name, + System.currentTimeMillis(), + new Raw(name), + rowValue); - Optional parsed = wildcard.getValueSerializer().deserialize(rowValue); - - if (parsed.isPresent()) { - consumer.accept( - accessor - .getCqlFactory() - .toKeyValue( - getEntityDescriptor(), - wildcard, - key, - name, - System.currentTimeMillis(), - new Offsets.Raw(name), - rowValue)); + if (keyValue != null) { + consumer.accept(keyValue); } else { - log.error("Failed to parse value for key {} attribute {}.{}", key, wildcard, attribute); + log.error( + "Failed to parse value for key {} attribute {} using class {}", + key, + name, + wildcard.getValueSerializer().getClass()); } } } diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CqlFactory.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CqlFactory.java index 72e2a5f52..468c0fe13 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CqlFactory.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CqlFactory.java @@ -130,7 +130,7 @@ Statement scanPartition( List> attributes, CassandraPartition partition, Session session); /** Convert the byte[] stored in the database into {@link KeyValue}. */ - KeyValue toKeyValue( + @Nullable KeyValue toKeyValue( EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor, String key, diff --git a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java index cbb2d6642..cb0e7f888 100644 --- a/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java +++ b/direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactory.java @@ -25,14 +25,15 @@ import cz.o2.proxima.core.repository.AttributeDescriptor; import cz.o2.proxima.core.repository.EntityDescriptor; import cz.o2.proxima.core.storage.StreamElement; -import cz.o2.proxima.core.util.ExceptionUtils; import cz.o2.proxima.direct.core.randomaccess.KeyValue; import cz.o2.proxima.direct.core.randomaccess.RandomOffset; import cz.o2.proxima.io.serialization.proto.Serialization; import cz.o2.proxima.io.serialization.proto.Serialization.Cell; import cz.o2.proxima.io.serialization.shaded.com.google.protobuf.ByteString; +import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; @@ -60,7 +61,7 @@ public class DefaultCqlFactory extends CacheableCqlFactory { interface Serializer extends Serializable { byte[] asCellBytes(StreamElement element); - KeyValue fromCellBytes( + @Nullable KeyValue fromCellBytes( EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor, String key, @@ -78,7 +79,7 @@ public byte[] asCellBytes(StreamElement element) { } @Override - public KeyValue fromCellBytes( + public @Nullable KeyValue fromCellBytes( EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor, String key, @@ -104,7 +105,7 @@ public byte[] asCellBytes(StreamElement element) { } @Override - public KeyValue fromCellBytes( + public @Nullable KeyValue fromCellBytes( EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor, String key, @@ -113,26 +114,36 @@ public KeyValue fromCellBytes( RandomOffset offset, byte[] serializedValue) { - Cell cell = ExceptionUtils.uncheckedFactory(() -> Cell.parseFrom(serializedValue)); - if (cell.getSeqId() > 0) { + try { + Cell cell = Cell.parseFrom(serializedValue); + if (cell.getSeqId() > 0) { + return KeyValue.of( + entityDescriptor, + attributeDescriptor, + cell.getSeqId(), + key, + attribute, + offset, + cell.getValue().toByteArray(), + stamp); + } return KeyValue.of( entityDescriptor, attributeDescriptor, - cell.getSeqId(), key, attribute, offset, cell.getValue().toByteArray(), stamp); + } catch (IOException ex) { + log.warn( + "Failed to parse cell from bytes {} in key {}, entity {}, attribute {}", + Arrays.toString(serializedValue), + key, + entityDescriptor, + attribute); } - return KeyValue.of( - entityDescriptor, - attributeDescriptor, - key, - attribute, - offset, - cell.getValue().toByteArray(), - stamp); + return null; } } @@ -403,7 +414,7 @@ public Statement scanPartition( } @Override - public KeyValue toKeyValue( + public @Nullable KeyValue toKeyValue( EntityDescriptor entityDescriptor, AttributeDescriptor attributeDescriptor, String key, diff --git a/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java b/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java index ea9a1097a..48c26629d 100644 --- a/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java +++ b/direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/DefaultCqlFactoryTest.java @@ -549,4 +549,37 @@ public void testV2SerializerIngestWildcard() { "INSERT INTO my_table (hgw, device, my_col) VALUES (?, ?, ?) USING TIMESTAMP ?", preparedStatement.get(0)); } + + @Test + public void testV2SerializerRead() { + factory.setup( + entity, + URI.create("cassandra://whatever/my_table?primary=hgw&data=my_col&serializer=v2"), + StringConverter.getDefault()); + long now = System.currentTimeMillis(); + KeyValue kv = + factory.toKeyValue( + entity, + attrWildcard, + "key", + attrWildcard.toAttributePrefix(true) + "1", + now, + Offsets.empty(), + Cell.newBuilder() + .setSeqId(1L) + .setValue(ByteString.copyFrom(new byte[] {1})) + .build() + .toByteArray()); + assertNotNull(kv); + kv = + factory.toKeyValue( + entity, + attrWildcard, + "key", + attrWildcard.toAttributePrefix(true) + "1", + System.currentTimeMillis(), + Offsets.empty(), + new byte[] {(byte) 199, 0}); + assertNull(kv); + } }