diff --git a/fhem/contrib/PRESENCE/collectord b/fhem/contrib/PRESENCE/collectord index 9db1ff348..6192d5ba4 100755 --- a/fhem/contrib/PRESENCE/collectord +++ b/fhem/contrib/PRESENCE/collectord @@ -70,6 +70,22 @@ my %socket_to_handle; my $uuid; +$SIG{__DIE__} = sub { + my ($msg) = @_; + + Log 1, "PERL ERROR: $msg"; + +}; + + +$SIG{__WARN__} = sub { + my ($msg) = @_; + + Log 1, "PERL WARN: $msg"; + +}; + + Getopt::Long::Configure('bundling'); GetOptions( "d" => \$opt_d, "daemon" => \$opt_d, @@ -107,15 +123,15 @@ sub print_usage () { if($opt_h) { - print_usage(); - exit; + print_usage(); + exit; } if(-e "$opt_P") { - print STDERR timestamp()." another process already running (PID file found at $opt_P)\n"; - print STDERR timestamp()." aborted...\n"; - exit 1; + print STDERR timestamp()." another process already running (PID file found at $opt_P)\n"; + print STDERR timestamp()." aborted...\n"; + exit 1; } if(not $opt_c) @@ -141,7 +157,7 @@ readConfig($opt_c); if($opt_d) { - daemonize(); + daemonize(); } # Write PID file @@ -204,27 +220,27 @@ Log 2, "finished initialization. entering main loop"; while(1) { - # Cleaning up the status hash for obsolete devices - foreach $uuid (keys %state) - { - my %handle_to_socket = reverse %socket_to_handle; - unless(exists($handle_to_socket{$uuid})) - { - Log 2, "cleaning up status values (UUID: $uuid)"; - delete $state{$uuid}; - } - - } - - # process all status messages from all threads via status queue - while($status_queue->pending) - { - ($uuid,$room,$value,$name) = split(";", $status_queue->dequeue); - - Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)"; - - if(not $value =~ /^(absence|present)$/) - { + # Cleaning up the status hash for obsolete devices + foreach $uuid (keys %state) + { + my %handle_to_socket = reverse %socket_to_handle; + unless(exists($handle_to_socket{$uuid})) + { + Log 2, "cleaning up status values (UUID: $uuid)"; + delete $state{$uuid}; + } + + } + + # process all status messages from all threads via status queue + while($status_queue->pending) + { + ($uuid,$room,$value,$name) = split(";", $status_queue->dequeue); + + Log 2, "processing state message for device ".(defined($name)?$name." ":"")."in room $room (UUID: $uuid)"; + + if(not $value =~ /^(absence|present)$/) + { $handle{$uuid}{client}->send("$value;$room\n") if(defined($handle{$uuid}{client})); if($value eq "socket_closed") @@ -250,105 +266,105 @@ while(1) } } } - - } - - #print Dumper(%state); - } - - - # If a thread has something reported via Log Queue, print it out if verbose is activated - while($log_queue->pending) - { - $logline = $log_queue->dequeue; - Log 2, $logline; - $logline = undef; - } - - - - # If a INET socket has anything to report - if(@new_handles = $listener->can_read(1)) - { + + } + + #print Dumper(\%state); + } + + + # If a thread has something reported via Log Queue, print it out if verbose is activated + while($log_queue->pending) + { + $logline = $log_queue->dequeue; + Log 2, $logline; + $logline = undef; + } + + + + # If a INET socket has anything to report + if(@new_handles = $listener->can_read(1)) + { - foreach my $client (@new_handles) - { - # if the socket is the server socket, accept new client and add it to the socket selector - if($client == $server) - { - $new_client = $server->accept(); - - $listener->add($new_client); - Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport(); - } - else # else is must be a client, so read the message and process it - { - $buf = ''; - $buf = <$client>; - - # if the message is defined, it is a real message, else the connection is closed (EOF) - if($buf) - { - # replace leading and trailing white spaces + foreach my $client (@new_handles) + { + # if the socket is the server socket, accept new client and add it to the socket selector + if($client == $server) + { + $new_client = $server->accept(); + + $listener->add($new_client); + Log 1, "new connection from ".$new_client->peerhost().":".$new_client->peerport(); + } + else # else is must be a client, so read the message and process it + { + $buf = ''; + $buf = <$client>; + + # if the message is defined, it is a real message, else the connection is closed (EOF) + if($buf) + { + # replace leading and trailing white spaces $buf =~ s/(^\s*|\s*$)//g; - - # if the message is a new command, accept the command and create threads for all rooms to process the command - if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/) - { - # send the acknowledgment back to the sender - $client->send("command accepted\n"); - Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf"; - - # Split the message into bluetooth address and the timeout value - # (timeout is ignored within the collectord, as it is given by configuration) - ($address, $timeout) = split("\\|", $buf); + + # if the message is a new command, accept the command and create threads for all rooms to process the command + if($buf =~ /^\s*([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}\s*\|\s*\d+\s*$/) + { + # send the acknowledgment back to the sender + $client->send("command accepted\n"); + Log 2, "received new command from ".$client->peerhost().":".$client->peerport()." - $buf"; + + # Split the message into bluetooth address and the timeout value + # (timeout is ignored within the collectord, as it is given by configuration) + ($address, $timeout) = split("\\|", $buf); - # remove any containing white spaces - $address =~ s/\s*//g; - $timeout =~ s/\s*//g; + # remove any containing white spaces + $address =~ s/\s*//g; + $timeout =~ s/\s*//g; - # if the client has already a request running, stop at first the old request - if(defined($socket_to_handle{$client})) - { - $uuid = $socket_to_handle{$client}; - # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) - { - Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room"; - $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address"); - $state{$uuid}{rooms}{$room} = "" - } - - $handle{$uuid}{timeout} = $timeout; - $state{$uuid}{lastresult}{timestamp} = 0; - } - else - { - # create a new uuid if not exist for socket - if(not defined($socket_to_handle{$client})) - { + # if the client has already a request running, stop at first the old request + if(defined($socket_to_handle{$client})) + { + $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal + my $temp = $handle{$uuid}{threads}; + foreach $room (keys %$temp) + { + Log 2, "sending thread ".$handle{$uuid}{threads}{$room}->tid()." new address $address for room $room"; + $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("new|$address"); + $state{$uuid}{rooms}{$room} = "" + } + + $handle{$uuid}{timeout} = $timeout; + $state{$uuid}{lastresult}{timestamp} = 0; + } + else + { + # create a new uuid if not exist for socket + if(not defined($socket_to_handle{$client})) + { $socket_to_handle{$client} = generateUUID(); Log 2, "generating new UUID for client ".$client->peerhost()." - ".$socket_to_handle{$client}; - } - - $uuid = $socket_to_handle{$client}; - - $handle{$uuid}{address} = $address; - $handle{$uuid}{client} = $client; - $handle{$uuid}{timeout} = $timeout; - - $state{$uuid}{lastresult}{value} = "absence"; - $state{$uuid}{lastresult}{timestamp} = 0; - - # create a new reqester thread for each configured room to perform the query - while (($room, $value) = each %config) - { - + } + + $uuid = $socket_to_handle{$client}; + + $handle{$uuid}{address} = $address; + $handle{$uuid}{client} = $client; + $handle{$uuid}{timeout} = $timeout; + + $state{$uuid}{lastresult}{value} = "absence"; + $state{$uuid}{lastresult}{timestamp} = 0; + + # create a new reqester thread for each configured room to perform the query + while (($room, $value) = each %config) + { + $thread_counter++; $queues{$thread_counter} = Thread::Queue->new(); my $new_thread = threads->new(\&doQuery, ($value, $room, $address, $uuid)); - Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)"; + Log 1, "created thread ".$new_thread->tid()." for processing device $address in room $room for peer ".$client->peerhost()." (UUID: $uuid)"; # detach from the thread, so the thread starts processing independantly $new_thread->detach(); @@ -356,115 +372,115 @@ while(1) # save the socket/room relationship to know which thread belongs to which client request (for stop command) $handle{$uuid}{threads}{$room} = $new_thread; $state{$uuid}{rooms}{$room} = ""; - } - } - } - elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server - { - Log 2, "received now command from client ".$client->peerhost(); - - # just to be sure if the client has really a running request - if(defined($socket_to_handle{$client})) - { - $uuid = $socket_to_handle{$client}; - # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) - { - Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost(); - $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now"); - $state{$uuid}{rooms}{$room} = ""; - } - - $state{$uuid}{lastresult}{timestamp} = 0; - $client->send("command accepted\n"); - - } - else - { - # if there is no command running, just tell the client he's wrong - $client->send("no command running\n"); - } - } - elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped - { - Log 1, "received stop command from client ".$client->peerhost(); - - # just to be sure if the client has really a running request - if(defined($socket_to_handle{$client})) - { - $uuid = $socket_to_handle{$client}; - # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) - { - Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); - $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); - delete($handle{$uuid}{threads}{$room}); - } - - # when all threads are signaled, delete all relationship entry for this client - delete($handle{$uuid}); - delete($socket_to_handle{$client}); - - $client->send("command accepted\n"); - - } - else - { - # if there is no command running, just tell the client he's wrong - $client->send("no command running\n"); - } - } + } + } + } + elsif(lc($buf) =~ /^\s*now\s*$/) # if a now command is received, all threads need to be signaled to send a now command to the presenced server + { + Log 2, "received now command from client ".$client->peerhost(); + + # just to be sure if the client has really a running request + if(defined($socket_to_handle{$client})) + { + $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal + my $temp = $handle{$uuid}{threads}; + foreach $room (keys %$temp) + { + Log 2, "signalling thread ".$handle{$uuid}{threads}{$room}->tid()." to send \"now\"-request for room $room for client ".$client->peerhost(); + $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("now"); + $state{$uuid}{rooms}{$room} = "" if(exists($state{$uuid}{rooms}{$room})); + } + + delete($state{$uuid}{lastresult}) if(exists($state{$uuid}{lastresult})); + $client->send("command accepted\n"); + + } + else + { + # if there is no command running, just tell the client he's wrong + $client->send("no command running\n"); + } + } + elsif(lc($buf) =~ /^\s*stop\s*$/) # if a stop command is received, the running request threads must be stopped + { + Log 1, "received stop command from client ".$client->peerhost(); + + # just to be sure if the client has really a running request + if(defined($socket_to_handle{$client})) + { + $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal + my $temp = $handle{$uuid}{threads}; + foreach $room (keys %$temp) + { + Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); + $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); + delete($handle{$uuid}{threads}{$room}); + } + + # when all threads are signaled, delete all relationship entry for this client + delete($handle{$uuid}); + delete($socket_to_handle{$client}); + + $client->send("command accepted\n"); + + } + else + { + # if there is no command running, just tell the client he's wrong + $client->send("no command running\n"); + } + } - else - { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging. - $client->send("command rejected\n"); - Log 1, "received invalid command >>$buf<< from client ".$client->peerhost(); - } + else + { # if the message does not match a regular command or a stop signal, just tell the client and make a entry for logging. + $client->send("command rejected\n"); + Log 1, "received invalid command >>$buf<< from client ".$client->peerhost(); + } - } - else # if the message is not defined (EOF) the connection was closed. Now let's clean up - { - # make a log entry and remove the socket from the socket selector - Log 1, "closed connection from ".$client->peerhost(); - $listener->remove($client); - - - # if there is a running command, stop it first and clean up (same as stop command, see above) - if(defined($socket_to_handle{$client})) - { - $uuid = $socket_to_handle{$client}; - # get all threads for this socket and send them a termination signal - my $temp = $handle{$uuid}{threads}; - foreach $room (keys %$temp) - { - Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); - $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); - delete($handle{$uuid}{threads}{$room}); - } - - # when all threads are signaled, delete all relationship entry for this client - delete($handle{$uuid}); - delete($socket_to_handle{$client}); - } - - # now close the socket, that's it - close $client; - } - } - } - } + } + else # if the message is not defined (EOF) the connection was closed. Now let's clean up + { + # make a log entry and remove the socket from the socket selector + Log 1, "closed connection from ".$client->peerhost(); + $listener->remove($client); + + + # if there is a running command, stop it first and clean up (same as stop command, see above) + if(defined($socket_to_handle{$client})) + { + $uuid = $socket_to_handle{$client}; + # get all threads for this socket and send them a termination signal + my $temp = $handle{$uuid}{threads}; + foreach $room (keys %$temp) + { + Log 2, "killing thread ".$handle{$uuid}{threads}{$room}->tid()." for room $room for client ".$client->peerhost(); + $queues{$handle{$uuid}{threads}{$room}->tid()}->enqueue("stop"); + delete($handle{$uuid}{threads}{$room}); + } + + # when all threads are signaled, delete all relationship entry for this client + delete($handle{$uuid}); + delete($socket_to_handle{$client}); + } + + # now close the socket, that's it + close $client; + } + } + } + } - # in case we have received a process signal, remove the pid file and shutdown - if(defined($sig_received)) - { - Log 1, "Caught $sig_received exiting"; - unlink($opt_P); - Log 1, "removed PID-File $opt_P"; - Log 1, "server shutdown"; - exit; - } + # in case we have received a process signal, remove the pid file and shutdown + if(defined($sig_received)) + { + Log 1, "Caught $sig_received exiting"; + unlink($opt_P); + Log 1, "removed PID-File $opt_P"; + Log 1, "server shutdown"; + exit; + } } Log 2, "leaving main loop"; @@ -490,7 +506,7 @@ sub daemonize } elsif($pid) { - Log 0, "forked with PID $pid"; + Log 0, "forked with PID $pid"; exit 0; } @@ -532,7 +548,7 @@ sub doQuery($$$) Type => SOCK_STREAM, KeepAlive => 1, Blocking => 1 - ) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -")); + ) or ( $log_queue->enqueue(threads->tid()."|$room : could not create socket to ".$values{address}." - $! -")); $selector = IO::Select->new($client_socket); @@ -556,11 +572,11 @@ sub doQuery($$$) $log_queue->enqueue(threads->tid()."|$room socket to ".$values{address}.":".$values{port}." did not report anything in expected time, resetting socket (last contact: ".strftime("%Y-%m-%d %H:%M:%S", localtime($last_contact)).")"); $selector->remove($client_socket); - shutdown($client_socket, 2); + $client_socket->shutdown(2); close($client_socket); $client_socket = undef; } - + if(exists($queues{threads->tid()}) and $queues{threads->tid()}->pending) { $cmd = $queues{threads->tid()}->dequeue; @@ -574,7 +590,7 @@ sub doQuery($$$) elsif($cmd eq "stop") { $log_queue->enqueue(threads->tid()."|$room terminating thread ".threads->tid()." for $address"); - $client_socket->shutdown() if(defined($client_socket)); + $client_socket->shutdown(2) if(defined($client_socket)); $selector->remove($client_socket) if(defined($selector)); close($client_socket) if(defined($client_socket)); $client_socket = undef; @@ -598,7 +614,7 @@ sub doQuery($$$) } } } - + if(not defined($client_socket)) { # if it's the first occurance @@ -643,7 +659,7 @@ sub doQuery($$$) sleep(9); } } - + # if the socket has a message available if(@client_handle = $selector->can_read(1)) { @@ -705,11 +721,11 @@ sub doQuery($$$) } } else # the socket is EOF which means the connection was closed - { + { $selector->remove($local_client); - shutdown($local_client, 2); + $local_client->shutdown(2); close($local_client); $client_socket = undef; } @@ -800,7 +816,7 @@ sub readConfig $errorcount++; } } - + if(not exists($config{$room}{presence_timeout})) { Log 0, "room >>$room<< has no value for >>presence_timeout<< configured";