さて、前回セットアップしたVerneMQを動かしてみましょう。MQTTクライアントはphpで実装します。ライブラリは、phpMQTTを使ってみます。
それでは、まず最初にvernemqサービスの起動をします。ファイルディスクリプタ上限数を推奨の65536に変えていないと、以下のように警告が出ます。個人的には親切な設計だと思います。
| 1 2 3 4 5 6 | $ sudo service vernemq start Starting vernemq: !!!! !!!! WARNING: ulimit -n is 1024; 65536 is the recommended minimum. !!!!                                                            [  OK  ] | 
MQTTクライアントの作成
MQTTクライアントを実装していきます。phpMQTTのソースコードはgithubで公開されているので、ダウンロードしてお好きなディレクトリに配置してください。
publisher
まずは、publisherから。phpMQTT.phpをrequireし、phpMQTTクラスを生成、認証情報をセットしてBrokerへ接続、その後publishという流れになります。
接続する各クライアントは、user/(ユーザ名) というトピックを subscribeしているものとし、publisherは、ユーザ名のみを指定して、特定のsubscriberのみに任意のメッセージを送る仕様とします。
引数は2つで、1つ目はtopic、2つ目はメッセージです。ファイル名は、publisher.phpとします。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | <?php   require("phpMQTT/phpMQTT.php");   $username = "pub";   $password = "password";   $target = isset($argv[1]) ? $argv[1] : null;   $message = isset($argv[2]) ? $argv[2] : null;   if ($target === null || $message === null){     die("invalid parameter\n\n");   }   $mqtt = new phpMQTT("localhost", 1883, "ClientID".rand());   if (!$mqtt->connect(true, null, $username, $password)){     exit;   }   $mqtt->publish("user/{$target}", $message, 0);     print "send: 'user/{$target}' -> '{$message}'\n\n";   $mqtt->close(); | 
subscriber
次は、subscriberです。引数は2つで、 Brokerに接続するユーザ名とパスワードです。自分宛のメッセージを受信すると、トピックとメッセージを標準出力へ出す仕様とします。ファイル名は、subscriber.phpです。
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | <?php   require("phpMQTT/phpMQTT.php");   $username = isset($argv[1]) ? $argv[1] : null;   $password = isset($argv[2]) ? $argv[2] : null;   $mqtt = new phpMQTT("localhost", 1883, "ClientID".rand());   if (!$mqtt->connect(true, null, $username, $password)){     exit;   }   $topics = array();   $topics["user/{$username}"] = array("qos" => 1, "function" => "procmsg");   print "subscribe 'user/{$username}'\n";   $mqtt->subscribe($topics, 1);   while ($mqtt->proc()){       // receive loop   }   $mqtt->close();   function procmsg($topic, $msg){     print "Msg Recieved: ".date("r")."\nTopic:{$topic}\n$msg\n";   } | 
実行
まずはsubscriberを2つ起動します。
| 1 2 3 | $ php subscriber.php sub1 password subscribe 'user/sub1' | 
| 1 2 3 | $ php subscriber.php sub2 password subscribe 'user/sub2' | 
この状態で、publisherを起動します。sub1にメッセージを送ってみましょう。
| 1 2 3 | $ php publisher.php sub1 "Hello MQTT" send: 'user/sub1' -> 'Hello MQTT' | 
| 1 2 3 4 5 6 | $ php subscriber.php sub1 password subscribe 'user/sub1' Msg Recieved: Mon, 17 Jul 2017 21:20:59 +0900 Topic:user/sub1 Hello MQTT | 
一見上手くいっているようですが、/var/log/vernemq/console.log をみると
| 1 2 | [warning] <0.2273.0>@vmq_ranch:teardown:127 session stopped abnormally due to '{error,unexpected_message,{error,cant_parse_variable_header}}' | 
subscriberがエラーで切断されてしまいます。んー、なんでだろうと調べていたのですが、一点怪しいところが。phpMQTT.php の function subsucribe() にて、MQTTのsubscribeコマンドを生成しているところで、
| 1 2 | $cmd = 0x80; | 
となっているのですが、これはMQTT3.1.1の仕様書をみると、subscribeの固定ヘッダ1バイト目の0~3ビット目は(0010)でなければならないようです。
さらに、このパラメータが正しくない場合は、切断をしなければならない、というように記述されています。VerneMQはこの仕様に沿った実装なのかと思います。
3.8 SUBSCRIBE – Subscribe to topics
3.8.1 Fixed header
Bits 3,2,1 and 0 of the fixed header of the SUBSCRIBE Control Packet are reserved and MUST be set to 0,0,1 and 0 respectively. The Server MUST treat any other value as malformed and close the Network Connection [MQTT-3.8.1-1].
ちなみに、MQTT3.1仕様書でもsubscribeの場合は0x82とするように書かれてはいますが、予期せぬパラメータが来た場合の挙動は特には記述されていません。
それでは、気を取り直して phpMQTT.php の該当部分を以下のように直します。
| 1 2 | $cmd = 0x82; | 
subscriberを立ち上げ直します。この状態でpublisherしても、エラーが出なくなりました。うまくいったようです。
いかがでしたでしょうか。次回は、クラスターを組んでみます。お楽しみに!
 
      




