2017年8月23日
目次
embulkとは大量のログデータ等をbigQuery、redshift、elasticsearch等のデータウエアハウスに転送する際に使われるルーツです。
同じトレジャーデータのツールにfluentdがありますがこちらは逐次データ転送をするツールがあります。
対してこちらのembulkは大量のデータ転送を行います。
今回はこちらを使ってmysqlからbigqueryへデータを転送する方法を記します。
こういうツール系は書いた時と後でやってみると違うことが多いのでコマンドというよりも公式のリンクと主に必要な物を記載します。
主に必要なのはjavaとrubyでしょう。
下記のURLを参考に各環境に応じて実施します。
embulkの公式インストール
schema.json
実行ファイル
2週類のファイルが必要なファイルになります。
1つが読子きさきに作成するデータベースの設計jsonファイルです。
もう1つがembulkの実行内容のファイルです。
[
{
"name": "id",
"type": "INTEGER"
},
{
"name": "カラム名",
"type": "STRING"
},
]
ここのjsonファイルは下記のデーターロード先で作成されるテーブルとカラム構造となります。
データ抽出元とここを合わせる必要があります。
in:
type: mysql
port: 3306
user: データベース接続ユーザー
password: データベースパスワード
database: データベース名
host: データベースの接続ホスト
query: |
SELECT *
FROM テーブル名
WHERE date BETWEEN '2017-07-01 00:00:00' AND '2017-07-31 23:59:59'
out:
type: bigquery
mode: append
auto_create_table: true
table: 作成されるテーブル名
options: {connectTimeout: 720000} # 長目がいい
ignore_unknown_values: 1 # 1行目はヘッダーだから無視
auth_method: private_key
schema_file: テーブルの設計jsonファイル
service_account_email: GCPで作成してます
p12_keyfile: GCPのキーファファイル通常はp12
source_format: CSV
file_ext: .csv
delete_from_local_when_job_end: 1
project: 'GCPのプロジェクトID'
dataset: データセットここは自分の作る
formatter:
type: csv
header_line: false
timezone: Asia/Tokyo
上記ではmodeがappendになっています。
新規のテーブルの場合は新規にテーブルが参加されますがすでにテーブルがある場合はテーブルにデータが追加されます。
inのquery部分で実行しているSQLで抽出された結果を取得しています。
ちなみに他のサイトでここのembulkをバッチ化する際に月ごとのバッチ化する際にshllスクリプトで変数として実行時間の期間を設定してましたがmysqlの関数をここに書けば大丈夫ですね。
こちらのコマンドにより実行できますが、データ量によって時間がかかります。
embulk run 実行ファイル.yml