From a554cd3f5800c8105c1b1e9e70ab81e5b689b1cf Mon Sep 17 00:00:00 2001 From: tsightler Date: Mon, 29 Jul 2019 08:03:06 -0400 Subject: [PATCH] Work around memory leak in tuyapi >5.1.x Work around memory leak in tuyapi >5.1.x --- package.json | 6 +- tuya-mqtt.js | 331 ++++++++++++++++++++++++++++----------------------- 2 files changed, 185 insertions(+), 152 deletions(-) diff --git a/package.json b/package.json index e454d1d..ec905fb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "tuya-mqtt", - "version": "2.0.1", + "version": "2.0.2", "description": "", "homepage": "https://github.com/TheAgentK/tuya-mqtt#readme", "main": "tuya-mqtt.js", @@ -14,8 +14,8 @@ "license": "ISC", "dependencies": { "color-convert": "^1.9.3", - "debug": "^3.2.6", - "mqtt": "^2.18.8", + "debug": "^4.1.1", + "mqtt": "^3.0.0", "tuyapi": "^5.1.1" }, "repository": { diff --git a/tuya-mqtt.js b/tuya-mqtt.js index 6b90765..1ee8c17 100644 --- a/tuya-mqtt.js +++ b/tuya-mqtt.js @@ -1,3 +1,4 @@ +'use strict' const mqtt = require('mqtt'); const TuyaDevice = require('./tuya-device'); const debug = require('debug')('TuyAPI:mqtt'); @@ -6,6 +7,14 @@ const debugTuya = require('debug')('TuyAPI:mqtt:device'); const debugError = require('debug')('TuyAPI:mqtt:error'); var cleanup = require('./cleanup').Cleanup(onExit); +var CONFIG = undefined; +var mqtt_client = undefined; + +// Gloabal variable to track all registered Tuya devices +// Used to disconnect/reconnect devices every 60 minutes +// due to memory leak in tuyapi >5.1.x +const tuyaDevices = new Array(); + function bmap(istate) { return istate ? 'ON' : "OFF"; } @@ -14,56 +23,7 @@ function boolToString(istate) { return istate ? 'true' : "false"; } -var connected = undefined; -var CONFIG = undefined; - -try { - CONFIG = require("./config"); -} catch (e) { - console.error("Configuration file not found") - debugError(e) - process.exit(1) -} - -if (typeof CONFIG.qos == "undefined") { - CONFIG.qos = 2; -} -if (typeof CONFIG.retain == "undefined") { - CONFIG.retain = false; -} - -const mqtt_client = mqtt.connect({ - host: CONFIG.host, - port: CONFIG.port, - username: CONFIG.mqtt_user, - password: CONFIG.mqtt_pass, -}); - -mqtt_client.on('connect', function (err) { - debug("Verbindung mit MQTT-Server hergestellt"); - connected = true; - var topic = CONFIG.topic + '#'; - mqtt_client.subscribe(topic, { - retain: CONFIG.retain, - qos: CONFIG.qos - }); -}); - -mqtt_client.on("reconnect", function (error) { - if (connected) { - debug("Verbindung mit MQTT-Server wurde unterbrochen. Erneuter Verbindungsversuch!"); - } else { - debug("Verbindung mit MQTT-Server konnte nicht herrgestellt werden."); - } - connected = false; -}); - -mqtt_client.on("error", function (error) { - debug("Verbindung mit MQTT-Server konnte nicht herrgestellt werden.", error); - connected = false; -}); - -/** +/* * execute function on topic message */ @@ -112,18 +72,34 @@ function getDeviceFromTopic(_topic) { var topic = _topic.split("/"); if (checkTopicNotation(_topic)) { - return { + var options = { id: topic[2], - key: topic[3], - ip: topic[4], - type: topic[1] + key: topic[3] }; + + if (ip !== "discover") { + options.ip = topic[4] + if (type == "ver3.3") { + options.version = "3.3" + } else if (type == "ver3.1") { + options.version = "3.1" + } else { + options.type = topic[1] + }; + }; + + return options; } else { - return { - id: topic[1], - key: topic[2], - ip: topic[3] - }; + var options = { + id: topic[1], + key: topic[2] + }; + + if (topic[3] !== "discover") { + options.ip = topic[3] + }; + + return options; } } @@ -165,62 +141,6 @@ function getCommandFromTopic(_topic, _message) { return command; } -mqtt_client.on('message', function (topic, message) { - try { - message = message.toString(); - var action = getActionFromTopic(topic); - var options = getDeviceFromTopic(topic); - - debug("receive settings", JSON.stringify({ - topic: topic, - action: action, - message: message, - options: options - })); - if (options.ip == "discover") { - delete options.ip; - } else if (options.type == "ver3.3") { - delete options.type; - options.version = "3.3"; - } else if (options.type == "ver3.1") { - delete options.type; - options.version = "3.1"; - } - var device = new TuyaDevice(options); - device.then(function (params) { - var device = params.device; - - switch (action) { - case "command": - var command = getCommandFromTopic(topic, message); - debug("receive command", command); - if (command == "toggle") { - device.switch(command).then((data) => { - debug("set device status completed", data); - }); - } else { - device.set(command).then((data) => { - debug("set device status completed", data); - }); - } - break; - case "color": - var color = message.toLowerCase(); - debugColor("set color: ", color); - device.setColor(color).then((data) => { - debug("set device color completed", data); - }); - break; - } - - }).catch((err) => { - debugError(err); - }); - } catch (e) { - debugError(e); - } -}); - /** * Publish current TuyaDevice state to MQTT-Topic * @param {TuyaDevice} device @@ -332,43 +252,156 @@ TuyaDevice.onAll('data', function (data) { } }); -/** - * MQTT connection tester - */ -function MQTT_Tester() { - this.interval = null; - - function mqttConnectionTest() { - if (mqtt_client.connected != connected) { - connected = mqtt_client.connected; - if (connected) { - debug('MQTT-Server verbunden.'); - } else { - debug('MQTT-Server nicht verbunden.'); - } - } - } - - this.destroy = function () { - clearInterval(this.interval); - this.interval = undefined; - } - - this.connect = function () { - this.interval = setInterval(mqttConnectionTest, 1500); - mqttConnectionTest(); - } - - var constructor = (function (that) { - that.connect.call(that); - })(this); -} -var tester = new MQTT_Tester(); - /** * Function call on script exit */ function onExit() { TuyaDevice.disconnectAll(); - if (tester) tester.destroy(); }; + +/** + * Function to check if devices has previously been created + * Used for memory leak hack for tuyapi >5.1.x + */ +function existingTuyaDevice(device) { + var existing = false; + tuyaDevices.forEach(tuyaDev => { + if (tuyaDev.hasOwnProperty("options")) { + if (tuyaDev.options.id === device.options.id) { + existing = true; + }; + }; + }); + return existing; +} + +// Simple sleep to pause in async functions +function sleep(sec) { + return new Promise(res => setTimeout(res, sec*1000)); +} + +// Main code loop +const main = async() => { + + try { + CONFIG = require("./config"); + } catch (e) { + console.error("Configuration file not found") + debugError(e) + process.exit(1) + } + + if (typeof CONFIG.qos == "undefined") { + CONFIG.qos = 2; + } + if (typeof CONFIG.retain == "undefined") { + CONFIG.retain = false; + } + + mqtt_client = mqtt.connect({ + host: CONFIG.host, + port: CONFIG.port, + username: CONFIG.mqtt_user, + password: CONFIG.mqtt_pass, + }); + + mqtt_client.on('connect', function (err) { + debug("Connection established to MQTT server"); + var topic = CONFIG.topic + '#'; + mqtt_client.subscribe(topic, { + retain: CONFIG.retain, + qos: CONFIG.qos + }); + }); + + mqtt_client.on("reconnect", function (error) { + if (mqtt_client.connected) { + debug("Connection to MQTT server lost. Attempting to reconnect..."); + } else { + debug("Unable to connect to MQTT server"); + } + }); + + mqtt_client.on("error", function (error) { + debug("Unable to connect to MQTT server", error); + }); + + mqtt_client.on('message', function (topic, message) { + try { + message = message.toString(); + var action = getActionFromTopic(topic); + var options = getDeviceFromTopic(topic); + + debug("receive settings", JSON.stringify({ + topic: topic, + action: action, + message: message, + options: options + })); + + var device = new TuyaDevice(options); + + device.then(function (params) { + var device = params.device; + + // If new device add to registered device list + // Used only for reconnecting devices due to tuyapi 5.1.1 memory leak + if (!existingTuyaDevice(device)) { + tuyaDevices.push(device); + } + + switch (action) { + case "command": + var command = getCommandFromTopic(topic, message); + debug("receive command", command); + if (command == "toggle") { + device.switch(command).then((data) => { + debug("set device status completed", data); + }); + } else { + device.set(command).then((data) => { + debug("set device status completed", data); + }); + } + break; + case "color": + var color = message.toLowerCase(); + debugColor("set color: ", color); + device.setColor(color).then((data) => { + debug("set device color completed", data); + }); + break; + } + + }).catch((err) => { + debugError(err); + }); + } catch (e) { + debugError(e); + } + }); + +} + +// Call the main code +main() + +// Hack for memory leak in Tuyapi > 5.1.x +// Disconnect and reconnect all devices every 60 minutes +setInterval(async function() { + tuyaDevices.forEach(tuyaDev => { + var device = new TuyaDevice(tuyaDev.options); + device.then(function (params) { + device = params.device; + device.disconnect(); + }); + }); + await sleep(1); + tuyaDevices.forEach(tuyaDev => { + var device = new TuyaDevice(tuyaDev.options); + device.then(function (params) { + device = params.device; + device.connect(); + }); + }); +}, 3600000);