diff --git a/fhem/contrib/DS_Starter/93_DbLog.pm b/fhem/contrib/DS_Starter/93_DbLog.pm index 6111c546a..4f3dcbd68 100644 --- a/fhem/contrib/DS_Starter/93_DbLog.pm +++ b/fhem/contrib/DS_Starter/93_DbLog.pm @@ -1,5 +1,5 @@ ############################################################################################################################################ -# $Id: 93_DbLog.pm 26750 2022-12-13 16:38:54Z DS_Starter $ +# $Id: 93_DbLog.pm 26750 2022-12-19 16:38:54Z DS_Starter $ # # 93_DbLog.pm # written by Dr. Boris Neubert 2007-12-30 @@ -11,7 +11,7 @@ # redesigned and maintained 2016-2022 by DS_Starter with credits by: JoeAllb, DeeSpe # e-mail: heiko dot maaz at t-online dot de # -# reduceLog() created by Claudiu Schuster (rapster) +# reduceLog() created by Claudiu Schuster (rapster) adapted by DS_Starter # ############################################################################################################################################ # @@ -38,6 +38,8 @@ no if $] >= 5.017011, warnings => 'experimental::smartmatch'; # Version History intern by DS_Starter: my %DbLog_vNotesIntern = ( + "5.5.7" => "19.12.2022 cutted _DbLog_SBP_onRun_Log into _DbLog_SBP_onRun_LogArray and _DbLog_SBP_onRun_LogBulk ". + "__DbLog_SBP_onRun_LogCurrent, __DbLog_SBP_fieldArrays ", "5.5.6" => "12.12.2022 Serialize with Storable instead of JSON, more code rework ", "5.5.5" => "11.12.2022 Array Log -> may be better error processing ", "5.5.4" => "11.12.2022 Array Log -> print out all cache not saved, DbLog_DelayedShutdown processing changed ", @@ -880,7 +882,9 @@ sub _DbLog_setstopSubProcess { ## no critic "not used" my $hash = $paref->{hash}; DbLog_SBP_CleanUp ($hash); # SubProcess beenden + my $ret = 'SubProcess stopped and will be automatically restarted if needed'; + DbLog_setReadingstate ($hash, $ret); return $ret; @@ -1267,6 +1271,18 @@ sub _DbLog_setimportCachefile { ## no critic "not used" else { $infile = $dir.$prop; } + + my $err = DbLog_SBP_CheckAndInit ($hash); # Subprocess checken und ggf. initialisieren + return $err if(!defined $hash->{".fhem"}{subprocess}); + + if (defined $hash->{HELPER}{LONGRUN_PID}) { + return 'Another operation is in progress, try again a little later.'; + } + + my $rst = DbLog_SBP_sendConnectionData ($hash); + if (!$rst) { + Log3 ($name, 3, "DbLog $name - requested DB connection parameters are transmitted"); + } DbLog_SBP_sendCommand ($hash, 'importCachefile', $infile); @@ -2401,15 +2417,30 @@ sub DbLog_SBP_onRun { ## Event Logging ######################################################### if ($operation =~ /log_/xs) { - _DbLog_SBP_onRun_Log ( { subprocess => $subprocess, - name => $name, - memc => $memc, - store => $store, - logstore => $logstore, - useta => $useta, - bst => $bst - } - ); + my $bi = $memc->{bi}; # Bulk-Insert 0|1 + + if ($bi) { + _DbLog_SBP_onRun_LogBulk ( { subprocess => $subprocess, + name => $name, + memc => $memc, + store => $store, + logstore => $logstore, + useta => $useta, + bst => $bst + } + ); + } + else { + _DbLog_SBP_onRun_LogArray ( { subprocess => $subprocess, + name => $name, + memc => $memc, + store => $store, + logstore => $logstore, + useta => $useta, + bst => $bst + } + ); + } } ## Kommando: count @@ -2562,8 +2593,9 @@ return ($err, $dbh); ################################################################# # SubProcess - Log-Routine +# Bulk-Insert ################################################################# -sub _DbLog_SBP_onRun_Log { +sub _DbLog_SBP_onRun_LogBulk { my $paref = shift; my $subprocess = $paref->{subprocess}; @@ -2576,7 +2608,6 @@ sub _DbLog_SBP_onRun_Log { my $DbLogType = $memc->{DbLogType}; # Log-Ziele my $nsupk = $memc->{nsupk}; # No Support PK 0|1 - my $bi = $memc->{bi}; # Bulk-Insert 0|1 my $tl = $memc->{tl}; # traceLevel my $tf = $memc->{tf}; # traceFlag my $operation = $memc->{operation} // 'unknown'; # aktuell angeforderte Operation (log, etc.) @@ -2590,8 +2621,7 @@ sub _DbLog_SBP_onRun_Log { my $current = $store->{dbparams}{current}; my $error = q{}; - my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) - my $rowlback = q{}; # Eventliste für Rückgabe wenn Fehler + my $rowlback = {}; # Hashreferenz Eventliste für Rückgabe wenn Fehler my $nins_hist = 0; my $ret; @@ -2603,12 +2633,7 @@ sub _DbLog_SBP_onRun_Log { $dbh->{TraceLevel} = '0'; } - my $ac = $dbh->{AutoCommit} ? "ON" : "OFF"; - my $tm = $useta ? "ON" : "OFF"; - - Log3 ($name, 5, "DbLog $name - DbLogType is: $DbLogType"); - Log3 ($name, 4, "DbLog $name - AutoCommit mode: $ac, Transaction mode: $tm"); - Log3 ($name, 4, "DbLog $name - Insert mode: ".($bi ? "Bulk" : "Array")); + __DbLog_SBP_logLogmodes ($paref); my ($usepkh,$usepkc,$pkh,$pkc); @@ -2625,10 +2650,9 @@ sub _DbLog_SBP_onRun_Log { Log3 ($name, 5, "DbLog $name - Primary Key usage suppressed by attribute noSupportPK"); } - my $ln = scalar keys %{$logstore}; + if ($ln) { # temporär gespeicherte Daten hinzufügen - for my $index (sort {$a<=>$b} keys %{$logstore}) { Log3 ($name, 4, "DbLog $name - add stored data: $index -> ".$logstore->{$index}); @@ -2639,489 +2663,112 @@ sub _DbLog_SBP_onRun_Log { Log3 ($name, 4, "DbLog $name - logstore deleted - $ln stored datasets added for processing"); } - - my $ceti = scalar keys %{$cdata}; - - my (@timestamp,@device,@type,@event,@reading,@value,@unit); - my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); + + my $faref = __DbLog_SBP_fieldArrays ($name, $cdata); # Feldarrays erstellen + my $ceti = scalar keys %{$cdata}; + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins,$ins_hist); - my ($tuples, $rows); - my @tuple_status; - no warnings 'uninitialized'; + $st = [gettimeofday]; # SQL-Startzeit - for my $key (sort {$a<=>$b} keys %{$cdata}) { - my $row = $cdata->{$key}; - my @a = split "\\|", $row; - s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" + if (lc($DbLogType) =~ m(history)) { # insert history mit/ohne primary key + $sqlins = __DbLog_SBP_sqlInsHistory ($history, $model, $usepkh); - push @timestamp, $a[0]; - push @device, $a[1]; - push @type, $a[2]; - push @event, $a[3]; - push @reading, $a[4]; - push @value, $a[5]; - push @unit, $a[6]; + no warnings 'uninitialized'; - Log3 ($name, 5, "DbLog $name - processing $key -> TS: $a[0], Dev: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Val: $a[5], Unit: $a[6]"); - } + for my $key (sort {$a<=>$b} keys %{$cdata}) { + my $row = $cdata->{$key}; + my @a = split "\\|", $row; + s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" - use warnings; + $a[3] =~ s/'/''/g; # escape ' with '' + $a[5] =~ s/'/''/g; # escape ' with '' + $a[6] =~ s/'/''/g; # escape ' with '' + $a[3] =~ s/\\/\\\\/g; # escape \ with \\ + $a[5] =~ s/\\/\\\\/g; # escape \ with \\ + $a[6] =~ s/\\/\\\\/g; # escape \ with \\ - if($bi) { - ## Bulk-Insert - ####################### - $st = [gettimeofday]; # SQL-Startzeit - - if (lc($DbLogType) =~ m(history)) { # insert history mit/ohne primary key - $sqlins = __DbLog_SBP_sqlInsHistory ($history, $model, $usepkh); - - no warnings 'uninitialized'; - - for my $key (sort {$a<=>$b} keys %{$cdata}) { - my $row = $cdata->{$key}; - my @a = split "\\|", $row; - s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" - - $a[3] =~ s/'/''/g; # escape ' with '' - $a[5] =~ s/'/''/g; # escape ' with '' - $a[6] =~ s/'/''/g; # escape ' with '' - $a[3] =~ s/\\/\\\\/g; # escape \ with \\ - $a[5] =~ s/\\/\\\\/g; # escape \ with \\ - $a[6] =~ s/\\/\\\\/g; # escape \ with \\ - - $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; - } - - use warnings; - - chop($sqlins); - - if ($usepkh && $model eq 'POSTGRESQL') { - $sqlins .= " ON CONFLICT DO NOTHING"; - } - - $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); - - eval { $sth_ih = $dbh->prepare($sqlins); - - if ($tl) { # Tracelevel setzen - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - else { - $sth_ih->{TraceLevel} = '0'; - } - - $ins_hist = $sth_ih->execute(); - $ins_hist = 0 if($ins_hist eq "0E0"); - 1; - } - or do { $error = $@; - - Log3 ($name, 2, "DbLog $name - Error table $history - $error"); - - if($useta) { - $rowlback = $cdata; # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - __DbLog_SBP_rollbackOnly ($name, $dbh, $history); - } - else { - __DbLog_SBP_commitOnly ($name, $dbh, $history); - } - - $ret = { - name => $name, - msg => $error, - ot => 0, - oper => $operation, - rowlback => $rowlback - }; - - __DbLog_SBP_sendToParent ($subprocess, $ret); - return; - }; - - if($ins_hist == $ceti) { - Log3 ($name, 4, "DbLog $name - $ins_hist of $ceti events inserted into table $history".($usepkh ? " using PK on columns $pkh" : "")); - } - else { - if($usepkh) { - Log3 ($name, 3, "DbLog $name - INFO - ".$ins_hist." of $ceti events inserted into table $history due to PK on columns $pkh"); - } - else { - Log3 ($name, 2, "DbLog $name - WARNING - only ".$ins_hist." of $ceti events inserted into table $history"); - } - } - - __DbLog_SBP_commitOnly ($name, $dbh, $history); + $sqlins .= "('$a[0]','$a[1]','$a[2]','$a[3]','$a[4]','$a[5]','$a[6]'),"; } - # insert current mit/ohne primary key - if (lc($DbLogType) =~ m(current)) { # Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode die nicht upgedateten Sätze nicht identifiziert werden können - ($error, $sth_ic) = __DbLog_SBP_sthInsTable ( { table => $current, - dbh => $dbh, - model => $model, - usepk => $usepkc - } - ); + use warnings; - if ($error) { - Log3 ($name, 2, "DbLog $name - Error: $error"); + chop($sqlins); - $dbh->disconnect(); - delete $store->{dbh}; + if ($usepkh && $model eq 'POSTGRESQL') { + $sqlins .= " ON CONFLICT DO NOTHING"; + } - $ret = { - name => $name, - msg => $error, - ot => 0, - oper => $operation - }; + $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); - __DbLog_SBP_sendToParent ($subprocess, $ret); - return; - } + eval { $sth_ih = $dbh->prepare($sqlins); + + if ($tl) { # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + else { + $sth_ih->{TraceLevel} = '0'; + } + + $ins_hist = $sth_ih->execute(); + $ins_hist = 0 if($ins_hist eq "0E0"); + 1; + } + or do { $error = $@; - ($error, $sth_uc) = __DbLog_SBP_sthUpdTable ( { table => $current, # Statement Handle "Update" current erstellen - dbh => $dbh, - model => $model, - usepk => $usepkc, - pk => $pkc - } - ); + Log3 ($name, 2, "DbLog $name - Error table $history - $error"); - if ($tl) { # Tracelevel setzen - $sth_uc->{TraceLevel} = "$tl|$tf"; - $sth_ic->{TraceLevel} = "$tl|$tf"; - } - else { - $sth_uc->{TraceLevel} = '0'; - $sth_ic->{TraceLevel} = '0'; - } + if($useta) { + $rowlback = $cdata; # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + + Log3 ($name, 4, "DbLog $name - Transaction is switched on. Transferred data is returned to the cache."); + } + else { + Log3 ($name, 2, "DbLog $name - Transaction is switched off. Transferred data is lost."); + } + + __DbLog_SBP_rollbackOnly ($name, $dbh, $history); - $sth_uc->bind_param_array (1, [@timestamp]); - $sth_uc->bind_param_array (2, [@type]); - $sth_uc->bind_param_array (3, [@event]); - $sth_uc->bind_param_array (4, [@value]); - $sth_uc->bind_param_array (5, [@unit]); - $sth_uc->bind_param_array (6, [@device]); - $sth_uc->bind_param_array (7, [@reading]); - - $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); - - eval { ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \@tuple_status } ); - }; - - my $nupd_cur = 0; - - for my $tuple (0..$ceti-1) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - - next if($status); # $status ist "1" wenn update ok - - Log3 ($name, 5, "DbLog $name - Failed to update in $current - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"); - - push @timestamp_cur, $timestamp[$tuple]; - push @device_cur, $device[$tuple]; - push @type_cur, $type[$tuple]; - push @event_cur, $event[$tuple]; - push @reading_cur, $reading[$tuple]; - push @value_cur, $value[$tuple]; - push @unit_cur, $unit[$tuple]; - - $nupd_cur++; - } - - if(!$nupd_cur) { - Log3 ($name, 4, "DbLog $name - $ceti of $ceti events updated in table $current".($usepkc ? " using PK on columns $pkc" : "")); - } - else { - Log3 ($name, 4, "DbLog $name - $nupd_cur of $ceti events not updated in table $current. Try to insert ".($usepkc ? " using PK on columns $pkc " : " ")."..."); - $doins = 1; - } - - if ($doins) { # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array (1, [@timestamp_cur]); - $sth_ic->bind_param_array (2, [@device_cur]); - $sth_ic->bind_param_array (3, [@type_cur]); - $sth_ic->bind_param_array (4, [@event_cur]); - $sth_ic->bind_param_array (5, [@reading_cur]); - $sth_ic->bind_param_array (6, [@value_cur]); - $sth_ic->bind_param_array (7, [@unit_cur]); - - undef @tuple_status; - - eval { ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \@tuple_status } ); + $ret = { + name => $name, + msg => $error, + ot => 0, + oper => $operation, + rowlback => $rowlback }; - my $nins_cur = 0; + __DbLog_SBP_sendToParent ($subprocess, $ret); + return; + }; - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - - next if($status); # $status ist "1" wenn insert ok - - Log3 ($name, 3, "DbLog $name - Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"); - - $nins_cur++; - } - - if(!$nins_cur) { - Log3 ($name, 4, "DbLog $name - ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc ? " using PK on columns $pkc" : "")); - } - else { - Log3 ($name, 4, "DbLog $name - ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc ? " using PK on columns $pkc" : "")); - } - } - - $error = __DbLog_SBP_commitOnly ($name, $dbh, $current); + if($ins_hist == $ceti) { + Log3 ($name, 4, "DbLog $name - $ins_hist of $ceti events inserted into table $history".($usepkh ? " using PK on columns $pkh" : "")); } + else { + if($usepkh) { + Log3 ($name, 3, "DbLog $name - INFO - ".$ins_hist." of $ceti events inserted into table $history due to PK on columns $pkh"); + } + else { + Log3 ($name, 2, "DbLog $name - WARNING - only ".$ins_hist." of $ceti events inserted into table $history"); + } + } + + __DbLog_SBP_commitOnly ($name, $dbh, $history); } - else { - ## Array-Insert - ####################### - $st = [gettimeofday]; # SQL-Startzeit - - if (lc($DbLogType) =~ m(history)) { # insert history mit/ohne primary key - ($error, $sth_ih) = __DbLog_SBP_sthInsTable ( { table => $history, - dbh => $dbh, - model => $model, - usepk => $usepkh - } - ); - - if ($error) { # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE - Log3 ($name, 2, "DbLog $name - Error: $error"); - - $dbh->disconnect(); - delete $store->{dbh}; - - $ret = { - name => $name, - msg => $error, - ot => 0, - oper => $operation, - rowlback => $cdata - }; - - __DbLog_SBP_sendToParent ($subprocess, $ret); - return; - } - - if ($tl) { # Tracelevel setzen - $sth_ih->{TraceLevel} = "$tl|$tf"; - } - else { - $sth_ih->{TraceLevel} = '0'; - } - - $sth_ih->bind_param_array (1, [@timestamp]); - $sth_ih->bind_param_array (2, [@device]); - $sth_ih->bind_param_array (3, [@type]); - $sth_ih->bind_param_array (4, [@event]); - $sth_ih->bind_param_array (5, [@reading]); - $sth_ih->bind_param_array (6, [@value]); - $sth_ih->bind_param_array (7, [@unit]); - - my @n2hist; - my $rowhref; - - $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); - - eval { - ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \@tuple_status } ); - }; - - if ($@) { - $error = $@; - - Log3 ($name, 2, "DbLog $name - Error table $history - $error"); - - if($useta) { - $rowlback = $cdata; # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein - __DbLog_SBP_rollbackOnly ($name, $dbh, $history); - } - }; - - no warnings 'uninitialized'; - - for my $tuple (0..$ceti-1) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - - next if($status); # $status ist "1" wenn insert ok - - Log3 ($name, 4, "DbLog $name - Insert into $history rejected".($usepkh ? " (possible PK violation) " : " ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple]"); - - $event[$tuple] =~ s/\|/_ESC_/gxs; # escape Pipe "|" - $reading[$tuple] =~ s/\|/_ESC_/gxs; - $value[$tuple] =~ s/\|/_ESC_/gxs; - $unit[$tuple] =~ s/\|/_ESC_/gxs; - - my $nlh = $timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]; - - push @n2hist, $nlh; - - $nins_hist++; - } - - use warnings; - - if(!$nins_hist) { - Log3 ($name, 4, "DbLog $name - $ceti of $ceti events inserted into table $history".($usepkh ? " using PK on columns $pkh" : "")); - } - else { - if($usepkh) { - Log3 ($name, 3, "DbLog $name - INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"); - } - else { - Log3 ($name, 2, "DbLog $name - WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table $history"); - - my $bkey = 1; - - for my $line (@n2hist) { - $rowhref->{$bkey} = $line; - $bkey++; - } - } - } - - __DbLog_SBP_commitOnly ($name, $dbh, $history); - - if(defined $rowhref) { # nicht gespeicherte Datensätze ausgeben - Log3 ($name, 2, "DbLog $name - The following data are faulty and were not saved:"); - - DbLog_logHashContent ($name, $rowhref, 2); - } - } - - if (lc($DbLogType) =~ m(current)) { # insert current mit/ohne primary key - ($error, $sth_ic) = __DbLog_SBP_sthInsTable ( { table => $current, - dbh => $dbh, - model => $model, - usepk => $usepkc - } - ); - - if ($error) { # Eventliste zurückgeben wenn z.B. Disk I/O error bei SQLITE - Log3 ($name, 2, "DbLog $name - Error: $error"); - - $dbh->disconnect(); - delete $store->{dbh}; - - $ret = { - name => $name, - msg => $error, - ot => 0, - oper => $operation - }; - - __DbLog_SBP_sendToParent ($subprocess, $ret); - return; - } - - ($error, $sth_uc) = __DbLog_SBP_sthUpdTable ( { table => $current, # Statement Handle "Update" current erstellen - dbh => $dbh, - model => $model, - usepk => $usepkc, - pk => $pkc - } - ); - - if ($tl) { # Tracelevel setzen - $sth_uc->{TraceLevel} = "$tl|$tf"; - $sth_ic->{TraceLevel} = "$tl|$tf"; - } - else { - $sth_uc->{TraceLevel} = '0'; - $sth_ic->{TraceLevel} = '0'; - } - - $sth_uc->bind_param_array (1, [@timestamp]); - $sth_uc->bind_param_array (2, [@type]); - $sth_uc->bind_param_array (3, [@event]); - $sth_uc->bind_param_array (4, [@value]); - $sth_uc->bind_param_array (5, [@unit]); - $sth_uc->bind_param_array (6, [@device]); - $sth_uc->bind_param_array (7, [@reading]); - - $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); - - undef @tuple_status; - - eval { - ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \@tuple_status } ); - my $nupd_cur = 0; - - no warnings 'uninitialized'; - - for my $tuple (0..$ceti-1) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - - next if($status); # $status ist "1" wenn update ok - - Log3 ($name, 5, "DbLog $name - Failed to update in $current - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"); - - push @timestamp_cur, $timestamp[$tuple]; - push @device_cur, $device[$tuple]; - push @type_cur, $type[$tuple]; - push @event_cur, $event[$tuple]; - push @reading_cur, $reading[$tuple]; - push @value_cur, $value[$tuple]; - push @unit_cur, $unit[$tuple]; - - $nupd_cur++; - } - - use warnings; - - if(!$nupd_cur) { - Log3 ($name, 4, "DbLog $name - $ceti of $ceti events updated in table $current".($usepkc ? " using PK on columns $pkc" : "")); - } - else { - Log3 ($name, 4, "DbLog $name - $nupd_cur of $ceti events not updated and try to insert into table $current".($usepkc ? " using PK on columns $pkc" : "")); - $doins = 1; - } - - if ($doins) { # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt - $sth_ic->bind_param_array (1, [@timestamp_cur]); - $sth_ic->bind_param_array (2, [@device_cur]); - $sth_ic->bind_param_array (3, [@type_cur]); - $sth_ic->bind_param_array (4, [@event_cur]); - $sth_ic->bind_param_array (5, [@reading_cur]); - $sth_ic->bind_param_array (6, [@value_cur]); - $sth_ic->bind_param_array (7, [@unit_cur]); - - undef @tuple_status; - - ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \@tuple_status } ); - my $nins_cur = 0; - - for my $tuple (0..$#device_cur) { - my $status = $tuple_status[$tuple]; - $status = 0 if($status eq "0E0"); - - next if($status); # $status ist "1" wenn insert ok - - Log3 ($name, 3, "DbLog $name - Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"); - - $nins_cur++; - } - - if(!$nins_cur) { - Log3 ($name, 4, "DbLog $name - ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc ? " using PK on columns $pkc" : "")); - } - else { - Log3 ($name, 4, "DbLog $name - ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc ? " using PK on columns $pkc" : "")); - } - } - - $error = __DbLog_SBP_commitOnly ($name, $dbh, $current); - }; - } + + if (lc($DbLogType) =~ m(current)) { + $error = __DbLog_SBP_onRun_LogCurrent ( { subprocess => $subprocess, + name => $name, + memc => $memc, + store => $store, + useta => $useta, + usepkc => $usepkc, + pkc => $pkc, + ceti => $ceti, + faref => $faref + } + ); } if ($operation eq 'importCachefile') { @@ -3145,6 +2792,471 @@ sub _DbLog_SBP_onRun_Log { return; } +################################################################# +# SubProcess - Log-Routine +# Array-Insert +################################################################# +sub _DbLog_SBP_onRun_LogArray { + my $paref = shift; + + my $subprocess = $paref->{subprocess}; + my $name = $paref->{name}; + my $memc = $paref->{memc}; + my $store = $paref->{store}; # Datenspeicher + my $logstore = $paref->{logstore}; # temporärer Logdatenspeicher + my $useta = $paref->{useta}; + my $bst = $paref->{bst}; + + my $DbLogType = $memc->{DbLogType}; # Log-Ziele + my $nsupk = $memc->{nsupk}; # No Support PK 0|1 + my $tl = $memc->{tl}; # traceLevel + my $tf = $memc->{tf}; # traceFlag + my $operation = $memc->{operation} // 'unknown'; # aktuell angeforderte Operation (log, etc.) + my $cdata = $memc->{cdata}; # Log Daten, z.B.: 3399 => 2022-11-29 09:33:32|SolCast|SOLARFORECAST||nextCycletime|09:33:47| + my $index = $memc->{cdataindex}; # aktueller Cache-Index + + my $dbh = $store->{dbh}; + my $dbconn = $store->{dbparams}{dbconn}; + my $model = $store->{dbparams}{model}; + my $history = $store->{dbparams}{history}; + my $current = $store->{dbparams}{current}; + + my $error = q{}; + my $rowlback = {}; # Hashreferenz Eventliste für Rückgabe wenn Fehler + my $nins_hist = 0; + + my $ret; + + if ($tl) { # Tracelevel setzen + $dbh->{TraceLevel} = "$tl|$tf"; + } + else { + $dbh->{TraceLevel} = '0'; + } + + __DbLog_SBP_logLogmodes ($paref); + + my ($usepkh,$usepkc,$pkh,$pkc); + + if (!$nsupk) { # check ob PK verwendet wird, @usepkx?Anzahl der Felder im PK:0 wenn kein PK, $pkx?Namen der Felder:none wenn kein PK + ($usepkh,$usepkc,$pkh,$pkc) = DbLog_checkUsePK ( { name => $name, + dbh => $dbh, + dbconn => $dbconn, + history => $history, + current => $current + } + ); + } + else { + Log3 ($name, 5, "DbLog $name - Primary Key usage suppressed by attribute noSupportPK"); + } + + my $ln = scalar keys %{$logstore}; + + if ($ln) { # temporär gespeicherte Daten hinzufügen + for my $index (sort {$a<=>$b} keys %{$logstore}) { + Log3 ($name, 4, "DbLog $name - add stored data: $index -> ".$logstore->{$index}); + + $cdata->{$index} = delete $logstore->{$index}; + } + + undef %{$logstore}; + + Log3 ($name, 4, "DbLog $name - logstore deleted - $ln stored datasets added for processing"); + } + + my $faref = __DbLog_SBP_fieldArrays ($name, $cdata); + my $ceti = scalar keys %{$cdata}; + + my ($st,$sth_ih,$sth_ic,$sth_uc,$sqlins,$ins_hist); + my ($tuples, $rows); + my @tuple_status; + + my @timestamp = @{$faref->{timestamp}}; + my @device = @{$faref->{device}}; + my @type = @{$faref->{type}}; + my @event = @{$faref->{event}}; + my @reading = @{$faref->{reading}}; + my @value = @{$faref->{value}}; + my @unit = @{$faref->{unit}}; + + $st = [gettimeofday]; # SQL-Startzeit + + if (lc($DbLogType) =~ m(history)) { # insert history mit/ohne primary key + ($error, $sth_ih) = __DbLog_SBP_sthInsTable ( { table => $history, + dbh => $dbh, + model => $model, + usepk => $usepkh + } + ); + + if ($error) { # Eventliste zurückgeben wenn z.B. Disk I/O Error bei SQLITE + Log3 ($name, 2, "DbLog $name - Error: $error"); + + $dbh->disconnect(); + delete $store->{dbh}; + + $ret = { + name => $name, + msg => $error, + ot => 0, + oper => $operation, + rowlback => $cdata + }; + + __DbLog_SBP_sendToParent ($subprocess, $ret); + return; + } + + if ($tl) { # Tracelevel setzen + $sth_ih->{TraceLevel} = "$tl|$tf"; + } + else { + $sth_ih->{TraceLevel} = '0'; + } + + $sth_ih->bind_param_array (1, [@timestamp]); + $sth_ih->bind_param_array (2, [@device]); + $sth_ih->bind_param_array (3, [@type]); + $sth_ih->bind_param_array (4, [@event]); + $sth_ih->bind_param_array (5, [@reading]); + $sth_ih->bind_param_array (6, [@value]); + $sth_ih->bind_param_array (7, [@unit]); + + my @n2hist; + my $rowhref; + + $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); + + eval { + ($tuples, $rows) = $sth_ih->execute_array( { ArrayTupleStatus => \@tuple_status } ); + }; + + if ($@) { + $error = $@; + $nins_hist = $ceti; + + Log3 ($name, 2, "DbLog $name - Error table $history - $error"); + + if($useta) { + $rowlback = $cdata; # nicht gespeicherte Datensätze nur zurück geben wenn Transaktion ein + __DbLog_SBP_rollbackOnly ($name, $dbh, $history); + + Log3 ($name, 4, "DbLog $name - Transaction is switched on. Transferred data is returned to the cache."); + } + }; + + no warnings 'uninitialized'; + + for my $tuple (0..$ceti-1) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + + next if($status); # $status ist "1" wenn insert ok + + Log3 ($name, 4, "DbLog $name - Insert into $history rejected".($usepkh ? " (possible PK violation) " : " ")."- TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple]"); + + $event[$tuple] =~ s/\|/_ESC_/gxs; # escape Pipe "|" + $reading[$tuple] =~ s/\|/_ESC_/gxs; + $value[$tuple] =~ s/\|/_ESC_/gxs; + $unit[$tuple] =~ s/\|/_ESC_/gxs; + + my $nlh = $timestamp[$tuple]."|".$device[$tuple]."|".$type[$tuple]."|".$event[$tuple]."|".$reading[$tuple]."|".$value[$tuple]."|".$unit[$tuple]; + + push @n2hist, $nlh; + + $nins_hist++; + } + + use warnings; + + if(!$nins_hist) { + Log3 ($name, 4, "DbLog $name - $ceti of $ceti events inserted into table $history".($usepkh ? " using PK on columns $pkh" : "")); + } + else { + if($usepkh) { + Log3 ($name, 3, "DbLog $name - INFO - ".($ceti-$nins_hist)." of $ceti events inserted into table history due to PK on columns $pkh"); + } + else { + Log3 ($name, 2, "DbLog $name - WARNING - only ".($ceti-$nins_hist)." of $ceti events inserted into table $history"); + + my $bkey = 1; + + for my $line (@n2hist) { + $rowhref->{$bkey} = $line; + $bkey++; + } + } + } + + __DbLog_SBP_commitOnly ($name, $dbh, $history) if(!$error); + + if(defined $rowhref) { # nicht gespeicherte Datensätze ausgeben + Log3 ($name, 2, "DbLog $name - The following data are faulty and were not saved:"); + + DbLog_logHashContent ($name, $rowhref, 2); + } + } + + if ($operation eq 'importCachefile') { + return ($error, $nins_hist, $rowlback); + } + + if (lc($DbLogType) =~ m(current)) { + $error = __DbLog_SBP_onRun_LogCurrent ( { subprocess => $subprocess, + name => $name, + memc => $memc, + store => $store, + useta => $useta, + usepkc => $usepkc, + pkc => $pkc, + ceti => $ceti, + faref => $faref + } + ); + } + + my $rt = tv_interval($st); # SQL-Laufzeit ermitteln + my $brt = tv_interval($bst); # Background-Laufzeit ermitteln + my $ot = $rt.",".$brt; + + $ret = { + name => $name, + msg => $error, + ot => $ot, + oper => $operation, + rowlback => $rowlback + }; + + __DbLog_SBP_sendToParent ($subprocess, $ret); + +return; +} + +################################################################# +# SubProcess - Log-Routine Insert/Update current Tabelle +# Array-Insert wird auch bei Bulk verwendet weil im Bulk-Mode +# die nicht upgedateten Sätze nicht identifiziert werden können +################################################################# +sub __DbLog_SBP_onRun_LogCurrent { + my $paref = shift; + + my $subprocess = $paref->{subprocess}; + my $name = $paref->{name}; + my $memc = $paref->{memc}; + my $store = $paref->{store}; # Datenspeicher + my $useta = $paref->{useta}; + my $usepkc = $paref->{usepkc}; + my $pkc = $paref->{pkc}; + my $ceti = $paref->{ceti}; + my $faref = $paref->{faref}; + + my $tl = $memc->{tl}; # traceLevel + my $tf = $memc->{tf}; # traceFlag + my $operation = $memc->{operation} // 'unknown'; # aktuell angeforderte Operation (log, etc.) + my $cdata = $memc->{cdata}; # Log Daten, z.B.: 3399 => 2022-11-29 09:33:32|SolCast|SOLARFORECAST||nextCycletime|09:33:47| + + my $dbh = $store->{dbh}; + my $model = $store->{dbparams}{model}; + my $current = $store->{dbparams}{current}; + + my $error = q{}; + my $doins = 0; # Hilfsvariable, wenn "1" sollen inserts in Tabelle current erfolgen (updates schlugen fehl) + + my $ret; + + my @timestamp = @{$faref->{timestamp}}; + my @device = @{$faref->{device}}; + my @type = @{$faref->{type}}; + my @event = @{$faref->{event}}; + my @reading = @{$faref->{reading}}; + my @value = @{$faref->{value}}; + my @unit = @{$faref->{unit}}; + + my (@timestamp_cur,@device_cur,@type_cur,@event_cur,@reading_cur,@value_cur,@unit_cur); + my ($tuples,$rows,$sth_ic,$sth_uc); + my @tuple_status; + + ($error, $sth_ic) = __DbLog_SBP_sthInsTable ( { table => $current, + dbh => $dbh, + model => $model, + usepk => $usepkc + } + ); + + return $error if ($error); + + ($error, $sth_uc) = __DbLog_SBP_sthUpdTable ( { table => $current, # Statement Handle "Update" current erstellen + dbh => $dbh, + model => $model, + usepk => $usepkc, + pk => $pkc + } + ); + + return $error if ($error); + + if ($tl) { # Tracelevel setzen + $sth_uc->{TraceLevel} = "$tl|$tf"; + $sth_ic->{TraceLevel} = "$tl|$tf"; + } + else { + $sth_uc->{TraceLevel} = '0'; + $sth_ic->{TraceLevel} = '0'; + } + + $sth_uc->bind_param_array (1, [@timestamp]); + $sth_uc->bind_param_array (2, [@type]); + $sth_uc->bind_param_array (3, [@event]); + $sth_uc->bind_param_array (4, [@value]); + $sth_uc->bind_param_array (5, [@unit]); + $sth_uc->bind_param_array (6, [@device]); + $sth_uc->bind_param_array (7, [@reading]); + + $error = __DbLog_SBP_beginTransaction ($name, $dbh, $useta); + + eval { ($tuples, $rows) = $sth_uc->execute_array( { ArrayTupleStatus => \@tuple_status } ); + }; + + my $nupd_cur = 0; + + for my $tuple (0..$ceti-1) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + + next if($status); # $status ist "1" wenn update ok + + Log3 ($name, 5, "DbLog $name - Failed to update in $current - TS: $timestamp[$tuple], Device: $device[$tuple], Reading: $reading[$tuple], Status = $status"); + + push @timestamp_cur, $timestamp[$tuple]; + push @device_cur, $device[$tuple]; + push @type_cur, $type[$tuple]; + push @event_cur, $event[$tuple]; + push @reading_cur, $reading[$tuple]; + push @value_cur, $value[$tuple]; + push @unit_cur, $unit[$tuple]; + + $nupd_cur++; + } + + if(!$nupd_cur) { + Log3 ($name, 4, "DbLog $name - $ceti of $ceti events updated in table $current".($usepkc ? " using PK on columns $pkc" : "")); + } + else { + Log3 ($name, 4, "DbLog $name - $nupd_cur of $ceti events not updated in table $current. Try to insert ".($usepkc ? " using PK on columns $pkc " : " ")."..."); + $doins = 1; + } + + if ($doins) { # events die nicht in Tabelle current updated wurden, werden in current neu eingefügt + $sth_ic->bind_param_array (1, [@timestamp_cur]); + $sth_ic->bind_param_array (2, [@device_cur]); + $sth_ic->bind_param_array (3, [@type_cur]); + $sth_ic->bind_param_array (4, [@event_cur]); + $sth_ic->bind_param_array (5, [@reading_cur]); + $sth_ic->bind_param_array (6, [@value_cur]); + $sth_ic->bind_param_array (7, [@unit_cur]); + + undef @tuple_status; + + eval { ($tuples, $rows) = $sth_ic->execute_array( { ArrayTupleStatus => \@tuple_status } ); + }; + + my $nins_cur = 0; + + for my $tuple (0..$#device_cur) { + my $status = $tuple_status[$tuple]; + $status = 0 if($status eq "0E0"); + + next if($status); # $status ist "1" wenn insert ok + + Log3 ($name, 3, "DbLog $name - Insert into $current rejected - TS: $timestamp[$tuple], Device: $device_cur[$tuple], Reading: $reading_cur[$tuple], Status = $status"); + + $nins_cur++; + } + + if(!$nins_cur) { + Log3 ($name, 4, "DbLog $name - ".($#device_cur+1)." of ".($#device_cur+1)." events inserted into table $current ".($usepkc ? " using PK on columns $pkc" : "")); + } + else { + Log3 ($name, 4, "DbLog $name - ".($#device_cur+1-$nins_cur)." of ".($#device_cur+1)." events inserted into table $current".($usepkc ? " using PK on columns $pkc" : "")); + } + } + + $error = __DbLog_SBP_commitOnly ($name, $dbh, $current); + +return; +} + +################################################################# +# Aufteilung der Logdaten auf Arrays für jedes +# Datenbankfeld (für Array-Insert) +################################################################# +sub __DbLog_SBP_fieldArrays { + my $name = shift; + my $cdata = shift; # Referenz zu Log Daten Hash + + my (@timestamp,@device,@type,@event,@reading,@value,@unit); + + no warnings 'uninitialized'; + + for my $key (sort {$a<=>$b} keys %{$cdata}) { + my $row = $cdata->{$key}; + my @a = split "\\|", $row; + s/_ESC_/\|/gxs for @a; # escaped Pipe back to "|" + + push @timestamp, $a[0]; + push @device, $a[1]; + push @type, $a[2]; + push @event, $a[3]; + push @reading, $a[4]; + push @value, $a[5]; + push @unit, $a[6]; + + Log3 ($name, 5, "DbLog $name - processing $key -> TS: $a[0], Dev: $a[1], Type: $a[2], Event: $a[3], Reading: $a[4], Val: $a[5], Unit: $a[6]"); + } + + use warnings; + + my $faref = { + timestamp => \@timestamp, + device => \@device, + type => \@type, + event => \@event, + reading => \@reading, + value => \@value, + unit => \@unit + }; + +return $faref; +} + +################################################################# +# Ausgabe Logging Modes +################################################################# +sub __DbLog_SBP_logLogmodes { + my $paref = shift; + + my $store = $paref->{store}; # Datenspeicher + my $memc = $paref->{memc}; + + my $name = $paref->{name}; + my $useta = $paref->{useta}; + my $dbh = $store->{dbh}; + my $bi = $memc->{bi}; # Bulk-Insert 0|1 + my $DbLogType = $memc->{DbLogType}; # Log-Ziele + my $operation = $memc->{operation} // 'unknown'; # aktuell angeforderte Operation (log, etc.) + + my $ac = $dbh->{AutoCommit} ? "ON" : "OFF"; + my $tm = $useta ? "ON" : "OFF"; + + Log3 ($name, 4, "DbLog $name - Operation: $operation"); + Log3 ($name, 5, "DbLog $name - DbLogType: $DbLogType"); + Log3 ($name, 4, "DbLog $name - AutoCommit: $ac, Transaction: $tm"); + Log3 ($name, 4, "DbLog $name - Insert mode: ".($bi ? "Bulk" : "Array")); + +return; +} + ################################################################# # SubProcess - Count-Routine ################################################################# @@ -3404,18 +3516,18 @@ sub _DbLog_SBP_onRun_importCachefile { $memc->{DbLogType} = 'history'; # nur history-Insert ! $memc->{bi} = 0; # Array-Insert ! + + ($error, $nins_hist, $rowlback) = _DbLog_SBP_onRun_LogArray ( { subprocess => $subprocess, + name => $name, + memc => $memc, + store => $store, + logstore => $logstore, + useta => 0, # keine Transaktion ! + bst => $bst + } + ); - ($error, $nins_hist, $rowlback) = _DbLog_SBP_onRun_Log ( { subprocess => $subprocess, - name => $name, - memc => $memc, - store => $store, - logstore => $logstore, - useta => 0, # keine Transaktion ! - bst => $bst - } - ); - - if (!$error && $nins_hist) { + if (!$error && $nins_hist && keys %{$rowlback}) { Log3 ($name, 2, "DbLog $name - WARNING - $nins_hist datasets from $infile were not imported:"); for my $index (sort {$a<=>$b} keys %{$rowlback}) { @@ -3426,7 +3538,22 @@ sub _DbLog_SBP_onRun_importCachefile { } my $improws = 'unknown'; - $improws = $crows - $nins_hist if(!$error); + + if (!$error) { + $improws = $crows - $nins_hist; + + my @parts = split "/", $infile; + $infile = pop @parts; + my $dir = (join "/", @parts).'/'; + + unless (rename ($dir.$infile, $dir."impdone_".$infile)) { + $error = "cachefile $dir$infile couldn't be renamed after import: ".$!; + Log3 ($name, 2, "DbLog $name - ERROR - $error"); + } + else { + Log3 ($name, 3, "DbLog $name - cachefile $dir$infile renamed to: ".$dir."impdone_".$infile); + } + } my $rt = tv_interval($st); # SQL-Laufzeit ermitteln my $brt = tv_interval($bst); # Background-Laufzeit ermitteln @@ -4317,7 +4444,11 @@ sub DbLog_SBP_sendCommand { my $memc; - $memc->{verbose} = AttrVal ($name, 'verbose', 3); + $memc->{nsupk} = AttrVal ($name, 'noSupportPK', 0); + $memc->{tl} = AttrVal ($name, 'traceLevel', 0); + $memc->{tf} = AttrVal ($name, 'traceFlag', 'SQL'); + $memc->{bi} = AttrVal ($name, 'bulkInsert', 0); + $memc->{verbose} = AttrVal ($name, 'verbose', 3); $memc->{operation} = $oper; $memc->{arguments} = $arg;