Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Cette page explique comment utiliser Cloud Composer 2 pour exécuter les charges de travail Dataproc sans serveur sur Google Cloud.
Dans les sections suivantes, les exemples vous montrent comment utiliser opérateurs permettant de gérer les charges de travail par lot Dataproc sans serveur. Vous utilisez ces opérateurs dans les DAG qui créent, Supprimez, répertoriez et obtenez une charge de travail par lot Spark sans serveur pour Dataproc:
Créer des DAG pour les opérateurs compatibles avec les charges de travail par lot Dataproc sans serveur:
Créer des DAG qui utilisent des conteneurs personnalisés, et Dataproc Metastore.
Configurez le serveur d'historique persistant pour ces DAG.
Avant de commencer
Activez l'API Dataproc:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Sélectionnez l'emplacement de votre fichier de charge de travail Batch. Vous pouvez utiliser n'importe quelle les options suivantes:
- Créez un bucket Cloud Storage qui stocke ce fichier.
- Utilisez le bucket de votre environnement. Parce que vous n'avez pas besoin de synchroniser ce fichier
avec Airflow, vous pouvez créer un sous-dossier distinct en dehors de
/dags
ou/data
dossiers. Par exemple,/batches
. - Utilisez un bucket existant.
Configurer les fichiers et les variables Airflow
Cette section explique comment configurer les fichiers et les variables Airflow pour ce tutoriel.
Importer un fichier de charge de travail Spark ML sans serveur Dataproc dans un bucket
La charge de travail de ce tutoriel exécute un script pyspark:
Enregistrez n'importe quel script pyspark dans un fichier local nommé
spark-job.py
. Par exemple, vous pouvez utiliser la méthode exemple de script pyspark.Importez le fichier à l'emplacement que vous avez sélectionné. dans la section Avant de commencer.
Définir les variables Airflow
Les exemples des sections suivantes utilisent des variables Airflow. Vous définissez des valeurs pour ces variables dans Airflow, le code du DAG peut accéder à ces valeurs.
Les exemples de ce tutoriel utilisent les variables Airflow suivantes. Vous pouvez les configurer au besoin, en fonction de l'exemple que vous utilisez.
Définissez les variables Airflow suivantes à utiliser dans le code de votre DAG:
project_id
: ID du projetbucket_name
: URI d'un bucket où se trouve le fichier Python principal de où se trouve la charge de travail (spark-job.py
). Vous avez sélectionné ce lieu dans Avant de commencerphs_cluster
: nom du cluster du serveur d'historique persistant. Vous définissez cette variable lorsque vous créez un serveur d'historique persistant.image_name
: nom et tag de l'image de conteneur personnalisé (image:tag
). Toi définissez cette variable lorsque vous utiliser une image de conteneur personnalisée avec DataprocCreateBatchOperator.metastore_cluster
: nom du service Dataproc Metastore. Vous définissez cette variable lorsque vous utilisez le service Dataproc Metastore avec DataprocCreateBatchOperator.region_name
: région où le service Dataproc Metastore est se trouve. Vous définissez cette variable lorsque vous utilisez le service Dataproc Metastore avec DataprocCreateBatchOperator.
Utiliser la console Google Cloud et l'interface utilisateur Airflow pour définir chaque variable Airflow
Dans la console Google Cloud, accédez à la page Environnements.
Dans la liste des environnements, cliquez sur le lien Airflow correspondant à votre environnement. L'interface utilisateur d'Airflow s'ouvre.
Dans l'interface utilisateur d'Airflow, sélectionnez Admin > Variables.
Cliquez sur Ajouter un enregistrement.
Spécifiez le nom de la variable dans le champ Clé, puis définissez sa valeur dans le champ Val.
Cliquez sur Enregistrer.
Créer un serveur d'historique persistant
Utiliser un serveur d'historique persistant (PHS) pour afficher les fichiers d'historique Spark de votre lot charges de travail:
- Créez un serveur d'historique persistant.
- Assurez-vous d'avoir spécifié le nom du cluster PHS dans le fichier
phs_cluster
Variable Airflow.
DataprocCreateBatchOperator
Le DAG suivant démarre une charge de travail par lot Dataproc sans serveur.
Pour en savoir plus sur les arguments DataprocCreateBatchOperator
, consultez
code source de l'opérateur.
Pour en savoir plus sur les attributs que vous pouvez transmettre dans le paramètre batch
de DataprocCreateBatchOperator
, consultez la description de la classe Batch.
Utiliser une image de conteneur personnalisé avec DataprocCreateBatchOperator
L'exemple suivant montre comment utiliser une image de conteneur personnalisé pour exécuter votre charges de travail. Vous pouvez utiliser un conteneur personnalisé, par exemple, pour ajouter Python les dépendances non fournies par l'image de conteneur par défaut.
Pour utiliser une image de conteneur personnalisée :
Créez une image de conteneur personnalisé et importez-la dans Container Registry.
Spécifiez l'image dans la variable Airflow
image_name
.Utilisez DataprocCreateBatchOperator avec votre image personnalisée :
Utiliser le service Dataproc Metastore avec DataprocCreateBatchOperator
Utiliser un service Dataproc Metastore à partir d'un DAG:
Vérifiez que votre service de métastore a déjà démarré.
Pour savoir comment démarrer un service de métastore, consultez la section Activer et désactiver Dataproc Metastore.
Pour en savoir plus sur l'opérateur de traitement par lot permettant de créer la configuration, consultez la section PeripheralsConfig.
Une fois que le service de métastore est opérationnel, spécifiez son nom dans La variable
metastore_cluster
et sa région dans la variable Airflowregion_name
.Utilisez le service de métastore dans DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Vous pouvez utiliser DataprocDeleteBatchOperator pour supprimer un lot en fonction de son ID. de la charge de travail.
DataprocListBatchesOperator
DataprocDeleteBatchOperator liste les lots qui existent dans un project_id et une région donnés.
DataprocGetBatchOperator
DataprocGetBatchOperator récupère une charge de travail par lot particulière.