Microsoft Azure Event Hubsから外部テーブル(MySQL、PostgreSQLなど)へ簡単にデータ連携・接続する方法について


最近、Microsoft Azure Event Hubsから外部DB(データベース)(External Table)へデータ連携したいというシチュエーションがあった。

しかし、Event Hubsからは通常はStream Analyticsを介してAzureのStorage TableやBlob Tableなどに連携することが普通のようである。

ある程度調べた段階では、Event HubsからMySQLや、Stream AnalyticsからMySQLへも連携できず、かなり困っていた。

しかし、AzureのFunction Appというアプリを使って、最終的には簡単・簡易にできた。

簡単にできたけれども、そこに至る過程ではものすごく苦労して、別のアプローチをものすごく試行錯誤してしまった。

ということで、私と同じ苦悩を抱えている人は少なからずいると思うので、ここにその方法を記しておきます。

この方法はオンプレミスのMySQLでも、Azure上のMySQLでも、それは問わずに可能な方法です。

こういうことをMicrosoftは広く、分かりやすく知らしめれば敷居が下がり、更に広く発展すると思うけど、概してMicrosoftのページは分かりにくく、取っ掛かりにくいんですよね。

あ、ちなみにAzure上のMySQLに接続する場合は、ネットワークの構成から3306番ポートのACLを、接続可能なように構成してください。

Event Hubsを作成する

多分、このページを見ている人は、既にEvent Hubsは作っているのだと思う。

なので、こういう名前のEvent Hubs(gps-eh)を作ったという前提を伝えるだけの画像です。

 

Function APPで関数を作成する

次に、Function Appというものを作成します。

 

こんな感じで、適当に作っていきます。

次に、関数という部分(画像の左側の赤枠)を押して、新規という部分(画像の上の赤枠)を押し、新しい関数を作っていきます。

そして、Event Hub Trigger – Javascriptというテンプレートを選びます。

そして、関数名は適当に決めて、画像右下の方に見えにくいですが赤枠で囲っている部分に「新規」というリンクが表示されているのでそこをクリックすると、画像のように、既に存在するEvent Hubが選択できるようになっていますので選択します。

そして、作成ボタンを押下します。

すると、以下のようなコードがテンプレートとして記載されています。

MySQLライブラリのインストール

そしてここでこのFunction APPの概要の部分まで戻って、プラットフォーム機能という部分(画像の赤枠)を押します。

次に、高度なツール(画像の赤枠)を押します。

 

次に、Debug ConsoleのCMD(画像赤枠)を押します。

そして、表示されたコマンドプロンプトの画面で、画像のようにlsとcdを駆使して、作った関数名のフォルダの下に移動します。

そして次に、この画面で「npm install mysql」とコマンドを実行します。

これによって、mysql用のライブラリがインストールされ、関数内で利用できるようになります。

Warningがでるけど気にせずに続行します。

オンプレミスなどの外部MySQLDBへの接続

そして最後に、以下のようなコードを書きます。

これで、Azure Event Hubsから外部のMySQLなどにデータ連携することが可能になりました!

Insert,Delete,Select,UpdateをJavascriptで好きに駆使して、システムを作り上げてください!

// requireの設定
const mysql = require('mysql');


module.exports = function (context, eventHubMessages) {
    context.log(`JavaScript eventhub trigger function called for message array ${eventHubMessages}`);
 
    // MySQLとのコネクションの作成
    const connection = mysql.createConnection({
    host : '123.123.123.123', // 実際のホスト名やIPアドレスを記載
    user : 'user', // 実際のMySQLのログインユーザ名を記載
    password : 'password' , // 実際のMySQLのパスワードを記載
    database: 'dbname' , // 実際のMySQLのデータベース名を記載
    connectTimeout : 10000
    });
    
    // 接続
    connection.connect();

    eventHubMessages.forEach(message => {
        // ここ以降は実際のデータに基づいて変更してください。これはあくまでサンプルです。
        var num1 = Number(message['timestamp']);
        var num2 = parseInt(num1 / 1000,10) ;
        var str1 = String(num2);
        context.log(`Processed message ${message['payloads']['time']}`);
        connection.query(
            'INSERT INTO GPS (imei, EventEnqueuedUtcTime, time, latitude, longitude, sealevel, url) VALUES (?,FROM_UNIXTIME(?),?,?,?,?,?)' 
            , [message['imei'],str1,message['payloads']['time'],message['payloads']['latitude'],message['payloads']['longitude'],message['payloads']['sealevel'],message['payloads']['url']] 
            , function (err, rows, fields) {
                if (err) { context.log('err: ' + err); } 
                context.log('id: ' + rows[0].id);}
        );
    });

    
    // 接続終了
    connection.end();
    context.done();
};

以上です。

ここまで来るのにものすごく苦労しました。

それでは。

 


コメントを残す

メールアドレスが公開されることはありません。 * が付いている欄は必須項目です