O spark-bigquery-connector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc.
Disponibilize o conector para seu aplicativo
Você pode disponibilizar o conector spark-bigquery para seu aplicativo de uma das seguintes maneiras:
Instale o conector spark-bigquery- no diretório jars do Spark de cada nó usando o Ação de inicialização dos conectores do Dataproc durante a criação do cluster.
Forneça o URI do conector ao enviar o job:
- Console do Google Cloud:use o item
Jars files
do job do Spark em a página Enviar um job do Dataproc. - CLI gcloud:use a sinalização
gcloud dataproc jobs submit spark --jars
. - API Dataproc: use o
Campo
SparkJob.jarFileUris
.
- Console do Google Cloud:use o item
Inclua o jar no seu aplicativo Scala ou Java Spark como uma dependência (consulte Compilação no conector (em inglês).
Como especificar o URI do jar do conector
As versões do conector Spark-BigQuery estão listadas no repositório do GitHub GoogleCloudDataproc/spark-bigquery-connector.
Especifique o jar do conector substituindo o Scala e a versão do conector
informações na seguinte string de URI:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Use o Scala
2.12
com as versões de imagem do Dataproc1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
Exemplo da CLI gcloud:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Use o Scala
2.11
com a versão de imagem do Dataproc1.4
e anteriores:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
Exemplo da CLI gcloud:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Cálculo de custos
Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços.
Como gravar e ler dados do BigQuery
Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.
O conector grava os dados no BigQuery
primeiro armazenando todos os dados em buffer
em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado.
Se o job falhar, remova as chaves temporárias restantes
do Google Cloud Storage. Normalmente, os arquivos temporários do BigQuery
estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Como configurar o faturamento
Por padrão, o projeto associado às credenciais ou à conta de serviço
são cobrados pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>")
.
Como executar o código
Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.
Use o comando bq para criar o wordcount_dataset
:
bq mk wordcount_dataset
Use o comando da Google Cloud CLI. crie um bucket do Cloud Storage, que será usado na exportação BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Analise o código e substitua o marcador [bucket] por
o bucket do Cloud Storage criado anteriormente.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Executar o código no seu cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc
- Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
- Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.
Uma janela do navegador é aberta no diretório principal do nó mestreConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie
wordcount.scala
com o editor de textovi
,vim
ounano
pré-instalado e cole o código da lista de códigos Scalanano wordcount.scala
- Inicie o REPL
spark-shell
.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Execute o wordcount.scala com o comando
:load wordcount.scala
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra oBigQuery
selecione a tabelawordcount_output
e clique em Prévia.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc
PySpark
- Analise o código e substitua o marcador [bucket] por
o bucket do Cloud Storage criado anteriormente.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Executar o código no cluster
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc
- Acesse o Clusters do Dataproc página no console do Google Cloud e, em seguida, clique no nome do seu cluster
- Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em
SSH
à direita do nome do nó mestre do cluster.
Uma janela do navegador é aberta no diretório principal do nó mestreConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Crie
wordcount.py
com o editor de textovi
,vim
ounano
pré-instalado e cole o código PySpark da lista de códigos PySparknano wordcount.py
- Execute a contagem de palavras com
spark-submit
para criar a tabelawordcount_output
do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
Para visualizar a tabela de saída, abra a páginaBigQuery
, selecione a tabelawordcount_output
e clique em Visualizar.
- Use o SSH para se conectar ao nó mestre do cluster do Dataproc
Para mais informações
- Armazenamento do BigQuery e Spark SQL, Python
- Como criar um arquivo de definição de tabela para uma fonte de dados externa
- Como consultar dados particionados externamente
- Dicas de ajuste de jobs do Spark