nkmtの日記

日常のことをつらつら書きます

embulkの使い方

インフラ

2017年8月23日

embulk 概要

embulkとは大量のログデータ等をbigQuery、redshift、elasticsearch等のデータウエアハウスに転送する際に使われるルーツです。
同じトレジャーデータのツールにfluentdがありますがこちらは逐次データ転送をするツールがあります。
対してこちらのembulkは大量のデータ転送を行います。
今回はこちらを使ってmysqlからbigqueryへデータを転送する方法を記します。

embulk インストール

こういうツール系は書いた時と後でやってみると違うことが多いのでコマンドというよりも公式のリンクと主に必要な物を記載します。
主に必要なのはjavaとrubyでしょう。
下記のURLを参考に各環境に応じて実施します。
embulkの公式インストール

embulk 必要なファイル

schema.json

実行ファイル
2週類のファイルが必要なファイルになります。
1つが読子きさきに作成するデータベースの設計jsonファイルです。
もう1つがembulkの実行内容のファイルです。

テーブル.json

[
  {
    "name": "id",
    "type": "INTEGER"
  },
  {
    "name": "カラム名",
    "type": "STRING"
  },
]

ここのjsonファイルは下記のデーターロード先で作成されるテーブルとカラム構造となります。
データ抽出元とここを合わせる必要があります。

実行.yml

in:
  type: mysql
  port: 3306
  user: データベース接続ユーザー
  password: データベースパスワード
  database: データベース名
  host: データベースの接続ホスト
  query: |
    SELECT *
    FROM テーブル名
    WHERE date BETWEEN '2017-07-01 00:00:00' AND '2017-07-31 23:59:59'

out:
  type: bigquery
  mode: append
  auto_create_table: true
  table: 作成されるテーブル名
  options: {connectTimeout: 720000} # 長目がいい
  ignore_unknown_values: 1 # 1行目はヘッダーだから無視
  auth_method: private_key
  schema_file: テーブルの設計jsonファイル
  service_account_email: GCPで作成してます
  p12_keyfile: GCPのキーファファイル通常はp12
  source_format: CSV
  file_ext: .csv
  delete_from_local_when_job_end: 1
  project: 'GCPのプロジェクトID'
  dataset: データセットここは自分の作る
  formatter:
    type: csv
    header_line: false
    timezone: Asia/Tokyo

上記ではmodeがappendになっています。
新規のテーブルの場合は新規にテーブルが参加されますがすでにテーブルがある場合はテーブルにデータが追加されます。
inのquery部分で実行しているSQLで抽出された結果を取得しています。
ちなみに他のサイトでここのembulkをバッチ化する際に月ごとのバッチ化する際にshllスクリプトで変数として実行時間の期間を設定してましたがmysqlの関数をここに書けば大丈夫ですね。

実行

こちらのコマンドにより実行できますが、データ量によって時間がかかります。
embulk run 実行ファイル.yml