Menggunakan Apache Spark dengan HBase di Dataproc


Tujuan

Tutorial ini menunjukkan kepada Anda cara:

  1. Membuat cluster Dataproc, menginstal Apache HBase dan Apache ZooKeeper di cluster
  2. Buat tabel HBase menggunakan shell HBase yang berjalan di node master cluster Dataproc
  3. Gunakan Cloud Shell untuk mengirimkan tugas Spark Java atau PySpark ke layanan Dataproc yang menulis data ke, lalu membaca data dari, tabel HBase

Biaya

Dalam dokumen ini, Anda menggunakan komponen Google Cloud yang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga. Pengguna baru Google Cloud mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

Jika Anda belum melakukannya, buat project Google Cloud Platform.

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  3. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  4. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

  5. Di konsol Google Cloud, pada halaman pemilih project, pilih atau buat project Google Cloud.

    Buka pemilih project

  6. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  7. Enable the Dataproc and Compute Engine APIs.

    Enable the APIs

Membuat cluster Dataproc

  1. Jalankan perintah berikut di terminal sesi Cloud Shell untuk:

    • Instal komponen HBase dan ZooKeeper
    • Sediakan tiga worker node (tiga hingga lima pekerja direkomendasikan untuk menjalankan kode dalam tutorial ini)
    • Mengaktifkan Gateway Komponen
    • Gunakan image versi 2.0
    • Gunakan flag --properties untuk menambahkan konfigurasi HBase dan library HBase ke classpath driver dan eksekutor Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Memverifikasi penginstalan konektor

  1. Dari konsol Google Cloud atau terminal sesi Cloud Shell, SSH ke node master cluster Dataproc.

  2. Verifikasi penginstalan konektor Apache HBase Spark pada node master:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Contoh output:
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Biarkan terminal sesi SSH tetap terbuka untuk:

    1. Membuat tabel HBase
    2. (pengguna Java): menjalankan perintah di node master cluster untuk menentukan versi komponen yang diinstal di cluster tersebut
    3. Pindai tabel Hbase setelah Anda menjalankan kode

Membuat tabel HBase

Jalankan perintah yang tercantum di bagian ini di terminal sesi SSH node master yang Anda buka pada langkah sebelumnya.

  1. Buka shell HBase:

    hbase shell
    

  2. Buat 'my-table' HBase dengan grup kolom 'cf':

    create 'my_table','cf'
    

    1. Untuk mengonfirmasi pembuatan tabel, di Konsol Google Cloud, klik HBase di Link Gateway Komponen Google Cloud Console untuk membuka UI Apache HBase. my-table tercantum di bagian Tabel pada halaman Beranda.

Melihat kode Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Menjalankan kode

  1. Buka terminal sesi Cloud Shell.

  2. Clone repositori GoogleCloudDataproc/cloud-dataproc GitHub ke terminal sesi Cloud Shell Anda:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Ubah ke direktori cloud-dataproc/spark-hbase:

    cd cloud-dataproc/spark-hbase
    
    Contoh output:
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Kirim tugas Dataproc.

Java

  1. Tetapkan versi komponen di file pom.xml.
    1. Halaman versi rilis 2.0.x Dataproc mencantumkan versi komponen Scala, Spark, dan HBase yang diinstal dengan empat versi subminor image 2.0 terbaru dan terakhir.
      1. Untuk menemukan versi subminor dari cluster versi image 2.0, klik nama cluster di halaman Clusters di Konsol Google Cloud untuk membuka halaman Cluster details, tempat Image version cluster dicantumkan.
    2. Atau, Anda dapat menjalankan perintah berikut di terminal sesi SSH dari node master cluster untuk menentukan versi komponen:
      1. Periksa versi scala:
        scala -version
        
      2. Periksa versi Spark (control-D untuk keluar):
        spark-shell
        
      3. Periksa versi HBase:
        hbase version
        
      4. Identifikasi dependensi versi Spark, Scala, dan HBase di Maven pom.xml:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Catatan: hbase-spark.version adalah versi konektor HBase Spark saat ini; jangan ubah nomor versi ini.
    3. Edit file pom.xml di editor Cloud Shell untuk menyisipkan nomor versi Scala, Spark, dan HBase yang benar. Klik Open Terminal setelah selesai mengedit untuk kembali ke command line terminal Cloud Shell.
      cloudshell edit .
      
    4. Beralihlah ke Java 8 di Cloud Shell. Versi JDK ini diperlukan untuk membuat kode (Anda dapat mengabaikan pesan peringatan plugin apa pun):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Verifikasi penginstalan Java 8:
      java -version
      
      Contoh output:
      openjdk version "1.8..."
       
  2. Build file jar:
    mvn clean package
    
    File .jar ditempatkan di subdirektori /target (misalnya, target/spark-hbase-1.0-SNAPSHOT.jar.
  3. Kirim tugas.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: Masukkan nama file .jar Anda setelah "target/" dan sebelum ".jar".
    • Jika tidak menetapkan classpath HBase driver dan eksekutor Spark saat membuat cluster, Anda harus menetapkannya bersama setiap pengiriman tugas dengan menyertakan flag ‑‑properties berikut dalam perintah pengiriman tugas:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Lihat output tabel HBase di output terminal sesi Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Kirim tugas.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Jika tidak menetapkan classpath HBase driver dan eksekutor Spark saat membuat cluster, Anda harus menetapkannya bersama setiap pengiriman tugas dengan menyertakan flag ‑‑properties berikut dalam perintah pengiriman tugas:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Lihat output tabel HBase di output terminal sesi Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Memindai tabel HBase

Anda dapat memindai konten tabel HBase dengan menjalankan perintah berikut di terminal sesi SSH node master yang Anda buka di bagian Verify Connector installation:

  1. Buka shell HBase:
    hbase shell
    
  2. Pindai 'tabel-saya':
    scan 'my_table'
    
    Contoh output:
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Pembersihan

Setelah menyelesaikan tutorial, Anda dapat membersihkan resource yang dibuat agar resource tersebut berhenti menggunakan kuota dan dikenai biaya. Bagian berikut menjelaskan cara menghapus atau menonaktifkan resource ini.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus project yang Anda buat untuk tutorial.

Untuk menghapus project:

  1. Di konsol Google Cloud, buka halaman Manage resource.

    Buka Manage resource

  2. Pada daftar project, pilih project yang ingin Anda hapus, lalu klik Delete.
  3. Pada dialog, ketik project ID, lalu klik Shut down untuk menghapus project.

Menghapus cluster

  • Untuk menghapus cluster:
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}