Skip to content

Commit

Permalink
first implement (#2)
Browse files Browse the repository at this point in the history
  • Loading branch information
takemikami authored Oct 15, 2020
1 parent be5fb34 commit cf7253e
Show file tree
Hide file tree
Showing 10 changed files with 383 additions and 71 deletions.
35 changes: 25 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,45 @@
# Sparql input plugin for Embulk
# SPARQL input plugin for Embulk

TODO: Write short description here and build.gradle file.
SPARQL input plugin for Embulk loads records by SPARQL from any endpoint.

## Overview

* **Plugin type**: input
* **Resume supported**: yes
* **Cleanup supported**: yes
* **Resume supported**: no
* **Cleanup supported**: no
* **Guess supported**: no

## Configuration

- **option1**: description (integer, required)
- **option2**: description (string, default: `"myvalue"`)
- **option3**: description (string, default: `null`)
- **endpoint**: SPARQL endpoint url (string, required)
- **query**: SPARQL query (string, required)
- **columns**: Output columns list (list, required)

## Example

```yaml
in:
type: sparql
option1: example1
option2: example2
endpoint: https://data.e-stat.go.jp/lod/sparql/alldata/query
query: |
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX foaf: <http://xmlns.com/foaf/0.1/>
PREFIX dcterms: <http://purl.org/dc/terms/>
SELECT ?publisher ?label ?homepage
WHERE {
?s rdf:type <http://data.e-stat.go.jp/lod/otherSurvey/Concept>;
rdfs:label ?label;
foaf:homepage ?homepage;
dcterms:publisher ?publisher
}
order by ?publisher
columns:
- { name: publisher, type: string }
- { name: label, type: string }
- { name: homepage, type: string }
```
## Build
```
Expand Down
88 changes: 49 additions & 39 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
plugins {
id "com.jfrog.bintray" version "1.1"
id "com.github.jruby-gradle.base" version "1.5.0"
id "java"
id "checkstyle"
id "com.jfrog.bintray" version "1.1"
id "com.github.jruby-gradle.base" version "1.5.0"
id "java"
id "checkstyle"
}
import com.github.jrubygradle.JRubyExec

repositories {
mavenCentral()
jcenter()
mavenCentral()
jcenter()
}
configurations {
provided
provided
}

version = "0.1.0"
Expand All @@ -19,61 +20,70 @@ sourceCompatibility = 1.8
targetCompatibility = 1.8

dependencies {
compile "org.embulk:embulk-core:0.9.23"
provided "org.embulk:embulk-core:0.9.23"
// compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION"
testCompile "junit:junit:4.+"
compile "org.embulk:embulk-core:0.9.23"
provided "org.embulk:embulk-core:0.9.23"

compile "org.apache.jena:jena-arq:3.16.0"

testCompile "junit:junit:4.+"
testCompile 'org.hamcrest:hamcrest-core:2.2'
testCompile "org.embulk:embulk-core:0.9.23"
testCompile "org.embulk:embulk-test:0.9.23"
testCompile "org.embulk:embulk-deps-buffer:0.9.23"
testCompile "org.embulk:embulk-deps-config:0.9.23"
testCompile 'org.embulk:embulk-standards:0.9.23'
testCompile 'org.apache.jena:jena-fuseki-main:3.16.0'
}

task classpath(type: Copy, dependsOn: ["jar"]) {
doFirst { file("classpath").deleteDir() }
from (configurations.runtime - configurations.provided + files(jar.archivePath))
into "classpath"
doFirst { file("classpath").deleteDir() }
from(configurations.runtime - configurations.provided + files(jar.archivePath))
into "classpath"
}
clean { delete "classpath" }

checkstyle {
configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml")
toolVersion = '6.14.1'
configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml")
toolVersion = '6.14.1'
}
checkstyleMain {
configFile = file("${project.rootDir}/config/checkstyle/default.xml")
ignoreFailures = true
configFile = file("${project.rootDir}/config/checkstyle/default.xml")
}
checkstyleTest {
configFile = file("${project.rootDir}/config/checkstyle/default.xml")
ignoreFailures = true
configFile = file("${project.rootDir}/config/checkstyle/default.xml")
ignoreFailures = true
}
task checkstyle(type: Checkstyle) {
classpath = sourceSets.main.output + sourceSets.test.output
source = sourceSets.main.allJava + sourceSets.test.allJava
classpath = sourceSets.main.output + sourceSets.test.output
source = sourceSets.main.allJava + sourceSets.test.allJava
}

task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) {
jrubyArgs "-S"
script "gem"
scriptArgs "build", "${project.name}.gemspec"
doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") }
jrubyArgs "-S"
script "gem"
scriptArgs "build", "${project.name}.gemspec"
doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") }
}

task gemPush(type: JRubyExec, dependsOn: ["gem"]) {
jrubyArgs "-S"
script "gem"
scriptArgs "push", "pkg/${project.name}-${project.version}.gem"
jrubyArgs "-S"
script "gem"
scriptArgs "push", "pkg/${project.name}-${project.version}.gem"
}

task "package"(dependsOn: ["gemspec", "classpath"]) {
doLast {
println "> Build succeeded."
println "> You can run embulk with '-L ${file(".").absolutePath}' argument."
}
doLast {
println "> Build succeeded."
println "> You can run embulk with '-L ${file(".").absolutePath}' argument."
}
}

task gemspec {
ext.gemspecFile = file("${project.name}.gemspec")
inputs.file "build.gradle"
outputs.file gemspecFile
doLast { gemspecFile.write($/
ext.gemspecFile = file("${project.name}.gemspec")
inputs.file "build.gradle"
outputs.file gemspecFile
doLast {
gemspecFile.write($/
Gem::Specification.new do |spec|
spec.name = "${project.name}"
spec.version = "${project.version}"
Expand All @@ -82,7 +92,7 @@ Gem::Specification.new do |spec|
spec.description = %[Loads records from Sparql.]
spec.email = ["[email protected]"]
spec.licenses = ["MIT"]
# TODO set this: spec.homepage = "https://github.com/takeshi.mikami/embulk-input-sparql"
spec.homepage = "https://github.com/takemikami/embulk-input-sparql"

spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"]
spec.test_files = spec.files.grep(%r"^(test|spec)/")
Expand All @@ -93,6 +103,6 @@ Gem::Specification.new do |spec|
spec.add_development_dependency 'rake', ['~> 12.0']
end
/$)
}
}
}
clean { delete "${project.name}.gemspec" }
143 changes: 121 additions & 22 deletions src/main/java/org/embulk/input/sparql/SparqlInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
package org.embulk.input.sparql;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.logging.Logger;

import com.google.common.base.Optional;
import org.apache.jena.query.*;

import org.apache.jena.rdf.model.Literal;
import org.embulk.config.Config;
import org.embulk.config.ConfigDefault;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.Task;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.InputPlugin;
import org.embulk.spi.PageOutput;
import org.embulk.spi.Schema;
import org.embulk.spi.SchemaConfig;
import org.embulk.spi.*;
import org.embulk.spi.type.Type;
import org.embulk.spi.type.Types;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;

public class SparqlInputPlugin
implements InputPlugin
{
private static final Logger logger = Logger.getLogger(SparqlInputPlugin.class.getName());

public interface PluginTask
extends Task
{
// configuration option 1 (required integer)
@Config("option1")
public int getOption1();

// configuration option 2 (optional string, null is not allowed)
@Config("option2")
@ConfigDefault("\"myvalue\"")
public String getOption2();
// endpoint: sparql endpoint (required)
@Config("endpoint")
public String getEndpoint();

// configuration option 3 (optional string, null is allowed)
@Config("option3")
@ConfigDefault("null")
public Optional<String> getOption3();
// query: sparql query (required)
@Config("query")
public String getQuery();

// if you get schema from config
@Config("columns")
Expand All @@ -49,7 +50,7 @@ public ConfigDiff transaction(ConfigSource config,
PluginTask task = config.loadConfig(PluginTask.class);

Schema schema = task.getColumns().toSchema();
int taskCount = 1; // number of run() method calls
int taskCount = 1;

return resume(task.dump(), schema, taskCount, control);
}
Expand Down Expand Up @@ -77,13 +78,111 @@ public TaskReport run(TaskSource taskSource,
{
PluginTask task = taskSource.loadTask(PluginTask.class);

// Write your code here :)
throw new UnsupportedOperationException("SparqlInputPlugin.run method is not implemented yet");
BufferAllocator allocator = Exec.getBufferAllocator();
PageBuilder pageBuilder = new PageBuilder(allocator, schema, output);

// execute query
QueryExecution qexec = null;
try {
Query query = QueryFactory.create(task.getQuery());
qexec = QueryExecutionFactory.sparqlService(task.getEndpoint(), query);
ResultSet results = qexec.execSelect();

// return columns should be subset of query result columns.
List<String> cols = results.getResultVars();
Optional<Column> ngCol = schema.getColumns().stream()
.filter(col -> !cols.contains(col.getName()))
.findFirst();
if (ngCol.isPresent()) {
throw new NoSuchElementException(String.format("Sparql query do not return column %s", ngCol.get().getName()));
}

// result set loop
while (results.hasNext()) {
QuerySolution solution = results.next();

for (Column col : schema.getColumns()) {
String colName = col.getName();
Type colType = col.getType();
int colIndex = col.getIndex();
if (colType.equals(Types.LONG)) {
pageBuilder.setLong(colIndex, getLongValue(solution, colName));
}
else if (colType.equals(Types.DOUBLE)) {
pageBuilder.setDouble(colIndex, getDoubleValue(solution, colName));
}
else if (colType.equals(Types.TIMESTAMP)) {
pageBuilder.setTimestamp(colIndex, getTimestampValue(solution, colName));
}
else {
pageBuilder.setString(colIndex, getStringValue(solution, colName));
}
}
pageBuilder.addRecord();
}
}
finally {
if (qexec != null) {
qexec.close();
}
}

pageBuilder.finish();
return Exec.newTaskReport();
}

@Override
public ConfigDiff guess(ConfigSource config)
{
return Exec.newConfigDiff();
}

// parse values by RDFDataTypes
private String getStringValue(QuerySolution solution, String column)
{
if (solution.get(column).isLiteral()) {
Literal literal = solution.getLiteral(column);
return literal.getString();
}
return solution.get(column).toString();
}

private long getLongValue(QuerySolution solution, String column)
{
if (solution.get(column).isLiteral()) {
Literal literal = solution.getLiteral(column);
if (literal.getDatatype().getJavaClass() == BigInteger.class) {
return literal.getInt();
}
else {
return Long.parseLong(literal.toString());
}
}
return Long.parseLong(solution.get(column).toString());
}

private double getDoubleValue(QuerySolution solution, String column)
{
if (solution.get(column).isLiteral()) {
Literal literal = solution.getLiteral(column);
if (literal.getDatatype().getJavaClass() == BigDecimal.class) {
return literal.getDouble();
}
else {
return Double.parseDouble(literal.toString());
}
}
return Double.parseDouble(solution.get(column).toString());
}

private org.embulk.spi.time.Timestamp getTimestampValue(QuerySolution solution, String column)
{
String stringValue = getStringValue(solution, column);
DateTimeZone defaultTZ = DateTimeZone.getDefault();
DateTimeZone.setDefault(DateTimeZone.UTC);
DateTime dt = DateTime.parse(stringValue);
org.embulk.spi.time.Timestamp ts = org.embulk.spi.time.Timestamp.ofEpochMilli(dt.getMillis());
DateTimeZone.setDefault(defaultTZ);
return ts;
}
}
Loading

0 comments on commit cf7253e

Please sign in to comment.