Skip to content

Commit d313c0c

Browse files
committed
Initial commit
0 parents  commit d313c0c

9 files changed

+897
-0
lines changed

README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
#Kendo data query
2+
Does not support multiple sort
3+
Only supports `and` filter conditions

apply.go

+348
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,348 @@
1+
package kendo
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/globalsign/mgo"
7+
"github.com/globalsign/mgo/bson"
8+
)
9+
10+
func (d *DataState) Apply(collection mgo.Collection) (dataResult DataResult) {
11+
if err := d.parse(); err != nil {
12+
return
13+
}
14+
15+
total, err := d.getTotal(collection)
16+
if err != nil {
17+
return
18+
}
19+
20+
aggregate := collection.Pipe(d.GetPipeline())
21+
22+
data := []interface{}{}
23+
err = aggregate.All(&data)
24+
fmt.Println(err)
25+
26+
return DataResult{
27+
Data: data,
28+
Total: total,
29+
}
30+
}
31+
32+
func (d *DataState) GetPipeline() (pipeline []bson.M) {
33+
34+
pipeline = []bson.M{}
35+
36+
if len(d.preprocessing) > 0 {
37+
pipeline = append(pipeline, d.preprocessing...)
38+
}
39+
40+
if len(d.Lookup) > 0 {
41+
pipeline = append(pipeline, d.getLookup()...)
42+
}
43+
44+
//replace _id by id
45+
pipeline = append(pipeline, bson.M{"$addFields": bson.M{
46+
"id": "$_id",
47+
}})
48+
pipeline = append(pipeline, bson.M{"$project": bson.M{
49+
"_id": 0,
50+
}})
51+
52+
if len(d.Filter.Filters) > 0 {
53+
pipeline = append(pipeline, bson.M{"$match": d.getFilter()})
54+
}
55+
56+
if len(d.Group) > 0 {
57+
pipeline = append(pipeline, d.getAggregate()...)
58+
pipeline = append(pipeline, d.getProject())
59+
}
60+
61+
if len(d.Sort) > 0 {
62+
pipeline = append(pipeline, d.getSortFields())
63+
}
64+
65+
if d.PageSize > 0 {
66+
page := d.Page - 1
67+
pipeline = append(pipeline, []bson.M{
68+
bson.M{"$skip": page * d.PageSize},
69+
bson.M{"$limit": d.PageSize},
70+
}...)
71+
}
72+
73+
return
74+
}
75+
76+
func (d *DataState) getTotal(collection mgo.Collection) (total int, err error) {
77+
78+
lookupsMap := map[string]LookupDescriptor{}
79+
for _, lookup := range d.Lookup {
80+
lookupsMap[lookup.As] = lookup
81+
}
82+
83+
// apply lookups only if needed by filter
84+
lookupsToApply := []LookupDescriptor{}
85+
for _, f := range d.Filter.Filters {
86+
if lookup, found := lookupsMap[f.Field]; found {
87+
lookupsToApply = append(lookupsToApply, lookup)
88+
}
89+
}
90+
91+
filter := d.getFilter()
92+
if len(lookupsToApply) > 0 {
93+
pipeline := []bson.M{}
94+
for _, l := range d.Lookup {
95+
pipeline = append(pipeline, bson.M{
96+
"$lookup": bson.M{
97+
"from": l.From,
98+
"localField": l.LocalField,
99+
"foreignField": l.ForeignField,
100+
"as": l.As,
101+
},
102+
})
103+
}
104+
105+
//test
106+
data := []interface{}{}
107+
collection.Pipe(pipeline).All(&data)
108+
} else {
109+
total, err = collection.Find(filter).Count()
110+
}
111+
112+
return
113+
}
114+
115+
func (d *DataState) getLookup() (lookup []bson.M) {
116+
117+
lookup = []bson.M{}
118+
119+
for _, l := range d.Lookup {
120+
lookup = append(lookup, bson.M{
121+
"$lookup": bson.M{
122+
"from": l.From,
123+
"localField": l.LocalField,
124+
"foreignField": l.ForeignField,
125+
"as": l.As,
126+
},
127+
})
128+
if l.Single { // should be single doc instead of array
129+
lookup = append(lookup, bson.M{
130+
"$addFields": bson.M{
131+
l.As: bson.M{
132+
"$ifNull": []interface{}{
133+
bson.M{"$arrayElemAt": []interface{}{fmt.Sprintf("$%s", l.As), 0}},
134+
nil,
135+
},
136+
},
137+
},
138+
})
139+
}
140+
}
141+
142+
return
143+
}
144+
145+
func (d *DataState) getAggregate() (aggregate []bson.M) {
146+
147+
aggregate = []bson.M{}
148+
149+
ids := bson.M{}
150+
for _, group := range d.Group {
151+
key := group.getKey()
152+
ids[key] = fmt.Sprintf("$_id.%s", key)
153+
}
154+
155+
aggregate = append(aggregate, d.getFirstGrouping())
156+
157+
nbGroups := len(d.Group) - 1
158+
for i := nbGroups; i >= 0; i-- {
159+
group := d.Group[i]
160+
key := group.getKey()
161+
162+
sortId := fmt.Sprintf("_id.%s", key)
163+
164+
if (nbGroups) == i {
165+
aggregate = append(aggregate, d.getFirstGrouping())
166+
} else {
167+
previousGroup := d.Group[i+1]
168+
previousField := previousGroup.Field
169+
previousKey := previousGroup.getKey()
170+
var groupKey interface{}
171+
if i == 0 {
172+
sortId = fmt.Sprintf("_id")
173+
groupKey = fmt.Sprintf("$_id.%s", key)
174+
} else {
175+
delete(ids, previousKey)
176+
groupKey = copyM(ids) //map elements are by reference we have to copy
177+
}
178+
aggregate = append(aggregate, d.getGroup(groupKey, previousKey, previousField, i))
179+
}
180+
181+
aggregate = append(aggregate, bson.M{
182+
"$sort": bson.M{
183+
sortId: getSort(group.Dir),
184+
},
185+
})
186+
}
187+
188+
return
189+
}
190+
191+
func (d *DataState) getFirstGrouping() (group bson.M) {
192+
193+
ids := bson.M{}
194+
for _, group := range d.Group {
195+
f := group.Field
196+
key := group.getKey()
197+
ids[key] = fmt.Sprintf("$%s", f)
198+
}
199+
200+
group = bson.M{
201+
"$group": bson.M{
202+
"_id": ids,
203+
"items": bson.M{
204+
"$push": "$$ROOT",
205+
},
206+
},
207+
}
208+
209+
return
210+
}
211+
212+
func (d *DataState) addAggregates(m bson.M, firstlevel bool) bson.M {
213+
214+
aggregates := bson.M{}
215+
216+
for _, a := range d.Aggregates {
217+
aggregateKey := fmt.Sprintf("$%s", d.toMongoAggregate(a.Aggregate))
218+
aggregate := bson.M{
219+
aggregateKey: getAggregateExpression(a, firstlevel),
220+
}
221+
222+
key := a.getKey()
223+
if agg, ok := aggregates[key]; ok {
224+
m, _ := agg.(bson.M)
225+
m[a.Aggregate] = aggregate
226+
aggregates[key] = m
227+
} else {
228+
aggregates[key] = bson.M{
229+
a.Aggregate: aggregate,
230+
}
231+
}
232+
}
233+
234+
if len(aggregates) == 0 {
235+
aggregates["_"] = nil //cannot project an empty object
236+
}
237+
238+
m["aggregates"] = aggregates
239+
240+
return m
241+
}
242+
243+
func getAggregateExpression(a AggregateDescriptor, firstlevel bool) (expression string) {
244+
245+
if firstlevel {
246+
expression = fmt.Sprintf("$items.%s", a.Field)
247+
} else {
248+
expression = fmt.Sprintf("$items.aggregates.%s.%s", a.getKey(), a.Aggregate)
249+
}
250+
251+
return
252+
}
253+
254+
func (d *DataState) toMongoAggregate(s string) (a string) {
255+
256+
switch s {
257+
case "average":
258+
a = "avg"
259+
default:
260+
a = s
261+
}
262+
263+
return
264+
}
265+
266+
func (d *DataState) getProject() (project bson.M) {
267+
firstGroup := d.Group[0]
268+
269+
value := "$_id"
270+
singleGroup := (len(d.Group) == 1)
271+
if singleGroup {
272+
value = fmt.Sprintf("$_id.%s", firstGroup.getKey())
273+
}
274+
project = bson.M{
275+
"$project": d.addAggregates(bson.M{
276+
"_id": 0,
277+
"value": value,
278+
"items": "$items",
279+
"field": firstGroup.Field,
280+
}, singleGroup),
281+
}
282+
283+
return
284+
}
285+
286+
func (d *DataState) getSortFields() (sort bson.M) {
287+
var fields bson.M
288+
for _, s := range d.Sort {
289+
fields = bson.M{
290+
s.Field: 1,
291+
}
292+
293+
if s.Dir == "desc" {
294+
fields[s.Field] = -1
295+
}
296+
}
297+
298+
return bson.M{
299+
"$sort": fields,
300+
}
301+
}
302+
303+
func (d *DataState) getFilter() (filter bson.M) {
304+
filter = bson.M{}
305+
306+
for _, f := range d.Filter.Filters {
307+
f.Filter(filter)
308+
}
309+
310+
return
311+
}
312+
313+
func (d *DataState) getGroup(id interface{}, value string, field string, depth int) (group bson.M) {
314+
isSecondGroup := (len(d.Group) - 2) == depth
315+
group = bson.M{
316+
"$group": bson.M{
317+
"_id": id,
318+
"items": bson.M{
319+
"$push": d.addAggregates(bson.M{
320+
"value": fmt.Sprintf("$_id.%s", value),
321+
"items": "$items",
322+
"field": field,
323+
}, isSecondGroup),
324+
},
325+
},
326+
}
327+
328+
return
329+
}
330+
331+
func copyM(m bson.M) (copy bson.M) {
332+
copy = bson.M{}
333+
for k, v := range m {
334+
copy[k] = v
335+
}
336+
337+
return
338+
}
339+
340+
func getSort(s string) (i int) {
341+
342+
i = 1
343+
if s == "desc" {
344+
i = -1
345+
}
346+
347+
return
348+
}

0 commit comments

Comments
 (0)