さて、前回セットアップしたVerneMQを動かしてみましょう。MQTTクライアントはphpで実装します。ライブラリは、phpMQTTを使ってみます。
それでは、まず最初にvernemqサービスの起動をします。ファイルディスクリプタ上限数を推奨の65536に変えていないと、以下のように警告が出ます。個人的には親切な設計だと思います。
1 2 3 4 5 6 |
$ sudo service vernemq start Starting vernemq: !!!! !!!! WARNING: ulimit -n is 1024; 65536 is the recommended minimum. !!!! [ OK ] |
MQTTクライアントの作成
MQTTクライアントを実装していきます。phpMQTTのソースコードはgithubで公開されているので、ダウンロードしてお好きなディレクトリに配置してください。
publisher
まずは、publisherから。phpMQTT.phpをrequireし、phpMQTTクラスを生成、認証情報をセットしてBrokerへ接続、その後publishという流れになります。
接続する各クライアントは、user/(ユーザ名) というトピックを subscribeしているものとし、publisherは、ユーザ名のみを指定して、特定のsubscriberのみに任意のメッセージを送る仕様とします。
引数は2つで、1つ目はtopic、2つ目はメッセージです。ファイル名は、publisher.phpとします。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
<?php require("phpMQTT/phpMQTT.php"); $username = "pub"; $password = "password"; $target = isset($argv[1]) ? $argv[1] : null; $message = isset($argv[2]) ? $argv[2] : null; if ($target === null || $message === null){ die("invalid parameter\n\n"); } $mqtt = new phpMQTT("localhost", 1883, "ClientID".rand()); if (!$mqtt->connect(true, null, $username, $password)){ exit; } $mqtt->publish("user/{$target}", $message, 0); print "send: 'user/{$target}' -> '{$message}'\n\n"; $mqtt->close(); |
subscriber
次は、subscriberです。引数は2つで、 Brokerに接続するユーザ名とパスワードです。自分宛のメッセージを受信すると、トピックとメッセージを標準出力へ出す仕様とします。ファイル名は、subscriber.phpです。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
<?php require("phpMQTT/phpMQTT.php"); $username = isset($argv[1]) ? $argv[1] : null; $password = isset($argv[2]) ? $argv[2] : null; $mqtt = new phpMQTT("localhost", 1883, "ClientID".rand()); if (!$mqtt->connect(true, null, $username, $password)){ exit; } $topics = array(); $topics["user/{$username}"] = array("qos" => 1, "function" => "procmsg"); print "subscribe 'user/{$username}'\n"; $mqtt->subscribe($topics, 1); while ($mqtt->proc()){ // receive loop } $mqtt->close(); function procmsg($topic, $msg){ print "Msg Recieved: ".date("r")."\nTopic:{$topic}\n$msg\n"; } |
実行
まずはsubscriberを2つ起動します。
1 2 3 |
$ php subscriber.php sub1 password subscribe 'user/sub1' |
1 2 3 |
$ php subscriber.php sub2 password subscribe 'user/sub2' |
この状態で、publisherを起動します。sub1にメッセージを送ってみましょう。
1 2 3 |
$ php publisher.php sub1 "Hello MQTT" send: 'user/sub1' -> 'Hello MQTT' |
1 2 3 4 5 6 |
$ php subscriber.php sub1 password subscribe 'user/sub1' Msg Recieved: Mon, 17 Jul 2017 21:20:59 +0900 Topic:user/sub1 Hello MQTT |
一見上手くいっているようですが、/var/log/vernemq/console.log をみると
1 2 |
[warning] <0.2273.0>@vmq_ranch:teardown:127 session stopped abnormally due to '{error,unexpected_message,{error,cant_parse_variable_header}}' |
subscriberがエラーで切断されてしまいます。んー、なんでだろうと調べていたのですが、一点怪しいところが。phpMQTT.php の function subsucribe()
にて、MQTTのsubscribeコマンドを生成しているところで、
1 2 |
$cmd = 0x80; |
となっているのですが、これはMQTT3.1.1の仕様書をみると、subscribeの固定ヘッダ1バイト目の0~3ビット目は(0010)でなければならないようです。
さらに、このパラメータが正しくない場合は、切断をしなければならない、というように記述されています。VerneMQはこの仕様に沿った実装なのかと思います。
3.8 SUBSCRIBE – Subscribe to topics
3.8.1 Fixed header
Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection [MQTT-3.8.1-1].
ちなみに、MQTT3.1仕様書でもsubscribeの場合は0x82とするように書かれてはいますが、予期せぬパラメータが来た場合の挙動は特には記述されていません。
それでは、気を取り直して phpMQTT.php の該当部分を以下のように直します。
1 2 |
$cmd = 0x82; |
subscriberを立ち上げ直します。この状態でpublisherしても、エラーが出なくなりました。うまくいったようです。
いかがでしたでしょうか。次回は、クラスターを組んでみます。お楽しみに!