はじめに
Cloud Dataflow に興味があったため、休日を使って少し遊んでみました。 この記事では、0の状態から、Mercari社が提供している Dataflow Template を使って、BigQueryからPubSub Topicへとデータを転送することをゴールとしています。 具体的には次のようなことを行います。
- Mercari Dataflow Templateを利用し、BigQueryからPubSub Topicへとデータを転送する
- PubSub TopicへはProtobufの形式で送信する
- Dataflow Job用のService Accountを作成し、IAMの設定をする
まず前準備として必要なツールとサービスのセットアップから始め、その後、Dataflowのテンプレートの使い方について詳述します。最後に、実際にDataflowジョブを実行してその過程を確認します。
構成としては次のようになります。至ってシンプルですね
Dataflow Templateとは
本来であれば、Dataflow のパイプラインを構築しようとすると Apatch Beam などを用いて実装が必要になります。Dataflow Template を用いることで、コードを書くことなくパラメータなどを設定してパイプラインをデプロイすることが可能になります。
Dataflow templates allow you to package a Dataflow pipeline for deployment. Anyone with the correct permissions can then use the template to deploy the packaged pipeline. You can create your own custom Dataflow templates, and Google provides pre-built templates for common scenarios.
ref: Dataflow templates | Google Cloud
自分で組んだパイプラインをテンプレートとして配布することはもちろん、Google社が提供しているテンプレートもあるため、それを用いてパイプラインを動かすことができます。
- Google-provided templates | Cloud Dataflow | Google Cloud
- https://github.com/GoogleCloudPlatform/DataflowTemplates
Mercari Dataflow Template では、公式が提供しているより多くのデータ入出力先をJsonファイルで指定・実行できます。
The Mercari Dataflow Template enables you to run various pipelines without writing programs by simply defining a configuration file.
ref: GitHub - mercari/DataflowTemplate: Mercari Dataflow Template
前準備
関連リソースのセットアップ
次を準備していきます。
- gcloud コマンド
- Google Cloud プロジェクト
- BigQuery
- Cloud PubSub
- Cloud Storage
- Cloud Artifact Registry
ではまず、gcloud コマンドをインストール、セットアップします。
続いて、Google Cloud プロジェクトを準備します。これから先、プロジェクトのIDが変数 PROJECT_ID
に設定されているとします。
$ gcloud projects create $PROJECT_ID $ gcloud config set project $PROJECT_ID
データ転送元のBigQueryにサンプルとして使うデータを用意します。今回はBigQueryのQuickStartに従ってサンプルデータを用意します。
これに従ってデータを用意することで、PROJECT_ID.babynames.names2010
データが準備できるはずです。
そしてCloud PubSub のセットアップです。BigQueryからデータを連携する先である Topic と Subscription を準備しておきましょう。
$ gcloud services enable pubsub.googleapis.com $ gcloud auth application-default login $ gcloud projects add-iam-policy-binding $PROJECT_ID --member="user:$EMAIL_ADDRESS" --role=roles/pubsub.admin $ gcloud pubsub topics create my-topic $ gcloud pubsub subscriptions create my-sub --topic my-topic
これで、Topic my-topic
とSubscription my-sub
が作成できました。
最後に、Mercari Dataflow Template を実行する上で必要なリソースをセットアップしていきます。 まずは、Cloud Artifact Registry をセットアップしましょう。Mercari Dataflow Template では、ビルドしたパイプラインを Cloud Artifact Registry の docker repository にアップロードして利用します。下記コマンドで docker repository を作成できます。
$ gcloud artifacts repositories create quickstart-docker-repo --repository-format=docker \ --location=us-central1 --description="Docker repository" \ --project=$PROJECT_ID
そしてテンプレートファイルのアップロードやJsonの設定ファイルの保存先として、Cloud Storage のセットアップも必要です。バケットを作成しましょう。
$ gcloud storage buckets create gs://BUCKET_NAME --location=BUCKET_LOCATION
ref: Create buckets | Cloud Storage | Google Cloud
ここまでで、依存しているインフラ周りの設定が完了しました。次のセクションで Service Account 周りの設定をしましょう。
Service Account の設定
Dataflow Job を実行するための Service Account を設定します。デフォルトでは Compute Engine のデフォルトサービスアカウントを利用するそうなので、自分で用意した Service Account を設定します。
こちらの公式ドキュメントに従ってセットアップすれば良いです。
概要を説明すると、それぞれの Service Account に対して次のような権限を付与します
- 自前の Service Account
roles/dataflow.worker
roles/dataflow.admin
- Dataflow Service Account と Compute Engine Service Agent
iam.serviceAccountTokenCreator
iam.serviceAccountUser
これで、Dataflow Job を自分が作成した Service Account で実行できるようになりました。
追加で、そのDataflow Jobを実行する際にアクセスするリソースに対する権限が必要です。
roles/artifactregistry.reader
roles/storage.objectAdmin
roles/bigquery.dataViewer
roles/bigquery.readSessionUser
roles/pubsub.publisher
それぞれ、次のコマンドで ROLE の部分を置き換えて付与していきましょう。
$ gcloud projects add-iam-policy-binding $PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_EMAIL" --role=ROLE
設定ファイル等の作成
Protoファイルの用意
まずはPubSubに送信する際のProtoファイルを作成しましょう。bq show --schema babynames.names2010
によって対象となるBigQueryのスキーマを確認できます。
[{"name":"name","type":"STRING","mode":"NULLABLE"},{"name":"assigned_sex_at_birth","type":"STRING","mode":"NULLABLE"},{"name":"count","type":"INTEGER","mode":"NULLABLE"}]
name
, assigned_sex_at_birth
, count
の三つのフィールドがあるようですね。それと同じフィールド名のProtoファイルを作成しましょう。特にデータ変換などを行わない場合は、Profoファイルの各フィールド名とBigQueryのスキーマの各フィールド名は一致している必要があります。今回SubscriberはGo言語で書くので、go_packageも指定しています。
syntax = "proto3"; package proto; option go_package = "github.com/hiramekun/pubsub-sample/proto;proto"; message BabyName { string name = 1; string assigned_sex_at_birth = 2; int64 count = 3; }
Jsonの設定ファイルの作成
Mercari Dataflow Template の設定ファイルを書いていきます。今回はモジュールとして BigQuery Source Module と PubSub Sink Module を利用します。その名の通り、source がデータの出てくる元で、sink がデータを送信する先です。
PubSub Sink Module のドキュメントに書いてあるように、パラメータとして次の値を設定する必要があります
format
にprotobuf
を指定protobufDescriptor
でGCSにアップロードしたdescriptorへのパスを指定protobufMessageName
でpackage nameを含めた対象となるメッセージ名を指定
なので、protocコマンドで先ほど作ったProtoファイルをdescriptorへと変換してGCSにアップロードしましょう。--descriptor_set_out
オプションを使ってdescriptorを出力できます。
$ protoc ---descriptor_set_out=message.pb message.proto
テンプレートが読み込むための設定ファイルは次のようになります。
{ "sources": [ { "name": "bigqueryInput", "module": "bigquery", "parameters": { "table": "PROJECT_ID.babynames.names2010" } } ], "sinks": [ { "name": "pubsubOutput", "module": "pubsub", "input": "bigqueryInput", "outputAvroSchema": "gs://BUCKET_ID/output_schema.avsc", "parameters": { "topic": "projects/PROJECT_ID/topics/my-topic", "format": "protobuf", "protobufDescriptor": "gs://BUCKET_ID/message.pb", "protobufMessageName": "proto.BabyName" } } ] }
実行
ここまで準備ができたら、Mercari Dataflow TemplateのREADMEに従って実行します!一点違うところは、--service-account-email
で先ほど設定した Service Account で実行するように指定する点です。
$ gcloud auth configure-docker us-central1-docker.pkg.dev $ mvn clean package -DskipTests -Dimage=us-central1-docker.pkg.dev/$PROJECT_ID/{template_repo_name}/cloud:latest $ gcloud dataflow flex-template build gs://{path/to/template_file} \ --image "us-central1-docker.pkg.dev/$PROJECT_ID/{template_repo_name}/cloud:latest" \ --sdk-language "JAVA" $ gsutil cp config.json gs://{path/to/config.json} $ gcloud dataflow flex-template run {job_name} \ --template-file-gcs-location=gs://{path/to/template_file} \ --parameters=config=gs://{path/to/config.json} \ --service-account-email=SERVICE_ACCOUNT_EMAIL
https://console.cloud.google.com/dataflow/jobsから実行結果やパイプラインの状況を確認することができます。
一応PubSubのTopicにメッセージが送信されてるかを確認しましょう。簡単なSubscriberをGoで書いてローカルで受信していることが確認できると思います。
func main() { projectID := os.Getenv("GCP_PROJECT_ID") subID := os.Getenv("PUBSUB_SUBSCRIPTION_ID") if err := pullMsgs(projectID, subID); err != nil { fmt.Println(err) } } func pullMsgs(projectID, subID string) error { ctx := context.Background() client, err := pubsub.NewClient(ctx, projectID) if err != nil { return fmt.Errorf("pubsub.NewClient: %w", err) } defer client.Close() sub := client.Subscription(subID) ctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() var received int32 err = sub.Receive(ctx, func(_ context.Context, msg *pubsub.Message) { var m pb.BabyName if err := proto.Unmarshal(msg.Data, &m); err != nil { fmt.Printf("proto.Unmarshal: %v\n", err) return } fmt.Printf("Got message: %s, %s, %d\n", m.Name, m.AssignedSexAtBirth, m.Count) atomic.AddInt32(&received, 1) msg.Ack() }) if err != nil { return fmt.Errorf("sub.Receive: %w", err) } fmt.Printf("Received %d messages\n", received) return nil }
まとめ
今回は、Mercari Dataflow Template を使って、何も設定していないところからBigQueryからPubSub Topicへデータを転送する方法を記載しました。意外と前準備などが多いため、参考になれば幸いです。
これからも色々なDataflow Templateを触って遊んだり、機会があれば仕事で使ってみたりしたいと思います。