最近、Microsoft Azure Event Hubsから外部DB(データベース)(External Table)へデータ連携したいというシチュエーションがあった。
しかし、Event Hubsからは通常はStream Analyticsを介してAzureのStorage TableやBlob Tableなどに連携することが普通のようである。
ある程度調べた段階では、Event HubsからMySQLや、Stream AnalyticsからMySQLへも連携できず、かなり困っていた。
しかし、AzureのFunction Appというアプリを使って、最終的には簡単・簡易にできた。
簡単にできたけれども、そこに至る過程ではものすごく苦労して、別のアプローチをものすごく試行錯誤してしまった。
ということで、私と同じ苦悩を抱えている人は少なからずいると思うので、ここにその方法を記しておきます。
この方法はオンプレミスのMySQLでも、Azure上のMySQLでも、それは問わずに可能な方法です。
こういうことをMicrosoftは広く、分かりやすく知らしめれば敷居が下がり、更に広く発展すると思うけど、概してMicrosoftのページは分かりにくく、取っ掛かりにくいんですよね。
あ、ちなみにAzure上のMySQLに接続する場合は、ネットワークの構成から3306番ポートのACLを、接続可能なように構成してください。
Event Hubsを作成する
多分、このページを見ている人は、既にEvent Hubsは作っているのだと思う。
なので、こういう名前のEvent Hubs(gps-eh)を作ったという前提を伝えるだけの画像です。
Function APPで関数を作成する
次に、Function Appというものを作成します。
こんな感じで、適当に作っていきます。
次に、関数という部分(画像の左側の赤枠)を押して、新規という部分(画像の上の赤枠)を押し、新しい関数を作っていきます。
そして、Event Hub Trigger – Javascriptというテンプレートを選びます。
そして、関数名は適当に決めて、画像右下の方に見えにくいですが赤枠で囲っている部分に「新規」というリンクが表示されているのでそこをクリックすると、画像のように、既に存在するEvent Hubが選択できるようになっていますので選択します。
そして、作成ボタンを押下します。
すると、以下のようなコードがテンプレートとして記載されています。
MySQLライブラリのインストール
そしてここでこのFunction APPの概要の部分まで戻って、プラットフォーム機能という部分(画像の赤枠)を押します。
次に、高度なツール(画像の赤枠)を押します。
次に、Debug ConsoleのCMD(画像赤枠)を押します。
そして、表示されたコマンドプロンプトの画面で、画像のようにlsとcdを駆使して、作った関数名のフォルダの下に移動します。
そして次に、この画面で「npm install mysql」とコマンドを実行します。
これによって、mysql用のライブラリがインストールされ、関数内で利用できるようになります。
Warningがでるけど気にせずに続行します。
オンプレミスなどの外部MySQLDBへの接続
そして最後に、以下のようなコードを書きます。
これで、Azure Event Hubsから外部のMySQLなどにデータ連携することが可能になりました!
Insert,Delete,Select,UpdateをJavascriptで好きに駆使して、システムを作り上げてください!
// requireの設定 const mysql = require('mysql'); module.exports = function (context, eventHubMessages) { context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`); // MySQLとのコネクションの作成 const connection = mysql.createConnection({ host : '123.123.123.123', // 実際のホスト名やIPアドレスを記載 user : 'user', // 実際のMySQLのログインユーザ名を記載 password : 'password' , // 実際のMySQLのパスワードを記載 database: 'dbname' , // 実際のMySQLのデータベース名を記載 connectTimeout : 10000 }); // 接続 connection.connect(); eventHubMessages.forEach(message => { // ここ以降は実際のデータに基づいて変更してください。これはあくまでサンプルです。 var num1 = Number(message['timestamp']); var num2 = parseInt(num1 / 1000,10) ; var str1 = String(num2); context.log(`Processed message ${message['payloads']['time']}`); connection.query( 'INSERT INTO GPS (imei, EventEnqueuedUtcTime, time, latitude, longitude, sealevel, url) VALUES (?,FROM_UNIXTIME(?),?,?,?,?,?)' , [message['imei'],str1,message['payloads']['time'],message['payloads']['latitude'],message['payloads']['longitude'],message['payloads']['sealevel'],message['payloads']['url']] , function (err, rows, fields) { if (err) { context.log('err: ' + err); } context.log('id: ' + rows[0].id);} ); }); // 接続終了 connection.end(); context.done(); };
以上です。
ここまで来るのにものすごく苦労しました。
それでは。