在 Dataproc on Google Kubernetes Engine 上运行 Spark 作业

准备工作

  1. 您必须已创建一个标准(非 Autopilot)Google Kubernetes Engine (GKE) 区域级区域级集群,并且在该集群上启用了 Workload Identity

创建 Dataproc on GKE 虚拟集群

系统会创建一个 Dataproc on GKE 虚拟集群,作为 Dataproc 组件的部署平台。它是一种虚拟资源,并且与 Compute Engine 集群上的 Dataproc 不同,它不包含单独的 Dataproc 主实例和工作器虚拟机。

  • 当您创建 Dataproc on GKE 虚拟集群时,Dataproc on GKE 会在 GKE 集群中创建节点池。

  • Dataproc on GKE 作业作为 Pod 在这些节点池上运行。节点池以及节点池上的 Pod 调度由 GKE 管理。

  • 创建多个虚拟集群。您可以在 GKE 集群上创建并运行多个虚拟集群,通过在虚拟集群之间共享节点池来提高资源利用率。

    • 每个虚拟集群:
      • 使用单独的属性创建,包括 Spark 引擎版本和工作负载身份
      • 隔离在 GKE 集群上单独的 GKE 命名空间中

控制台

  1. 在 Google Cloud 控制台中,转到 Dataproc 集群页面。

    转到集群

  2. 点击创建集群

  3. 创建 Dataproc 集群对话框中,点击 GKE 上的集群行中的创建

  4. 设置集群面板中,执行以下操作:

    1. 集群名称字段中,输入集群的名称。
    2. 区域列表中,为 Dataproc on GKE 虚拟集群选择一个区域。此区域必须是现有 GKE 集群所在的区域(您将在下一项中选择)。
    3. Kubernetes 集群字段中,点击浏览以选择现有 GKE 集群所在的区域。
    4. 可选:在 Cloud Storage 暂存存储桶字段中,点击浏览以选择现有的 Cloud Storage 存储桶。Dataproc on GKE 将在存储桶中暂存工件。请忽略此字段,以便让 Dataproc on GKE 创建暂存存储桶。
  5. 在左侧面板中,点击配置节点池,然后在节点池面板中点击添加池

    1. 要重复使用现有的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击重复使用现有节点池
      2. 输入现有节点池的名称并选择其角色。至少一个节点池必须具有 DEFAULT 角色。
      3. 点击完成
    2. 如需创建新的 Dataproc on GKE 节点池,请执行以下操作:
      1. 点击创建新节点池
      2. 输入以下节点池值:
        • 节点池名称
        • 角色:必须至少有一个节点池具有 DEFAULT 角色。
        • 位置:在 Dataproc on GKE 集群区域内指定一个可用区。
        • 节点池机器类型
        • CPU 平台
        • 抢占
        • 最小:节点数下限。
        • Max:节点数上限。节点数上限必须大于 0。
    3. 点击添加池以添加更多节点池。所有节点池都必须具有该位置。您总共可以添加四个节点池。
  6. (可选)如果您已设置 Dataproc 永久性历史记录服务器 (PHS) 来查看 Spark 作业历史记录,请在活跃和已删除的 Dataproc on GKE 集群上点击自定义集群。然后,在历史记录服务器集群字段中,浏览并选择您的 PHS 集群。PHS 集群必须与 Dataproc on GKE 虚拟集群位于同一区域。

  7. 点击创建以创建 Dataproc 集群。您的 Dataproc on GKE 集群会显示在集群页面上的列表中。其状态为正在预配,直到集群可供使用,然后状态变为正在运行

gcloud

设置环境变量,然后在本地或 Cloud Shell 中运行 gcloud dataproc clusters gke create 命令以创建 Dataproc on GKE 集群。

  1. 设置环境变量:

    DP_CLUSTER=Dataproc on GKE  cluster-name \
      REGION=region \
      GKE_CLUSTER=GKE cluster-name \
      BUCKET=Cloud Storage bucket-name \
      DP_POOLNAME=node pool-name
      PHS_CLUSTER=Dataproc PHS server name
    
    注意:

    • DP_CLUSTER:设置 Dataproc 虚拟集群名称,该名称必须以小写字母开头,后面最多可跟 54 个小写字母、数字或连字符,并且不能以连字符结尾。
    • REGIONregion 必须与 GKE 集群所在的区域相同。
    • GKE_CLUSTER:现有 GKE 集群的名称。
    • BUCKET:(可选)您可以指定 Cloud Storage 存储桶的名称,Dataproc 将使用该存储分区暂存工件。如果您未指定存储桶,则 GKE 上的 Dataproc 将创建一个暂存存储桶。
    • DP_POOLNAME:要在 GKE 集群上创建的节点池的名称。
    • PHS_CLUSTER:(可选)Dataproc PHS 服务器,用于查看活跃和已删除的 Dataproc on GKE 集群上的 Spark 作业历史记录。PHS 集群必须与 Dataproc on GKE 虚拟集群位于同一区域。
  2. 运行以下命令:

    gcloud dataproc clusters gke create ${DP_CLUSTER} \
        --region=${REGION} \
        --gke-cluster=${GKE_CLUSTER} \
        --spark-engine-version=latest \
        --staging-bucket=${BUCKET} \
        --pools="name=${DP_POOLNAME},roles=default" \
        --setup-workload-identity \
        --history-server-cluster=${PHS_CLUSTER}
    
    请注意:

    • --spark-engine-version:Dataproc 集群上使用的 Spark 映像版本。您可以使用标识符(如 33.1latest),也可以指定完整的次要版本(如 3.1-dataproc-5)。
    • --staging-bucket:删除此标志,让 Dataproc on GKE 创建暂存存储桶。
    • --pools:此标志用于指定 Dataproc 将创建或用于执行工作负载的新节点池或现有节点池。列出 Dataproc on GKE 节点池设置,以英文逗号分隔,例如:
      --pools=name=dp-default,roles=default,machineType=e2-standard-4,min=0,max=10
      
      。您必须指定节点池 namerole。其他节点池设置是可选的。您可以使用多个 --pools 标志来指定多个节点池。至少一个节点池必须具有 default 角色。所有节点池都必须具有相同的位置。
    • --setup-workload-identity:此标志启用 Workload Identity 绑定。这些绑定允许 Kubernetes 服务帐号 (KSA) 充当虚拟集群的默认 Dataproc 虚拟机服务帐号(数据平面身份)

REST

完成 virtualClusterConfig 作为 Dataproc API cluster.create 请求的一部分。

在使用任何请求数据之前,请先进行以下替换:

  • PROJECT:Google Cloud 项目 ID
  • REGION:Dataproc 虚拟集群区域(与现有 GKE 集群区域相同)
  • DP_CLUSTER:Dataproc 集群名称
  • GKE_CLUSTER:GKE 集群名称
  • NODE_POOL:节点池名称
  • PHS_CLUSTERPersistent History Server (PHS) 集群名称
  • BUCKET:(可选)暂存存储桶的名称。将此字段留空可让 Dataproc on GKE 创建暂存存储桶。

HTTP 方法和网址:

POST https://dataproc.googleapis.com/v1/projects/project-id/regions/region/clusters

请求 JSON 正文:

{
  "clusterName":"DP_CLUSTER",
  "projectId":"PROJECT",
  "virtualClusterConfig":{
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    },
    "kubernetesClusterConfig":{
      "gkeClusterConfig":{
        "gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"latest"
        }
      }
    },
    "stagingBucket":"BUCKET"
  }
}

如需发送您的请求,请展开以下选项之一:

您应该收到类似以下内容的 JSON 响应:

{
  "projectId":"PROJECT",
  "clusterName":"DP_CLUSTER",
  "status":{
    "state":"RUNNING",
    "stateStartTime":"2022-04-01T19:16:39.865716Z"
  },
  "clusterUuid":"98060b77-...",
  "statusHistory":[
    {
      "state":"CREATING",
      "stateStartTime":"2022-04-01T19:14:27.340544Z"
    }
  ],
  "labels":{
    "goog-dataproc-cluster-name":"DP_CLUSTER",
    "goog-dataproc-cluster-uuid":"98060b77-...",
    "goog-dataproc-location":"REGION",
    "goog-dataproc-environment":"prod"
  },
  "virtualClusterConfig":{
    "stagingBucket":"BUCKET",
    "kubernetesClusterConfig":{
      "kubernetesNamespace":"dp-cluster",
      "gkeClusterConfig":{
"gkeClusterTarget":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER",
        "nodePoolTarget":[
          {
"nodePool":"projects/PROJECT/locations/REGION/clusters/GKE_CLUSTER/nodePools/NODE_POOL",
            "roles":[
              "DEFAULT"
            ]
          }
        ]
      },
      "kubernetesSoftwareConfig":{
        "componentVersion":{
          "SPARK":"3.1-..."
        },
        "properties":{
          "dpgke:dpgke.unstable.outputOnly.endpoints.sparkHistoryServer":"https://...",
          "spark:spark.eventLog.dir":"gs://BUCKET/.../spark-job-history",
          "spark:spark.eventLog.enabled":"true"
        }
      }
    },
    "auxiliaryServicesConfig":{
      "sparkHistoryServerConfig":{
        "dataprocCluster":"projects/PROJECT/regions/REGION/clusters/PHS_CLUSTER"
      }
    }
  }

提交 Spark 作业

在 Dataproc on GKE 虚拟集群运行后,使用 Google Cloud 控制台、gcloud CLI 或 Dataproc jobs.submit API(通过使用直接 HTTP 请求或 Cloud 客户端库提交 Spark 作业

gcloud CLI Spark 作业示例

gcloud dataproc jobs submit spark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    --class=org.apache.spark.examples.SparkPi \
    --jars=local:///usr/lib/spark/examples/jars/spark-examples.jar \
    -- 1000

gcloud CLI PySpark 作业示例

gcloud dataproc jobs submit pyspark \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/python/pi.py \
    -- 10

gcloud CLI SparkR 作业示例

gcloud dataproc jobs submit spark-r \
    --region=${REGION} \
    --cluster=${DP_CLUSTER} \
    local:///usr/lib/spark/examples/src/main/r/dataframe.R

清理