Work around memory leak in tuyapi >5.1.x

Work around memory leak in tuyapi >5.1.x
This commit is contained in:
tsightler
2019-07-29 08:03:06 -04:00
parent fb79927020
commit a554cd3f58
2 changed files with 185 additions and 152 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "tuya-mqtt",
"version": "2.0.1",
"version": "2.0.2",
"description": "",
"homepage": "https://github.com/TheAgentK/tuya-mqtt#readme",
"main": "tuya-mqtt.js",
@@ -14,8 +14,8 @@
"license": "ISC",
"dependencies": {
"color-convert": "^1.9.3",
"debug": "^3.2.6",
"mqtt": "^2.18.8",
"debug": "^4.1.1",
"mqtt": "^3.0.0",
"tuyapi": "^5.1.1"
},
"repository": {

View File

@@ -1,3 +1,4 @@
'use strict'
const mqtt = require('mqtt');
const TuyaDevice = require('./tuya-device');
const debug = require('debug')('TuyAPI:mqtt');
@@ -6,6 +7,14 @@ const debugTuya = require('debug')('TuyAPI:mqtt:device');
const debugError = require('debug')('TuyAPI:mqtt:error');
var cleanup = require('./cleanup').Cleanup(onExit);
var CONFIG = undefined;
var mqtt_client = undefined;
// Gloabal variable to track all registered Tuya devices
// Used to disconnect/reconnect devices every 60 minutes
// due to memory leak in tuyapi >5.1.x
const tuyaDevices = new Array();
function bmap(istate) {
return istate ? 'ON' : "OFF";
}
@@ -14,56 +23,7 @@ function boolToString(istate) {
return istate ? 'true' : "false";
}
var connected = undefined;
var CONFIG = undefined;
try {
CONFIG = require("./config");
} catch (e) {
console.error("Configuration file not found")
debugError(e)
process.exit(1)
}
if (typeof CONFIG.qos == "undefined") {
CONFIG.qos = 2;
}
if (typeof CONFIG.retain == "undefined") {
CONFIG.retain = false;
}
const mqtt_client = mqtt.connect({
host: CONFIG.host,
port: CONFIG.port,
username: CONFIG.mqtt_user,
password: CONFIG.mqtt_pass,
});
mqtt_client.on('connect', function (err) {
debug("Verbindung mit MQTT-Server hergestellt");
connected = true;
var topic = CONFIG.topic + '#';
mqtt_client.subscribe(topic, {
retain: CONFIG.retain,
qos: CONFIG.qos
});
});
mqtt_client.on("reconnect", function (error) {
if (connected) {
debug("Verbindung mit MQTT-Server wurde unterbrochen. Erneuter Verbindungsversuch!");
} else {
debug("Verbindung mit MQTT-Server konnte nicht herrgestellt werden.");
}
connected = false;
});
mqtt_client.on("error", function (error) {
debug("Verbindung mit MQTT-Server konnte nicht herrgestellt werden.", error);
connected = false;
});
/**
/*
* execute function on topic message
*/
@@ -112,18 +72,34 @@ function getDeviceFromTopic(_topic) {
var topic = _topic.split("/");
if (checkTopicNotation(_topic)) {
return {
var options = {
id: topic[2],
key: topic[3],
ip: topic[4],
type: topic[1]
key: topic[3]
};
if (ip !== "discover") {
options.ip = topic[4]
if (type == "ver3.3") {
options.version = "3.3"
} else if (type == "ver3.1") {
options.version = "3.1"
} else {
options.type = topic[1]
};
};
return options;
} else {
return {
id: topic[1],
key: topic[2],
ip: topic[3]
};
var options = {
id: topic[1],
key: topic[2]
};
if (topic[3] !== "discover") {
options.ip = topic[3]
};
return options;
}
}
@@ -165,62 +141,6 @@ function getCommandFromTopic(_topic, _message) {
return command;
}
mqtt_client.on('message', function (topic, message) {
try {
message = message.toString();
var action = getActionFromTopic(topic);
var options = getDeviceFromTopic(topic);
debug("receive settings", JSON.stringify({
topic: topic,
action: action,
message: message,
options: options
}));
if (options.ip == "discover") {
delete options.ip;
} else if (options.type == "ver3.3") {
delete options.type;
options.version = "3.3";
} else if (options.type == "ver3.1") {
delete options.type;
options.version = "3.1";
}
var device = new TuyaDevice(options);
device.then(function (params) {
var device = params.device;
switch (action) {
case "command":
var command = getCommandFromTopic(topic, message);
debug("receive command", command);
if (command == "toggle") {
device.switch(command).then((data) => {
debug("set device status completed", data);
});
} else {
device.set(command).then((data) => {
debug("set device status completed", data);
});
}
break;
case "color":
var color = message.toLowerCase();
debugColor("set color: ", color);
device.setColor(color).then((data) => {
debug("set device color completed", data);
});
break;
}
}).catch((err) => {
debugError(err);
});
} catch (e) {
debugError(e);
}
});
/**
* Publish current TuyaDevice state to MQTT-Topic
* @param {TuyaDevice} device
@@ -332,43 +252,156 @@ TuyaDevice.onAll('data', function (data) {
}
});
/**
* MQTT connection tester
*/
function MQTT_Tester() {
this.interval = null;
function mqttConnectionTest() {
if (mqtt_client.connected != connected) {
connected = mqtt_client.connected;
if (connected) {
debug('MQTT-Server verbunden.');
} else {
debug('MQTT-Server nicht verbunden.');
}
}
}
this.destroy = function () {
clearInterval(this.interval);
this.interval = undefined;
}
this.connect = function () {
this.interval = setInterval(mqttConnectionTest, 1500);
mqttConnectionTest();
}
var constructor = (function (that) {
that.connect.call(that);
})(this);
}
var tester = new MQTT_Tester();
/**
* Function call on script exit
*/
function onExit() {
TuyaDevice.disconnectAll();
if (tester) tester.destroy();
};
/**
* Function to check if devices has previously been created
* Used for memory leak hack for tuyapi >5.1.x
*/
function existingTuyaDevice(device) {
var existing = false;
tuyaDevices.forEach(tuyaDev => {
if (tuyaDev.hasOwnProperty("options")) {
if (tuyaDev.options.id === device.options.id) {
existing = true;
};
};
});
return existing;
}
// Simple sleep to pause in async functions
function sleep(sec) {
return new Promise(res => setTimeout(res, sec*1000));
}
// Main code loop
const main = async() => {
try {
CONFIG = require("./config");
} catch (e) {
console.error("Configuration file not found")
debugError(e)
process.exit(1)
}
if (typeof CONFIG.qos == "undefined") {
CONFIG.qos = 2;
}
if (typeof CONFIG.retain == "undefined") {
CONFIG.retain = false;
}
mqtt_client = mqtt.connect({
host: CONFIG.host,
port: CONFIG.port,
username: CONFIG.mqtt_user,
password: CONFIG.mqtt_pass,
});
mqtt_client.on('connect', function (err) {
debug("Connection established to MQTT server");
var topic = CONFIG.topic + '#';
mqtt_client.subscribe(topic, {
retain: CONFIG.retain,
qos: CONFIG.qos
});
});
mqtt_client.on("reconnect", function (error) {
if (mqtt_client.connected) {
debug("Connection to MQTT server lost. Attempting to reconnect...");
} else {
debug("Unable to connect to MQTT server");
}
});
mqtt_client.on("error", function (error) {
debug("Unable to connect to MQTT server", error);
});
mqtt_client.on('message', function (topic, message) {
try {
message = message.toString();
var action = getActionFromTopic(topic);
var options = getDeviceFromTopic(topic);
debug("receive settings", JSON.stringify({
topic: topic,
action: action,
message: message,
options: options
}));
var device = new TuyaDevice(options);
device.then(function (params) {
var device = params.device;
// If new device add to registered device list
// Used only for reconnecting devices due to tuyapi 5.1.1 memory leak
if (!existingTuyaDevice(device)) {
tuyaDevices.push(device);
}
switch (action) {
case "command":
var command = getCommandFromTopic(topic, message);
debug("receive command", command);
if (command == "toggle") {
device.switch(command).then((data) => {
debug("set device status completed", data);
});
} else {
device.set(command).then((data) => {
debug("set device status completed", data);
});
}
break;
case "color":
var color = message.toLowerCase();
debugColor("set color: ", color);
device.setColor(color).then((data) => {
debug("set device color completed", data);
});
break;
}
}).catch((err) => {
debugError(err);
});
} catch (e) {
debugError(e);
}
});
}
// Call the main code
main()
// Hack for memory leak in Tuyapi > 5.1.x
// Disconnect and reconnect all devices every 60 minutes
setInterval(async function() {
tuyaDevices.forEach(tuyaDev => {
var device = new TuyaDevice(tuyaDev.options);
device.then(function (params) {
device = params.device;
device.disconnect();
});
});
await sleep(1);
tuyaDevices.forEach(tuyaDev => {
var device = new TuyaDevice(tuyaDev.options);
device.then(function (params) {
device = params.device;
device.connect();
});
});
}, 3600000);