Skip to content

Commit

Permalink
WIP - online writer and reader
Browse files Browse the repository at this point in the history
  • Loading branch information
LesTR committed Jan 13, 2020
1 parent c672859 commit 36a4055
Show file tree
Hide file tree
Showing 15 changed files with 721 additions and 228 deletions.
48 changes: 41 additions & 7 deletions direct/io-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,40 @@
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven.shade.version}</version>
<configuration>
<artifactSet>
<excludes>
<!-- exclude proxima and core dependencies -->
<exclude>cz.o2.proxima:proxima-core</exclude>
<exclude>cz.o2.proxima:proxima-direct-core</exclude>
</excludes>
</artifactSet>
<relocations>
<relocation>
<pattern>com.google.common.</pattern>
<shadedPattern>cz.o2.proxima.cassandra.shaded.com.google.common.</shadedPattern>
</relocation>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>

<dependency>
Expand All @@ -50,6 +84,13 @@
<version>3.4.1</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand All @@ -72,13 +113,6 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>cz.o2.proxima</groupId>
<artifactId>proxima-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.jdbc;

import cz.o2.proxima.repository.AttributeDescriptor;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;

public interface Converter<T> extends Serializable {
default void setup() {}

String getKeyFromResult(ResultSet result) throws SQLException;

Object attributeValue(ResultSet resultSet, AttributeDescriptor<T> attr) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,33 @@
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import lombok.extern.slf4j.Slf4j;

import cz.o2.proxima.util.Classpath;
import java.net.URI;
import java.sql.SQLException;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JdbcDataAccessor extends AbstractStorage implements DataAccessor {
static final String JDBC_URI_STORAGE_PREFIX = "jdbc://";
static final String JDBC_DRIVER_CFG = "driverClassName";
static final String JDBC_USERNAME_CFG = "username";
static final String JDBC_PASSWORD_CFG = "password";
static final String JDBC_SQL_QUERY_FACTORY = "sqlQueryfactory";
static final String JDBC_RESULT_CONVERTER = "converter";

private final Map<String, Object> cfg;
private final String jdbcUri;

private final EntityDescriptor entityDescriptor;
private final URI uri;

private final SqlStatementFactory sqlStatementFactory;

@Getter private final Converter resultConverter;

private transient HikariDataSource dataSource;

protected JdbcDataAccessor(EntityDescriptor entityDesc, URI uri, Map<String, Object> cfg) {
Expand All @@ -51,6 +59,38 @@ protected JdbcDataAccessor(EntityDescriptor entityDesc, URI uri, Map<String, Obj
this.entityDescriptor = entityDesc;
this.uri = uri;

if (!cfg.containsKey(JDBC_SQL_QUERY_FACTORY)) {
log.error("Missing configuration param {}.", JDBC_URI_STORAGE_PREFIX);
throw new IllegalStateException(
String.format("Missing configuration param %s", JDBC_SQL_QUERY_FACTORY));
} else {
log.info("Using '{}' as SqlStatementFactory.", cfg.get(JDBC_SQL_QUERY_FACTORY));
sqlStatementFactory =
Classpath.newInstance(
cfg.get(JDBC_SQL_QUERY_FACTORY).toString(), SqlStatementFactory.class);
try {
sqlStatementFactory.setup(entityDesc, uri, this.getDataSource());
} catch (SQLException e) {
log.error(
"Unable to setup {} from class {}.",
JDBC_SQL_QUERY_FACTORY,
cfg.get(JDBC_SQL_QUERY_FACTORY),
e);
throw new IllegalStateException(e.getMessage(), e);
}
}

if (!cfg.containsKey(JDBC_RESULT_CONVERTER)) {
log.error("Missing configuration param {}.", JDBC_RESULT_CONVERTER);
throw new IllegalStateException(
String.format("Missing configuration param %s", JDBC_RESULT_CONVERTER));
} else {
log.info("Using '{}' as SqlStatementFactory.", cfg.get(JDBC_RESULT_CONVERTER));
resultConverter =
Classpath.newInstance(cfg.get(JDBC_RESULT_CONVERTER).toString(), Converter.class);
resultConverter.setup();
}

/* @TODO
for (Map.Entry<String, Object> entry : cfg.entrySet()) {
log.debug("Setting property {} to value {}.", entry.getKey(), entry.getValue());
Expand All @@ -70,11 +110,11 @@ public Optional<RandomAccessReader> getRandomAccessReader(Context context) {
}

AttributeWriterBase newWriter() {
return new JdbcOnlineAttributeWriter(this, entityDescriptor, uri);
return new JdbcOnlineAttributeWriter(this, this.sqlStatementFactory, entityDescriptor, uri);
}

RandomAccessReader newRandomAccessReader() {
return new JdbcOnlineAttributeReader(this, entityDescriptor, uri);
return new JdbcOnlineAttributeReader(this, this.sqlStatementFactory, entityDescriptor, uri);
}

HikariDataSource getDataSource() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
/**
* Copyright 2017-2020 O2 Czech Republic, a.s.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package cz.o2.proxima.direct.jdbc;

import com.zaxxer.hikari.HikariDataSource;
import cz.o2.proxima.direct.randomaccess.KeyValue;
import cz.o2.proxima.direct.randomaccess.RandomAccessReader;
import cz.o2.proxima.direct.randomaccess.RandomOffset;
Expand All @@ -9,25 +23,29 @@
import cz.o2.proxima.repository.EntityDescriptor;
import cz.o2.proxima.storage.AbstractStorage;
import cz.o2.proxima.util.Pair;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nullable;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Optional;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class JdbcOnlineAttributeReader extends AbstractStorage implements RandomAccessReader {

private final JdbcDataAccessor accessor;
private final SqlStatementFactory sqlStatementFactory;

public JdbcOnlineAttributeReader(
JdbcDataAccessor accessor, EntityDescriptor entityDesc, URI uri) {
JdbcDataAccessor accessor,
SqlStatementFactory sqlStatementFactory,
EntityDescriptor entityDesc,
URI uri) {
super(entityDesc, uri);
this.accessor = accessor;
this.sqlStatementFactory = sqlStatementFactory;
}

@Override
Expand All @@ -38,29 +56,26 @@ public RandomOffset fetchOffset(Listing type, String key) {
@Override
public <T> Optional<KeyValue<T>> get(
String key, String attribute, AttributeDescriptor<T> desc, long stamp) {
HikariDataSource dataSource = accessor.getDataSource();
try {
try (PreparedStatement stmt =
dataSource.getConnection().prepareStatement("SELECT id, attribute FROM DUMMYTABLE where id = ? LIMIT 1")) {
stmt.setString(1, key);
log.debug("Execute statement {}", stmt);
try (ResultSet resultSet = stmt.executeQuery()) {
resultSet.next();//TODO
return Optional.of(
KeyValue.of(
getEntityDescriptor(),
desc,
key,
desc.getName(),
new Offsets.Raw(key),
resultSet.getString("attribute").getBytes()));
}
try (PreparedStatement statement =
sqlStatementFactory.get(accessor.getDataSource(), desc, key);
ResultSet result = statement.executeQuery()) {
log.debug("Executed statement {}", statement);
if (!result.next()) {
return Optional.empty();
} else {
return Optional.of(
KeyValue.<T>of(
getEntityDescriptor(),
desc,
key,
desc.getName(),
new Offsets.Raw(accessor.getResultConverter().getKeyFromResult(result)),
result.getString(desc.getName()).getBytes()));
}
} catch (SQLException e) {
e.printStackTrace();
log.error("Error during query execution: {}", e.getMessage(), e);
return Optional.empty();
}

return Optional.empty();
}

@Override
Expand All @@ -69,7 +84,10 @@ public void scanWildcardAll(
@Nullable RandomOffset offset,
long stamp,
int limit,
Consumer<KeyValue<?>> consumer) {}
Consumer<KeyValue<?>> consumer) {

throw new UnsupportedOperationException("Not implemented");
}

@Override
public <T> void scanWildcard(
Expand All @@ -78,11 +96,28 @@ public <T> void scanWildcard(
@Nullable RandomOffset offset,
long stamp,
int limit,
Consumer<KeyValue<T>> consumer) {}
Consumer<KeyValue<T>> consumer) {
throw new UnsupportedOperationException("Not implemented");
}

@Override
public void listEntities(
@Nullable RandomOffset offset, int limit, Consumer<Pair<RandomOffset, String>> consumer) {}
@Nullable RandomOffset offset, int limit, Consumer<Pair<RandomOffset, String>> consumer) {
try (PreparedStatement statement =
sqlStatementFactory.list(accessor.getDataSource(), offset, limit);
ResultSet resultSet = statement.executeQuery()) {
log.debug("Executed statement {}", statement);
while (resultSet.next()) {
String key = accessor.getResultConverter().getKeyFromResult(resultSet);
consumer.accept(Pair.of(new Offsets.Raw(key), key));
}
} catch (SQLException e) {
log.error(
"Error during query execution: {}.",
e.getMessage(),
e); // @TODO: what to do in this case?
}
}

@Override
public void close() throws IOException {}
Expand Down
Loading

0 comments on commit 36a4055

Please sign in to comment.