00_MQTT2_CLIENT.pm: more AWS/Worx extensions (Forum #111959)

git-svn-id: https://svn.fhem.de/fhem/trunk@27389 2b470e98-0d58-463d-a4d8-8e2adae1ed80
This commit is contained in:
rudolfkoenig
2023-04-03 16:41:04 +00:00
parent 17955d9964
commit b861937c29

View File

@@ -49,7 +49,7 @@ MQTT2_CLIENT_Initialize($)
lwt lwt
lwtRetain lwtRetain
keepaliveTimeout keepaliveTimeout
maxNrConnects maxFailedConnects
msgAfterConnect msgAfterConnect
msgBeforeDisconnect msgBeforeDisconnect
mqttVersion:3.1.1,3.1 mqttVersion:3.1.1,3.1
@@ -98,41 +98,41 @@ MQTT2_CLIENT_Define($$)
$hash->{connecting} = 1; $hash->{connecting} = 1;
$hash->{nrConnects} = 0; $hash->{nrConnects} = 0;
InternalTimer(1, sub($){ InternalTimer(1, \&MQTT2_CLIENT_connect, $hash, 0);
my $cfn = AttrVal($hash->{NAME}, "connectFn", undef);
if($cfn) {
my %specials= ("%NAME" => $hash->{NAME});
$cfn = EvalSpecials($cfn, %specials);
AnalyzeCommand(undef, $cfn); # $cfn calls connect
} else {
MQTT2_CLIENT_connect($hash);
}
}, $hash, 0);
return undef; return undef;
} }
sub sub
MQTT2_CLIENT_connect($) MQTT2_CLIENT_connect($;$)
{ {
my ($hash) = @_; my ($hash,$calledFromConnectFn) = @_;
my $me = $hash->{NAME}; my $me = $hash->{NAME};
return if($hash->{authError} || AttrVal($me, "disable", 0)); return if($hash->{authError} || AttrVal($me, "disable", 0));
my $mc = AttrVal($me, "maxNrConnects", -1); my $mc = AttrVal($me, "maxFailedConnects", -1);
if($mc ne -1 && $hash->{nrConnects} >= $mc) { $hash->{nrFailedConnects} = 0 if(!defined($hash->{nrFailedConnects}));
Log3 $me, 2, "maxNrConnects ($mc) reached, no more reconnect attemtps"; if($mc ne -1 && $hash->{nrFailedConnects} >= $mc) {
Log3 $me, 2, "maxFailedConnects ($mc) reached, no more reconnect attemtps";
delete($readyfnlist{"$me.".$hash->{DeviceName}}); # Source of retry delete($readyfnlist{"$me.".$hash->{DeviceName}}); # Source of retry
return; return;
} }
my $cfn = AttrVal($hash->{NAME}, "connectFn", undef); # for AWS-IOT / auth
if($cfn && !$calledFromConnectFn) {
$cfn = EvalSpecials($cfn, ("%NAME" => $hash->{NAME}));
return AnalyzeCommand(undef, $cfn);
}
my $disco = (DevIo_getState($hash) eq "disconnected"); my $disco = (DevIo_getState($hash) eq "disconnected");
$hash->{connecting} = 1 if($disco && !$hash->{connecting}); $hash->{connecting} = 1 if($disco && !$hash->{connecting});
$hash->{nextOpenDelay} = 5; $hash->{nextOpenDelay} = 10;
$hash->{BUF}=""; $hash->{BUF}="";
if($hash->{DeviceName} =~ m/^wss?:/) { if($hash->{DeviceName} =~ m/^wss?:/) {
$hash->{binary} = 1; $hash->{binary} = 1;
$hash->{header}{"Sec-WebSocket-Protocol"} = "mqtt"; $hash->{header}{"Sec-WebSocket-Protocol"} = "mqtt";
} }
$hash->{nrConnects}++; $hash->{nrConnects}++;
$hash->{nrFailedConnects}++; # delete on CONNACK
return DevIo_OpenDev($hash, $disco, "MQTT2_CLIENT_doinit", sub(){}) return DevIo_OpenDev($hash, $disco, "MQTT2_CLIENT_doinit", sub(){})
if($hash->{connecting}); if($hash->{connecting});
} }
@@ -147,8 +147,7 @@ MQTT2_CLIENT_doinit($)
if($hash->{connecting} == 1) { if($hash->{connecting} == 1) {
my $usr = AttrVal($name, "username", ""); my $usr = AttrVal($name, "username", "");
my ($err, $pwd) = getKeyValue($name); my ($err, $pwd) = getKeyValue($name);
$usr = $hash->{".usr"} if($hash->{".usr"}); # AWS/WORX $usr = $hash->{".usr"} if(defined($hash->{".usr"})); # AWS-IOT/WORX
$pwd = $hash->{".pwd"} if($hash->{".pwd"});
$pwd = undef if($usr eq ""); $pwd = undef if($usr eq "");
if($err) { if($err) {
Log 1, "ERROR: $err"; Log 1, "ERROR: $err";
@@ -199,10 +198,17 @@ MQTT2_CLIENT_doinit($)
} elsif($hash->{connecting} == 3) { } elsif($hash->{connecting} == 3) {
my $onc = AttrVal($name, "msgAfterConnect", "");
delete($hash->{connecting}); delete($hash->{connecting});
my $mac = AttrVal($name, "msgAfterConnect", "");
MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1) MQTT2_CLIENT_doPublish($hash, $2, $3, $1, 1)
if($onc && $onc =~ m/^(-r\s)?([^\s]*)\s*(.*)$/); if($mac && $mac =~ m/^(-r\s)?([^\s]*)\s*(.*)$/);
my $eac = AttrVal($name, "execAfterConnect", "");
if($eac) {
$eac = EvalSpecials($eac, ("%NAME" => $name));
return AnalyzeCommand(undef, $eac);
}
if($hash->{qosQueue} && @{$hash->{qosQueue}}) { if($hash->{qosQueue} && @{$hash->{qosQueue}}) {
my $pa = $hash->{qosQueue}; my $pa = $hash->{qosQueue};
@@ -448,6 +454,7 @@ MQTT2_CLIENT_Set($@)
MQTT2_CLIENT_Disco($hash) if($init_done); MQTT2_CLIENT_Disco($hash) if($init_done);
} elsif($a[0] eq "connect") { } elsif($a[0] eq "connect") {
$hash->{maxFailedConnects} = 0;
MQTT2_CLIENT_connect($hash) if(!$hash->{FD}); MQTT2_CLIENT_connect($hash) if(!$hash->{FD});
} elsif($a[0] eq "disconnect") { } elsif($a[0] eq "disconnect") {
@@ -511,6 +518,7 @@ MQTT2_CLIENT_Read($@)
$hash->{lastMsgTime} = gettimeofday(); $hash->{lastMsgTime} = gettimeofday();
# Lowlevel debugging # Lowlevel debugging
Log3 $name, 4, "$name received $cpt";
if(AttrVal($name, "verbose", 1) >= 5) { if(AttrVal($name, "verbose", 1) >= 5) {
my $pltxt = $pl; my $pltxt = $pl;
$pltxt =~ s/([^ -~])/"(".ord($1).")"/ge; $pltxt =~ s/([^ -~])/"(".ord($1).")"/ge;
@@ -523,6 +531,7 @@ MQTT2_CLIENT_Read($@)
my $rc = ord(substr($pl,1,1)); my $rc = ord(substr($pl,1,1));
if($rc == 0) { if($rc == 0) {
MQTT2_CLIENT_doinit($hash); MQTT2_CLIENT_doinit($hash);
delete($hash->{nrFailedConnects});
} else { } else {
my @txt = ("Accepted", "bad proto", "bad id", "server unavailable", my @txt = ("Accepted", "bad proto", "bad id", "server unavailable",
@@ -580,7 +589,7 @@ MQTT2_CLIENT_Read($@)
MQTT2_CLIENT_feedTheList($hash, $tp, $val, 1); MQTT2_CLIENT_feedTheList($hash, $tp, $val, 1);
} else { } else {
Log 1, "M2: Unhandled packet $cpt, disconneting $name"; Log 1, "MQTT2_CLIENT: Unhandled packet $cpt, disconneting $name";
MQTT2_CLIENT_Disco($hash); MQTT2_CLIENT_Disco($hash);
return; return;
} }
@@ -903,13 +912,13 @@ MQTT2_CLIENT_feedTheList($$$)
necessary for some MQTT servers in robotic vacuum cleaners. necessary for some MQTT servers in robotic vacuum cleaners.
</li></br> </li></br>
<a id="MQTT2_CLIENT-attr-maxNrConnects"></a> <a id="MQTT2_CLIENT-attr-maxFailedConnects"></a>
<li>maxNrConnects &lt;number&gt;<br> <li>maxFailedConnects &lt;number&gt;<br>
maximum number of established connections. Useful when experimenting with maximum number of failed connections. Useful when experimenting with a
public server, where repeatedly failing connection attempts lead to public server, where repeatedly failing connection attempts lead to
temporary suspension of the account. The counter is increased after the temporary suspension of the account. The counter is increased before the
TCP connection is established, before the MQTT handshake. Reset the TCP connection is established, and reset after the MQTT CONNACK message
counter with the modify or defmod command. or by the set connect command.
</li></br> </li></br>
<li><a href="#disable">disable</a><br> <li><a href="#disable">disable</a><br>
@@ -971,6 +980,12 @@ MQTT2_CLIENT_feedTheList($$$)
If the optional -r is specified, then the publish sets the retain flag. If the optional -r is specified, then the publish sets the retain flag.
</li></br> </li></br>
<a id="MQTT2_CLIENT-attr-execAfterConnect"></a>
<li>execAfterConnect FHEM-command<br>
FHEM-command is executed after connect.
</li></br>
<a id="MQTT2_CLIENT-attr-msgBeforeDisconnect"></a> <a id="MQTT2_CLIENT-attr-msgBeforeDisconnect"></a>
<li>msgBeforeDisconnect [-r] topic message<br> <li>msgBeforeDisconnect [-r] topic message<br>
publish the topic bofore each disconnect.<br> publish the topic bofore each disconnect.<br>