Menjalankan DAG Apache Airflow di Cloud Composer 1 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Panduan memulai ini menunjukkan cara membuat lingkungan Cloud Composer dan menjalankan DAG Apache Airflow di Cloud Composer 1.

Sebelum memulai

  1. Login ke akun Google Cloud Anda. Jika Anda baru menggunakan Google Cloud, buat akun untuk mengevaluasi performa produk kami dalam skenario dunia nyata. Pelanggan baru juga mendapatkan kredit gratis senilai $300 untuk menjalankan, menguji, dan men-deploy workload.
  2. Menginstal Google Cloud CLI.
  3. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  4. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  5. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  6. Menginstal Google Cloud CLI.
  7. Untuk initialize gcloud CLI, jalankan perintah berikut:

    gcloud init
  8. Buat atau pilih project Google Cloud.

    • Membuat project Google Cloud:

      gcloud projects create PROJECT_ID

      Ganti PROJECT_ID dengan nama untuk project Google Cloud yang Anda buat.

    • Pilih project Google Cloud yang Anda buat:

      gcloud config set project PROJECT_ID

      Ganti PROJECT_ID dengan nama project Google Cloud Anda.

  9. Pastikan penagihan telah diaktifkan untuk project Google Cloud Anda.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. Untuk mendapatkan izin yang diperlukan guna menyelesaikan panduan memulai ini, minta administrator untuk memberi Anda peran IAM berikut pada project Anda:

    Untuk mengetahui informasi selengkapnya tentang pemberian peran, lihat Mengelola akses.

    Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat lingkungan

Buat lingkungan baru bernama example-environment di region us-central1, dengan versi Cloud Composer 1 terbaru.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-1.20.12-airflow-1.10.15

Membuat file DAG

DAG Airflow adalah kumpulan tugas terorganisir yang ingin Anda jadwalkan dan jalankan. DAG ditentukan dalam file Python standar.

Panduan ini menggunakan contoh DAG Airflow yang ditentukan dalam file quickstart.py. Kode Python dalam file ini melakukan hal berikut:

  1. Membuat DAG, composer_sample_dag. DAG ini berjalan setiap hari.
  2. Menjalankan satu tugas, print_dag_run_conf. Tugas ini mencetak konfigurasi run DAG menggunakan operator bash.

Simpan salinan file quickstart.py di komputer lokal Anda:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Mengupload file DAG ke bucket lingkungan Anda

Setiap lingkungan Cloud Composer memiliki bucket Cloud Storage yang terkait. Airflow di Cloud Composer hanya menjadwalkan DAG yang berada di folder /dags di bucket ini.

Untuk menjadwalkan DAG, upload quickstart.py dari komputer lokal Anda ke folder /dags lingkungan Anda:

Untuk mengupload quickstart.py dengan Google Cloud CLI, jalankan perintah berikut di folder tempat file quickstart.py berada:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Melihat DAG

Setelah Anda mengupload file DAG, Airflow akan melakukan hal berikut:

  1. Mengurai file DAG yang Anda upload. Mungkin perlu waktu beberapa menit agar DAG tersedia untuk Airflow.
  2. Menambahkan DAG ke daftar DAG yang tersedia.
  3. Mengeksekusi DAG sesuai dengan jadwal yang Anda berikan di file DAG.

Periksa apakah DAG Anda diproses tanpa error dan tersedia di Airflow dengan melihatnya di UI DAG. UI DAG adalah antarmuka Cloud Composer untuk melihat informasi DAG di Konsol Google Cloud. Cloud Composer juga menyediakan akses ke UI Airflow, yang merupakan antarmuka web Airflow native.

  1. Tunggu sekitar lima menit guna memberi Airflow waktu untuk memproses file DAG yang Anda upload sebelumnya, dan untuk menyelesaikan proses DAG pertama (akan dijelaskan nanti).

  2. Jalankan perintah berikut di Google Cloud CLI. Perintah ini menjalankan perintah Airflow CLI dags list yang mencantumkan DAG di lingkungan Anda.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Pastikan DAG composer_quickstart tercantum dalam output perintah.

    Contoh output:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Melihat detail operasi DAG

Satu eksekusi DAG disebut run DAG. Airflow segera mengeksekusi proses DAG untuk contoh DAG karena tanggal mulai di file DAG ditetapkan ke kemarin. Dengan cara ini, Airflow akan mengikuti jadwal DAG yang ditentukan.

Contoh DAG berisi satu tugas, print_dag_run_conf, yang menjalankan perintah echo di konsol. Perintah ini menghasilkan informasi meta tentang DAG (ID numerik run DAG).

Jalankan perintah berikut di Google Cloud CLI. Perintah ini mencantumkan operasi DAG untuk DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Contoh output:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

Airflow CLI tidak menyediakan perintah untuk melihat log tugas. Anda dapat menggunakan metode lain untuk melihat log tugas Airflow: Cloud Composer DAG UI, Airflow UI, atau Cloud Logging. Panduan ini menunjukkan cara membuat kueri Cloud Logging untuk mendapatkan log dari proses DAG tertentu.

Jalankan perintah berikut di Google Cloud CLI. Perintah ini membaca log dari Cloud Logging untuk menjalankan DAG tertentu dari DAG composer_quickstart. Argumen --format memformat output sehingga hanya teks pesan log yang ditampilkan.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Ganti:

  • RUN_ID dengan nilai run_id dari output perintah tasks states-for-dag-run yang Anda jalankan sebelumnya. Misalnya, 2024-02-17T15:38:38.969307+00:00.

Contoh output:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Pembersihan

Agar tidak menimbulkan biaya pada akun Google Cloud Anda untuk resource yang digunakan pada halaman ini, hapus project Google Cloud yang berisi resource tersebut.

Hapus resource yang digunakan dalam tutorial ini:

  1. Hapus lingkungan Cloud Composer:

    1. Di Konsol Google Cloud, buka halaman Environments.

      Buka Lingkungan

    2. Pilih example-environment, lalu klik Hapus.

    3. Tunggu hingga lingkungan dihapus.

  2. Hapus bucket lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus bucket-nya.

    1. Di Konsol Google Cloud, buka halaman Penyimpanan > Browser.

      Buka Penyimpanan > Browser

    2. Pilih bucket lingkungan dan klik Delete. Misalnya, bucket ini dapat diberi nama us-central1-example-environ-c1616fe8-bucket.

  3. Hapus persistent disk dari antrean Redis lingkungan Anda. Menghapus lingkungan Cloud Composer tidak akan menghapus persistent disk-nya.

    1. Di Konsol Google Cloud, buka Compute Engine > Disks.

      Buka Disk

    2. Pilih persistent disk antrean Redis lingkungan lingkungan Anda, lalu klik Delete.

      Misalnya, disk ini dapat diberi nama gke-us-central1-exampl-pvc-b12055b6-c92c-43ff-9de9-10f2cc6fc0ee. Disk untuk Cloud Composer 1 selalu memiliki jenis Standard persistent disk dan berukuran 2 GB.

Langkah selanjutnya