Utiliser Apache Spark avec HBase sur Dataproc


Objectifs

Ce tutoriel vous explique comment :

  1. Créer un cluster Dataproc, en y installant Apache HBase et Apache ZooKeeper
  2. Créer une table HBase à l'aide de l'interface système HBase exécutée sur le nœud maître du cluster Dataproc.
  3. Utilisez Cloud Shell pour envoyer une tâche Java ou PySpark Spark au service Dataproc qui écrit et lit des données dans la table HBase.

Coûts

Dans ce document, vous utilisez les composants facturables suivants de Google Cloud :

Obtenez une estimation des coûts en fonction de votre utilisation prévue à l'aide du simulateur de coût. Les nouveaux utilisateurs de Google Cloud peuvent bénéficier d'un essai gratuit.

Avant de commencer

Si ce n'est pas déjà fait, créez un projet Google Cloud Platform.

  1. Connectez-vous à votre compte Google Cloud. Si vous débutez sur Google Cloud, créez un compte pour évaluer les performances de nos produits en conditions réelles. Les nouveaux clients bénéficient également de 300 $ de crédits gratuits pour exécuter, tester et déployer des charges de travail.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  4. Activer les API Dataproc and Compute Engine.

    Activer les API

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Vérifiez que la facturation est activée pour votre projet Google Cloud.

  7. Activer les API Dataproc and Compute Engine.

    Activer les API

Créer un cluster Dataproc

  1. Exécutez la commande suivante dans un terminal de session Cloud Shell pour:

    • Installez les composants HBase et ZooKeeper.
    • Provisionnez trois nœuds de calcul (il est recommandé d'utiliser trois à cinq nœuds de calcul pour exécuter le code dans ce tutoriel)
    • Activez la passerelle des composants.
    • Utiliser la version d'image 2.0
    • Utilisez l'option --properties pour ajouter la configuration HBase et la bibliothèque HBase aux classpaths du pilote et de l'exécuteur Spark.
gcloud dataproc clusters create cluster-name \
    --region=region \
    --optional-components=HBASE,ZOOKEEPER \
    --num-workers=3 \
    --enable-component-gateway \
    --image-version=2.0 \
    --properties='spark:spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark:spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'

Vérifier l'installation du connecteur

  1. Depuis la console Google Cloud ou un terminal de session Cloud Shell, connectez-vous en SSH au nœud maître du cluster Dataproc.

  2. Vérifiez l'installation du connecteur Spark Apache HBase sur le nœud maître:

    ls -l /usr/lib/spark/jars | grep hbase-spark
    
    Exemple de résultat :
    -rw-r--r-- 1 root root size date time hbase-spark-connector.version.jar
    

  3. Gardez le terminal de session SSH ouvert pour:

    1. Créer une table HBase
    2. (Utilisateurs de Java): exécuter des commandes sur le nœud maître du cluster pour déterminer les versions des composants installés sur le cluster
    3. Analyser la table Hbase après avoir exécuté le code

Créer une table HBase

Exécutez les commandes répertoriées dans cette section dans le terminal de session SSH du nœud maître que vous avez ouvert à l'étape précédente.

  1. Ouvrez le shell HBase:

    hbase shell
    

  2. Créez une table HBase "my-table" avec une famille de colonnes "cf" :

    create 'my_table','cf'
    

    1. Pour confirmer la création de la table, dans la console Google Cloud, cliquez sur HBase dans les liens vers la passerelle des composants de la console Google Cloud pour ouvrir l'interface utilisateur Apache HBase. my-table est répertorié dans la section Tables de la page Accueil.

Afficher le code Spark

Java

package hbase;

import org.apache.hadoop.hbase.spark.datasources.HBaseTableCatalog;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class SparkHBaseMain {
    public static class SampleData implements Serializable {
        private String key;
        private String name;

        public SampleData(String key, String name) {
            this.key = key;
            this.name = name;
        }

        public SampleData() {
        }

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getKey() {
            return key;
        }

        public void setKey(String key) {
            this.key = key;
        }
    }
    public static void main(String[] args) {
        // Init SparkSession
        SparkSession spark = SparkSession
                .builder()
                .master("yarn")
                .appName("spark-hbase-tutorial")
                .getOrCreate();

        // Data Schema
        String catalog = "{"+"\"table\":{\"namespace\":\"default\", \"name\":\"my_table\"}," +
                "\"rowkey\":\"key\"," +
                "\"columns\":{" +
                "\"key\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "\"name\":{\"cf\":\"cf\", \"col\":\"name\", \"type\":\"string\"}" +
                "}" +
                "}";

        Map<String, String> optionsMap = new HashMap<String, String>();
        optionsMap.put(HBaseTableCatalog.tableCatalog(), catalog);

        Dataset<Row> ds= spark.createDataFrame(Arrays.asList(
                new SampleData("key1", "foo"),
                new SampleData("key2", "bar")), SampleData.class);

        // Write to HBase
        ds.write()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .mode("overwrite")
                .save();

        // Read from HBase
        Dataset dataset = spark.read()
                .format("org.apache.hadoop.hbase.spark")
                .options(optionsMap)
                .option("hbase.spark.use.hbasecontext", "false")
                .load();
        dataset.show();
    }
}

Python

from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession \
  .builder \
  .master('yarn') \
  .appName('spark-hbase-tutorial') \
  .getOrCreate()

data_source_format = ''

# Create some test data
df = spark.createDataFrame(
    [
        ("key1", "foo"),
        ("key2", "bar"),
    ],
    ["key", "name"]
)

# Define the schema for catalog
catalog = ''.join("""{
    "table":{"namespace":"default", "name":"my_table"},
    "rowkey":"key",
    "columns":{
        "key":{"cf":"rowkey", "col":"key", "type":"string"},
        "name":{"cf":"cf", "col":"name", "type":"string"}
    }
}""".split())

# Write to HBase
df.write.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").mode("overwrite").save()

# Read from HBase
result = spark.read.format('org.apache.hadoop.hbase.spark').options(catalog=catalog).option("hbase.spark.use.hbasecontext", "false").load()
result.show()

Exécuter le code

  1. Ouvrez un terminal de session Cloud Shell.

  2. Clonez le dépôt GitHub GoogleCloudDataproc/cloud-dataproc dans le terminal de votre session Cloud Shell:

    git clone https://github.com/GoogleCloudDataproc/cloud-dataproc.git
    

  3. Accédez au répertoire cloud-dataproc/spark-hbase :

    cd cloud-dataproc/spark-hbase
    
    Exemple de résultat :
    user-name@cloudshell:~/cloud-dataproc/spark-hbase (project-id)$
    

  4. Envoyez le job Dataproc.

Java

  1. Définissez les versions des composants dans le fichier pom.xml.
    1. La page Versions de version 2.0.x de Dataproc indique les versions de composants Scala, Spark et HBase installées avec les quatre dernières versions de correction d'image 2.0 et les plus récentes.
      1. Pour trouver la version de correction de votre cluster de version d'image 2.0, cliquez sur le nom du cluster sur la page Clusters de la console Google Cloud pour ouvrir la page Détails du cluster, qui contient la Version de l'image du cluster.
    2. Vous pouvez également exécuter les commandes suivantes dans un terminal de session SSH à partir du nœud maître de votre cluster pour déterminer les versions des composants :
      1. Vérifiez la version de scala:
        scala -version
        
      2. Vérifiez la version de Spark (Ctrl+D pour quitter):
        spark-shell
        
      3. Vérifiez la version de HBase:
        hbase version
        
      4. Identifiez les dépendances de version Spark, Scala et HBase dans le fichier pom.xml Maven:
        <properties>
          <scala.version>scala full version (for example, 2.12.14)</scala.version>
          <scala.main.version>scala main version (for example, 2.12)</scala.main.version>
          <spark.version>spark version (for example, 3.1.2)</spark.version>
          <hbase.client.version>hbase version (for example, 2.2.7)</hbase.client.version>
          <hbase-spark.version>1.0.0(the current Apache HBase Spark Connector version)>
        </properties>
        
        Remarque: hbase-spark.version correspond à la version actuelle du connecteur Spark HBase. Ne modifiez pas ce numéro de version.
    3. Modifiez le fichier pom.xml dans l'éditeur Cloud Shell pour insérer les numéros de version Scala, Spark et HBase corrects. Cliquez sur Ouvrir le terminal lorsque vous avez terminé les modifications pour revenir à la ligne de commande du terminal Cloud Shell.
      cloudshell edit .
      
    4. Passez à Java 8 dans Cloud Shell. Cette version du JDK est nécessaire pour compiler le code (vous pouvez ignorer les messages d'avertissement du plug-in):
      sudo update-java-alternatives -s java-1.8.0-openjdk-amd64 && export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
      
    5. Vérifiez l'installation de Java 8:
      java -version
      
      Exemple de résultat :
      openjdk version "1.8..."
       
  2. Créez le fichier jar:
    mvn clean package
    
    Le fichier .jar est placé dans le sous-répertoire /target (par exemple, target/spark-hbase-1.0-SNAPSHOT.jar).
  3. Envoyez le job.

    gcloud dataproc jobs submit spark \
        --class=hbase.SparkHBaseMain  \
        --jars=target/filename.jar \
        --region=cluster-region \
        --cluster=cluster-name
    
    • --jars: insérez le nom de votre fichier .jar après "target/" et avant ".jar".
    • Si vous n'avez pas défini les classpaths HBase du pilote et de l'exécuteur Spark lors de la création de votre cluster, vous devez les définir à chaque envoi de tâche en incluant l'option ‑‑properties suivante dans votre commande d'envoi de tâche:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  4. Affichez la sortie de la table HBase dans le résultat du terminal de la session Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Python

  1. Envoyez le job.

    gcloud dataproc jobs submit pyspark scripts/pyspark-hbase.py \
        --region=cluster-region \
        --cluster=cluster-name
    
    • Si vous n'avez pas défini les classpaths HBase du pilote et de l'exécuteur Spark lors de la création de votre cluster, vous devez les définir à chaque envoi de tâche en incluant l'option ‑‑properties suivante dans votre commande d'envoi de tâche:
      --properties='spark.driver.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*,spark.executor.extraClassPath=/etc/hbase/conf:/usr/lib/hbase/*'
             

  2. Affichez la sortie de la table HBase dans le résultat du terminal de la session Cloud Shell:

    Waiting for job output...
    ...
    +----+----+
    | key|name|
    +----+----+
    |key1| foo|
    |key2| bar|
    +----+----+
    

Analyser la table HBase

Vous pouvez analyser le contenu de votre table HBase en exécutant les commandes suivantes dans le terminal de session SSH du nœud maître que vous avez ouvert à la section Vérifier l'installation du connecteur:

  1. Ouvrez le shell HBase:
    hbase shell
    
  2. Analyser "my-table":
    scan 'my_table'
    
    Exemple de résultat :
    ROW               COLUMN+CELL
     key1             column=cf:name, timestamp=1647364013561, value=foo
     key2             column=cf:name, timestamp=1647364012817, value=bar
    2 row(s)
    Took 0.5009 seconds
    

Effectuer un nettoyage

Une fois le tutoriel terminé, vous pouvez procéder au nettoyage des ressources que vous avez créées afin qu'elles ne soient plus comptabilisées dans votre quota et qu'elles ne vous soient plus facturées. Dans les sections suivantes, nous allons voir comment supprimer ou désactiver ces ressources.

Supprimer le projet

Le moyen le plus simple d'empêcher la facturation est de supprimer le projet que vous avez créé pour ce tutoriel.

Pour supprimer le projet :

  1. Dans la console Google Cloud, accédez à la page Gérer les ressources.

    Accéder à la page Gérer les ressources

  2. Dans la liste des projets, sélectionnez le projet que vous souhaitez supprimer, puis cliquez sur Supprimer.
  3. Dans la boîte de dialogue, saisissez l'ID du projet, puis cliquez sur Arrêter pour supprimer le projet.

Supprimer le cluster

  • Pour supprimer le cluster :
    gcloud dataproc clusters delete cluster-name \
        --region=${REGION}