00_MQTT2_CLIENT.pm: add qosMaxQueueLength Attribute (Forum #91304)
git-svn-id: https://svn.fhem.de/fhem/trunk@22264 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
@@ -48,6 +48,7 @@ MQTT2_CLIENT_Initialize($)
|
|||||||
msgBeforeDisconnect
|
msgBeforeDisconnect
|
||||||
mqttVersion:3.1.1,3.1
|
mqttVersion:3.1.1,3.1
|
||||||
privacy:0,1
|
privacy:0,1
|
||||||
|
qosMaxQueueLength
|
||||||
rawEvents
|
rawEvents
|
||||||
subscriptions
|
subscriptions
|
||||||
SSL
|
SSL
|
||||||
@@ -99,6 +100,7 @@ MQTT2_CLIENT_doinit($)
|
|||||||
if($hash->{connecting} == 1) {
|
if($hash->{connecting} == 1) {
|
||||||
my $usr = AttrVal($name, "username", "");
|
my $usr = AttrVal($name, "username", "");
|
||||||
my ($err, $pwd) = getKeyValue($name);
|
my ($err, $pwd) = getKeyValue($name);
|
||||||
|
$pwd = undef if($usr eq "");
|
||||||
if($err) {
|
if($err) {
|
||||||
Log 1, "ERROR: $err";
|
Log 1, "ERROR: $err";
|
||||||
return;
|
return;
|
||||||
@@ -130,7 +132,7 @@ MQTT2_CLIENT_doinit($)
|
|||||||
InternalTimer(gettimeofday()+$keepalive,"MQTT2_CLIENT_keepalive",$hash,0);
|
InternalTimer(gettimeofday()+$keepalive,"MQTT2_CLIENT_keepalive",$hash,0);
|
||||||
}
|
}
|
||||||
|
|
||||||
############################## SUBSCRIBE
|
############################## SUBSCRIBE & RESEND Non-Acked
|
||||||
} elsif($hash->{connecting} == 2) {
|
} elsif($hash->{connecting} == 2) {
|
||||||
my $s = AttrVal($name, "subscriptions", "#");
|
my $s = AttrVal($name, "subscriptions", "#");
|
||||||
if($s eq "setByTheProgram") {
|
if($s eq "setByTheProgram") {
|
||||||
@@ -145,7 +147,24 @@ MQTT2_CLIENT_doinit($)
|
|||||||
MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg, 0, 1);
|
MQTT2_CLIENT_calcRemainingLength(length($msg)).$msg, 0, 1);
|
||||||
$hash->{connecting} = 3;
|
$hash->{connecting} = 3;
|
||||||
|
|
||||||
|
|
||||||
|
} elsif($hash->{connecting} == 3) {
|
||||||
|
my $onc = AttrVal($name, "msgAfterConnect", "");
|
||||||
|
delete($hash->{connecting});
|
||||||
|
MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1)
|
||||||
|
if($onc && $onc =~ m/^(-r\s)?([^\s]*)\s*(.*)$/);
|
||||||
|
|
||||||
|
if($hash->{qosQueue} && @{$hash->{qosQueue}}) {
|
||||||
|
my $pa = $hash->{qosQueue};
|
||||||
|
$hash->{qosQueue} = [];
|
||||||
|
for(my $i1=0; $i1<@{$pa}; $i1++) {
|
||||||
|
my $r = $pa->[$i1];
|
||||||
|
MQTT2_CLIENT_doPublish($hash, $r->[0], $r->[1], $r->[2], 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,6 +252,18 @@ MQTT2_CLIENT_Attr(@)
|
|||||||
delete($hash->{authError});
|
delete($hash->{authError});
|
||||||
MQTT2_CLIENT_Disco($hash);
|
MQTT2_CLIENT_Disco($hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if($attrName eq "qosMaxQueueLength") {
|
||||||
|
if($type eq "set" && $param[0] ne "0") {
|
||||||
|
return "qosMaxQueueLength must be an integer" if($param[0] !~ m/^\d+$/);
|
||||||
|
$hash->{qosMaxQueueLength} = $param[0];
|
||||||
|
$hash->{qosQueue} = [];
|
||||||
|
} else {
|
||||||
|
delete $hash->{qosQueue};
|
||||||
|
delete $hash->{qosMaxQueueLength};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return undef;
|
return undef;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -345,14 +376,12 @@ MQTT2_CLIENT_Read($@)
|
|||||||
MQTT2_CLIENT_Disco($hash);
|
MQTT2_CLIENT_Disco($hash);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} elsif($cpt eq "PUBACK") { # ignore it
|
|
||||||
} elsif($cpt eq "SUBACK") {
|
} elsif($cpt eq "PUBACK") {
|
||||||
if($hash->{connecting}) {
|
shift(@{$hash->{qosQueue}}) if($hash->{qosQueue});
|
||||||
delete($hash->{connecting});
|
|
||||||
my $onc = AttrVal($name, "msgAfterConnect", "");
|
} elsif($cpt eq "SUBACK") {
|
||||||
MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1)
|
MQTT2_CLIENT_doinit($hash) if($hash->{connecting});
|
||||||
if($onc && $onc =~ m/^(-r\s)?([^\s]*)\s*(.*)$/);
|
|
||||||
}
|
|
||||||
|
|
||||||
} elsif($cpt eq "PINGRESP") {
|
} elsif($cpt eq "PINGRESP") {
|
||||||
delete($hash->{waitingForPingRespSince});
|
delete($hash->{waitingForPingRespSince});
|
||||||
@@ -402,10 +431,21 @@ MQTT2_CLIENT_doPublish($@)
|
|||||||
my $name = $hash->{NAME};
|
my $name = $hash->{NAME};
|
||||||
return if(IsDisabled($name));
|
return if(IsDisabled($name));
|
||||||
$val = "" if(!defined($val));
|
$val = "" if(!defined($val));
|
||||||
my $msg = pack("C", $retain ? 0x31:0x30).
|
|
||||||
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)).
|
my $hdr = 0x30;
|
||||||
|
my $pi = "";
|
||||||
|
$hdr += 1 if($retain);
|
||||||
|
if(defined($hash->{qosQueue}) &&
|
||||||
|
@{$hash->{qosQueue}} < $hash->{qosMaxQueueLength}) {
|
||||||
|
$hdr += 2; # QoS:1
|
||||||
|
push(@{$hash->{qosQueue}}, [$topic,$val,$retain]);
|
||||||
|
$pi = "FM"; # Packet Identifier, if QoS > 0
|
||||||
|
}
|
||||||
|
my $msg = pack("C", $hdr).
|
||||||
|
MQTT2_CLIENT_calcRemainingLength(2+length($topic)+length($val)+
|
||||||
|
length($pi)).
|
||||||
pack("n", length($topic)).
|
pack("n", length($topic)).
|
||||||
$topic.$val;
|
$topic.$pi.$val;
|
||||||
MQTT2_CLIENT_send($hash, $msg, $immediate)
|
MQTT2_CLIENT_send($hash, $msg, $immediate)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -618,6 +658,14 @@ MQTT2_CLIENT_getStr($$)
|
|||||||
If the optional -r is specified, then the publish sets the retain flag.
|
If the optional -r is specified, then the publish sets the retain flag.
|
||||||
</li></br>
|
</li></br>
|
||||||
|
|
||||||
|
<a name="MQTT_CLIENTqosMaxQueueLength"></a>
|
||||||
|
<li>qosMaxQueueLength <number><br>
|
||||||
|
if set to a nonzero value, messages are published with QoS=1, and are
|
||||||
|
kept in a memory-only buffer until acknowledged by the server.
|
||||||
|
If there is no connection to the server, up to <number> messages
|
||||||
|
are queued, and resent when the connection is esablished.
|
||||||
|
</li></br>
|
||||||
|
|
||||||
<a name="MQTT_CLIENTrawEvents"></a>
|
<a name="MQTT_CLIENTrawEvents"></a>
|
||||||
<li>rawEvents <topic-regexp><br>
|
<li>rawEvents <topic-regexp><br>
|
||||||
send all messages as events attributed to this MQTT2_CLIENT instance.
|
send all messages as events attributed to this MQTT2_CLIENT instance.
|
||||||
|
|||||||
Reference in New Issue
Block a user