diff --git a/README.md b/README.md index 592eaac..f3f7a0e 100644 --- a/README.md +++ b/README.md @@ -1,30 +1,45 @@ -# Sparql input plugin for Embulk +# SPARQL input plugin for Embulk -TODO: Write short description here and build.gradle file. +SPARQL input plugin for Embulk loads records by SPARQL from any endpoint. ## Overview * **Plugin type**: input -* **Resume supported**: yes -* **Cleanup supported**: yes +* **Resume supported**: no +* **Cleanup supported**: no * **Guess supported**: no ## Configuration -- **option1**: description (integer, required) -- **option2**: description (string, default: `"myvalue"`) -- **option3**: description (string, default: `null`) +- **endpoint**: SPARQL endpoint url (string, required) +- **query**: SPARQL query (string, required) +- **columns**: Output columns list (list, required) ## Example ```yaml in: type: sparql - option1: example1 - option2: example2 + endpoint: https://data.e-stat.go.jp/lod/sparql/alldata/query + query: | + PREFIX rdf: + PREFIX rdfs: + PREFIX foaf: + PREFIX dcterms: + SELECT ?publisher ?label ?homepage + WHERE { + ?s rdf:type ; + rdfs:label ?label; + foaf:homepage ?homepage; + dcterms:publisher ?publisher + } + order by ?publisher + columns: + - { name: publisher, type: string } + - { name: label, type: string } + - { name: homepage, type: string } ``` - ## Build ``` diff --git a/build.gradle b/build.gradle index aded6f0..939f746 100644 --- a/build.gradle +++ b/build.gradle @@ -1,16 +1,17 @@ plugins { - id "com.jfrog.bintray" version "1.1" - id "com.github.jruby-gradle.base" version "1.5.0" - id "java" - id "checkstyle" + id "com.jfrog.bintray" version "1.1" + id "com.github.jruby-gradle.base" version "1.5.0" + id "java" + id "checkstyle" } import com.github.jrubygradle.JRubyExec + repositories { - mavenCentral() - jcenter() + mavenCentral() + jcenter() } configurations { - provided + provided } version = "0.1.0" @@ -19,61 +20,70 @@ sourceCompatibility = 1.8 targetCompatibility = 1.8 dependencies { - compile "org.embulk:embulk-core:0.9.23" - provided "org.embulk:embulk-core:0.9.23" - // compile "YOUR_JAR_DEPENDENCY_GROUP:YOUR_JAR_DEPENDENCY_MODULE:YOUR_JAR_DEPENDENCY_VERSION" - testCompile "junit:junit:4.+" + compile "org.embulk:embulk-core:0.9.23" + provided "org.embulk:embulk-core:0.9.23" + + compile "org.apache.jena:jena-arq:3.16.0" + + testCompile "junit:junit:4.+" + testCompile 'org.hamcrest:hamcrest-core:2.2' + testCompile "org.embulk:embulk-core:0.9.23" + testCompile "org.embulk:embulk-test:0.9.23" + testCompile "org.embulk:embulk-deps-buffer:0.9.23" + testCompile "org.embulk:embulk-deps-config:0.9.23" + testCompile 'org.embulk:embulk-standards:0.9.23' + testCompile 'org.apache.jena:jena-fuseki-main:3.16.0' } task classpath(type: Copy, dependsOn: ["jar"]) { - doFirst { file("classpath").deleteDir() } - from (configurations.runtime - configurations.provided + files(jar.archivePath)) - into "classpath" + doFirst { file("classpath").deleteDir() } + from(configurations.runtime - configurations.provided + files(jar.archivePath)) + into "classpath" } clean { delete "classpath" } checkstyle { - configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml") - toolVersion = '6.14.1' + configFile = file("${project.rootDir}/config/checkstyle/checkstyle.xml") + toolVersion = '6.14.1' } checkstyleMain { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") - ignoreFailures = true + configFile = file("${project.rootDir}/config/checkstyle/default.xml") } checkstyleTest { - configFile = file("${project.rootDir}/config/checkstyle/default.xml") - ignoreFailures = true + configFile = file("${project.rootDir}/config/checkstyle/default.xml") + ignoreFailures = true } task checkstyle(type: Checkstyle) { - classpath = sourceSets.main.output + sourceSets.test.output - source = sourceSets.main.allJava + sourceSets.test.allJava + classpath = sourceSets.main.output + sourceSets.test.output + source = sourceSets.main.allJava + sourceSets.test.allJava } task gem(type: JRubyExec, dependsOn: ["gemspec", "classpath"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "build", "${project.name}.gemspec" - doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") } + jrubyArgs "-S" + script "gem" + scriptArgs "build", "${project.name}.gemspec" + doLast { ant.move(file: "${project.name}-${project.version}.gem", todir: "pkg") } } task gemPush(type: JRubyExec, dependsOn: ["gem"]) { - jrubyArgs "-S" - script "gem" - scriptArgs "push", "pkg/${project.name}-${project.version}.gem" + jrubyArgs "-S" + script "gem" + scriptArgs "push", "pkg/${project.name}-${project.version}.gem" } task "package"(dependsOn: ["gemspec", "classpath"]) { - doLast { - println "> Build succeeded." - println "> You can run embulk with '-L ${file(".").absolutePath}' argument." - } + doLast { + println "> Build succeeded." + println "> You can run embulk with '-L ${file(".").absolutePath}' argument." + } } task gemspec { - ext.gemspecFile = file("${project.name}.gemspec") - inputs.file "build.gradle" - outputs.file gemspecFile - doLast { gemspecFile.write($/ + ext.gemspecFile = file("${project.name}.gemspec") + inputs.file "build.gradle" + outputs.file gemspecFile + doLast { + gemspecFile.write($/ Gem::Specification.new do |spec| spec.name = "${project.name}" spec.version = "${project.version}" @@ -82,7 +92,7 @@ Gem::Specification.new do |spec| spec.description = %[Loads records from Sparql.] spec.email = ["takeshi.mikami@gmail.com"] spec.licenses = ["MIT"] - # TODO set this: spec.homepage = "https://github.com/takeshi.mikami/embulk-input-sparql" + spec.homepage = "https://github.com/takemikami/embulk-input-sparql" spec.files = `git ls-files`.split("\n") + Dir["classpath/*.jar"] spec.test_files = spec.files.grep(%r"^(test|spec)/") @@ -93,6 +103,6 @@ Gem::Specification.new do |spec| spec.add_development_dependency 'rake', ['~> 12.0'] end /$) - } + } } clean { delete "${project.name}.gemspec" } diff --git a/src/main/java/org/embulk/input/sparql/SparqlInputPlugin.java b/src/main/java/org/embulk/input/sparql/SparqlInputPlugin.java index 6fc3470..b6041b4 100644 --- a/src/main/java/org/embulk/input/sparql/SparqlInputPlugin.java +++ b/src/main/java/org/embulk/input/sparql/SparqlInputPlugin.java @@ -1,41 +1,42 @@ package org.embulk.input.sparql; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.logging.Logger; -import com.google.common.base.Optional; +import org.apache.jena.query.*; +import org.apache.jena.rdf.model.Literal; import org.embulk.config.Config; -import org.embulk.config.ConfigDefault; import org.embulk.config.ConfigDiff; import org.embulk.config.ConfigSource; import org.embulk.config.Task; import org.embulk.config.TaskReport; import org.embulk.config.TaskSource; -import org.embulk.spi.Exec; -import org.embulk.spi.InputPlugin; -import org.embulk.spi.PageOutput; -import org.embulk.spi.Schema; -import org.embulk.spi.SchemaConfig; +import org.embulk.spi.*; +import org.embulk.spi.type.Type; +import org.embulk.spi.type.Types; +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; public class SparqlInputPlugin implements InputPlugin { + private static final Logger logger = Logger.getLogger(SparqlInputPlugin.class.getName()); + public interface PluginTask extends Task { - // configuration option 1 (required integer) - @Config("option1") - public int getOption1(); - - // configuration option 2 (optional string, null is not allowed) - @Config("option2") - @ConfigDefault("\"myvalue\"") - public String getOption2(); + // endpoint: sparql endpoint (required) + @Config("endpoint") + public String getEndpoint(); - // configuration option 3 (optional string, null is allowed) - @Config("option3") - @ConfigDefault("null") - public Optional getOption3(); + // query: sparql query (required) + @Config("query") + public String getQuery(); // if you get schema from config @Config("columns") @@ -49,7 +50,7 @@ public ConfigDiff transaction(ConfigSource config, PluginTask task = config.loadConfig(PluginTask.class); Schema schema = task.getColumns().toSchema(); - int taskCount = 1; // number of run() method calls + int taskCount = 1; return resume(task.dump(), schema, taskCount, control); } @@ -77,8 +78,57 @@ public TaskReport run(TaskSource taskSource, { PluginTask task = taskSource.loadTask(PluginTask.class); - // Write your code here :) - throw new UnsupportedOperationException("SparqlInputPlugin.run method is not implemented yet"); + BufferAllocator allocator = Exec.getBufferAllocator(); + PageBuilder pageBuilder = new PageBuilder(allocator, schema, output); + + // execute query + QueryExecution qexec = null; + try { + Query query = QueryFactory.create(task.getQuery()); + qexec = QueryExecutionFactory.sparqlService(task.getEndpoint(), query); + ResultSet results = qexec.execSelect(); + + // return columns should be subset of query result columns. + List cols = results.getResultVars(); + Optional ngCol = schema.getColumns().stream() + .filter(col -> !cols.contains(col.getName())) + .findFirst(); + if (ngCol.isPresent()) { + throw new NoSuchElementException(String.format("Sparql query do not return column %s", ngCol.get().getName())); + } + + // result set loop + while (results.hasNext()) { + QuerySolution solution = results.next(); + + for (Column col : schema.getColumns()) { + String colName = col.getName(); + Type colType = col.getType(); + int colIndex = col.getIndex(); + if (colType.equals(Types.LONG)) { + pageBuilder.setLong(colIndex, getLongValue(solution, colName)); + } + else if (colType.equals(Types.DOUBLE)) { + pageBuilder.setDouble(colIndex, getDoubleValue(solution, colName)); + } + else if (colType.equals(Types.TIMESTAMP)) { + pageBuilder.setTimestamp(colIndex, getTimestampValue(solution, colName)); + } + else { + pageBuilder.setString(colIndex, getStringValue(solution, colName)); + } + } + pageBuilder.addRecord(); + } + } + finally { + if (qexec != null) { + qexec.close(); + } + } + + pageBuilder.finish(); + return Exec.newTaskReport(); } @Override @@ -86,4 +136,53 @@ public ConfigDiff guess(ConfigSource config) { return Exec.newConfigDiff(); } + + // parse values by RDFDataTypes + private String getStringValue(QuerySolution solution, String column) + { + if (solution.get(column).isLiteral()) { + Literal literal = solution.getLiteral(column); + return literal.getString(); + } + return solution.get(column).toString(); + } + + private long getLongValue(QuerySolution solution, String column) + { + if (solution.get(column).isLiteral()) { + Literal literal = solution.getLiteral(column); + if (literal.getDatatype().getJavaClass() == BigInteger.class) { + return literal.getInt(); + } + else { + return Long.parseLong(literal.toString()); + } + } + return Long.parseLong(solution.get(column).toString()); + } + + private double getDoubleValue(QuerySolution solution, String column) + { + if (solution.get(column).isLiteral()) { + Literal literal = solution.getLiteral(column); + if (literal.getDatatype().getJavaClass() == BigDecimal.class) { + return literal.getDouble(); + } + else { + return Double.parseDouble(literal.toString()); + } + } + return Double.parseDouble(solution.get(column).toString()); + } + + private org.embulk.spi.time.Timestamp getTimestampValue(QuerySolution solution, String column) + { + String stringValue = getStringValue(solution, column); + DateTimeZone defaultTZ = DateTimeZone.getDefault(); + DateTimeZone.setDefault(DateTimeZone.UTC); + DateTime dt = DateTime.parse(stringValue); + org.embulk.spi.time.Timestamp ts = org.embulk.spi.time.Timestamp.ofEpochMilli(dt.getMillis()); + DateTimeZone.setDefault(defaultTZ); + return ts; + } } diff --git a/src/test/java/org/embulk/input/sparql/TestSparqlInputPlugin.java b/src/test/java/org/embulk/input/sparql/TestSparqlInputPlugin.java index 642d32c..15be8ba 100644 --- a/src/test/java/org/embulk/input/sparql/TestSparqlInputPlugin.java +++ b/src/test/java/org/embulk/input/sparql/TestSparqlInputPlugin.java @@ -1,5 +1,134 @@ package org.embulk.input.sparql; +import org.apache.jena.fuseki.main.FusekiServer; +import org.apache.jena.graph.Factory; +import org.apache.jena.graph.Graph; +import org.apache.jena.query.Dataset; +import org.apache.jena.query.DatasetFactory; +import org.apache.jena.rdf.model.ModelFactory; +import org.apache.jena.riot.RDFParser; +import org.embulk.config.ConfigSource; +import org.embulk.spi.InputPlugin; +import org.embulk.test.EmbulkTests; +import org.embulk.test.TestingEmbulk; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.ServerSocket; +import java.net.URL; +import java.nio.file.Path; +import java.util.logging.Logger; + +import static org.embulk.test.EmbulkTests.readSortedFile; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + public class TestSparqlInputPlugin { + private static final Logger logger = Logger.getLogger(TestSparqlInputPlugin.class.getName()); + + private static final String BASIC_RESOURCE_PATH = "org/embulk/input/sparql/test/expect/"; + + private static ConfigSource loadYamlResource(TestingEmbulk embulk, String fileName) + { + return embulk.loadYamlResource(BASIC_RESOURCE_PATH + fileName); + } + + private static String readResource(String fileName) + { + return EmbulkTests.readResource(BASIC_RESOURCE_PATH + fileName); + } + + @Rule + public TestingEmbulk embulk = TestingEmbulk.builder() + .registerPlugin(InputPlugin.class, "sparql", SparqlInputPlugin.class) + .build(); + + private static FusekiServer fusekiServer = null; + + @BeforeClass + public static void setup() + { + // detect available port + int availablePort = -1; + for (int i = 1025; i < 65535; i++) { + try (ServerSocket socket = new ServerSocket(i, 1, InetAddress.getByName("localhost"))) { + availablePort = socket.getLocalPort(); + break; + } + catch (IOException e) { + // skip, try next port number + } + } + if (availablePort == -1) { + fail("cannot find available port."); + } + + // start fuseki server + URL rootUrl = TestSparqlInputPlugin.class.getClassLoader().getResource(BASIC_RESOURCE_PATH + "unittest.ttl"); + assertNotNull("testValidatorsImpl not found", rootUrl); + String ttlPath = rootUrl.getPath(); + + Graph g = Factory.createGraphMem(); + RDFParser.source(ttlPath).parse(g); + Dataset ds = DatasetFactory.create(ModelFactory.createModelForGraph(g)); + fusekiServer = FusekiServer.create() + .add("/dataset", ds) + .port(availablePort) + .build(); + fusekiServer.start(); + logger.info("start fuseki server for unittest."); + } + + @AfterClass + public static void tearDown() + { + // stop fuseki server + fusekiServer.stop(); + logger.info("stop fuseki server for unittest."); + } + + private ConfigSource baseConfig; + + @Before + public void init() + { + // init config + baseConfig = embulk.newConfig(); + baseConfig.set("type", "sparql"); + baseConfig.set("endpoint", String.format("http://localhost:%d/dataset", fusekiServer.getPort())); + } + + @Test + public void testBasic() + throws Exception + { + Path out1 = embulk.createTempFile("csv"); + embulk.runInput(baseConfig.merge(loadYamlResource(embulk, "test_config.yml")), out1); + assertThat(readSortedFile(out1), is(readResource("test_expected.csv"))); + } + + @Test + public void testAttrs() + throws Exception + { + Path out1 = embulk.createTempFile("csv"); + embulk.runInput(baseConfig.merge(loadYamlResource(embulk, "test_config_attrs.yml")), out1); + assertThat(readSortedFile(out1), is(readResource("test_expected_attrs.csv"))); + } + + @Test(expected = org.embulk.exec.PartialExecutionException.class) + public void testInvalidCols() + throws Exception + { + Path out1 = embulk.createTempFile("csv"); + embulk.runInput(baseConfig.merge(loadYamlResource(embulk, "test_config_invalid_cols.yml")), out1); + } } diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/test_config.yml b/src/test/resources/org/embulk/input/sparql/test/expect/test_config.yml new file mode 100644 index 0000000..511ec18 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/test_config.yml @@ -0,0 +1,10 @@ +query: | + select ?s ?p ?o + where { + ?s ?p ?o. + filter(?s = ) + } +columns: + - { name: s, type: string } + - { name: p, type: string } + - { name: o, type: string } \ No newline at end of file diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/test_config_attrs.yml b/src/test/resources/org/embulk/input/sparql/test/expect/test_config_attrs.yml new file mode 100644 index 0000000..1375242 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/test_config_attrs.yml @@ -0,0 +1,19 @@ +query: | + select ?s ?name ?birth ?position ?height ?url ?knows + where { + ?s ?name ; + ?birth ; + ?position ; + ?height ; + ?url ; + ?knows . + filter(?s = ) + } +columns: + - { name: s, type: string } + - { name: name, type: string } + - { name: birth, type: timestamp } + - { name: position, type: long } + - { name: height, type: double } + - { name: url, type: string } + - { name: knows, type: string } diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/test_config_invalid_cols.yml b/src/test/resources/org/embulk/input/sparql/test/expect/test_config_invalid_cols.yml new file mode 100644 index 0000000..f790639 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/test_config_invalid_cols.yml @@ -0,0 +1,8 @@ +query: | + select ?s ?p ?o + where { + ?s ?p ?o. + filter(?s = ) + } +columns: + - { name: x, type: string } diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/test_expected.csv b/src/test/resources/org/embulk/input/sparql/test/expect/test_expected.csv new file mode 100644 index 0000000..944d8d9 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/test_expected.csv @@ -0,0 +1,7 @@ +http://example.com/something,http://schema.org/birthDate,2000-01-01 +http://example.com/something,http://schema.org/height,170.0 +http://example.com/something,http://schema.org/knows,http://example.com/anotherthing +http://example.com/something,http://schema.org/name,familyName +http://example.com/something,http://schema.org/position,1 +http://example.com/something,http://schema.org/url,http://example.com/something +http://example.com/something,http://www.w3.org/1999/02/22-rdf-syntax-ns#type,http://schema.org/Person diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/test_expected_attrs.csv b/src/test/resources/org/embulk/input/sparql/test/expect/test_expected_attrs.csv new file mode 100644 index 0000000..0e0e7c6 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/test_expected_attrs.csv @@ -0,0 +1 @@ +http://example.com/something,familyName,2000-01-01 00:00:00.000000 +0000,1,170.0,http://example.com/something,http://example.com/anotherthing diff --git a/src/test/resources/org/embulk/input/sparql/test/expect/unittest.ttl b/src/test/resources/org/embulk/input/sparql/test/expect/unittest.ttl new file mode 100644 index 0000000..3fc9155 --- /dev/null +++ b/src/test/resources/org/embulk/input/sparql/test/expect/unittest.ttl @@ -0,0 +1,14 @@ +@base . +@prefix schema: . +@prefix rdf: . +@prefix xsd: . + + a schema:Person ; + schema:name "familyName" ; + schema:url ; + schema:position 1 ; + schema:height 170.0 ; + schema:birthDate "2000-01-01"^^xsd:date ; + schema:knows . + + a schema:Person. \ No newline at end of file