;============================================================ ; iron_mqtt.hsp — MQTT 3.1.1 クライアント (.NET版) ; ; System.Net.Sockets.TcpClient で MQTT プロトコルを直接実装。 ; CONNECT/CONNACK/PUBLISH/SUBSCRIBE/SUBACK/PINGREQ パケットを ; C# で手動構築。hsp3net 専用。 ; ; API: ; mqtt_connect "broker", port, "client_id" 接続 → stat (0=成功) ; mqtt_set_auth "user", "pass" 認証情報設定 ; mqtt_publish "topic", "payload", qos パブリッシュ ; mqtt_subscribe "topic", qos サブスクライブ ; mqtt_poll topic, payload, timeout_ms 受信 → stat (1=受信あり, 0=なし) ; mqtt_disconnect 切断 ; ; 例: ; #include "iron_mqtt.hsp" ; mqtt_connect "test.mosquitto.org", 1883, "hsp_client" ; if stat == 0 { ; mqtt_subscribe "test/topic", 0 ; mqtt_publish "test/topic", "Hello MQTT", 0 ; mqtt_poll topic, payload, 5000 ; if stat == 1 : mes topic + " : " + payload ; mqtt_disconnect ; } ;============================================================ #ifndef __iron_mqtt_hsp__ #define __iron_mqtt_hsp__ #module iron_mqtt dim _mqtt_cs_loaded, 1 #deffunc _mqtt_load_cs if _mqtt_cs_loaded : return sdim cs, 16384 cs = "using System;using System.IO;using System.Net.Sockets;using System.Text;" cs += "using System.Collections.Generic;using System.Threading;" cs += "public class HspMqtt {" cs += " static TcpClient _tcp; static NetworkStream _ns;" cs += " static string _user=\"\",_pass=\"\";" cs += " static ushort _pid=1;" cs += " static Queue _queue = new Queue();" ; EncodeRemaining cs += " static byte[] EncRL(int len) {" cs += " var ms=new MemoryStream();" cs += " do { byte b=(byte)(len%128); len/=128;" cs += " if(len>0) b|=0x80; ms.WriteByte(b); } while(len>0);" cs += " return ms.ToArray(); }" ; EncodeString cs += " static byte[] EncStr(string s) {" cs += " var b=Encoding.UTF8.GetBytes(s);" cs += " var ms=new MemoryStream();" cs += " ms.WriteByte((byte)(b.Length>>8)); ms.WriteByte((byte)(b.Length&0xFF));" cs += " ms.Write(b,0,b.Length); return ms.ToArray(); }" ; Connect cs += " public static string Connect(string host, int port, string clientId) {" cs += " try { _tcp=new TcpClient(); _tcp.Connect(host,port);" cs += " _ns=_tcp.GetStream();" cs += " var pay=new MemoryStream();" cs += " pay.Write(EncStr(\"MQTT\"),0,6);" ; protocol name cs += " pay.WriteByte(4);" ; protocol level 3.1.1 cs += " byte flags=0x02;" ; clean session cs += " if(_user!=\"\") { flags|=0x80; if(_pass!=\"\") flags|=0x40; }" cs += " pay.WriteByte(flags);" cs += " pay.WriteByte(0); pay.WriteByte(60);" ; keepalive 60s cs += " pay.Write(EncStr(clientId),0,EncStr(clientId).Length);" cs += " if(_user!=\"\") { pay.Write(EncStr(_user),0,EncStr(_user).Length);" cs += " if(_pass!=\"\") pay.Write(EncStr(_pass),0,EncStr(_pass).Length); }" cs += " var pb=pay.ToArray();" cs += " var pkt=new MemoryStream();" cs += " pkt.WriteByte(0x10);" ; CONNECT packet type cs += " pkt.Write(EncRL(pb.Length),0,EncRL(pb.Length).Length);" cs += " pkt.Write(pb,0,pb.Length);" cs += " var d=pkt.ToArray(); _ns.Write(d,0,d.Length);" ; Read CONNACK cs += " var hdr=new byte[4]; _ns.Read(hdr,0,4);" cs += " if(hdr[0]!=0x20) return \"-1|CONNACK not received\";" cs += " if(hdr[3]!=0) return \"-1|CONNACK rc=\"+hdr[3];" cs += " return \"0|\"; }" cs += " catch(Exception e) { return \"-1|\"+e.Message; } }" ; SetAuth cs += " public static string SetAuth(string user, string pass) {" cs += " _user=user; _pass=pass; return \"0|\"; }" ; Publish cs += " public static string Publish(string topic, string payload, int qos) {" cs += " try { var ms=new MemoryStream();" cs += " ms.Write(EncStr(topic),0,EncStr(topic).Length);" cs += " if(qos>0) { ms.WriteByte((byte)(_pid>>8)); ms.WriteByte((byte)(_pid&0xFF)); _pid++; }" cs += " var pb=Encoding.UTF8.GetBytes(payload); ms.Write(pb,0,pb.Length);" cs += " var body=ms.ToArray(); var pkt=new MemoryStream();" cs += " pkt.WriteByte((byte)(0x30|(qos<<1)));" cs += " pkt.Write(EncRL(body.Length),0,EncRL(body.Length).Length);" cs += " pkt.Write(body,0,body.Length);" cs += " var d=pkt.ToArray(); _ns.Write(d,0,d.Length);" cs += " return \"0|\"; }" cs += " catch(Exception e) { return \"-1|\"+e.Message; } }" ; Subscribe cs += " public static string Subscribe(string topic, int qos) {" cs += " try { var ms=new MemoryStream();" cs += " ms.WriteByte((byte)(_pid>>8)); ms.WriteByte((byte)(_pid&0xFF)); _pid++;" cs += " ms.Write(EncStr(topic),0,EncStr(topic).Length);" cs += " ms.WriteByte((byte)qos);" cs += " var body=ms.ToArray(); var pkt=new MemoryStream();" cs += " pkt.WriteByte(0x82);" ; SUBSCRIBE cs += " pkt.Write(EncRL(body.Length),0,EncRL(body.Length).Length);" cs += " pkt.Write(body,0,body.Length);" cs += " var d=pkt.ToArray(); _ns.Write(d,0,d.Length);" ; Read SUBACK cs += " var hdr=new byte[5]; _ns.Read(hdr,0,5);" cs += " return \"0|\"; }" cs += " catch(Exception e) { return \"-1|\"+e.Message; } }" ; Poll - try to read a PUBLISH packet cs += " public static string Poll(int timeout_ms) {" cs += " try { if(_tcp==null||!_tcp.Connected) return \"0||\"; " cs += " _ns.ReadTimeout=timeout_ms;" cs += " byte[] hdr=new byte[1];" cs += " int n;" cs += " try { n=_ns.Read(hdr,0,1); } catch(IOException) { return \"0||\"; }" cs += " if(n==0) return \"0||\";" cs += " int type=hdr[0]>>4;" ; Read remaining length cs += " int rem=0,shift=0; byte bx;" cs += " do { var tb=new byte[1]; _ns.Read(tb,0,1); bx=tb[0];" cs += " rem|=(bx&0x7F)<