mirror of
https://repository.entgra.net/community/device-mgt-plugins.git
synced 2025-09-16 23:42:15 +00:00
Merge branch 'master' of https://github.com/wso2/carbon-device-mgt-plugins
This commit is contained in:
commit
4a63b8c52b
@ -75,7 +75,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|||||||
agentManager.updateAgentStatus("Connected to MQTT Queue");
|
agentManager.updateAgentStatus("Connected to MQTT Queue");
|
||||||
} catch (TransportHandlerException e) {
|
} catch (TransportHandlerException e) {
|
||||||
log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint +
|
log.warn(AgentConstants.LOG_APPENDER + "Connection to MQTT Broker at: " + mqttBrokerEndPoint +
|
||||||
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
|
" failed.\n Will retry in " + timeoutInterval + " milli-seconds.");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@ -115,63 +115,63 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
receivedMessage = message.toString();
|
receivedMessage = message.toString();
|
||||||
if(!receivedMessage.contains("policyDefinition")){
|
if (!receivedMessage.contains("policyDefinition")) {
|
||||||
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
|
receivedMessage = AgentUtilOperations.extractMessageFromPayload(receivedMessage);
|
||||||
|
|
||||||
String[] controlSignal = receivedMessage.split(":");
|
String[] controlSignal = receivedMessage.split(":");
|
||||||
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
// message- "<SIGNAL_TYPE>:<SIGNAL_MODE>" format.(ex: "BULB:ON", "TEMPERATURE", "HUMIDITY")
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch (controlSignal[0].toUpperCase()) {
|
switch (controlSignal[0].toUpperCase()) {
|
||||||
case AgentConstants.BULB_CONTROL:
|
case AgentConstants.BULB_CONTROL:
|
||||||
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
|
boolean stateToSwitch = controlSignal[1].equals(AgentConstants.CONTROL_ON);
|
||||||
agentManager.changeAlarmStatus(stateToSwitch);
|
agentManager.changeAlarmStatus(stateToSwitch);
|
||||||
log.info(
|
log.info(
|
||||||
AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
AgentConstants.LOG_APPENDER + "Bulb was switched to state: '" + controlSignal[1] + "'");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AgentConstants.TEMPERATURE_CONTROL:
|
case AgentConstants.TEMPERATURE_CONTROL:
|
||||||
int currentTemperature = agentManager.getTemperature();
|
int currentTemperature = agentManager.getTemperature();
|
||||||
|
|
||||||
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
|
String replyTemperature = "Current temperature was read as: '" + currentTemperature + "C'";
|
||||||
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
|
log.info(AgentConstants.LOG_APPENDER + replyTemperature);
|
||||||
|
|
||||||
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
String tempPublishTopic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
||||||
|
|
||||||
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
replyMessage = AgentConstants.TEMPERATURE_CONTROL + ":" + currentTemperature;
|
||||||
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
||||||
publishToQueue(tempPublishTopic, securePayLoad);
|
publishToQueue(tempPublishTopic, securePayLoad);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AgentConstants.HUMIDITY_CONTROL:
|
case AgentConstants.HUMIDITY_CONTROL:
|
||||||
int currentHumidity = agentManager.getHumidity();
|
int currentHumidity = agentManager.getHumidity();
|
||||||
|
|
||||||
String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'";
|
String replyHumidity = "Current humidity was read as: '" + currentHumidity + "%'";
|
||||||
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
|
log.info(AgentConstants.LOG_APPENDER + replyHumidity);
|
||||||
|
|
||||||
String humidPublishTopic = String.format(
|
String humidPublishTopic = String.format(
|
||||||
AgentConstants.MQTT_PUBLISH_TOPIC,tenantDomain, deviceID);
|
AgentConstants.MQTT_PUBLISH_TOPIC, tenantDomain, deviceID);
|
||||||
|
|
||||||
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
replyMessage = AgentConstants.HUMIDITY_CONTROL + ":" + currentHumidity;
|
||||||
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
securePayLoad = AgentUtilOperations.prepareSecurePayLoad(replyMessage);
|
||||||
publishToQueue(humidPublishTopic, securePayLoad);
|
publishToQueue(humidPublishTopic, securePayLoad);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case AgentConstants.POLICY_REVOKE:
|
case AgentConstants.POLICY_REVOKE:
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
|
log.warn(AgentConstants.LOG_APPENDER + "'" + controlSignal[0] +
|
||||||
"' is invalid and not-supported for this device-type");
|
"' is invalid and not-supported for this device-type");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} catch (AgentCoreOperationException e) {
|
} catch (AgentCoreOperationException e) {
|
||||||
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
log.warn(AgentConstants.LOG_APPENDER + "Preparing Secure payload failed", e);
|
||||||
} catch (TransportHandlerException e) {
|
} catch (TransportHandlerException e) {
|
||||||
log.error(AgentConstants.LOG_APPENDER +
|
log.error(AgentConstants.LOG_APPENDER +
|
||||||
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
"MQTT - Publishing, reply message to the MQTT Queue at: " +
|
||||||
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + " failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
JSONObject jsonMessage = new JSONObject(receivedMessage);
|
JSONObject jsonMessage = new JSONObject(receivedMessage);
|
||||||
@ -208,13 +208,13 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|||||||
pushMessage.setRetained(false);
|
pushMessage.setRetained(false);
|
||||||
|
|
||||||
String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
|
String topic = String.format(AgentConstants.MQTT_PUBLISH_TOPIC,
|
||||||
agentManager.getAgentConfigs().getTenantDomain(),
|
agentManager.getAgentConfigs().getTenantDomain(),
|
||||||
agentManager.getAgentConfigs().getDeviceId());
|
agentManager.getAgentConfigs().getDeviceId());
|
||||||
|
|
||||||
publishToQueue(topic, pushMessage);
|
publishToQueue(topic, pushMessage);
|
||||||
log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" +
|
log.info(AgentConstants.LOG_APPENDER + "Message: '" + message + "' published to MQTT Queue at [" +
|
||||||
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" +
|
agentManager.getAgentConfigs().getMqttBrokerEndpoint() + "] under topic [" +
|
||||||
topic + "]");
|
topic + "]");
|
||||||
|
|
||||||
} catch (TransportHandlerException e) {
|
} catch (TransportHandlerException e) {
|
||||||
log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" +
|
log.warn(AgentConstants.LOG_APPENDER + "Data Publish attempt to topic - [" +
|
||||||
@ -226,7 +226,7 @@ public class FireAlarmMQTTCommunicator extends MQTTTransportHandler {
|
|||||||
};
|
};
|
||||||
|
|
||||||
dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval,
|
dataPushServiceHandler = service.scheduleAtFixedRate(pushDataRunnable, publishInterval, publishInterval,
|
||||||
TimeUnit.SECONDS);
|
TimeUnit.SECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -125,8 +125,8 @@ public class AgentManager {
|
|||||||
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
|
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
|
||||||
|
|
||||||
} catch (TransportHandlerException e) {
|
} catch (TransportHandlerException e) {
|
||||||
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
|
log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
|
||||||
", provided in the configuration file is invalid.");
|
", provided in the configuration file is invalid. XMPP is not configured.");
|
||||||
}
|
}
|
||||||
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
|
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
|
||||||
agentConfigs.getDeviceId());
|
agentConfigs.getDeviceId());
|
||||||
|
|||||||
@ -68,32 +68,27 @@ public class SidhdhiQuery implements Runnable {
|
|||||||
|
|
||||||
//Start the execution plan with pre-defined or previously persisted Siddhi query
|
//Start the execution plan with pre-defined or previously persisted Siddhi query
|
||||||
File f = new File(sidhdhiQueryPath);
|
File f = new File(sidhdhiQueryPath);
|
||||||
|
|
||||||
if (!f.exists()) {
|
|
||||||
AgentUtilOperations.writeToFile("", sidhdhiQueryPath);
|
|
||||||
}
|
|
||||||
|
|
||||||
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
if (f.exists()) {
|
||||||
//Check if there is new policy update available
|
//AgentUtilOperations.writeToFile("", sidhdhiQueryPath);
|
||||||
if (AgentManager.isUpdated()) {
|
StartExecutionPlan startExecutionPlan = new StartExecutionPlan().invoke();
|
||||||
System.out.print("### Policy Update Detected!");
|
//Check if there is new policy update available
|
||||||
//Restart execution plan with new query
|
if (AgentManager.isUpdated()) {
|
||||||
restartSiddhi();
|
System.out.print("### Policy Update Detected!");
|
||||||
startExecutionPlan = new StartExecutionPlan().invoke();
|
//Restart execution plan with new query
|
||||||
}
|
restartSiddhi();
|
||||||
InputHandler inputHandler = startExecutionPlan.getInputHandler();
|
startExecutionPlan = new StartExecutionPlan().invoke();
|
||||||
|
}
|
||||||
//Sending events to Siddhi
|
InputHandler inputHandler = startExecutionPlan.getInputHandler();
|
||||||
try {
|
//Sending events to Siddhi
|
||||||
int humidityReading = AgentManager.getInstance().getTemperature();
|
try {
|
||||||
inputHandler.send(new Object[]{"FIRE_1", humidityReading});
|
int humidityReading = AgentManager.getInstance().getTemperature();
|
||||||
Thread.sleep(3000);
|
inputHandler.send(new Object[]{"FIRE_1", humidityReading});
|
||||||
} catch (InterruptedException e) {
|
Thread.sleep(3000);
|
||||||
e.printStackTrace();
|
} catch (InterruptedException e) {
|
||||||
break;
|
e.printStackTrace();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -117,7 +112,9 @@ public class SidhdhiQuery implements Runnable {
|
|||||||
public static String readFile(String path, Charset encoding) {
|
public static String readFile(String path, Charset encoding) {
|
||||||
byte[] encoded = new byte[0];
|
byte[] encoded = new byte[0];
|
||||||
try {
|
try {
|
||||||
encoded = Files.readAllBytes(Paths.get(path));
|
if (new File(sidhdhiQueryPath).exists()) {
|
||||||
|
encoded = Files.readAllBytes(Paths.get(path));
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
log.error("Error reading Sidhdhi query from file.");
|
log.error("Error reading Sidhdhi query from file.");
|
||||||
}
|
}
|
||||||
@ -172,7 +169,7 @@ public class SidhdhiQuery implements Runnable {
|
|||||||
siddhiManager.addCallback("bulbOnStream", new StreamCallback() {
|
siddhiManager.addCallback("bulbOnStream", new StreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void receive(Event[] events) {
|
public void receive(Event[] events) {
|
||||||
System.out.println("Bulb on Event Fired!");
|
// System.out.println("Bulb on Event Fired!");
|
||||||
if (events.length > 0) {
|
if (events.length > 0) {
|
||||||
if (!AgentManager.getInstance().isAlarmOn()) {
|
if (!AgentManager.getInstance().isAlarmOn()) {
|
||||||
AgentManager.getInstance().changeAlarmStatus(true);
|
AgentManager.getInstance().changeAlarmStatus(true);
|
||||||
@ -185,7 +182,7 @@ public class SidhdhiQuery implements Runnable {
|
|||||||
siddhiManager.addCallback("bulbOffStream", new StreamCallback() {
|
siddhiManager.addCallback("bulbOffStream", new StreamCallback() {
|
||||||
@Override
|
@Override
|
||||||
public void receive(Event[] inEvents) {
|
public void receive(Event[] inEvents) {
|
||||||
System.out.println("Bulb off Event Fired");
|
// System.out.println("Bulb off Event Fired");
|
||||||
if (AgentManager.getInstance().isAlarmOn()) {
|
if (AgentManager.getInstance().isAlarmOn()) {
|
||||||
AgentManager.getInstance().changeAlarmStatus(false);
|
AgentManager.getInstance().changeAlarmStatus(false);
|
||||||
System.out.println("#### Performed HTTP call! OFF.");
|
System.out.println("#### Performed HTTP call! OFF.");
|
||||||
@ -193,12 +190,10 @@ public class SidhdhiQuery implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
//Retrieving InputHandler to push events into Siddhi
|
//Retrieving InputHandler to push events into Siddhi
|
||||||
inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream");
|
inputHandler = siddhiManager.getInputHandler("fireAlarmEventStream");
|
||||||
|
|
||||||
//Starting event processing
|
//Starting event processing
|
||||||
System.out.println("Execution Plan Started!");
|
// System.out.println("Execution Plan Started!");
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -114,8 +114,8 @@ public class AgentManager {
|
|||||||
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
|
agentCommunicator.put(AgentConstants.XMPP_PROTOCOL, xmppCommunicator);
|
||||||
|
|
||||||
} catch (TransportHandlerException e) {
|
} catch (TransportHandlerException e) {
|
||||||
log.error("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
|
log.info("XMPP Endpoint String - " + agentConfigs.getXmppServerEndpoint() +
|
||||||
", provided in the configuration file is invalid.");
|
", provided in the configuration file is invalid. XMPP is not configured.");
|
||||||
}
|
}
|
||||||
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
|
String mqttTopic = String.format(AgentConstants.MQTT_SUBSCRIBE_TOPIC, agentConfigs.getTenantDomain(),
|
||||||
agentConfigs.getDeviceId());
|
agentConfigs.getDeviceId());
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user