爱吃芒果

Presto任务执行流程

// TaskResouce /v1/task/{taskId}
public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
{
    requireNonNull(taskUpdateRequest, "taskUpdateRequest is null");

    Session session = taskUpdateRequest.getSession().toSession(sessionPropertyManager, taskUpdateRequest.getExtraCredentials());
    TaskInfo taskInfo = taskManager.updateTask(session,
            taskId,
            taskUpdateRequest.getFragment(),
            taskUpdateRequest.getSources(),
            taskUpdateRequest.getOutputIds(),
            taskUpdateRequest.getTotalPartitions());

    if (shouldSummarize(uriInfo)) {
        taskInfo = taskInfo.summarize();
    }

    return Response.ok().entity(taskInfo).build();
}

Presto执行计划的调度

在Presto的执行模型中,SQL的执行被划分为如下几个层次:

  • 查询:用户提交一个SQL,触发Presto的一次查询,在代码中对应一个QueryInfo。每个查询都有一个字符串形式的QueryId
  • 查询执行阶段:Presto生成查询的执行计划时,根据是否需要做跨查询执行节点的数据交换来划分PlanFragment。调度执行计划时,每个PlanFragment对应一个查询执行阶段,在代码中对应一个StageInfo,其中有StageId,StageId的形式为QueryId + 从0自增id。查询执行阶段之间有数据依赖关系,即不能并行执行,存在执行上的顺序关系,需要注意的是,StageId越小,这个查询执行阶段的执行顺序越靠后。Presto的查询执行阶段类似于Spark的查询执行阶段的概念,他们的不同是Presto不像Spark批式处理那样,需要前面的查询执行阶段执行完再执行后面的查询执行阶段,Presto采用的是流水线(Pipeline)处理机制。
  • 任务(Task):任务是Presto分布式任务的执行单元,每个查询执行阶段可以有多个任务,这些任务可以并行执行,同一个查询执行阶段中的所有任务的执行逻辑完全相同。一个查询执行阶段的任务个数就是此查询执行阶段的并发度。在Presto的任务调度代码中,可以看到任务的个数是根据查询执行阶段的数据分布方式(Source,Fixed,Single)以及查询执行节点的个数来决定的。
// SqlQueryExecution.java
private void planDistribution(PlanRoot plan)
{
  	// 遍历执行计划PlanNode树,找到所有的TableScanNode(也就是连接器对应的PlanNode),获取到他们的SplitSource
    DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata, dynamicFilterService);
    StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), stateMachine.getSession());

  	// 创建最后一个查询执行阶段的Outputbuffer,这个OutputBuffer用于给Presto SQL客户端输出查询的最终计算结果
    PartitioningHandle partitioningHandle = plan.getRoot().getFragment().getPartitioningScheme().getPartitioning().getHandle();
    OutputBuffers rootOutputBuffers = createInitialEmptyOutputBuffers(partitioningHandle)
            .withBuffer(OUTPUT_BUFFER_ID, BROADCAST_PARTITION_ID)
            .withNoMoreBufferIds();

    // 创建SqlStageExecution,并将其封装在SqlQueryScheduler里面返回
  	// 这里只是创建Stage,但是不会去调度执行它
    SqlQueryScheduler scheduler = createSqlQueryScheduler(
            stateMachine,
            outputStageExecutionPlan,
            nodePartitioningManager,
            nodeScheduler,
            remoteTaskFactory,
            stateMachine.getSession(),
            plan.isSummarizeTaskInfos(),
            scheduleSplitBatchSize,
            queryExecutor,
            schedulerExecutor,
            failureDetector,
            rootOutputBuffers,
            nodeTaskMap,
            executionPolicy,
            schedulerStats,
            dynamicFilterService);

    queryScheduler.set(scheduler);
}

获取数据源分片

SqlQueryExecution.planDistribution首先从数据源连接器中获取到所有的分片数据源。分片是Presto中分块组织数据的方式,Presto连接器会将待处理的所有数据划分为若干分片让Presto读取,而这些分片也会被安排到多个Presto查询执行节点上来处理以实现分布式高性能计算。分布式OLAP引擎几乎全都有分片的抽象设计,例如Spark、Flink等。

Presto环境搭建

前言

Trino是从Presto分离出来的项目,在后面的文章中不会严格区分Presto和Trino,除非某些代码只在其中一个项目中存在,根据《OLAP引擎底层原理与设计实践》的推荐,后面基本会通过trino项目的v350版本为例分析presto的一些源码级的实现,希望能够比较系统地理解OLAP引擎的整体实现。