Menggunakan konektor Bigtable Spark

Konektor Bigtable Spark memungkinkan Anda membaca dan menulis data ke dan dari Bigtable. Anda dapat membaca data dari aplikasi Spark menggunakan Spark SQL dan DataFrame. Operasi Bigtable berikut didukung menggunakan konektor Bigtable Spark:

  • Menulis data
  • Membaca data
  • Membuat tabel baru

Dokumen ini menunjukkan cara mengonversi tabel DataFrame Spark SQL ke tabel Bigtable, lalu mengompilasi dan membuat file JAR untuk mengirimkan tugas Spark.

Status dukungan Spark dan Scala

Konektor Bigtable Spark hanya mendukung versi Scala 2.12, dan versi Spark berikut:

Konektor Bigtable Spark mendukung versi Dataproc berikut:

Menghitung biaya

Jika Anda memutuskan untuk menggunakan komponen Google Cloud yang dapat ditagih berikut, Anda akan dikenai biaya untuk resource yang Anda gunakan:

  • Bigtable (Anda tidak dikenai biaya untuk menggunakan emulator Bigtable)
  • Dataproc
  • Cloud Storage

Harga Dataproc berlaku untuk penggunaan Dataproc di cluster Compute Engine. Dataproc Serverless harga berlaku untuk workload dan sesi yang berjalan di Dataproc Serverless untuk Spark.

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan, gunakan kalkulator harga.

Sebelum memulai

Selesaikan prasyarat berikut sebelum menggunakan konektor Bigtable Spark.

Peran yang diperlukan

Untuk mendapatkan izin yang diperlukan untuk menggunakan konektor Bigtable Spark, minta administrator untuk memberi Anda peran IAM berikut pada project Anda:

  • Administrator Bigtable (roles/bigtable.admin)(Opsional): memungkinkan Anda membaca atau menulis data dan membuat tabel baru.
  • Pengguna Bigtable (roles/bigtable.user): dapat membaca atau menulis data, tetapi tidak membuat tabel baru.

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Jika Anda menggunakan Dataproc atau Cloud Storage, izin tambahan mungkin diperlukan. Untuk mengetahui informasi selengkapnya, lihat Izin Dataproc dan izin Cloud Storage.

Menyiapkan Spark

Selain membuat instance Bigtable, Anda juga perlu menyiapkan instance Spark. Anda dapat melakukannya secara lokal atau memilih salah satu opsi berikut untuk menggunakan Spark dengan Dataproc:

  • Cluster Dataproc
  • Dataproc Serverless

Untuk mengetahui informasi selengkapnya tentang memilih antara cluster Dataproc atau opsi serverless, lihat artikel Dataproc Serverless untuk Spark dibandingkan dengan Dataproc di Compute Engine .

Download file JAR konektor

Anda dapat menemukan kode sumber konektor Bigtable Spark dengan contohnya di repositori GitHub konektor Bigtable Spark.

Berdasarkan penyiapan Spark, Anda dapat mengakses file JAR sebagai berikut:

  • Jika menjalankan PySpark secara lokal, Anda harus mendownload file JAR konektor dari lokasi Cloud Storage gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

    Ganti SCALA_VERSION dengan versi Scala, tetapkan ke 2.12 sebagai satu-satunya versi yang didukung, dan CONNECTOR_VERSION dengan versi konektor yang ingin Anda gunakan.

  • Untuk opsi tanpa server atau cluster Dataproc, gunakan file JAR terbaru sebagai artefak yang dapat ditambahkan di aplikasi Scala atau Java Spark Anda. Untuk mengetahui informasi selengkapnya tentang penggunaan file JAR sebagai artefak, lihat Mengelola dependensi.

  • Jika Anda mengirimkan tugas PySpark ke Dataproc, gunakan flag gcloud dataproc jobs submit pyspark --jars untuk menetapkan URI ke lokasi file JAR di Cloud Storage—misalnya, gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Menentukan jenis komputasi

Untuk tugas hanya-baca, Anda dapat menggunakan komputasi serverless Data Boost (Pratinjau), yang memungkinkan Anda agar tidak memengaruhi cluster penyaluran aplikasi Anda. Spark Anda aplikasi harus menggunakan konektor Spark versi 1.1.0 atau yang lebih baru agar dapat menggunakan Peningkatan Data.

Untuk menggunakan Peningkatan Data, Anda harus membuat profil aplikasi Data Boost, lalu memberikan ID profil aplikasi untuk spark.bigtable.app_profile.id Spark saat Anda menambahkan Bigtable ke aplikasi Spark Anda. Jika Anda sudah membuat aplikasi untuk tugas baca Spark Anda dan Anda ingin terus menggunakannya tanpa mengubah kode aplikasi, Anda dapat mengonversi profil aplikasi menjadi Profil aplikasi Data Boost. Untuk informasi selengkapnya, lihat Mengonversi aplikasi profil Anda.

Untuk mengetahui informasi selengkapnya, lihat Bigtable Data Booster ringkasan.

Untuk tugas yang melibatkan baca dan tulis, Anda dapat menggunakan cluster instance node untuk komputasi dengan menentukan profil aplikasi standar dengan permintaan Anda.

Mengidentifikasi atau membuat profil aplikasi yang akan digunakan

Jika Anda tidak menentukan ID profil aplikasi, konektor akan menggunakan aplikasi default untuk profil.

Sebaiknya gunakan profil aplikasi unik untuk setiap aplikasi yang Anda dijalankan, termasuk aplikasi Spark. Untuk mengetahui informasi selengkapnya tentang profil aplikasi jenis dan setelan, lihat Profil aplikasi ringkasan. Untuk mengetahui petunjuknya, lihat Membuat dan mengonfigurasi profil aplikasi.

Menambahkan konfigurasi Bigtable ke aplikasi Spark Anda

Di aplikasi Spark, tambahkan opsi Spark yang memungkinkan Anda berinteraksi dengan Bigtable.

Opsi Spark yang didukung

Gunakan opsi Spark yang tersedia sebagai bagian dari paket com.google.cloud.spark.bigtable.

Nama opsi Wajib Nilai default Arti
spark.bigtable.project.id Ya T/A Tetapkan project ID Bigtable.
spark.bigtable.instance.id Ya T/A Tetapkan ID instance Bigtable.
catalog Ya T/A Tetapkan format JSON yang menentukan format konversi antara skema mirip SQL DataFrame dan skema tabel Bigtable.

Lihat Membuat metadata tabel dalam format JSON untuk mengetahui informasi selengkapnya.
spark.bigtable.app_profile.id Tidak default Tetapkan ID profil aplikasi Bigtable.
spark.bigtable.write.timestamp.milliseconds Tidak Waktu sistem saat ini Setel stempel waktu dalam milidetik untuk digunakan saat menulis DataFrame ke Bigtable.

Perlu diperhatikan bahwa karena semua baris dalam DataFrame menggunakan stempel waktu yang sama, baris dengan kolom kunci baris yang sama dalam DataFrame akan dipertahankan sebagai satu versi di Bigtable karena baris tersebut memiliki stempel waktu yang sama.
spark.bigtable.create.new.table Tidak false Tetapkan ke true untuk membuat tabel baru sebelum menulis ke Bigtable.
spark.bigtable.read.timerange.start.milliseconds atau spark.bigtable.read.timerange.end.milliseconds Tidak T/A Setel stempel waktu (dalam milidetik sejak waktu epoch) untuk memfilter sel dengan tanggal mulai dan tanggal akhir tertentu.
spark.bigtable.push.down.row.key.filters Tidak true Tetapkan ke true untuk mengizinkan pemfilteran row key sederhana di sisi server. Pemfilteran pada kunci baris gabungan diterapkan di sisi klien.

Lihat Membaca baris DataFrame tertentu menggunakan filter untuk mengetahui informasi selengkapnya.
spark.bigtable.read.rows.attempt.timeout.milliseconds Tidak 30 mnt Setel durasi waktu tunggu untuk upaya baris baca yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java.
spark.bigtable.read.rows.total.timeout.milliseconds Tidak 12 j Tetapkan durasi total waktu tunggu untuk upaya baris baca yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds Tidak 1m Setel durasi timeout untuk upaya baris bermutasi yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds Tidak 10 mnt Tetapkan durasi total waktu tunggu untuk upaya baris mutasi yang sesuai dengan satu partisi DataFrame di klien Bigtable untuk Java.
spark.bigtable.batch.mutate.size Tidak 100 Tetapkan ke jumlah mutasi di setiap batch. Nilai maksimum yang dapat Anda tetapkan adalah 100000.
spark.bigtable.enable.batch_mutate.flow_control Tidak false Tetapkan ke true untuk mengaktifkan kontrol alur untuk mutasi batch.

Membuat metadata tabel dalam format JSON

Format tabel DataFrame Spark SQL harus dikonversi menjadi tabel Bigtable menggunakan string dengan format JSON. Format JSON string ini membuat format data tersebut kompatibel dengan Bigtable. Anda dapat meneruskan format JSON dalam kode aplikasi menggunakan opsi .option("catalog", catalog_json_string).

Sebagai contoh, pertimbangkan tabel DataFrame berikut dan tabel Bigtable yang sesuai.

Dalam contoh ini, kolom name dan birthYear dalam DataFrame dikelompokkan bersama di bagian keluarga kolom info dan masing-masing diganti namanya menjadi name dan birth_year. Demikian pula, kolom address disimpan di bawah kelompok kolom location dengan nama kolom yang sama. Kolom id dari DataFrame dikonversi menjadi kunci baris Bigtable.

Kunci baris tidak memiliki nama kolom khusus di Bigtable dan dalam contoh ini, id_rowkey hanya digunakan untuk menunjukkan kepada konektor bahwa ini adalah kolom kunci baris. Anda dapat menggunakan nama apa pun untuk kolom kunci baris dan memastikan Anda menggunakan nama yang sama saat mendeklarasikan kolom "rowkey":"column_name" dalam format JSON.

DataFrame Tabel Bigtable = t1
Kolom Tombol baris Kelompok kolom
info lokasi
Kolom Kolom
id name birthYear alamat id_rowkey name birth_year alamat

Format JSON untuk katalog adalah sebagai berikut:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

Kunci dan nilai yang digunakan dalam format JSON adalah sebagai berikut:

Kunci katalog Nilai katalog Format JSON
tabel Nama tabel Bigtable. "table":{"name":"t1"}

Jika tabel tidak ada, gunakan .option("spark.bigtable.create.new.table", "true") untuk membuat tabel.
rowkey Nama kolom yang akan digunakan sebagai kunci baris Bigtable. Pastikan nama kolom kolom DataFrame digunakan sebagai kunci baris—misalnya, id_rowkey.

Kunci gabungan juga diterima sebagai kunci baris. Contohnya, "rowkey":"name:address" Pendekatan ini mungkin menghasilkan row key yang memerlukan pemindaian tabel penuh untuk semua permintaan baca.
"rowkey":"id_rowkey",
kolom Pemetaan setiap kolom DataFrame ke dalam kelompok kolom Bigtable ("cf") dan nama kolom ("col") yang sesuai. Nama kolom dapat berbeda dari nama kolom di tabel DataFrame. Jenis data yang didukung mencakup string, long, dan binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

Dalam contoh ini, id_rowkey adalah kunci baris, dan info serta location adalah grup kolom.

Jenis data yang didukung

Konektor mendukung penggunaan jenis string, long, dan binary (array byte) dalam katalog. Sebelum dukungan untuk jenis lain seperti int dan float ditambahkan, Anda bisa secara manual mengonversi tipe data tersebut ke array byte (pengaturan BinaryType) sebelum menggunakan konektor untuk menulisnya Bigtable.

Selain itu, Anda dapat menggunakan Avro untuk serialisasi kompleks yang signifikan, seperti ArrayType. Untuk informasi selengkapnya, lihat Menserialisasi data kompleks menggunakan Apache Avro.

Menulis ke Bigtable

Gunakan fungsi .write() dan opsi yang didukung untuk menulis data Anda ke Bigtable.

Java

Kode berikut dari repositori GitHub menggunakan Java dan Maven untuk menulis ke Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

Kode berikut dari repositori GitHub menggunakan Python untuk menulis ke Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

Membaca dari Bigtable

Gunakan fungsi .read() untuk memeriksa apakah tabel berhasil diimpor ke Bigtable.

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Mengompilasi project Anda

Buat file JAR yang digunakan untuk menjalankan tugas di cluster Dataproc, instance Dataproc Serverless, atau Spark lokal. Anda dapat mengompilasi file JAR secara lokal, lalu menggunakannya untuk mengirimkan tugas. Jalur ke JAR yang dikompilasi ditetapkan sebagai variabel lingkungan PATH_TO_COMPILED_JAR saat Anda mengirimkan tugas.

Langkah ini tidak berlaku untuk aplikasi PySpark.

Mengelola dependensi

Konektor Bigtable Spark mendukung alat pengelolaan dependensi berikut:

Mengompilasi file JAR

Maven

  1. Tambahkan dependensi spark-bigtable ke file pom.xml Anda.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Tambahkan plugin Maven Shade ke file pom.xml Anda untuk membuat uber JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Jalankan perintah mvn clean install untuk menghasilkan file JAR.

sbt

  1. Tambahkan dependensi spark-bigtable ke file build.sbt Anda:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
    
  2. Tambahkan plugin sbt-assembly ke file project/plugins.sbt atau project/assembly.sbt Anda untuk membuat file Uber JAR.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
    
  3. Jalankan perintah sbt clean assembly untuk menghasilkan file JAR.

Gradle

  1. Tambahkan dependensi spark-bigtable ke file build.gradle Anda.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
    
  2. Tambahkan plugin Shadow di file build.gradle Anda untuk membuat file JAR uber:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
    
  3. Lihat dokumentasi plugin Bayangan untuk informasi konfigurasi dan kompilasi JAR lebih lanjut.

Mengirim tugas

Kirim tugas Spark menggunakan Dataproc, Dataproc Serverless, atau instance Spark lokal untuk meluncurkan aplikasi Anda.

Menetapkan lingkungan runtime

Setel variabel lingkungan berikut.

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Ganti kode berikut:

  • PROJECT_ID: ID permanen untuk project Bigtable.
  • INSTANCE_ID: ID permanen untuk instance Bigtable.
  • TABLE_NAME: ID permanen untuk tabel.
  • DATAPROC_CLUSTER: ID permanen untuk cluster Dataproc.
  • DATAPROC_REGION: Region Dataproc yang berisi salah satu cluster dalam instance Dataproc Anda—misalnya, northamerica-northeast2.
  • DATAPROC_ZONE: Zona tempat cluster Dataproc berjalan.
  • SUBNET: Jalur resource lengkap subnet.
  • GCS_BUCKET_NAME: Bucket Cloud Storage untuk mengupload dependensi workload Spark.
  • PATH_TO_COMPILED_JAR: Jalur lengkap atau relatif ke JAR yang dikompilasi—misalnya, /path/to/project/root/target/<compiled_JAR_name> untuk Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: Bucket Cloud Storage gs://spark-lib/bigtable, tempat file spark-bigtable_SCALA_VERSION_CONNECTOR_VERSION.jar berada.
  • PATH_TO_PYTHON_FILE: Untuk aplikasi PySpark, jalur ke file Python yang akan digunakan untuk menulis dan membaca data dari Bigtable.
  • LOCAL_PATH_TO_CONNECTOR_JAR: Untuk aplikasi PySpark, jalur ke file JAR konektor Bigtable Spark yang didownload.

Mengirim tugas Spark

Untuk instance Dataproc atau penyiapan Spark lokal, jalankan tugas Spark untuk mengupload data ke Bigtable.

Cluster Dataproc

Menggunakan file JAR yang telah dikompilasi dan membuat tugas cluster Dataproc yang membaca serta menulis data dari dan ke Bigtable.

  1. Membuat cluster Dataproc. Contoh berikut menunjukkan contoh perintah untuk membuat cluster Dataproc v2.0 dengan Debian 10, dua worker node, dan konfigurasi default.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Kirim tugas.

    Scala/Java

    Contoh berikut menunjukkan class spark.bigtable.example.WordCount yang menyertakan logika untuk membuat tabel pengujian di DataFrame, menulis tabel ke Bigtable, lalu menghitung jumlah kata dalam tabel.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc Serverless

Gunakan file JAR yang telah dikompilasi, lalu buat tugas Dataproc yang membaca dan menulis data dari dan ke Bigtable dengan instance Dataproc Serverless.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Spark Lokal

Gunakan file JAR yang didownload dan buat tugas Spark yang membaca dan menulis data dari dan ke Bigtable dengan instance Spark lokal. Anda juga dapat menggunakan emulator Bigtable untuk mengirimkan tugas Spark.

Menggunakan emulator Bigtable

Jika Anda memutuskan untuk menggunakan emulator Bigtable, ikuti langkah-langkah berikut:

  1. Jalankan perintah berikut untuk memulai emulator:

    gcloud beta emulators bigtable start
    

    Secara default, emulator akan memilih localhost:8086.

  2. Tetapkan variabel lingkungan BIGTABLE_EMULATOR_HOST:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Kirim tugas Spark.

Untuk mengetahui informasi selengkapnya tentang menggunakan emulator Bigtable, lihat Menguji menggunakan emulator.

Mengirim tugas Spark

Gunakan perintah spark-submit untuk mengirimkan tugas Spark, terlepas dari apakah Anda menggunakan emulator Bigtable lokal.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Memverifikasi data tabel

Jalankan perintah berikut CLI cbt untuk memverifikasi bahwa data ditulis ke Bigtable. Tujuan CLI cbt adalah komponen dari Google Cloud CLI. Untuk informasi selengkapnya, lihat CLI cbt ringkasan.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Solusi tambahan

Gunakan konektor Bigtable Spark untuk mendapatkan solusi tertentu, seperti membuat serialisasi jenis Spark SQL yang kompleks, membaca baris tertentu, dan membuat metrik sisi klien.

Membaca baris DataFrame tertentu menggunakan filter

Saat menggunakan DataFrame untuk membaca dari Bigtable, Anda dapat menentukan filter agar hanya membaca baris tertentu. Filter sederhana seperti ==, <=, dan startsWith pada kolom kunci baris diterapkan di sisi server untuk menghindari pemindaian tabel penuh. Filter pada kunci baris gabungan atau filter kompleks seperti filter LIKE di kolom kunci baris diterapkan di sisi klien.

Jika Anda membaca tabel besar, sebaiknya gunakan filter tombol baris sederhana untuk menghindari melakukan pemindaian tabel penuh. Contoh pernyataan berikut menunjukkan cara membaca menggunakan filter sederhana. Pastikan bahwa dalam filter Spark, Anda menggunakan nama kolom DataFrame yang dikonversi menjadi row key:

    dataframe.filter("id == 'some_id'").show()
  

Saat menerapkan filter, gunakan nama kolom DataFrame, bukan nama kolom tabel Bigtable.

Melakukan serialisasi tipe data kompleks menggunakan Apache Avro

Konektor Bigtable Spark memberikan dukungan untuk menggunakan Apache Avro untuk membuat serialisasi jenis SQL Spark yang kompleks, seperti ArrayType, MapType, atau StructType. Apache Avro menyediakan serialisasi data untuk data kumpulan data yang biasa digunakan untuk memproses dan menyimpan struktur data yang kompleks.

Gunakan sintaksis seperti "avro":"avroSchema" untuk menentukan bahwa kolom di Bigtable harus dienkode menggunakan Avro. Selanjutnya Anda dapat menggunakan .option("avroSchema", avroSchemaString) saat membaca dari atau menulis ke Bigtable untuk menentukan skema Avro yang sesuai dengan kolom tersebut dalam format string. Anda dapat menggunakan nama opsi yang berbeda—misalnya, "anotherAvroSchema" untuk kolom yang berbeda dan meneruskan skema Avro untuk beberapa kolom.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Menggunakan metrik sisi klien

Karena konektor Bigtable Spark didasarkan pada Klien Bigtable untuk Java, metrik sisi klien diaktifkan di dalam konektor secara default. Anda dapat membaca dokumentasi metrik sisi klien untuk menemukan detail selengkapnya tentang cara mengakses dan menafsirkan metrik ini.

Menggunakan klien Bigtable untuk Java dengan fungsi RDD level rendah

Karena konektor Bigtable Spark didasarkan pada klien Bigtable untuk Java, Anda dapat langsung menggunakan klien tersebut di aplikasi Spark dan melakukan permintaan baca atau tulis terdistribusi dalam fungsi RDD tingkat rendah seperti mapPartitions dan foreachPartition.

Agar dapat menggunakan klien Bigtable untuk class Java, tambahkan awalan com.google.cloud.spark.bigtable.repackaged ke nama paket. Misalnya, gunakan com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient, bukan nama class sebagai com.google.cloud.bigtable.data.v2.BigtableDataClient.

Untuk mengetahui informasi selengkapnya tentang klien Bigtable untuk Java, lihat klien Bigtable untuk Java.

Langkah selanjutnya