-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathkwg_util.py
303 lines (252 loc) · 14.1 KB
/
kwg_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
import statistics
from collections import defaultdict
from PyQt5.QtCore import QVariant
from qgis._core import QgsMessageLog, Qgis, QgsVectorLayer, QgsField
from .kwg_sparqlutil import kwg_sparqlutil
class kwg_util:
def __init__(self):
pass
def get_geometry_type_from_wkt(self, wkt):
if "POINT".lower() in wkt.lower():
return "Point"
elif "MULTIPOINT".lower() in wkt.lower():
return "Point"
elif "LINESTRING".lower() in wkt.lower():
return "LineString"
elif "MULTILINESTRING".lower() in wkt.lower():
return "LineString"
elif "POLYGON".lower() in wkt.lower():
return "Polygon"
elif "MULTIPOLYGON".lower() in wkt.lower():
return "Polygon"
else:
# raise Exception("Unrecognized geometry type: {}".format(wkt))
return "0"
def extractCommonPropertyJSON(self, commonPropertyJSON,
p_url_list=[], p_name_list=[], url_dict={},
p_var="p", plabel_var="pLabel", numofsub_var="NumofSub"):
SPARQLUtil = kwg_sparqlutil()
for jsonItem in commonPropertyJSON:
propertyURL = jsonItem[p_var]["value"]
if propertyURL not in p_url_list:
p_url_list.append(propertyURL)
label = ""
if plabel_var in jsonItem:
label = jsonItem[plabel_var]["value"]
if label.strip() == "":
label = SPARQLUtil.make_prefixed_iri(propertyURL)
propertyName = f"""{label} [{jsonItem[numofsub_var]["value"]}]"""
p_name_list.append(propertyName)
url_dict = dict(zip(p_name_list, p_url_list))
return url_dict
def getPropertyName(self, propertyURL):
# give a URL of property, get the property name (without prefix)
if "#" in propertyURL:
lastIndex = propertyURL.rfind("#")
propertyName = propertyURL[(lastIndex + 1):]
else:
lastIndex = propertyURL.rfind("/")
propertyName = propertyURL[(lastIndex + 1):]
return propertyName
def getFieldNameWithTable(self, propertyName, featureClassName, gpkgLocation='/var/local/QGIS/kwg_results.gpkg'):
# give a property Name which have been sliced by getPropertyName(propertyURL)
# decide whether its lengh is larger than 10
# decide whether it is already in the feature class table
# return the final name of this field, if return -1, that mean the field name has more than 10 times in this table, you just do nothing
# if len(propertyName) > 10:
# propertyName = propertyName[:9]
isfieldNameinTable = self.isFieldNameInTable(propertyName, featureClassName, gpkgLocation)
if isfieldNameinTable == False:
return propertyName
else:
return self.changeFieldNameWithTable(propertyName, featureClassName, gpkgLocation)
def isFieldNameInTable(self, fieldName, featureClassName, gpkgLocation='/var/local/QGIS/kwg_results.gpkg'):
# read layer and check for existing fields
gpkg_places_layer = gpkgLocation + "|layername=%s" % (featureClassName)
vlayer = QgsVectorLayer(gpkg_places_layer, featureClassName, "ogr")
prov = vlayer.dataProvider()
fieldList = [field.name() for field in prov.fields()]
isfieldNameinFieldList = False
for field in fieldList:
if field == fieldName:
isfieldNameinFieldList = True
break
return isfieldNameinFieldList
def changeFieldNameWithTable(self, propertyName, featureClassName, gpkgLocation):
for i in range(1, 10):
propertyName = propertyName[:(len(propertyName) - 1)] + str(i)
isfieldNameinTable = self.isFieldNameInTable(propertyName, featureClassName, gpkgLocation)
if isfieldNameinTable == False:
return propertyName
return -1
def getFieldDataTypeInTable(self, fieldName, featureClassName, gpkgLocation='/var/local/QGIS/kwg_results.gpkg'):
# read layer and check for existing fields
gpkg_places_layer = gpkgLocation + "|layername=%s" % (featureClassName)
vlayer = QgsVectorLayer(gpkg_places_layer, featureClassName, "ogr")
prov = vlayer.dataProvider()
fieldList = [field.name() for field in prov.fields()]
for field in fieldList:
if field.name() == fieldName:
return field.typeName()
return -1
def buildMultiValueDictFromNoFunctionalProperty(self, fieldName, tableName, URLFieldName='wikiURL',
featureClassName="geo_results",
gpkgLocation="/var/local/QGIS/kwg_results.gpkg"):
# build a collections.defaultdict object to store the multivalue for each no-functional property's subject.
# The subject "wikiURL" is the key, the corespnding property value in "fieldName" is the value
if self.isFieldNameInTable(fieldName, tableName):
noFunctionalPropertyDict = defaultdict(list)
# fieldList = arcpy.ListFields(tableName)
gpkg_places_layer = gpkgLocation + "|layername=%s" % (tableName)
vlayer = QgsVectorLayer(gpkg_places_layer, tableName, "ogr")
if not vlayer.isValid():
srows = None
else:
for feature in vlayer.getFeatures():
row = feature.attributes()
foreignKeyValue = row[1]
noFunctionalPropertyValue = row[2]
if noFunctionalPropertyValue is not None:
noFunctionalPropertyDict[foreignKeyValue].append(noFunctionalPropertyValue)
return noFunctionalPropertyDict
else:
return -1
def appendFieldInFeatureClassByMergeRule(self, inputFeatureClassName, noFunctionalPropertyDict, appendFieldName,
relatedTableName, mergeRule, delimiter,
gpkgLocation="/var/local/QGIS/kwg_results.gpkg"):
# append a new field in inputFeatureClassName which will install the merged no-functional property value
# noFunctionalPropertyDict: the collections.defaultdict object which stores the no-functional property value for each URL
# appendFieldName: the field name of no-functional property in the relatedTableName
# mergeRule: the merge rule the user selected, one of ['SUM', 'MIN', 'MAX', 'STDEV', 'MEAN', 'COUNT', 'FIRST', 'LAST']
# delimiter: the optional paramter which define the delimiter of the cancatenate operation
appendFieldType = ''
# readthe layer from the geopackage
gpkg_places_layer = gpkgLocation + "|layername=%s" % (inputFeatureClassName)
vlayer = QgsVectorLayer(gpkg_places_layer, "geo_results", "ogr")
if not vlayer.isValid():
QgsMessageLog.logMessage("Error reading the table",
"kwg_geoenrichment", level=Qgis.Warning)
return 0
# get the field list
prov = vlayer.dataProvider()
fieldList = [field.name() for field in prov.fields()]
for field in fieldList:
if field == appendFieldName:
appendFieldType = field.typeName()
break
mergeRuleField = ''
if mergeRule == 'SUM':
mergeRuleField = 'SUM'
elif mergeRule == 'MIN':
mergeRuleField = 'MIN'
elif mergeRule == 'MAX':
mergeRuleField = 'MAX'
elif mergeRule == 'STD-DEV' or mergeRule == 'STDEV':
mergeRuleField = 'STD'
elif mergeRule == 'MEAN':
mergeRuleField = 'MEN'
elif mergeRule == 'COUNT':
mergeRuleField = 'COUNT'
elif mergeRule == 'FIRST':
mergeRuleField = 'FIRST'
elif mergeRule == 'LAST':
mergeRuleField = 'LAST'
elif mergeRule == 'CONCATENATE':
mergeRuleField = 'CONCAT'
# featureClassAppendFieldName = subFieldName + "_" + mergeRuleField
featureClassAppendFieldName = appendFieldName + "_" + mergeRuleField
newAppendFieldName = self.getFieldNameWithTable(featureClassAppendFieldName, inputFeatureClassName)
if newAppendFieldName != -1:
if mergeRule == 'COUNT':
prov.addAttributes([QgsField(newAppendFieldName, QVariant.Int)])
vlayer.updateFields()
elif mergeRule == 'STDEV' or mergeRule == 'MEAN':
# arcpy.AddField_management(inputFeatureClassName, newAppendFieldName, "DOUBLE")
prov.addAttributes([QgsField(newAppendFieldName, QVariant.Double)])
vlayer.updateFields()
else:
prov.addAttributes([QgsField(newAppendFieldName, QVariant.String)])
vlayer.updateFields()
if self.isFieldNameInTable("place_iri", inputFeatureClassName):
vlayer.startEditing()
for feature in vlayer.getFeatures():
foreignKeyValue = feature["place_iri"]
noFunctionalPropertyValueList = noFunctionalPropertyDict[foreignKeyValue]
if len(noFunctionalPropertyValueList) != 0:
rowValue = ""
if mergeRule in ['STDEV', 'MEAN', 'SUM', 'MIN', 'MAX']:
if appendFieldType in ['Single', 'Double', 'SmallInteger', 'Integer']:
if mergeRule == 'MEAN':
rowValue = statistics.mean(noFunctionalPropertyValueList)
elif mergeRule == 'STDEV':
rowValue = statistics.stdev(noFunctionalPropertyValueList)
elif mergeRule == 'SUM':
rowValue = sum(noFunctionalPropertyValueList)
pass
elif mergeRule == 'MIN':
rowValue = min(noFunctionalPropertyValueList)
elif mergeRule == 'MAX':
rowValue = max(noFunctionalPropertyValueList)
else:
QgsMessageLog.logMessage(
"The {0} data type of Field {1} does not support {2} merge rule".format(
appendFieldType,
appendFieldName,
mergeRule), "kwg_geoenrichment", level=Qgis.Warning)
elif mergeRule in ['COUNT', 'FIRST', 'LAST']:
if mergeRule == 'COUNT':
rowValue = len(noFunctionalPropertyValueList)
elif mergeRule == 'FIRST':
rowValue = noFunctionalPropertyValueList[0]
elif mergeRule == 'LAST':
rowValue = noFunctionalPropertyValueList[len(noFunctionalPropertyValueList) - 1]
elif mergeRule == 'CONCATENATE':
value = ""
if appendFieldType in ['String']:
rowValue = delimiter.join(
sorted(set([val for val in noFunctionalPropertyValueList if not value is None])))
else:
rowValue = delimiter.join(sorted(
set([str(val) for val in noFunctionalPropertyValueList if not value is None])))
feature[newAppendFieldName] = rowValue
vlayer.updateFeature(feature)
vlayer.commitChanges()
return 1
def mergeTripleStoreDicts(self, superTripleStore, childTripleStore):
# superTripleStore and childTripleStore: dict() object with key nameTuple Triple("Triple",["s", "p", "o"])
# add childTripleStore to superTripleStore.
# If S-P-O is in the superTripleStore, update the degree to the smaller one between the original degree in superTripleStore and the one in childTripleStore
# If S-P-O is not in the superTripleStore, add it
for triple in childTripleStore:
if triple not in superTripleStore:
superTripleStore[triple] = childTripleStore[triple]
else:
if superTripleStore[triple] > childTripleStore[triple]:
superTripleStore[triple] = childTripleStore[triple]
return superTripleStore
def directionListFromBoth2OD(self, propertyDirectionList):
# given a list of direction, return a list of lists which change a list with "BOTH" to two list with "ORIGIN" and "DESTINATION"
# e.g. ["BOTH", "ORIGIN", "DESTINATION", "ORIGIN"] -> ["ORIGIN", "ORIGIN", "DESTINATION", "ORIGIN"] and ["DESTINATION", "ORIGIN", "DESTINATION", "ORIGIN"]
# propertyDirectionList: a list of direction from ["BOTH", "ORIGIN", "DESTINATION"], it has at most 4 elements
propertyDirectionExpandedLists = []
propertyDirectionExpandedLists.append(propertyDirectionList)
resultList = []
for currentPropertyDirectionList in propertyDirectionExpandedLists:
i = 0
indexOfBOTH = -1
while i < len(currentPropertyDirectionList):
if currentPropertyDirectionList[i] == "BOTH":
indexOfBOTH = i
break
i = i + 1
if indexOfBOTH != -1:
newList1 = currentPropertyDirectionList[:]
newList1[indexOfBOTH] = "ORIGIN"
propertyDirectionExpandedLists.append(newList1)
newList2 = currentPropertyDirectionList[:]
newList2[indexOfBOTH] = "DESTINATION"
propertyDirectionExpandedLists.append(newList2)
else:
if currentPropertyDirectionList not in resultList:
resultList.append(currentPropertyDirectionList)
return resultList