并行处理

流水线可在机器集群中执行。它们通过 拆分需要完成的工作,然后在 并行处理。一般来说, 分块(也称为分区)的数量越多, 可以运行流水线中的并行级别由 流水线中的来源和 shuffle 阶段。

来源

在每个流水线运行开始时,流水线中的每个来源都会计算 以及如何将数据拆分为多个块。对于 假设有一个从 Cloud Storage 读取数据的基本流水线, 执行一些 Wrangler 转换,然后将内容写回 Cloud Storage

显示 Cloud Storage 源、Wrangler 转换和 Cloud Storage 接收器的基本流水线

流水线启动时,Cloud Storage 来源会检查输入 文件,并根据文件大小将其拆分为多个文件。例如, 单个 GB 的文件可以拆分为 100 个拆分文件,每个文件 10 MB 。每个执行器都会读取该分块的数据,并运行 Wrangler 转换,然后将输出写入 part 文件。

将 Cloud Storage 中的数据分区为并行 Wrangler 转换为分块文件

如果流水线运行缓慢,首先要检查的事项之一是, 您的来源会创建足够的分块来充分利用并行性。 例如,某些类型的压缩会导致明文文件不可拆分。如果您 正在读取经过 gzip 压缩的文件 与读取未压缩文件或 使用 BZIP(可拆分)压缩。同样,如果您使用的是数据库源,并将其配置为仅使用一个分块,那么与将其配置为使用更多分块相比,其运行速度会慢得多。

随机播放

某些类型的插件会导致数据在集群中进行重排。这个 需要将一个执行器处理的记录发送到另一个执行程序时,会发生以下情况: Executor 来执行计算。Shuffle 的操作成本高昂, 它们涉及很多 I/O 操作导致数据重排的插件都显示在 在 Pipeline Studio 的 Analytics 部分中操作。其中包括 分组依据、去重、唯一和连接符。例如,假设在前面的示例中向流水线添加了分组阶段。

另外,假设正在读取的数据表示在杂货店进行的购买。 每条记录都包含一个 item 字段和一个 num_purchased 字段。在小组中 在阶段,我们配置流水线以对item字段上的记录进行分组,并 计算 num_purchased 字段的总和。

流水线运行时,输入文件会按前面所述的方式拆分。之后,系统会在集群中对每条记录进行重排,以便具有相同项的每条记录都属于同一执行程序。

如前面的示例所示,苹果购买的记录为 最初分布在多个执行器中。要执行聚合操作 需要跨集群发送到同一个执行器。

大多数需要 shuffle 的插件都允许您指定分区数量 数据重排时使用。这控制着用于执行 会处理重排的数据。

在前面的示例中,如果将分区数量设置为 2,则每个执行器都会计算两个项(而非一个项)的汇总。

请注意,在这之后,您可以降低流水线的并行处理量 阶段。例如,请考虑流水线的逻辑视图:

如果来源将数据划分到 500 个分区,但“分组依据”使用 200 个分区, 500 至 200。您只会写入 200 个不同的分块文件到 Cloud Storage,而不是 500 个。

选择分区

如果分区数量太少,您将无法使用 并行处理尽可能多的工作。同时设置分区 就会增加不必要的开销一般来说, 使用的分区要多于少的分区。您需要担心额外的开销 如果您的流水线需要几分钟的运行时间,而您想要缩减 几分钟。如果您的流水线需要数小时的运行时间,那么开销通常不会 一些需要担心的问题

一种有用但过于简单的方法,用于确定要 方法是将其设置为 max(cluster CPUs, input records / 500,000)。在其他 将输入记录数除以 500,000。如果该数字大于集群 CPU 数量,请将其用作分区数量。否则,请使用集群 CPU 数量。例如,如果您的集群 100 个 CPU 和 Shuffle 阶段预计有 1 亿个输入 记录,使用 200 个分区。

更完整的答案是,当每个分区的中间 Shuffle 数据完全可以放入执行器的内存中,而无需将任何数据溢写到磁盘时,Shuffle 的性能最好。Spark 预留出来的 用于保存 shuffle 数据的执行程序内存。确切数字为 (总内存 - 300 MB) * 30%。如果我们假设每个执行器都设置为使用 2 GB 的内存, 这意味着每个分区的大小不应超过 (2 GB - 300 MB) * 30% = 大约 500 MB 记录。如果我们假设每条记录向下压缩, 大小为 1 KB,则意味着 (500 MB / 分区) / (1 KB / record) = 每个分区 500,000 条记录。如果您的执行器使用 或者您的记录较小,则可以相应地调整此数值。

数据倾斜

请注意,在前面的示例中,各种商品的购买量平均进行了 分发。也就是说,苹果、香蕉、胡萝卜和鸡蛋各有 3 次购买交易。对均匀分布键进行随机化处理的效果最佳 是 shuffle 类型的,但许多数据集没有此属性。继续学习 那么前面的示例中提到的杂货店购买,那么您应该会获得很多 鸡蛋的购买量要高于婚礼卡片的购买量当存在几次重排时 比其他键更常见,则您需要处理偏差 数据。与非偏差数据相比,偏差数据的效果会显著下降, 为数不多的工作量 执行器。它会导致一小部分分区比所有分区都大得多 其他。

在本例中,鸡蛋的购买次数是卡片购买次数的 5 倍, 也就是说,计算蛋蛋聚合所需的时间大约是前者的 5 倍。它 在处理只有 10 条(而非两条)记录时,意义不大, 在处理 50 亿条记录(而不是一条)时意义重大 亿。当存在数据倾斜时,Shuffle 中使用的分区数量对流水线性能没有太大影响。

您可以通过检查随时间变化的输出记录图表来识别数据偏差。 如果相应阶段在流水线运行开始时以更快的速度输出记录,然后突然减速,则可能表示数据存在偏差。

您还可以通过检查一段时间内的集群内存使用情况来识别数据倾斜。如果 您的集群已用尽容量,但突然出现内存用量偏低的情况 这也表示您遇到了数据偏差。

联接时,偏差数据对性能的影响最大 错误。有几个技巧可用于提高性能 用于偏差联接的计算方法。如需了解详情,请参阅 JOIN 操作的并行处理

针对执行进行自适应调整

如需自适应调整执行,请指定要使用的分区范围,而不是 确切的分区号。确切的分区号(即使在流水线中设置了分区编号) 配置,在启用自适应执行时会被忽略。

如果您使用的是临时 Dataproc 集群, Cloud Data Fusion 会自动设置适当的配置,但对于静态数据, Dataproc 或 Hadoop 集群,接下来的两个配置 参数:

  • spark.default.parallelism:将其设置为可用 vCore 的总数 资源。这可确保集群不会过载,并为分区数量定义下限。
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum:设置为 32 倍 集群中可用的 vCore 数量的百分比。这定义了分区数量的上限。
  • Spark.sql.adaptive.enabled:若要启用优化,请将此值设置为 true。Dataproc 会自动设置 您必须确保已启用 Hadoop 集群。

您可以在特定 API 的引擎配置中设置这些参数 流水线或静态 Dataproc 的集群属性中 集群。

后续步骤