Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Questa pagina descrive come utilizzare Cloud Composer 2 per eseguire Carichi di lavoro Dataproc Serverless on in Google Cloud.
Gli esempi nelle seguenti sezioni mostrano come utilizzare operatori per la gestione dei carichi di lavoro batch serverless di Dataproc. Questi operatori vengono utilizzati nei DAG che creano, elimina, elenca e recupera un carico di lavoro batch Spark serverless di Dataproc:
Crea DAG per gli operatori che funzionano con i carichi di lavoro batch serverless di Dataproc:
Crea DAG che utilizzano container personalizzati e Dataproc Metastore.
Configura il server di cronologia permanente per questi DAG.
Prima di iniziare
Abilita l'API Dataproc:
Console
Enable the Dataproc API.
gcloud
Enable the Dataproc API:
gcloud services enable dataproc.googleapis.com
Seleziona la località per il file del carico di lavoro batch. Puoi utilizzare uno qualsiasi dei le seguenti opzioni:
- Crea un bucket Cloud Storage che archivia questo file.
- Utilizza il bucket del tuo ambiente. Perché non devi sincronizzare questo file
con Airflow, puoi creare una sottocartella separata all'esterno di
/dags
o/data
cartelle. Ad esempio:/batches
. - Utilizza un bucket esistente.
configura file e variabili Airflow
Questa sezione illustra come impostare i file e configurare le variabili Airflow per questo tutorial.
Carica un file del carico di lavoro Spark ML serverless Dataproc in un bucket
Il carico di lavoro in questo tutorial esegue uno script pyspark:
Salva qualsiasi script pyspark in un file locale denominato
spark-job.py
. Ad esempio, puoi utilizzare script pyspark di esempio.Carica il file nel percorso selezionato. in Prima di iniziare.
Imposta variabili Airflow
Gli esempi nelle sezioni seguenti utilizzano le variabili Airflow. Imposti i valori per queste variabili in Airflow, il codice DAG può accedere a questi valori.
Gli esempi in questo tutorial utilizzano le seguenti variabili Airflow. Puoi impostarle secondo necessità, a seconda dell'esempio utilizzato.
Imposta le seguenti variabili Airflow da utilizzare nel codice DAG:
project_id
: ID progetto.bucket_name
: URI di un bucket in cui è presente il file Python principale si trova il carico di lavoro (spark-job.py
). Hai selezionato questa località in Prima di iniziare.phs_cluster
: il nome del cluster del server di cronologia permanente. Hai impostato questa variabile quando crei un server di cronologia permanente.image_name
: nome e tag dell'immagine container personalizzata (image:tag
). Tu imposta questa variabile quando utilizza immagine container personalizzata con DataprocCreateBatchOperator.metastore_cluster
: nome del servizio Dataproc Metastore. Imposta questa variabile quando utilizza il servizio Dataproc Metastore con DataprocCreateBatchOperator.region_name
: regione in cui il servizio Dataproc Metastore in cui viene localizzato. Imposti questa variabile quando utilizzi il servizio Dataproc Metastore con DataprocCreateBatchOperator.
Usa la console Google Cloud e la UI di Airflow per impostare ogni variabile Airflow
Nella console Google Cloud, vai alla pagina Ambienti.
Nell'elenco degli ambienti, fai clic sul link Airflow per il tuo completamente gestito di Google Cloud. Si apre la UI di Airflow.
Nella UI di Airflow, seleziona Amministratore > Variabili.
Fai clic su Add a new record (Aggiungi un nuovo record).
Specifica il nome della variabile nel campo Chiave e imposta il valore per nel campo Val (Valore).
Fai clic su Salva.
Crea un server di cronologia permanente
Usa un server di cronologia permanente (PHS) per visualizzare i file di cronologia di Spark del tuo batch carichi di lavoro standard:
- Crea un server di cronologia permanente.
- Assicurati di aver specificato il nome del cluster PHS nel
phs_cluster
Variabile Airflow.
DataprocCreateBatchOperator
Il DAG seguente avvia un carico di lavoro batch serverless di Dataproc.
Per ulteriori informazioni sugli argomenti DataprocCreateBatchOperator
, vedi
codice sorgente dell'operatore.
Per ulteriori informazioni sugli attributi che puoi trasmettere nel batch
di DataprocCreateBatchOperator
, consulta le
descrizione della classe Batch.
Usa l'immagine container personalizzata con DataprocCreateBatchOperator
L'esempio seguente mostra come utilizzare un'immagine container personalizzata per eseguire carichi di lavoro con scale out impegnativi. Puoi utilizzare un container personalizzato, ad esempio, per aggiungere Python le dipendenze non fornite dall'immagine container predefinita.
Per utilizzare un'immagine container personalizzata:
Crea un'immagine container personalizzata e caricala in Container Registry.
Specifica l'immagine nella variabile Airflow
image_name
.Utilizza DataprocCreateBatchOperator con la tua immagine personalizzata:
Utilizzo del servizio Dataproc Metastore con DataprocCreateBatchOperator
Per utilizzare un servizio Dataproc Metastore da un DAG:
Verifica che il servizio metastore sia già avviato.
Per scoprire come avviare un servizio metastore, vedi Abilita e disabilita Dataproc Metastore.
Per informazioni dettagliate sull'operatore batch per la creazione della configurazione, consulta PeripheralsConfig.
Quando il servizio metastore è attivo e in esecuzione, specifica il suo nome in la variabile
metastore_cluster
e la rispettiva regione nella variabile Airflowregion_name
.Usa il servizio metastore in DataprocCreateBatchOperator:
DataprocDeleteBatchOperator
Puoi utilizzare DataprocDeleteBatchOperator per eliminare un batch in base all'ID del batch del carico di lavoro.
DataprocListBatchesOperator
DataprocDeleteBatchOperator elenca i batch esistenti in un determinato project_id e nella regione.
DataprocGetBatchOperator
DataprocGetBatchOperator recupera un determinato carico di lavoro batch.