Use o conector do BigQuery com o Spark

O spark-bigquery-connector é usado com o Apache Spark para ler e gravar dados do e para o BigQuery. Neste tutorial, fornecemos um código de exemplo que usa o conector Spark-bigquery em um aplicativo Spark. Para instruções sobre como criar um cluster, consulte os Guias de início rápido do Dataproc.

Disponibilize o conector para seu aplicativo

Você pode disponibilizar o conector spark-bigquery para seu aplicativo de uma das seguintes maneiras:

  1. Instale o conector spark-bigquery- no diretório jars do Spark de cada nó usando o Ação de inicialização dos conectores do Dataproc durante a criação do cluster.

  2. Forneça o URI do conector ao enviar o job:

    1. Console do Google Cloud:use o item Jars files do job do Spark em a página Enviar um job do Dataproc.
    2. CLI gcloud:use a sinalização gcloud dataproc jobs submit spark --jars.
    3. API Dataproc: use o Campo SparkJob.jarFileUris.
  3. Inclua o jar no seu aplicativo Scala ou Java Spark como uma dependência (consulte Compilação no conector (em inglês).

.

Como especificar o URI do jar do conector

As versões do conector Spark-BigQuery estão listadas no repositório do GitHub GoogleCloudDataproc/spark-bigquery-connector.

Especifique o jar do conector substituindo o Scala e a versão do conector informações na seguinte string de URI:

gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar

  • Use o Scala 2.12 com as versões de imagem do Dataproc 1.5+

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
    

    Exemplo da CLI gcloud:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \
        -- job-args
    

  • Use o Scala 2.11 com a versão de imagem do Dataproc 1.4 e anteriores:

    gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
    

    Exemplo da CLI gcloud:

    gcloud dataproc jobs submit spark \
        --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \
        -- job-args
    

.

Cálculo de custos

Neste documento, você usará os seguintes componentes faturáveis do Google Cloud:

  • Dataproc
  • BigQuery
  • Cloud Storage

Para gerar uma estimativa de custo baseada na projeção de uso deste tutorial, use a calculadora de preços. Novos usuários do Google Cloud podem estar qualificados para uma avaliação gratuita.

Como gravar e ler dados do BigQuery

Este exemplo lê dados do BigQuery em um DataFrame do Spark para executar uma contagem de palavras usando a API de origem de dados padrão.

O conector grava os dados no BigQuery primeiro armazenando todos os dados em buffer em uma tabela temporária do Cloud Storage. Em seguida, ele copia todos os dados do BigQuery em uma única operação. O conector tenta excluir os arquivos temporários depois que a operação de carregamento do BigQuery for bem-sucedida e mais uma vez quando o aplicativo Spark é encerrado. Se o job falhar, remova as chaves temporárias restantes do Google Cloud Storage. Normalmente, os arquivos temporários do BigQuery estão localizados em gs://[bucket]/.spark-bigquery-[jobid]-[UUID].

Como configurar o faturamento

Por padrão, o projeto associado às credenciais ou à conta de serviço são cobrados pelo uso da API. Para faturar um projeto diferente, defina a seguinte configuração: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

Ele também pode ser adicionado a uma operação de leitura/gravação, da seguinte maneira: .option("parentProject", "<BILLED-GCP-PROJECT>").

Como executar o código

Antes de executar este exemplo, crie um conjunto de dados chamado "wordcount_dataset" ou altere o conjunto de dados de saída no código para um conjunto de dados existente do BigQuery no projeto do Google Cloud.

Use o comando bq para criar o wordcount_dataset:

bq mk wordcount_dataset

Use o comando da Google Cloud CLI. crie um bucket do Cloud Storage, que será usado na exportação BigQuery:

gcloud storage buckets create gs://[bucket]

Scala

  1. Analise o código e substitua o marcador [bucket] por o bucket do Cloud Storage criado anteriormente.
    /*
     * Remove comment if you are not running in spark-shell.
     *
    import org.apache.spark.sql.SparkSession
    val spark = SparkSession.builder()
      .appName("spark-bigquery-demo")
      .getOrCreate()
    */
    
    // Use the Cloud Storage bucket for temporary BigQuery export data used
    // by the connector.
    val bucket = "[bucket]"
    spark.conf.set("temporaryGcsBucket", bucket)
    
    // Load data in from BigQuery. See
    // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties
    // for option information.
    val wordsDF =
      (spark.read.format("bigquery")
      .option("table","bigquery-public-data:samples.shakespeare")
      .load()
      .cache())
    
    wordsDF.createOrReplaceTempView("words")
    
    // Perform word count.
    val wordCountDF = spark.sql(
      "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word")
    wordCountDF.show()
    wordCountDF.printSchema()
    
    // Saving the data to BigQuery.
    (wordCountDF.write.format("bigquery")
      .option("table","wordcount_dataset.wordcount_output")
      .save())
  2. Executar o código no seu cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse a página Clusters do Dataproc no console do Google Cloud e clique no nome do cluster.
        Página de clusters do Dataproc no Console do Cloud.
      2. Na página >Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
        Página de detalhes do cluster do Dataproc no console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó mestre
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.scala com o editor de texto vi, vim ou nano pré-instalado e cole o código da lista de códigos Scala
      nano wordcount.scala
        
    3. Inicie o REPL spark-shell.
      $ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar
      ...
      Using Scala version ...
      Type in expressions to have them evaluated.
      Type :help for more information.
      ...
      Spark context available as sc.
      ...
      SQL context available as sqlContext.
      scala>
      
    4. Execute o wordcount.scala com o comando :load wordcount.scala para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      :load wordcount.scala
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra o BigQuery selecione a tabela wordcount_output e clique em Prévia.
      Prévia da tabela na página do BigQuery Explorer no console do Cloud.

PySpark

  1. Analise o código e substitua o marcador [bucket] por o bucket do Cloud Storage criado anteriormente.
    #!/usr/bin/env python
    
    """BigQuery I/O PySpark example."""
    
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .master('yarn') \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[bucket]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Save the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  2. Executar o código no cluster
    1. Use o SSH para se conectar ao nó mestre do cluster do Dataproc
      1. Acesse o Clusters do Dataproc página no console do Google Cloud e, em seguida, clique no nome do seu cluster
        Página &quot;Clusters&quot; no console do Cloud.
      2. Na página Detalhes do cluster, selecione a guia "Instâncias de VM". Em seguida, clique em SSH à direita do nome do nó mestre do cluster.
        Selecione &quot;SSH na linha do nome do cluster&quot; na página de detalhes do cluster no console do Cloud.

        Uma janela do navegador é aberta no diretório principal do nó mestre
            Connected, host fingerprint: ssh-rsa 2048 ...
            ...
            user@clusterName-m:~$
            
    2. Crie wordcount.py com o editor de texto vi, vim ou nano pré-instalado e cole o código PySpark da lista de códigos PySpark
      nano wordcount.py
      
    3. Execute a contagem de palavras com spark-submit para criar a tabela wordcount_output do BigQuery. A listagem de saída exibe 20 linhas a partir da saída de wordcount.
      spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py
      ...
      +---------+----------+
      |     word|word_count|
      +---------+----------+
      |     XVII|         2|
      |    spoil|        28|
      |    Drink|         7|
      |forgetful|         5|
      |   Cannot|        46|
      |    cures|        10|
      |   harder|        13|
      |  tresses|         3|
      |      few|        62|
      |  steel'd|         5|
      | tripping|         7|
      |   travel|        35|
      |   ransom|        55|
      |     hope|       366|
      |       By|       816|
      |     some|      1169|
      |    those|       508|
      |    still|       567|
      |      art|       893|
      |    feign|        10|
      +---------+----------+
      only showing top 20 rows
      
      root
       |-- word: string (nullable = false)
       |-- word_count: long (nullable = true)
      

      Para visualizar a tabela de saída, abra a página BigQuery, selecione a tabela wordcount_output e clique em Visualizar.
      Visualização da tabela na página do BigQuery Explorer no console do Cloud.

Para mais informações