-
Notifications
You must be signed in to change notification settings - Fork 980
View the Generated Code for a UDF
As we have discussed, Drill's UDF mechanism is just Drill's internal built-in function mechanism with a new name. The mechanism copies the source of our UDF into code that Drill generates. To help us learn the mechanism, or to track down issues, it can be very helpful to examine the generated code.
The generated code is written to the drillbit.log
file when the proper logging level is set:
- Logger:
org.apache.drill.exec.compile.JaninoClassCompiler
- Level:
DEBUG
The code is written on each node where it is generated (which typically is all nodes, for a fully parallelized query.) The code is written during execution time (not plan time.)
To enable the above logging in your Drill server, locate the $DRILL_HOME/conf/logback.xml
file. (Or, if you are using a site directory, look in $SITE_DIR/logback.xml
.)
Add the following to the end of the file, before the </configuration>
line:
<logger name="org.apache.drill.exec.compile.JaninoClassCompiler" additivity="false">
<level value="debug" />
<appender-ref ref="FILE" />
</logger>
You can also see the source file when using the debug framework discussed previously. Just add the following to your test:
@Test
public void example() throws Exception {
LogFixtureBuilder logBuilder = new LogFixtureBuilder()
.logger("org.apache.drill.exec.compile.JaninoClassCompiler", Level.DEBUG)
;
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
.configProperty("drill.classpath.scanning.cache.enabled", false);
try (LogFixture logFixture = logBuilder.build();
ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
// Your test goes here
}
}
The following is an example of the code for the project operator for the following query:
SELECT a, b + c AS d FROM cp.`example.json`
On this input file:
{a: 1, b: 2, c: 3}
The generated source (somewhat simplified) is below. Notice the inlined add function on line 45. That's where the body of our UDF would go.
1:
2: package org.apache.drill.exec.test.generated;
3:
4: import org.apache.drill.exec.exception.SchemaChangeException;
5: import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
6: import org.apache.drill.exec.ops.FragmentContext;
7: import org.apache.drill.exec.record.RecordBatch;
8: import org.apache.drill.exec.vector.NullableBigIntVector;
9:
10: public class ProjectorGen0 {
11:
12: NullableBigIntVector vv0;
13: NullableBigIntVector vv4;
14: NullableBigIntVector vv9;
15:
16: public void doEval(int inIndex, int outIndex)
17: throws SchemaChangeException
18: {
19: {
20: NullableBigIntHolder out3 = new NullableBigIntHolder();
21: {
22: out3 .isSet = vv0 .getAccessor().isSet((inIndex));
23: if (out3 .isSet == 1) {
24: out3 .value = vv0 .getAccessor().get((inIndex));
25: }
26: }
27: NullableBigIntHolder out7 = new NullableBigIntHolder();
28: {
29: out7 .isSet = vv4 .getAccessor().isSet((inIndex));
30: if (out7 .isSet == 1) {
31: out7 .value = vv4 .getAccessor().get((inIndex));
32: }
33: }
34: //---- start of eval portion of add function. ----//
35: NullableBigIntHolder out8 = new NullableBigIntHolder();
36: {
37: if ((out3 .isSet*out7 .isSet) == 0) {
38: out8 .isSet = 0;
39: } else {
40: final NullableBigIntHolder out = new NullableBigIntHolder();
41: NullableBigIntHolder in1 = out3;
42: NullableBigIntHolder in2 = out7;
43:
44: AddFunctions$BigIntBigIntAdd_eval: {
45: out.value = (long) (in1.value + in2.value);
46: }
47:
48: out.isSet = 1;
49: out8 = out;
50: out.isSet = 1;
51: }
52: }
53: //---- end of eval portion of add function. ----//
54: if (!(out8 .isSet == 0)) {
55: vv9 .getMutator().set((outIndex), out8 .isSet, out8 .value);
56: }
57: }
58: }
59:
60: public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing)
61: throws SchemaChangeException
62: {
63: {
64: int[] fieldIds1 = new int[ 1 ] ;
65: fieldIds1 [ 0 ] = 1;
66: Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, fieldIds1).getValueVector();
...
70: vv0 = ((NullableBigIntVector) tmp2);
71: int[] fieldIds5 = new int[ 1 ] ;
72: fieldIds5 [ 0 ] = 2;
73: Object tmp6 = (incoming).getValueAccessorById(NullableBigIntVector.class, fieldIds5).getValueVector();
...
77: vv4 = ((NullableBigIntVector) tmp6);
78: /** start SETUP for function add **/
79: {
80: {}
81: }
82: /** end SETUP for function add **/
83: int[] fieldIds10 = new int[ 1 ] ;
84: fieldIds10 [ 0 ] = 1;
85: Object tmp11 = (outgoing).getValueAccessorById(NullableBigIntVector.class, fieldIds10).getValueVector();
...
89: vv9 = ((NullableBigIntVector) tmp11);
90: }
91: }
...
98: }
Aggregate UDFs are used in the aggregate operator: StreamingAggregate
or HashAggregate
. The following is an abbreviated example of the code generated for the example average aggregate discussed in the Aggregate UDFs section:
1:
2: package org.apache.drill.exec.test.generated;
3:
...
15:
16: public class StreamingAggregatorGen5 {
17:
...
22: Float8Holder work18;
23: IntHolder work19;
24: NullableFloat8Vector[] vv20;
25: Float8Vector vv25;
...
31:
32: public void addRecord(int index)
33: throws SchemaChangeException
34: {
35: {
36: NullableFloat8Holder out23 = new NullableFloat8Holder();
...
43: NullableFloat8Holder input = out23;
44: Float8Holder sum = work18;
45: IntHolder count = work19;
46:
// Inlined aggregate function add() method
47: WeightedAverageImpl$MyAvgFunc_add: {
48: if (input.isSet == 1) {
49: count.value++;
50: sum.value += input.value;
51: }
52: }
53:
54: work18 = sum;
55: work19 = count;
56: }
57: }
...
259: public void outputRecordValues(int outIndex)
260: throws SchemaChangeException
261: {
262: {
263: Float8Holder out24;
264: {
265: final Float8Holder output = new Float8Holder();
266: Float8Holder sum = work18;
267: IntHolder count = work19;
268:
// Inlined aggregate function output() method
269: WeightedAverageImpl$MyAvgFunc_output: {
270: if (count.value == 0) {
271: output.value = 0;
272: } else
273: {
274: output.value = sum.value / count.value;
275: }
276: }
277:
278: work18 = sum;
279: work19 = count;
280: out24 = output;
281: }
282: vv25 .getMutator().setSafe((outIndex), out24 .value);
283: }
284: }
285:
286: public boolean resetValues()
287: throws SchemaChangeException
288: {
289: {
290: /** start RESET for function myAvg **/
291: {
292: Float8Holder sum = work18;
293: IntHolder count = work19;
294:
// Inlined aggregate function reset() method
295: WeightedAverageImpl$MyAvgFunc_reset: {
296: sum.value = 0;
297: count.value = 0;
298: }
299:
300: work18 = sum;
301: work19 = count;
302: }
303: /** end RESET for function myAvg **/
304: }
305: {
306: return true;
307: }
308: }
309:
310: public void setupInterior(RecordBatch incoming, RecordBatch outgoing)
311: throws SchemaChangeException
312: {
...
346: {
347: /** start SETUP for function myAvg **/
348: {
349: Float8Holder sum = work18;
350: IntHolder count = work19;
// Inlined (empty) aggregate function setup() method
351: {}
352: work18 = sum;
353: work19 = count;
354: }
355: /** end SETUP for function myAvg **/
...
396: }
397: }
398:
399: public void __DRILL_INIT__()
400: throws SchemaChangeException
401: {
402: {
// The generated code creates our working fields here.
403: work18 = new Float8Holder();
404: work19 = new IntHolder();
405: }
406: }
407:
408: }
The generated class also handles key comparisons but the above omits that code for brevity.
The Aggregate UDFs section discusses a weighted average example and discussed how the Hash Aggregate maintains multiple aggregations in parallel by keeping intermediate values in value vectors. Below is the code generated for that example that shows that mechanism in action:
2: package org.apache.drill.exec.test.generated;
...
14: public class HashAggregatorGen0 {
...
43: private final class BatchHolder {
44:
// Holder and vector for our sum workspace
45: Float8Holder work0;
46: Float8Vector vv1;
// Holder and vector for our totalWeights workspace
47: Float8Holder work4;
48: Float8Vector vv5;
49: NullableFloat8Vector vv8;
50: NullableBigIntVector vv12;
51: Float8Vector vv18;
52:
53: public void outputRecordValues(int htRowIdx, int outRowIdx)
54: throws SchemaChangeException
55: {
56: {
57: Float8Holder out17;
58: {
// Copy our workspace values out of vectors
59: final Float8Holder output = new Float8Holder();
60: work0 .value = vv1 .getAccessor().get((htRowIdx));
61: Float8Holder sum = work0;
62: work4 .value = vv5 .getAccessor().get((htRowIdx));
63: Float8Holder totalWeights = work4;
64:
// Inlined aggregate output() method
65: WeightedAverageImpl$WeightedAvgFunc_output: {
66: if (totalWeights.value == 0) {
67: output.value = 0;
68: } else
69: {
70: output.value = sum.value / totalWeights.value;
71: }
72: }
73:
// Write workspace values back into vectors
74: work0 = sum;
75: vv1 .getMutator().set((htRowIdx), work0 .value);
76: work4 = totalWeights;
77: vv5 .getMutator().set((htRowIdx), work4 .value);
78: out17 = output;
79: }
80: vv18 .getMutator().setSafe((outRowIdx), out17 .value);
81: }
82: }
83:
84: public void setupInterior(RecordBatch incoming, RecordBatch outgoing, VectorContainer aggrValuesContainer)
85: throws SchemaChangeException
86: {
...
// Loop over the set of internal groups that correspond to
// hash bins. Create an aggregate (that is, workspace vector)
// for each group.
105: work0 = new Float8Holder();
106: work4 = new Float8Holder();
107: for (int drill_internal_i = 0; (drill_internal_i<vectorSize); drill_internal_i += 1) {
108: {
109: /** start SETUP for function wtAvg **/
110: {
111: work0 .value = vv1 .getAccessor().get(drill_internal_i);
112: Float8Holder sum = work0;
113: work4 .value = vv5 .getAccessor().get(drill_internal_i);
114: Float8Holder totalWeights = work4;
115: {}
116: work0 = sum;
117: vv1 .getMutator().set(drill_internal_i, work0 .value);
118: work4 = totalWeights;
119: vv5 .getMutator().set(drill_internal_i, work4 .value);
120: }
121: /** end SETUP for function wtAvg **/
122: }
123: }
...
151: }
152:
153: public void updateAggrValuesInternal(int incomingRowIdx, int htRowIdx)
154: throws SchemaChangeException
155: {
156: {
// Read the nullable (JSON) values into holders in
// preparation for invoking the aggregate add() method
157: NullableFloat8Holder out11 = new NullableFloat8Holder();
158: {
159: out11 .isSet = vv8 .getAccessor().isSet((incomingRowIdx));
160: if (out11 .isSet == 1) {
161: out11 .value = vv8 .getAccessor().get((incomingRowIdx));
162: }
163: }
164: NullableBigIntHolder out15 = new NullableBigIntHolder();
165: {
166: out15 .isSet = vv12 .getAccessor().isSet((incomingRowIdx));
167: if (out15 .isSet == 1) {
168: out15 .value = vv12 .getAccessor().get((incomingRowIdx));
169: }
170: }
171: //---- start of eval portion of castFLOAT8 function. ----//
172: NullableFloat8Holder out16 = new NullableFloat8Holder();
173: {
174: if (out15 .isSet == 0) {
175: out16 .isSet = 0;
176: } else {
177: final NullableFloat8Holder out = new NullableFloat8Holder();
178: NullableBigIntHolder in = out15;
179:
...
// Get the intermediate values out of vectors
192: work0 .value = vv1 .getAccessor().get((htRowIdx));
193: Float8Holder sum = work0;
194: work4 .value = vv5 .getAccessor().get((htRowIdx));
195: Float8Holder totalWeights = work4;
196:
// Inlined copy of the aggregates add() method
197: WeightedAverageImpl$WeightedAvgFunc_add: {
198: if (input.isSet == 1 && weight.isSet == 1) {
199: totalWeights.value += weight.value;
200: sum.value += input.value * weight.value;
201: }
202: }
203:
// Write intermediate values back out to vectors
204: work0 = sum;
205: vv1 .getMutator().set((htRowIdx), work0 .value);
206: work4 = totalWeights;
207: vv5 .getMutator().set((htRowIdx), work4 .value);
208: }
209: }
...