Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Df dev with cfgURL and datacatalog in release graph #57

Open
wants to merge 5 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.18-df-development
2.0.19-df-development
8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,4 +177,12 @@ nabu object --cfgPath directory --cfgName name objectId
eg use generated
```
nabu object --cfgPath ../gleaner/configs --cfgName local milled/opentopography/ffa0df033bb3a8fc9f600c80df3501fe1a2dbe93.rdf
```

### Using URL based configuration

Nabu can also read the configuration file from over the network

```
go run ../../cmd/nabu/main.go release --cfgURL https://provisium.io/data/nabuconfig.yaml --prefix summoned/dataverse --endpoint localoxi
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nabu release --cfgURL https://provisium.io/data/nabuconfig.yaml --prefix summoned/dataverse --endpoint localoxi

```
11 changes: 11 additions & 0 deletions docs/httpSPARQL.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,15 @@ curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegr

```bash
curl -H 'Accept: application/sparql-results+json' http://coreos.lan:9090/blazegraph/namespace/iow/sparql --data-urlencode 'query=SELECT (COUNT(DISTINCT ?graph) AS ?namedGraphsCount)(COUNT(*) AS ?triplesCount)WHERE {GRAPH ?graph {?subject ?predicate ?object}}'
```


### Oxigraph

```bash
curl -i -X PUT -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store
```

```bash
curl -i -X POST -H 'Content-Type:text/x-nquads' --data-binary @veupathdb_release.nq http://localhost:7878/store
```
6 changes: 0 additions & 6 deletions docs/images/prefix.d2

This file was deleted.

35 changes: 0 additions & 35 deletions docs/images/prefix.svg

This file was deleted.

3 changes: 0 additions & 3 deletions docs/images/prune.d2

This file was deleted.

35 changes: 0 additions & 35 deletions docs/images/prune.svg

This file was deleted.

77 changes: 0 additions & 77 deletions docs/images/workflow.d2

This file was deleted.

102 changes: 0 additions & 102 deletions docs/images/workflow.svg

This file was deleted.

2 changes: 1 addition & 1 deletion internal/graph/toFromRDF.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
log "github.com/sirupsen/logrus"
"strings"

"github.com/knakk/rdf"
Expand Down
85 changes: 65 additions & 20 deletions internal/objects/pipecopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,44 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"github.com/spf13/viper"
"io"
"strings"
"sync"

"github.com/spf13/viper"
"time"

"github.com/gleanerio/nabu/internal/graph"
log "github.com/sirupsen/logrus"

"github.com/minio/minio-go/v7"
)

func getLastElement(s string) string {
parts := strings.Split(s, "/")
return parts[len(parts)-1]
}

// GenerateDateHash generates a unique hash based on the current date and time.
func generateDateHash() string {
// Get the current date and time
now := time.Now()

// Format the date and time as a string
dateString := now.Format("2006-01-02 15:04:05")

// Create a SHA256 hash
hash := sha256.New()
hash.Write([]byte(dateString))

// Convert the hash to a hex string
hashString := hex.EncodeToString(hash.Sum(nil))

return hashString
}

// PipeCopy writes a new object based on an prefix, this function assumes the objects are valid when concatenated
// v1: viper config object
// mc: minio client pointer
Expand All @@ -26,7 +51,8 @@ import (
// destprefix: destination prefix
// sf: boolean to declare if single file or not. If so, skip skolimization since JSON-LD library output is enough
func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefix string) error {
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s", name, bucket, prefix)
orgname := v1.GetString("implementation_network.orgname")
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s org name: %s", name, bucket, prefix, orgname)

pr, pw := io.Pipe() // TeeReader of use?
lwg := sync.WaitGroup{} // work group for the pipe writes...
Expand All @@ -47,10 +73,6 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi
}
}(pw)

// Set and use a "single file flag" to bypass skolimaization since if it is a single file
// the JSON-LD to RDF will correctly map blank nodes.
// NOTE: with a background context we can't get the len(channel) so we have to iterate it.
// This is fast, but it means we have to do the ListObjects twice
clen := 0
sf := false
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -67,25 +89,23 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi

objectCh := mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive})

// for object := range mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}, doneCh) {
lastProcessed := false
idList := make([]string, 0)
for object := range objectCh {
fo, err := mc.GetObject(context.Background(), bucket, object.Key, minio.GetObjectOptions{})
if err != nil {
fmt.Println(err)
continue
}

var b bytes.Buffer
bw := bufio.NewWriter(&b)

_, err = io.Copy(bw, fo)
if err != nil {
log.Println(err)
continue
}

s := string(b.Bytes())

nq := ""
//log.Println("Calling JSONLDtoNQ")
if strings.HasSuffix(object.Key, ".nq") {
nq = s
} else {
Expand All @@ -95,33 +115,58 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi
return
}
}

var snq string

if sf {
snq = nq // just pass through the RDF without trying to Skolemize since we ar a single fil
snq = nq
} else {
snq, err = graph.Skolemization(nq, object.Key)
if err != nil {
return
}
}

// 1) get graph URI
ctx, err := graph.MakeURN(v1, object.Key)
if err != nil {
return
}
// 2) convert NT to NQ
csnq, err := graph.NtToNq(snq, ctx)
if err != nil {
return
}

_, err = pw.Write([]byte(csnq))
if err != nil {
return
}
idList = append(idList, ctx)
lastProcessed = true
}

// Once we are done with the loop, put in the triples to associate all the graphURIs with the org.
if lastProcessed {

data := `<urn:gleaner.io:` + orgname + `:datacatalog> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/DataCatalog> .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/description> "GleanerIO Nabu generated catalog" .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

urn:gleaner.io:ORG:SOURCE:datacatalog

<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dateCreated> "` + time.Now().Format("2006-01-02 15:04:05") + `" .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/provider> <urn:gleaner.io:` + orgname + `:provider> .
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/publisher> <urn:gleaner.io:` + getLastElement(prefix) + `:publisher> .
<urn:gleaner.io:` + orgname + `:provider> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> .
<urn:gleaner.io:` + orgname + `:provider> <https://schema.org/name> "` + orgname + `" .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there is one provider, this is good
urn:gleaner.io:eco:provider

<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> .
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

urn:gleaner.io:ORG:SOURCE:publisher

<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <https://schema.org/name> "` + getLastElement(prefix) + `" .
`

for _, item := range idList {
data += `<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dataset> <` + item + `> .` + "\n"
}

namedgraph := "urn:gleaner.io:" + orgname + ":" + getLastElement(prefix) + ":datacatalog:" + generateDateHash()
sdata, err := graph.NtToNq(data, namedgraph)

// Perform the final write to the pipe here
// ilstr := strings.Join(idList, ",")
_, err = pw.Write([]byte(sdata))
if err != nil {
log.Println(err)
}
}
}()

Expand Down
25 changes: 16 additions & 9 deletions internal/services/bulk/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package bulk
import (
"bytes"
"errors"
"fmt"
"io"
"net/http"
"strings"
Expand Down Expand Up @@ -37,7 +36,7 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)

// check for the required bulk endpoint, no need to move on from here
if spql.URL == "" {
return "", errors.New("The configuration file lacks an endpointBulk entry")
return "", errors.New("configuration file lacks an endpointBulk entry")
}

log.Printf("Object %s:%s for %s with method %s type %s", bucketName, item, ep, md, ct)
Expand All @@ -53,12 +52,13 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
// Review if this graph g should b here since we are loading quads
// I don't think it should b. validate with all the tested triple stores
//bn := strings.Replace(bucketName, ".", ":", -1) // convert to urn : values, buckets with . are not valid IRIs
g, err := graph.MakeURN(v1, item)
//g, err := graph.MakeURN(v1, item)
if err != nil {
log.Error("gets3Bytes %v\n", err)
return "", err // Assume return. since on this error things are not good?
}
url := fmt.Sprintf("%s?graph=%s", ep, g)
//url := fmt.Sprintf("%s?graph=%s", ep, g) // NOTE 11-13-2023 ?graph with nquads fails with Oxigraph
url := ep // testing

// check if JSON-LD and convert to RDF
if strings.Contains(item, ".jsonld") {
Expand All @@ -73,8 +73,16 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
if err != nil {
return "", err
}
req.Header.Set("Content-Type", ct) // needs to be x-nquads for blaze, n-quads for jena and graphdb
req.Header.Set("User-Agent", "EarthCube_DataBot/1.0")

headers := map[string]string{
"Content-Type": ct, // replace value with actual content
"User-Agent": "EarthCube_DataBot/1.0",
// add other headers here
}

for k, v := range headers {
req.Header.Add(k, v)
}

client := &http.Client{}
resp, err := client.Do(req)
Expand All @@ -87,16 +95,15 @@ func BulkLoad(v1 *viper.Viper, mc *minio.Client, bucketName string, item string)
}
}(resp.Body)

log.Println(resp)
body, err := io.ReadAll(resp.Body) // return body if you want to debugg test with it
body, err := io.ReadAll(resp.Body) // return body if you want to debug test with it
if err != nil {
log.Println(string(body))
return string(body), err
}

// report
log.Println(string(body))
log.Printf("success: %s : %d : %s\n", item, len(b), ep)
log.Printf("status: %s : %d : %s\n", item, len(b), ep)

return string(body), err
}
1 change: 1 addition & 0 deletions internal/services/releases/bulkLoader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func BulkRelease(v1 *viper.Viper, mc *minio.Client) error {
return err
}

// TODO Should this be optional / controlled by flag?
// Copy the "latest" graph just made to archive with a date
// This means the graph in latests is a duplicate of the most recently dated version in archive/{provider}
const layout = "2006-01-02-15-04-05"
Expand Down
Loading