-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Df dev with cfgURL and datacatalog in release graph #57
base: dev
Are you sure you want to change the base?
Changes from all commits
59c259b
98053e1
22099ff
c897b6f
e34ad75
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
2.0.18-df-development | ||
2.0.19-df-development |
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,19 +4,44 @@ import ( | |
"bufio" | ||
"bytes" | ||
"context" | ||
"crypto/sha256" | ||
"encoding/hex" | ||
"fmt" | ||
"github.com/spf13/viper" | ||
"io" | ||
"strings" | ||
"sync" | ||
|
||
"github.com/spf13/viper" | ||
"time" | ||
|
||
"github.com/gleanerio/nabu/internal/graph" | ||
log "github.com/sirupsen/logrus" | ||
|
||
"github.com/minio/minio-go/v7" | ||
) | ||
|
||
func getLastElement(s string) string { | ||
parts := strings.Split(s, "/") | ||
return parts[len(parts)-1] | ||
} | ||
|
||
// GenerateDateHash generates a unique hash based on the current date and time. | ||
func generateDateHash() string { | ||
// Get the current date and time | ||
now := time.Now() | ||
|
||
// Format the date and time as a string | ||
dateString := now.Format("2006-01-02 15:04:05") | ||
|
||
// Create a SHA256 hash | ||
hash := sha256.New() | ||
hash.Write([]byte(dateString)) | ||
|
||
// Convert the hash to a hex string | ||
hashString := hex.EncodeToString(hash.Sum(nil)) | ||
|
||
return hashString | ||
} | ||
|
||
// PipeCopy writes a new object based on an prefix, this function assumes the objects are valid when concatenated | ||
// v1: viper config object | ||
// mc: minio client pointer | ||
|
@@ -26,7 +51,8 @@ import ( | |
// destprefix: destination prefix | ||
// sf: boolean to declare if single file or not. If so, skip skolimization since JSON-LD library output is enough | ||
func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefix string) error { | ||
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s", name, bucket, prefix) | ||
orgname := v1.GetString("implementation_network.orgname") | ||
log.Printf("PipeCopy with name: %s bucket: %s prefix: %s org name: %s", name, bucket, prefix, orgname) | ||
|
||
pr, pw := io.Pipe() // TeeReader of use? | ||
lwg := sync.WaitGroup{} // work group for the pipe writes... | ||
|
@@ -47,10 +73,6 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi | |
} | ||
}(pw) | ||
|
||
// Set and use a "single file flag" to bypass skolimaization since if it is a single file | ||
// the JSON-LD to RDF will correctly map blank nodes. | ||
// NOTE: with a background context we can't get the len(channel) so we have to iterate it. | ||
// This is fast, but it means we have to do the ListObjects twice | ||
clen := 0 | ||
sf := false | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
|
@@ -67,25 +89,23 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi | |
|
||
objectCh := mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}) | ||
|
||
// for object := range mc.ListObjects(context.Background(), bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: isRecursive}, doneCh) { | ||
lastProcessed := false | ||
idList := make([]string, 0) | ||
for object := range objectCh { | ||
fo, err := mc.GetObject(context.Background(), bucket, object.Key, minio.GetObjectOptions{}) | ||
if err != nil { | ||
fmt.Println(err) | ||
continue | ||
} | ||
|
||
var b bytes.Buffer | ||
bw := bufio.NewWriter(&b) | ||
|
||
_, err = io.Copy(bw, fo) | ||
if err != nil { | ||
log.Println(err) | ||
continue | ||
} | ||
|
||
s := string(b.Bytes()) | ||
|
||
nq := "" | ||
//log.Println("Calling JSONLDtoNQ") | ||
if strings.HasSuffix(object.Key, ".nq") { | ||
nq = s | ||
} else { | ||
|
@@ -95,33 +115,58 @@ func PipeCopy(v1 *viper.Viper, mc *minio.Client, name, bucket, prefix, destprefi | |
return | ||
} | ||
} | ||
|
||
var snq string | ||
|
||
if sf { | ||
snq = nq // just pass through the RDF without trying to Skolemize since we ar a single fil | ||
snq = nq | ||
} else { | ||
snq, err = graph.Skolemization(nq, object.Key) | ||
if err != nil { | ||
return | ||
} | ||
} | ||
|
||
// 1) get graph URI | ||
ctx, err := graph.MakeURN(v1, object.Key) | ||
if err != nil { | ||
return | ||
} | ||
// 2) convert NT to NQ | ||
csnq, err := graph.NtToNq(snq, ctx) | ||
if err != nil { | ||
return | ||
} | ||
|
||
_, err = pw.Write([]byte(csnq)) | ||
if err != nil { | ||
return | ||
} | ||
idList = append(idList, ctx) | ||
lastProcessed = true | ||
} | ||
|
||
// Once we are done with the loop, put in the triples to associate all the graphURIs with the org. | ||
if lastProcessed { | ||
|
||
data := `<urn:gleaner.io:` + orgname + `:datacatalog> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/DataCatalog> . | ||
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/description> "GleanerIO Nabu generated catalog" . | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. urn:gleaner.io:ORG:SOURCE:datacatalog |
||
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dateCreated> "` + time.Now().Format("2006-01-02 15:04:05") + `" . | ||
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/provider> <urn:gleaner.io:` + orgname + `:provider> . | ||
<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/publisher> <urn:gleaner.io:` + getLastElement(prefix) + `:publisher> . | ||
<urn:gleaner.io:` + orgname + `:provider> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> . | ||
<urn:gleaner.io:` + orgname + `:provider> <https://schema.org/name> "` + orgname + `" . | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since there is one provider, this is good |
||
<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <http://www.w3.org/1999/02/22-rdf-syntax-ns#type> <https://schema.org/Organization> . | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. urn:gleaner.io:ORG:SOURCE:publisher |
||
<urn:gleaner.io:` + getLastElement(prefix) + `:publisher> <https://schema.org/name> "` + getLastElement(prefix) + `" . | ||
` | ||
|
||
for _, item := range idList { | ||
data += `<urn:gleaner.io:` + orgname + `:datacatalog> <https://schema.org/dataset> <` + item + `> .` + "\n" | ||
} | ||
|
||
namedgraph := "urn:gleaner.io:" + orgname + ":" + getLastElement(prefix) + ":datacatalog:" + generateDateHash() | ||
sdata, err := graph.NtToNq(data, namedgraph) | ||
|
||
// Perform the final write to the pipe here | ||
// ilstr := strings.Join(idList, ",") | ||
_, err = pw.Write([]byte(sdata)) | ||
if err != nil { | ||
log.Println(err) | ||
} | ||
} | ||
}() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nabu release --cfgURL https://provisium.io/data/nabuconfig.yaml --prefix summoned/dataverse --endpoint localoxi