-
Notifications
You must be signed in to change notification settings - Fork 980
Runtime Operator Protocol
Drill's execution model consists of a large variety of runtime operators assembled by the planner to implement a specific SQL query. Operators use the Volcano model: each is an iterator that returns records from a call to a next()
method. The devil, as they say, is in the details. We explore those details here.
Each operator implements some relational operator: scan, sort, join, broadcast, etc. The Volcano iterator approach provides a unified iterator model implemented by all operators. No matter whether the operator reads data, sorts, or performs aggregations, it implements the same basic iterator model. Here we focus on that iterator model, not on the unique behavior of each operator.
Each operator has a Creator
class. For example, FilterRecordBatch
has a FilterBatchCreator
. The creator builds the operator. The operator constructor performs one-time initial setup. The key fact to understand is that the constructor does not have visibility to the schema of the data, hence the constructor cannot do setup that requires this information. Most operators therefore have some internal state (sometimes explicit as an enum, sometimes implicit via some variable) to know that they are in the "have not yet seen a schema" state.
As [Runtime Model|described elsewhere], operators are implemented in a bit of a non-intuitive way. The term "operator" in Drill is what we might call an "operator definition": the information that describes the operator. The actual runtime operator is called a "record batch" in Drill. All such runtime operators derive from RecordBatch. The basic (highly simplified) protocol is:
public interface RecordBatch {
public static enum IterOutcome { NONE, OK_NEW_SCHEMA, OK, STOP };
public IterOutcome next();
public BatchSchema getSchema();
public VectorContainer getOutgoingContainer();
public void close();
}
The actual code is somewhat more complex, but contains thorough comments that you should read for the details.
The heart of the protocol is the next()
method. The theory in Volcano is simple: each call returns a record until all records are read. In Drill, the operation is a bit more complex because operators return batches of records (as value vectors), not individual records. Drill also allows the schema to vary as the query runs, and handles error cases. This results in the protocol explained here.
First, note that next()
returns a variety of exit codes:
-
OK_NEW_SCHEMA
: Returned a schema (and optionally a record batch). Returned (ideally) each time the schema from this call tonext()
differs from that of previous calls. -
OK
: Returned a record batch (which always includes a schema). The schema is the same as that from the previous call. -
DONE
: No data returned, end of data. Equivalent to an EOF from a file reader. -
STOP
: Error condition: stop processing.
The FragmentExecutor
runs the fragment which consists of a tree of operators, one of which is the root. The fragment executor calls next()
on the root fragment to start execution.
while (shouldContinue() && root.next()) {
// loop
}
In general, the protocol is that the downstream operator calls next()
on the upstream operator. The implementation adds an additional layer defined in AbstractRecordBatch
that performs some standard processing in next()
before calling innerNext()
that does work unique to each operator. The very first batch is handled specially, however. On the initial call to next()
, AbstractRecordBatch
calls a "fast path" to get the schema: buildSchema()
, which calls next()
on the upstream operator. By the type that AbstractRecordBatch
calls innerNext()
, a batch has already been fetched. Note that this behavior is different than all subsequent rows in which innerNext()
is responsible for fetching any needed batch.
When discussing the behavior of next()
, we have to consider two views:
- The "consumer" the bit of code (usually an operator) that calls
next()
and handles the results. - The "producer" that implements the
next()
method.
We discuss both views below.
The next()
call propagates down the tree (the order is highly dependent on the particular type of operator). For any given operator, it will eventually see a first call to next()
.
At this point the operator does not know the data schema. Therefore, the operator must call next()
on its own input in order to get the first batch. (That call may, in turn, cascade down the operator tree until it reaches a leaf: a scanner or a network receiver.) Once we have a schema, the operator can complete initialization:
- Call
next()
on the input to get a first batch. - Initialize the present operator based on the returned schema.
- Process the record batch.
That is, the first next()
both initializes and processes records the same way that subsequent next()
calls will.
The operator now must consider what to do based on the return value from it's input next()
. For example:
-
OK
: Indicates that the child (input) operator returned a batch of records (along with a schema.) Since this is the first batch, the present operator must usually do some form of setup which often involves generating code based on the schema. -
OK_NEW_SCHEMA
: In theory, the input should return theOK_NEW_SCHEMA
status each time the schema changes, including the first time. In practice, the first batch seems to be returned (for some operators) as simplyOK
. Operators contain code to handle this ambiguity. -
DONE
: It could be that the query has no data at all: as scanner read an empty file, a filter removed all records, etc. In this case, the very first call to the inputnext()
can returnDONE
, indicating that no data is available. -
STOP
: Indicates that an error occurred and that the operator should stop processing and exit.
Each operator processes the first batch differently. A filter will process the one batch; a sort will read all its incoming batches before returning from the first next()
. In general, the return values are the above, but seen from the consumer's perspective:
-
OK_NEW_SCHEMA
: Should be returned from the firstnext()
call for successful results. Note that the actual results may be empty if all rows in the batch were filtered away. -
DONE
: No data from the query. Either no data was received from input, or this operator discarded all the data. -
STOP
: An error occurred.
Consumers handle subsequent calls to next()
work similarly to the first. The first call returned a schema and caused stop to occur. Since Drill has late schema binding, schemas may change. Thus any call to next()
may return a new schema, requiring new initialization (assuming that the consumer can handle schema changes.) Expected return codes are thus:
-
OK
: Indicates another batch with the same schema as the previous one. -
OK_NEW_SCHEMA
: Indicates a schema change (with optional data). -
DONE
: End of data. -
STOP
: Error condition.
Again, the operator must return a status as a producer using the same codes as above:
-
OK
: Indicates another batch with the same schema as the previous one. -
OK_NEW_SCHEMA
: Indicates this operator encountered a schema change (with optional data). -
DONE
: End of data. -
STOP
: Error condition.
Every query must end at some time. When a scanner finds it has no more records to read, it returns DONE
from the (last) call to next()
. DONE
never includes data; it instead indicates EOF.
Each operator includes some internal state that must be shut down: release (direct memory) buffers, close files, etc. This is, in general, not done in response to end-of-data, but is done later in the close()
call.
The protocol should dictate that once a producer returns DONE
, its consumer must never again call next()
. At present, this rule is vague: an operator must be prepared for further calls to next()
. Such spurious calls must continue to return DONE
.
Once the root operator returns DONE
in the loop shown earlier, the fragment executor starts the process of operator shutdown. The fragment executor (actually, a helper class) loops over operators and invokes close()
on each. This occurs from root to leaf order, down all branches of the operator tree. close()
must release all resources, especially direct memory buffers, open files, open connections, etc. While this seems simple, actual implementation can be quite complex in some operators.
Some general rules:
-
close()
should invoke no methods on either its parent or child operators. (However, some operators violate this rule.) -
close()
should allocate no new direct memory (since it may be called in an out-of-memory condition.)
A complexity is that the record iterator operator does, in fact, call next()
on its input operator in an attempt to clear incoming batches. This appears to be more of a "bug" than a feature.
Similarly, the merge join directly calls close()
on its child operators. Again, this seems to be an attempt to fix a specific bug rather than by design.
Error handling in Drill operators is a complex topic explained on its own page.