-
Notifications
You must be signed in to change notification settings - Fork 980
View the Generated Code for a UDF
As we have discussed, Drill's UDF mechanism is just Drill's internal built-in function mechanism with a new name. The mechanism copies the source of our UDF into code that Drill generates. To help us learn the mechanism, or to track down issues, it can be very helpful to examine the generated code.
The generated code is written to the drillbit.log
file when the proper logging level is set:
- Logger:
org.apache.drill.exec.compile.JaninoClassCompiler
- Level:
DEBUG
To enable the above logging in your Drill server, locate the $DRILL_HOME/conf/logback.xml
file. (Or, if you are using a site directory, look in $SITE_DIR/logback.xml
.)
Add the following to the end of the file, before the </configuration>
line:
<logger name="org.apache.drill.exec.compile.JaninoClassCompiler" additivity="false">
<level value="debug" />
<appender-ref ref="FILE" />
</logger>
You can also see the source file when using the debug framework discussed previously. Just add the following to your test:
@Test
public void example() throws Exception {
LogFixtureBuilder logBuilder = new LogFixtureBuilder()
.logger("org.apache.drill.exec.compile.JaninoClassCompiler", Level.DEBUG)
;
ClusterFixtureBuilder builder = ClusterFixture.builder(dirTestWatcher)
.configProperty("drill.classpath.scanning.cache.enabled", false);
try (LogFixture logFixture = logBuilder.build();
ClusterFixture cluster = builder.build();
ClientFixture client = cluster.clientFixture()) {
// Your test goes here
}
}
The following is an example of the code for the project operator for the following query:
SELECT a, b + c AS d FROM cp.`example.json`
On this input file:
{a: 1, b: 2, c: 3}
The generated source (somewhat simplified) is below. Notice the inlined add function on line 45. That's where the body of our UDF would go.
1:
2: package org.apache.drill.exec.test.generated;
3:
4: import org.apache.drill.exec.exception.SchemaChangeException;
5: import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
6: import org.apache.drill.exec.ops.FragmentContext;
7: import org.apache.drill.exec.record.RecordBatch;
8: import org.apache.drill.exec.vector.NullableBigIntVector;
9:
10: public class ProjectorGen0 {
11:
12: NullableBigIntVector vv0;
13: NullableBigIntVector vv4;
14: NullableBigIntVector vv9;
15:
16: public void doEval(int inIndex, int outIndex)
17: throws SchemaChangeException
18: {
19: {
20: NullableBigIntHolder out3 = new NullableBigIntHolder();
21: {
22: out3 .isSet = vv0 .getAccessor().isSet((inIndex));
23: if (out3 .isSet == 1) {
24: out3 .value = vv0 .getAccessor().get((inIndex));
25: }
26: }
27: NullableBigIntHolder out7 = new NullableBigIntHolder();
28: {
29: out7 .isSet = vv4 .getAccessor().isSet((inIndex));
30: if (out7 .isSet == 1) {
31: out7 .value = vv4 .getAccessor().get((inIndex));
32: }
33: }
34: //---- start of eval portion of add function. ----//
35: NullableBigIntHolder out8 = new NullableBigIntHolder();
36: {
37: if ((out3 .isSet*out7 .isSet) == 0) {
38: out8 .isSet = 0;
39: } else {
40: final NullableBigIntHolder out = new NullableBigIntHolder();
41: NullableBigIntHolder in1 = out3;
42: NullableBigIntHolder in2 = out7;
43:
44: AddFunctions$BigIntBigIntAdd_eval: {
45: out.value = (long) (in1.value + in2.value);
46: }
47:
48: out.isSet = 1;
49: out8 = out;
50: out.isSet = 1;
51: }
52: }
53: //---- end of eval portion of add function. ----//
54: if (!(out8 .isSet == 0)) {
55: vv9 .getMutator().set((outIndex), out8 .isSet, out8 .value);
56: }
57: }
58: }
59:
60: public void doSetup(FragmentContext context, RecordBatch incoming, RecordBatch outgoing)
61: throws SchemaChangeException
62: {
63: {
64: int[] fieldIds1 = new int[ 1 ] ;
65: fieldIds1 [ 0 ] = 1;
66: Object tmp2 = (incoming).getValueAccessorById(NullableBigIntVector.class, fieldIds1).getValueVector();
...
70: vv0 = ((NullableBigIntVector) tmp2);
71: int[] fieldIds5 = new int[ 1 ] ;
72: fieldIds5 [ 0 ] = 2;
73: Object tmp6 = (incoming).getValueAccessorById(NullableBigIntVector.class, fieldIds5).getValueVector();
...
77: vv4 = ((NullableBigIntVector) tmp6);
78: /** start SETUP for function add **/
79: {
80: {}
81: }
82: /** end SETUP for function add **/
83: int[] fieldIds10 = new int[ 1 ] ;
84: fieldIds10 [ 0 ] = 1;
85: Object tmp11 = (outgoing).getValueAccessorById(NullableBigIntVector.class, fieldIds10).getValueVector();
...
89: vv9 = ((NullableBigIntVector) tmp11);
90: }
91: }
...
98: }
Aggregate UDFs are used in the aggregate operator: StreamingAggregate
or HashAggregate
. The following is an abbreviated example of the code generated for the example average aggregate discussed in the Aggregate UDFs section:
1:
2: package org.apache.drill.exec.test.generated;
3:
...
15:
16: public class StreamingAggregatorGen5 {
17:
18: NullableVarCharVector[] vv0;
19: NullableVarCharVector[] vv4;
20: NullableVarCharVector[] vv9;
21: NullableVarCharVector[] vv13;
22: Float8Holder work18;
23: IntHolder work19;
24: NullableFloat8Vector[] vv20;
25: Float8Vector vv25;
26: NullableVarCharVector[] vv28;
27: NullableVarCharVector vv32;
28: NullableVarCharVector[] vv35;
29: NullableVarCharVector vv39;
30: SelectionVector4 sv4_42;
31:
32: public void addRecord(int index)
33: throws SchemaChangeException
34: {
35: {
36: NullableFloat8Holder out23 = new NullableFloat8Holder();
37: {
38: out23 .isSet = vv20 [((index)>>> 16)].getAccessor().isSet(((index)& 65535));
39: if (out23 .isSet == 1) {
40: out23 .value = vv20 [((index)>>> 16)].getAccessor().get(((index)& 65535));
41: }
42: }
43: NullableFloat8Holder input = out23;
44: Float8Holder sum = work18;
45: IntHolder count = work19;
46:
// Inlined aggregate function add() method
47: WeightedAverageImpl$MyAvgFunc_add: {
48: if (input.isSet == 1) {
49: count.value++;
50: sum.value += input.value;
51: }
52: }
53:
54: work18 = sum;
55: work19 = count;
56: }
57: }
...
259: public void outputRecordValues(int outIndex)
260: throws SchemaChangeException
261: {
262: {
263: Float8Holder out24;
264: {
265: final Float8Holder output = new Float8Holder();
266: Float8Holder sum = work18;
267: IntHolder count = work19;
268:
// Inlined aggregate function output() method
269: WeightedAverageImpl$MyAvgFunc_output: {
270: if (count.value == 0) {
271: output.value = 0;
272: } else
273: {
274: output.value = sum.value / count.value;
275: }
276: }
277:
278: work18 = sum;
279: work19 = count;
280: out24 = output;
281: }
282: vv25 .getMutator().setSafe((outIndex), out24 .value);
283: }
284: }
285:
286: public boolean resetValues()
287: throws SchemaChangeException
288: {
289: {
290: /** start RESET for function myAvg **/
291: {
292: Float8Holder sum = work18;
293: IntHolder count = work19;
294:
// Inlined aggregate function reset() method
295: WeightedAverageImpl$MyAvgFunc_reset: {
296: sum.value = 0;
297: count.value = 0;
298: }
299:
300: work18 = sum;
301: work19 = count;
302: }
303: /** end RESET for function myAvg **/
304: }
305: {
306: return true;
307: }
308: }
309:
310: public void setupInterior(RecordBatch incoming, RecordBatch outgoing)
311: throws SchemaChangeException
312: {
...
346: {
347: /** start SETUP for function myAvg **/
348: {
349: Float8Holder sum = work18;
350: IntHolder count = work19;
// Inlined (empty) aggregate function setup() method
351: {}
352: work18 = sum;
353: work19 = count;
354: }
355: /** end SETUP for function myAvg **/
...
396: }
397: }
398:
399: public void __DRILL_INIT__()
400: throws SchemaChangeException
401: {
402: {
// The generated code creates our working fields here.
403: work18 = new Float8Holder();
404: work19 = new IntHolder();
405: }
406: }
407:
408: }