diff --git a/fhem/FHEM/00_MQTT2_CLIENT.pm b/fhem/FHEM/00_MQTT2_CLIENT.pm index 8f6ec4ff3..0130ff72d 100644 --- a/fhem/FHEM/00_MQTT2_CLIENT.pm +++ b/fhem/FHEM/00_MQTT2_CLIENT.pm @@ -48,6 +48,7 @@ MQTT2_CLIENT_Initialize($) msgBeforeDisconnect mqttVersion:3.1.1,3.1 privacy:0,1 + qosMaxQueueLength rawEvents subscriptions SSL @@ -99,6 +100,7 @@ MQTT2_CLIENT_doinit($) if($hash->{connecting} == 1) { my $usr = AttrVal($name, "username", ""); my ($err, $pwd) = getKeyValue($name); + $pwd = undef if($usr eq ""); if($err) { Log 1, "ERROR: $err"; return; @@ -130,7 +132,7 @@ MQTT2_CLIENT_doinit($) InternalTimer(gettimeofday()+$keepalive,"MQTT2_CLIENT_keepalive",$hash,0); } - ############################## SUBSCRIBE + ############################## SUBSCRIBE & RESEND Non-Acked } elsif($hash->{connecting} == 2) { my $s = AttrVal($name, "subscriptions", "#"); if($s eq "setByTheProgram") { @@ -145,7 +147,24 @@ MQTT2_CLIENT_doinit($) MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg, 0, 1); $hash->{connecting} = 3; + + } elsif($hash->{connecting} == 3) { + my $onc = AttrVal($name, "msgAfterConnect", ""); + delete($hash->{connecting}); + MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1) + if($onc && $onc =~ m/^(-r\s)?([^\s]*)\s*(.*)$/); + + if($hash->{qosQueue} && @{$hash->{qosQueue}}) { + my $pa = $hash->{qosQueue}; + $hash->{qosQueue} = []; + for(my $i1=0; $i1<@{$pa}; $i1++) { + my $r = $pa->[$i1]; + MQTT2_CLIENT_doPublish($hash, $r->[0], $r->[1], $r->[2], 0); + } + } + } + return undef; } @@ -233,8 +252,20 @@ MQTT2_CLIENT_Attr(@) delete($hash->{authError}); MQTT2_CLIENT_Disco($hash); } + + if($attrName eq "qosMaxQueueLength") { + if($type eq "set" && $param[0] ne "0") { + return "qosMaxQueueLength must be an integer" if($param[0] !~ m/^\d+$/); + $hash->{qosMaxQueueLength} = $param[0]; + $hash->{qosQueue} = []; + } else { + delete $hash->{qosQueue}; + delete $hash->{qosMaxQueueLength}; + } + } + return undef; -} +} sub MQTT2_CLIENT_Set($@) @@ -345,14 +376,12 @@ MQTT2_CLIENT_Read($@) MQTT2_CLIENT_Disco($hash); return; } - } elsif($cpt eq "PUBACK") { # ignore it - } elsif($cpt eq "SUBACK") { - if($hash->{connecting}) { - delete($hash->{connecting}); - my $onc = AttrVal($name, "msgAfterConnect", ""); - MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1) - if($onc && $onc =~ m/^(-r\s)?([^\s]*)\s*(.*)$/); - } + + } elsif($cpt eq "PUBACK") { + shift(@{$hash->{qosQueue}}) if($hash->{qosQueue}); + + } elsif($cpt eq "SUBACK") { + MQTT2_CLIENT_doinit($hash) if($hash->{connecting}); } elsif($cpt eq "PINGRESP") { delete($hash->{waitingForPingRespSince}); @@ -402,10 +431,21 @@ MQTT2_CLIENT_doPublish($@) my $name = $hash->{NAME}; return if(IsDisabled($name)); $val = "" if(!defined($val)); - my $msg = pack("C", $retain ? 0x31:0x30). - MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)). + + my $hdr = 0x30; + my $pi = ""; + $hdr += 1 if($retain); + if(defined($hash->{qosQueue}) && + @{$hash->{qosQueue}} < $hash->{qosMaxQueueLength}) { + $hdr += 2; # QoS:1 + push(@{$hash->{qosQueue}}, [$topic,$val,$retain]); + $pi = "FM"; # Packet Identifier, if QoS > 0 + } + my $msg = pack("C", $hdr). + MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)+ + length($pi)). pack("n", length($topic)). - $topic.$val; + $topic.$pi.$val; MQTT2_CLIENT_send($hash, $msg, $immediate) } @@ -618,6 +658,14 @@ MQTT2_CLIENT_getStr($$) If the optional -r is specified, then the publish sets the retain flag. + +