ひらめの日常

日常のメモをつらつらと

Mercari Dataflow Template を使って、BigQueryからPubSub Topicへデータを転送する

はじめに

Cloud Dataflow に興味があったため、休日を使って少し遊んでみました。 この記事では、0の状態から、Mercari社が提供している Dataflow Template を使って、BigQueryからPubSub Topicへとデータを転送することをゴールとしています。 具体的には次のようなことを行います。

  • Mercari Dataflow Templateを利用し、BigQueryからPubSub Topicへとデータを転送する
  • PubSub TopicへはProtobufの形式で送信する
  • Dataflow Job用のService Accountを作成し、IAMの設定をする

まず前準備として必要なツールとサービスのセットアップから始め、その後、Dataflowのテンプレートの使い方について詳述します。最後に、実際にDataflowジョブを実行してその過程を確認します。

構成としては次のようになります。至ってシンプルですね

Dataflow Templateとは

本来であれば、Dataflow のパイプラインを構築しようとすると Apatch Beam などを用いて実装が必要になります。Dataflow Template を用いることで、コードを書くことなくパラメータなどを設定してパイプラインをデプロイすることが可能になります。

Dataflow templates allow you to package a Dataflow pipeline for deployment. Anyone with the correct permissions can then use the template to deploy the packaged pipeline. You can create your own custom Dataflow templates, and Google provides pre-built templates for common scenarios.

ref: Dataflow templates  |  Google Cloud

自分で組んだパイプラインをテンプレートとして配布することはもちろん、Google社が提供しているテンプレートもあるため、それを用いてパイプラインを動かすことができます。

Mercari Dataflow Template では、公式が提供しているより多くのデータ入出力先をJsonファイルで指定・実行できます。

The Mercari Dataflow Template enables you to run various pipelines without writing programs by simply defining a configuration file.

ref: GitHub - mercari/DataflowTemplate: Mercari Dataflow Template

前準備

関連リソースのセットアップ

次を準備していきます。

  • gcloud コマンド
  • Google Cloud プロジェクト
  • BigQuery
  • Cloud PubSub
  • Cloud Storage
  • Cloud Artifact Registry

ではまず、gcloud コマンドをインストール、セットアップします。

cloud.google.com

続いて、Google Cloud プロジェクトを準備します。これから先、プロジェクトのIDが変数 PROJECT_ID に設定されているとします。

$ gcloud projects create $PROJECT_ID
$ gcloud config set project $PROJECT_ID

データ転送元のBigQueryにサンプルとして使うデータを用意します。今回はBigQueryのQuickStartに従ってサンプルデータを用意します。

cloud.google.com

これに従ってデータを用意することで、PROJECT_ID.babynames.names2010 データが準備できるはずです。

そしてCloud PubSub のセットアップです。BigQueryからデータを連携する先である Topic と Subscription を準備しておきましょう。

$ gcloud services enable pubsub.googleapis.com
$ gcloud auth application-default login
$ gcloud projects add-iam-policy-binding $PROJECT_ID --member="user:$EMAIL_ADDRESS" --role=roles/pubsub.admin
$ gcloud pubsub topics create my-topic
$ gcloud pubsub subscriptions create my-sub --topic my-topic

ref: Quickstart: Publish and receive messages in Pub/Sub by using a client library  |  Pub/Sub Documentation  |  Google Cloud

これで、Topic my-topic とSubscription my-sub が作成できました。

最後に、Mercari Dataflow Template を実行する上で必要なリソースをセットアップしていきます。 まずは、Cloud Artifact Registry をセットアップしましょう。Mercari Dataflow Template では、ビルドしたパイプラインを Cloud Artifact Registry の docker repository にアップロードして利用します。下記コマンドで docker repository を作成できます。

$ gcloud artifacts repositories create quickstart-docker-repo --repository-format=docker \
    --location=us-central1 --description="Docker repository" \
    --project=$PROJECT_ID

ref: Quickstart: Store Docker container images in Artifact Registry  |  Artifact Registry documentation  |  Google Cloud

そしてテンプレートファイルのアップロードやJsonの設定ファイルの保存先として、Cloud Storage のセットアップも必要です。バケットを作成しましょう。

$ gcloud storage buckets create gs://BUCKET_NAME --location=BUCKET_LOCATION

ref: Create buckets  |  Cloud Storage  |  Google Cloud

ここまでで、依存しているインフラ周りの設定が完了しました。次のセクションで Service Account 周りの設定をしましょう。

Service Account の設定

Dataflow Job を実行するための Service Account を設定します。デフォルトでは Compute Engine のデフォルトサービスアカウントを利用するそうなので、自分で用意した Service Account を設定します。

こちらの公式ドキュメントに従ってセットアップすれば良いです。

cloud.google.com

概要を説明すると、それぞれの Service Account に対して次のような権限を付与します

  • 自前の Service Account
    • roles/dataflow.worker
    • roles/dataflow.admin
  • Dataflow Service Account と Compute Engine Service Agent
    • iam.serviceAccountTokenCreator
    • iam.serviceAccountUser

これで、Dataflow Job を自分が作成した Service Account で実行できるようになりました。

追加で、そのDataflow Jobを実行する際にアクセスするリソースに対する権限が必要です。

  • roles/artifactregistry.reader
  • roles/storage.objectAdmin
  • roles/bigquery.dataViewer
  • roles/bigquery.readSessionUser
  • roles/pubsub.publisher

それぞれ、次のコマンドで ROLE の部分を置き換えて付与していきましょう。

$ gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" --role=ROLE

設定ファイル等の作成

Protoファイルの用意

まずはPubSubに送信する際のProtoファイルを作成しましょう。bq show --schema babynames.names2010 によって対象となるBigQueryのスキーマを確認できます。

[{"name":"name","type":"STRING","mode":"NULLABLE"},{"name":"assigned_sex_at_birth","type":"STRING","mode":"NULLABLE"},{"name":"count","type":"INTEGER","mode":"NULLABLE"}]

name, assigned_sex_at_birth, count の三つのフィールドがあるようですね。それと同じフィールド名のProtoファイルを作成しましょう。特にデータ変換などを行わない場合は、Profoファイルの各フィールド名とBigQueryのスキーマの各フィールド名は一致している必要があります。今回SubscriberはGo言語で書くので、go_packageも指定しています。

syntax = "proto3";
package proto;

option go_package = "github.com/hiramekun/pubsub-sample/proto;proto";

message BabyName {
  string name = 1;
  string assigned_sex_at_birth = 2;
  int64 count = 3;
}

Jsonの設定ファイルの作成

Mercari Dataflow Template の設定ファイルを書いていきます。今回はモジュールとして BigQuery Source ModulePubSub Sink Module を利用します。その名の通り、source がデータの出てくる元で、sink がデータを送信する先です。

PubSub Sink Module のドキュメントに書いてあるように、パラメータとして次の値を設定する必要があります

  • formatprotobuf を指定
  • protobufDescriptor でGCSにアップロードしたdescriptorへのパスを指定
  • protobufMessageName でpackage nameを含めた対象となるメッセージ名を指定

なので、protocコマンドで先ほど作ったProtoファイルをdescriptorへと変換してGCSにアップロードしましょう。--descriptor_set_out オプションを使ってdescriptorを出力できます。

$ protoc ---descriptor_set_out=message.pb message.proto

テンプレートが読み込むための設定ファイルは次のようになります。

{
  "sources": [
    {
      "name": "bigqueryInput",
      "module": "bigquery",
      "parameters": {
        "table": "PROJECT_ID.babynames.names2010"
      }
    }
  ],
  "sinks": [
    {
      "name": "pubsubOutput",
      "module": "pubsub",
      "input": "bigqueryInput",
      "outputAvroSchema": "gs://BUCKET_ID/output_schema.avsc",
      "parameters": {
        "topic": "projects/PROJECT_ID/topics/my-topic",
        "format": "protobuf",
        "protobufDescriptor": "gs://BUCKET_ID/message.pb",
        "protobufMessageName": "proto.BabyName"
      }
    }
  ]
}

実行

ここまで準備ができたら、Mercari Dataflow TemplateのREADMEに従って実行します!一点違うところは、--service-account-email で先ほど設定した Service Account で実行するように指定する点です。

github.com

$ gcloud auth configure-docker us-central1-docker.pkg.dev
$ mvn clean package -DskipTests -Dimage=us-central1-docker.pkg.dev/$PROJECT_ID/{template_repo_name}/cloud:latest
$ gcloud dataflow flex-template build gs://{path/to/template_file} \
  --image "us-central1-docker.pkg.dev/$PROJECT_ID/{template_repo_name}/cloud:latest" \
  --sdk-language "JAVA"
$ gsutil cp config.json gs://{path/to/config.json}
$ gcloud dataflow flex-template run {job_name} \
  --template-file-gcs-location=gs://{path/to/template_file} \
  --parameters=config=gs://{path/to/config.json} \
  --service-account-email=SERVICE_ACCOUNT_EMAIL

https://console.cloud.google.com/dataflow/jobsから実行結果やパイプラインの状況を確認することができます。

一応PubSubのTopicにメッセージが送信されてるかを確認しましょう。簡単なSubscriberをGoで書いてローカルで受信していることが確認できると思います。

func main() {
    projectID := os.Getenv("GCP_PROJECT_ID")
    subID := os.Getenv("PUBSUB_SUBSCRIPTION_ID")
    if err := pullMsgs(projectID, subID); err != nil {
        fmt.Println(err)
    }
}

func pullMsgs(projectID, subID string) error {
    ctx := context.Background()
    client, err := pubsub.NewClient(ctx, projectID)
    if err != nil {
        return fmt.Errorf("pubsub.NewClient: %w", err)
    }
    defer client.Close()

    sub := client.Subscription(subID)

    ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
    defer cancel()

    var received int32
    err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) {
        var m pb.BabyName
        if err := proto.Unmarshal(msg.Data, &m); err != nil {
            fmt.Printf("proto.Unmarshal: %v\n", err)
            return
        }
        fmt.Printf("Got message: %s, %s, %d\n", m.Name, m.AssignedSexAtBirth, m.Count)
        atomic.AddInt32(&received, 1)
        msg.Ack()
    })
    if err != nil {
        return fmt.Errorf("sub.Receive: %w", err)
    }
    fmt.Printf("Received %d messages\n", received)

    return nil
}

まとめ

今回は、Mercari Dataflow Template を使って、何も設定していないところからBigQueryからPubSub Topicへデータを転送する方法を記載しました。意外と前準備などが多いため、参考になれば幸いです。

これからも色々なDataflow Templateを触って遊んだり、機会があれば仕事で使ってみたりしたいと思います。