Procesamiento paralelo

Las canalizaciones se ejecutan en clústeres de máquinas. Logran una alta capacidad de procesamiento gracias a dividir el trabajo que hay que hacer y, luego, ejecutarlo en paralelos a los múltiples ejecutores distribuidos en el clúster. En general, cuanto mayor sea la cantidad de divisiones (también llamadas particiones), más rápido se puede ejecutar la canalización. El nivel de paralelismo en tu canalización está determinado por entre las fuentes y las etapas de Shuffle de la canalización.

Fuente

Al comienzo de cada ejecución de canalización, qué fuente calcula cada fuente de la canalización los datos deben leerse y cómo se pueden dividir. Para ejemplo, considera una canalización básica que lee desde Cloud Storage, realiza algunas transformaciones de Wrangler y, luego, escribe en Google Cloud Storage.

Canalización básica que muestra la fuente de Cloud Storage, la transformación de Wrangler y el receptor de Cloud Storage

Cuando se inicia la canalización, la fuente de Cloud Storage examina los archivos de entrada y los divide en función de los tamaños de los archivos. Por ejemplo, un de un solo gigabyte se puede dividir en 100 divisiones, cada una de 10 MB en de tamaño del ensamble. Cada ejecutor lee los datos para esa división, ejecuta el Wrangler transformaciones y, luego, escribe el resultado en un archivo part.

Datos particionados en Cloud Storage en transformaciones de Wrangler paralelas en archivos de partes

Si la canalización se ejecuta lentamente, lo primero que debes verificar Tus fuentes crean suficientes divisiones para aprovechar al máximo el paralelismo. Por ejemplo, algunos tipos de compresión hacen que los archivos de texto simple no se puedan dividir. Si están leyendo archivos que se comprimieron con gzip, es posible que notes que tu canalización se ejecuta mucho más lento que si lees archivos sin comprimir o incluso se puede comprimir con BZIP (que se puede dividir). Del mismo modo, si usas fuente de base de datos y la configuraste para que use una sola división, ejecuta mucho más lento que si lo configuraras para usar más divisiones.

Mezclas

Ciertos tipos de complementos hacen que los datos se mezclen en el clúster. Esto ocurre cuando los registros que procesa un ejecutor deben enviarse a otro ejecutor para realizar el procesamiento. Las Shuffles son operaciones costosas porque involucran mucho I/O. Los complementos que causan la mezcla de los datos aparecen en en la sección Analytics de Pipeline Studio. Estos incluyen complementos, como Agrupar por, Eliminar duplicados, Distinto y Vincular. Por ejemplo, supongamos que un valor Agrupar por se agrega a la canalización en el ejemplo anterior.

También supongamos que los datos que se leen representan compras realizadas en un supermercado. Cada registro contiene un campo item y un campo num_purchased. En el Grupo Por etapa, configuramos la canalización para agrupar registros en el campo item y calcular la suma del campo num_purchased.

Cuando se ejecuta la canalización, los archivos de entrada se dividen como se describió antes. Después de eso, cada registro se redistribuye en el clúster de modo que cada registro con el mismo elemento pertenezca al mismo ejecutor.

Como se ilustra en el ejemplo anterior, los registros de compras de manzanas se distribuida originalmente en varios ejecutores. Para realizar la agregación, de esos registros debían enviarse a través del clúster al mismo ejecutor.

La mayoría de los complementos que requieren Shuffle te permiten especificar la cantidad de particiones. para usar cuando se redistribuyen los datos. Esto controla cuántos ejecutores se usan para procesar los datos redistribuidos.

En el ejemplo anterior, si la cantidad de particiones se establece en 2, cada ejecutor calcula las agregaciones de dos elementos en lugar de uno.

Ten en cuenta que es posible disminuir el paralelismo de la canalización si lo haces. etapa. Por ejemplo, considera la vista lógica de la canalización:

Si la fuente divide los datos en 500 particiones, pero la opción Agrupar por redistribuye mediante 200 particiones, el nivel máximo de paralelismo después de que la agrupación de 500 a 200. En lugar de 500 archivos de partes diferentes escritos Cloud Storage, solo tienes 200.

Elige particiones

Si la cantidad de particiones es demasiado baja, no usarás la capacidad completa de para paralelizar la mayor cantidad de trabajo posible. Si estableces las particiones demasiado altas, aumenta la cantidad de sobrecarga innecesaria. En general, es mejor usar demasiadas particiones que muy pocas. La sobrecarga adicional es algo de lo que debes preocuparte. Si tu canalización tarda unos minutos en ejecutarse y tratas de reducir un un par de minutos. Si tu canalización tarda horas en ejecutarse, la sobrecarga no suele ser algo de lo que debes preocuparte.

Una forma útil, pero demasiado simplista, de determinar la cantidad de particiones para usar es establecerlo en max(cluster CPUs, input records / 500,000). En otro palabras, toma el número de registros de entrada y divídelo por 500,000. Si ese número es mayor que la cantidad de CPU del clúster, úsala para la cantidad de particiones. De lo contrario, usa la cantidad de CPUs del clúster. Por ejemplo, si tu clúster tiene 100 CPU y se espera que la etapa de mezcla tenga 100 millones de registros de entrada, usa 200 particiones.

Una respuesta más completa es que las Shuffles funcionan mejor cuando los datos aleatorios para cada partición pueden caber completamente en la memoria del ejecutor para que no haya que derramar nada en el disco. Spark reserva un poco menos del 30% de un del ejecutor para conservar los datos de Shuffle. El número exacto es (memoria total - 300 MB) * 30%. Si suponemos que cada ejecutor está configurado para usar 2 GB de memoria, Esto significa que cada partición no debe contener más de (2 GB - 300 MB) * 30% = aproximadamente 500 MB de registros. Si suponemos que cada registro se comprime a 1 KB de tamaño, eso significa que (500 MB/partición)/ (1 KB/registro) = 500,000 registros por partición. Si tus ejecutores usan más memoria o si tus registros son más pequeños, puedes ajustar este número según corresponda.

Sesgo de datos

Ten en cuenta que, en el ejemplo anterior, las compras de varios artículos se distribuyeron de manera uniforme. Es decir, hubo tres compras de manzanas, bananas zanahorias y huevos. La mezcla en una clave distribuida de forma uniforme es la más eficaz de Shuffle, pero muchos conjuntos de datos no tienen esta propiedad. Continuación del proceso compras en la tienda de comestibles en el ejemplo anterior, esperarías tener muchas más compras de huevos que de tarjetas de boda. Cuando hay algunas mezclas que son mucho más comunes que otras, se trata de problemas de datos no estructurados. Los datos sesgados pueden tener un rendimiento significativamente peor que los no sesgados porque un una cantidad desproporcionada de trabajo está siendo realizada por un pequeño puñado de ejecutores. Hace que un pequeño subconjunto de particiones sea mucho más grande que todos los otras personas.

En este ejemplo, hay cinco veces más compras de huevos que compras con tarjeta lo que significa que el agregado de huevos tarda aproximadamente cinco veces más en calcularse. Integra no importa mucho cuando se trata de solo 10 registros, en lugar de dos, pero hace una gran diferencia cuando se trata de cinco mil millones de registros en lugar de uno mil millones. Cuando hay un sesgo de datos, la cantidad de particiones usadas no afecta mucho el rendimiento de la canalización.

Para reconocer la distorsión de datos, examina el gráfico en busca de registros de salida a lo largo del tiempo. Si la etapa está generando registros a un ritmo mucho más alto al comienzo del de la canalización y, luego, se ralentiza repentinamente, lo que podría significar que tienes datos sesgados.

También puedes reconocer el sesgo de los datos examinando el uso de memoria del clúster en el tiempo. Si su clúster estuvo al límite de su capacidad por un tiempo, pero de repente tiene un uso bajo de memoria en un período, esto también es una señal de que estás lidiando con el sesgo de datos.

Los datos sesgados tienen un mayor impacto en el rendimiento cuando se realiza una unión una tarea. Existen varias técnicas que pueden usarse para mejorar el rendimiento para las uniones sesgadas. Para obtener más información, consulta Procesamiento paralelo para operaciones de JOIN.

Ajuste adaptable para la ejecución

Para ajustar la ejecución de forma adaptable, especifica el rango de particiones que se usarán, no el rango el número de partición exacto. El número de partición exacto, incluso si se configuró en la canalización predeterminada, se ignora cuando la ejecución adaptable está habilitada.

Si usas un clúster efímero de Dataproc, Cloud Data Fusion establece la configuración adecuada automáticamente, pero para los clústeres estáticos de Dataproc o Hadoop, se pueden establecer los siguientes dos parámetros de configuración:

  • spark.default.parallelism: Establécelo en la cantidad total de vCore disponibles. en el clúster. Esto garantiza que tu clúster no esté sobrecargado y define la límite inferior para la cantidad de particiones.
  • spark.sql.adaptive.coalescePartitions.initialPartitionNum: Establécelo en 32 veces. de la cantidad de vCores disponibles en el clúster. Esto define la parte superior limitado al número de particiones.
  • Spark.sql.adaptive.enabled: Para habilitar las optimizaciones, establece este valor como true Dataproc lo configura automáticamente, pero si usas clústeres de Hadoop genéricos, debes asegurarte de que estén habilitados .

Estos parámetros pueden fijarse en la configuración del motor de un o en las propiedades del clúster de un Dataproc estático clúster.

¿Qué sigue?