在Presto的执行模型中,SQL的执行被划分为如下几个层次:
- 查询:用户提交一个SQL,触发Presto的一次查询,在代码中对应一个QueryInfo。每个查询都有一个字符串形式的QueryId
- 查询执行阶段:Presto生成查询的执行计划时,根据是否需要做跨查询执行节点的数据交换来划分PlanFragment。调度执行计划时,每个PlanFragment对应一个查询执行阶段,在代码中对应一个StageInfo,其中有StageId,StageId的形式为QueryId + 从0自增id。查询执行阶段之间有数据依赖关系,即不能并行执行,存在执行上的顺序关系,需要注意的是,StageId越小,这个查询执行阶段的执行顺序越靠后。Presto的查询执行阶段类似于Spark的查询执行阶段的概念,他们的不同是Presto不像Spark批式处理那样,需要前面的查询执行阶段执行完再执行后面的查询执行阶段,Presto采用的是流水线(Pipeline)处理机制。
- 任务(Task):任务是Presto分布式任务的执行单元,每个查询执行阶段可以有多个任务,这些任务可以并行执行,同一个查询执行阶段中的所有任务的执行逻辑完全相同。一个查询执行阶段的任务个数就是此查询执行阶段的并发度。在Presto的任务调度代码中,可以看到任务的个数是根据查询执行阶段的数据分布方式(Source,Fixed,Single)以及查询执行节点的个数来决定的。
// SqlQueryExecution.java
private void planDistribution(PlanRoot plan)
{
// 遍历执行计划PlanNode树,找到所有的TableScanNode(也就是连接器对应的PlanNode),获取到他们的SplitSource
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());
// 创建最后一个查询执行阶段的Outputbuffer,这个OutputBuffer用于给Presto SQL客户端输出查询的最终计算结果
PartitioningHandle partitioningHandle = plan.getRoot().getFragment().getPartitioningScheme().getPartitioning().getHandle();
OutputBuffers rootOutputBuffers = createInitialEmptyOutputBuffers(partitioningHandle)
.withBuffer(OUTPUT_BUFFER_ID, BROADCAST_PARTITION_ID)
.withNoMoreBufferIds();
// 创建SqlStageExecution,并将其封装在SqlQueryScheduler里面返回
// 这里只是创建Stage,但是不会去调度执行它
SqlQueryScheduler scheduler = createSqlQueryScheduler(
stateMachine,
outputStageExecutionPlan,
nodePartitioningManager,
nodeScheduler,
remoteTaskFactory,
stateMachine.getSession(),
plan.isSummarizeTaskInfos(),
scheduleSplitBatchSize,
queryExecutor,
schedulerExecutor,
failureDetector,
rootOutputBuffers,
nodeTaskMap,
executionPolicy,
schedulerStats,
dynamicFilterService);
queryScheduler.set(scheduler);
}
获取数据源分片
SqlQueryExecution.planDistribution首先从数据源连接器中获取到所有的分片数据源。分片是Presto中分块组织数据的方式,Presto连接器会将待处理的所有数据划分为若干分片让Presto读取,而这些分片也会被安排到多个Presto查询执行节点上来处理以实现分布式高性能计算。分布式OLAP引擎几乎全都有分片的抽象设计,例如Spark、Flink等。