mirror of
https://repository.entgra.net/community/product-iots.git
synced 2025-09-16 23:32:19 +00:00
Added MQTT-Subscriber and XMPP-Connector for the virtual_firealarm
deviceType
This commit is contained in:
parent
33edc31644
commit
c13e5ac620
@ -48,7 +48,7 @@ public class ArduinoControllerService {
|
|||||||
public void setMqttArduinoSubscriber(MqttArduinoSubscriber mqttArduinoSubscriber) {
|
public void setMqttArduinoSubscriber(MqttArduinoSubscriber mqttArduinoSubscriber) {
|
||||||
ArduinoControllerService.mqttArduinoSubscriber = mqttArduinoSubscriber;
|
ArduinoControllerService.mqttArduinoSubscriber = mqttArduinoSubscriber;
|
||||||
try {
|
try {
|
||||||
mqttArduinoSubscriber.subscribe();
|
mqttArduinoSubscriber.connectAndSubscribe();
|
||||||
} catch (DeviceManagementException e) {
|
} catch (DeviceManagementException e) {
|
||||||
log.error(e.getErrorMessage());
|
log.error(e.getErrorMessage());
|
||||||
}
|
}
|
||||||
|
|||||||
@ -26,10 +26,12 @@ import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
public class MqttArduinoSubscriber extends MqttSubscriber {
|
public class MqttArduinoSubscriber extends MqttSubscriber {
|
||||||
|
|
||||||
private static Log log = LogFactory.getLog(MqttArduinoSubscriber.class);
|
private static Log log = LogFactory.getLog(MqttArduinoSubscriber.class);
|
||||||
|
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0,5);
|
||||||
private static final String subscribetopic =
|
private static final String subscribetopic =
|
||||||
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
|
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
|
||||||
ArduinoConstants.DEVICE_TYPE + File.separator + "#";
|
ArduinoConstants.DEVICE_TYPE + File.separator + "#";
|
||||||
@ -37,8 +39,7 @@ public class MqttArduinoSubscriber extends MqttSubscriber {
|
|||||||
|
|
||||||
//make it singleton
|
//make it singleton
|
||||||
private MqttArduinoSubscriber() {
|
private MqttArduinoSubscriber() {
|
||||||
|
super(iotServerSubscriber, ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(),
|
||||||
super("Subscriber", ArduinoConstants.DEVICE_TYPE, MqttConfig.getInstance().getMqttQueueEndpoint(),
|
|
||||||
subscribetopic);
|
subscribetopic);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -201,7 +201,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.eclipse.paho</groupId>
|
<groupId>org.eclipse.paho</groupId>
|
||||||
<artifactId>mqtt-client</artifactId>
|
<artifactId>mqtt-client</artifactId>
|
||||||
<version>${eclipse.paho.version}</version>
|
<version>${paho.mqtt.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
@ -441,7 +441,6 @@
|
|||||||
<orbit.tomcat.version>7.0.52.wso2v5</orbit.tomcat.version>
|
<orbit.tomcat.version>7.0.52.wso2v5</orbit.tomcat.version>
|
||||||
<orbit.tomcat.jdbc.pooling.version>7.0.34.wso2v2</orbit.tomcat.jdbc.pooling.version>
|
<orbit.tomcat.jdbc.pooling.version>7.0.34.wso2v2</orbit.tomcat.jdbc.pooling.version>
|
||||||
|
|
||||||
<eclipse.paho.version>0.4.0</eclipse.paho.version>
|
|
||||||
<google.gson.version>2.2.4</google.gson.version>
|
<google.gson.version>2.2.4</google.gson.version>
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -22,4 +22,9 @@ public class VirtualFireAlarmConstants {
|
|||||||
public final static String DEVICE_PLUGIN_DEVICE_ID = "VIRTUAL_FIREALARM_DEVICE_ID";
|
public final static String DEVICE_PLUGIN_DEVICE_ID = "VIRTUAL_FIREALARM_DEVICE_ID";
|
||||||
public final static String STATE_ON = "ON";
|
public final static String STATE_ON = "ON";
|
||||||
public final static String STATE_OFF = "OFF";
|
public final static String STATE_OFF = "OFF";
|
||||||
|
|
||||||
|
public static final String URL_PREFIX = "http://";
|
||||||
|
public static final String BULB_CONTEXT = "/BULB/";
|
||||||
|
public static final String SONAR_CONTEXT = "/HUMIDITY/";
|
||||||
|
public static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/";
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,7 +49,6 @@ public class VirtualFireAlarmManager implements DeviceManager {
|
|||||||
private static final Log log = LogFactory.getLog(VirtualFireAlarmManager.class);
|
private static final Log log = LogFactory.getLog(VirtualFireAlarmManager.class);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FeatureManager getFeatureManager() {
|
public FeatureManager getFeatureManager() {
|
||||||
return null;
|
return null;
|
||||||
|
|||||||
@ -25,11 +25,13 @@ import org.osgi.framework.ServiceRegistration;
|
|||||||
import org.osgi.service.component.ComponentContext;
|
import org.osgi.service.component.ComponentContext;
|
||||||
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
|
import org.wso2.carbon.device.mgt.common.spi.DeviceManagementService;
|
||||||
import org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService;
|
import org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService;
|
||||||
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.VirtualFireAlarmManagerService;
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl
|
||||||
|
.VirtualFireAlarmManagerService;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal.VirtualFirealarmManagementServiceComponent"
|
* @scr.component name="org.wso2.carbon.device.mgt.iot.firealarm.internal
|
||||||
|
* .VirtualFirealarmManagementServiceComponent"
|
||||||
* immediate="true"
|
* immediate="true"
|
||||||
* @scr.reference name="wso2.carbon.device.mgt.iot.common.DeviceTypeService"
|
* @scr.reference name="wso2.carbon.device.mgt.iot.common.DeviceTypeService"
|
||||||
* interface="org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService"
|
* interface="org.wso2.carbon.device.mgt.iot.common.service.DeviceTypeService"
|
||||||
@ -39,66 +41,69 @@ import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.impl.Virtu
|
|||||||
* unbind="unsetDeviceTypeService"
|
* unbind="unsetDeviceTypeService"
|
||||||
*/
|
*/
|
||||||
public class VirtualFirealarmManagementServiceComponent {
|
public class VirtualFirealarmManagementServiceComponent {
|
||||||
|
|
||||||
|
|
||||||
private ServiceRegistration firealarmServiceRegRef;
|
|
||||||
|
|
||||||
|
|
||||||
|
private ServiceRegistration firealarmServiceRegRef;
|
||||||
|
|
||||||
private static final Log log = LogFactory.getLog(VirtualFirealarmManagementServiceComponent.class);
|
private static final Log log = LogFactory.getLog(
|
||||||
protected void activate(ComponentContext ctx) {
|
VirtualFirealarmManagementServiceComponent.class);
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Activating Virtual Firealarm Device Management Service Component");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
BundleContext bundleContext = ctx.getBundleContext();
|
|
||||||
|
|
||||||
|
protected void activate(ComponentContext ctx) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Activating Virtual Firealarm Device Management Service Component");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
BundleContext bundleContext = ctx.getBundleContext();
|
||||||
|
firealarmServiceRegRef =
|
||||||
|
bundleContext.registerService(DeviceManagementService.class.getName(),
|
||||||
|
new VirtualFireAlarmManagerService(), null);
|
||||||
|
|
||||||
firealarmServiceRegRef =
|
if (log.isDebugEnabled()) {
|
||||||
bundleContext.registerService(DeviceManagementService.class.getName(),
|
log.debug(
|
||||||
new VirtualFireAlarmManagerService(),
|
"Virtual Firealarm Device Management Service Component has been " +
|
||||||
null);
|
"successfully activated");
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error(
|
||||||
|
"Error occurred while activating Virtual Firealarm Device Management Service " +
|
||||||
|
"Component",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void deactivate(ComponentContext ctx) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("De-activating Virtual Firealarm Device Management Service Component");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (firealarmServiceRegRef != null) {
|
||||||
|
firealarmServiceRegRef.unregister();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug(
|
||||||
|
"Virtual Firealarm Device Management Service Component has been " +
|
||||||
|
"successfully de-activated");
|
||||||
|
}
|
||||||
|
} catch (Throwable e) {
|
||||||
|
log.error(
|
||||||
|
"Error occurred while de-activating Virtual Firealarm Device Management " +
|
||||||
|
"bundle",
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
protected void setDeviceTypeService(DeviceTypeService deviceTypeService) {
|
||||||
log.debug("Virtual Firealarm Device Management Service Component has been successfully activated");
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
log.error("Error occurred while activating Virtual Firealarm Device Management Service Component", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void deactivate(ComponentContext ctx) {
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("De-activating Virtual Firealarm Device Management Service Component");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
if (firealarmServiceRegRef != null) {
|
|
||||||
firealarmServiceRegRef.unregister();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug(
|
|
||||||
"Virtual Firealarm Device Management Service Component has been successfully de-activated");
|
|
||||||
}
|
|
||||||
} catch (Throwable e) {
|
|
||||||
log.error("Error occurred while de-activating Virtual Firealarm Device Management bundle", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void setDeviceTypeService(DeviceTypeService deviceTypeService) {
|
|
||||||
/* This is to avoid this component getting initialized before the
|
/* This is to avoid this component getting initialized before the
|
||||||
common registered */
|
common registered */
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug("Data source service set to mobile service component");
|
log.debug("Data source service set to mobile service component");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) {
|
protected void unsetDeviceTypeService(DeviceTypeService deviceTypeService) {
|
||||||
//do nothing
|
//do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -98,16 +98,6 @@
|
|||||||
<artifactId>org.wso2.carbon.device.mgt.analytics</artifactId>
|
<artifactId>org.wso2.carbon.device.mgt.analytics</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!--Dependencies on XMPP Client Library-->
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.igniterealtime.smack.wso2</groupId>
|
|
||||||
<artifactId>smack</artifactId>
|
|
||||||
</dependency>
|
|
||||||
<dependency>
|
|
||||||
<groupId>org.igniterealtime.smack.wso2</groupId>
|
|
||||||
<artifactId>smackx</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -23,8 +23,6 @@ import org.apache.http.client.methods.HttpGet;
|
|||||||
import org.apache.http.concurrent.FutureCallback;
|
import org.apache.http.concurrent.FutureCallback;
|
||||||
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||||
import org.apache.http.impl.nio.client.HttpAsyncClients;
|
import org.apache.http.impl.nio.client.HttpAsyncClients;
|
||||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
||||||
import org.jivesoftware.smack.packet.Message;
|
|
||||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||||
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
|
import org.wso2.carbon.device.mgt.analytics.exception.DataPublisherConfigurationException;
|
||||||
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
|
import org.wso2.carbon.device.mgt.analytics.service.DeviceAnalyticsService;
|
||||||
@ -32,14 +30,15 @@ import org.wso2.carbon.device.mgt.common.DeviceIdentifier;
|
|||||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||||
import org.wso2.carbon.device.mgt.iot.common.DeviceController;
|
import org.wso2.carbon.device.mgt.iot.common.DeviceController;
|
||||||
import org.wso2.carbon.device.mgt.iot.common.DeviceValidator;
|
import org.wso2.carbon.device.mgt.iot.common.DeviceValidator;
|
||||||
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig;
|
|
||||||
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
|
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
|
||||||
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
|
import org.wso2.carbon.device.mgt.iot.common.exception.DeviceControllerException;
|
||||||
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON;
|
|
||||||
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
|
||||||
.VirtualFireAlarmConstants;
|
.VirtualFireAlarmConstants;
|
||||||
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt.MQTTClient;
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.DeviceJSON;
|
||||||
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp.XMPPClient;
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmMqttSubscriber;
|
||||||
|
|
||||||
|
|
||||||
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmXmppConnector;
|
||||||
import org.wso2.carbon.utils.CarbonUtils;
|
import org.wso2.carbon.utils.CarbonUtils;
|
||||||
|
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
@ -75,74 +74,49 @@ public class VirtualFireAlarmControllerService {
|
|||||||
@Context //injected response proxy supporting multiple thread
|
@Context //injected response proxy supporting multiple thread
|
||||||
private HttpServletResponse response;
|
private HttpServletResponse response;
|
||||||
private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature";
|
private static final String TEMPERATURE_STREAM_DEFINITION = "org.wso2.iot.devices.temperature";
|
||||||
|
|
||||||
private static final String URL_PREFIX = "http://";
|
|
||||||
private static final String BULB_CONTEXT = "/BULB/";
|
|
||||||
private static final String SONAR_CONTEXT = "/SONAR/";
|
|
||||||
private static final String TEMPERATURE_CONTEXT = "/TEMPERATURE/";
|
|
||||||
|
|
||||||
public static final String XMPP_PROTOCOL = "XMPP";
|
public static final String XMPP_PROTOCOL = "XMPP";
|
||||||
public static final String HTTP_PROTOCOL = "HTTP";
|
public static final String HTTP_PROTOCOL = "HTTP";
|
||||||
public static final String MQTT_PROTOCOL = "MQTT";
|
public static final String MQTT_PROTOCOL = "MQTT";
|
||||||
|
|
||||||
private static ConcurrentHashMap<String, String> deviceToIpMap = new ConcurrentHashMap<String, String>();
|
private static VirtualFireAlarmMqttSubscriber virtualFireAlarmMqttSubscriber;
|
||||||
private static XMPPClient xmppClient;
|
private static VirtualFireAlarmXmppConnector virtualFireAlarmXmppConnector;
|
||||||
private static MQTTClient mqttClient;
|
private static ConcurrentHashMap<String, String> deviceToIpMap =
|
||||||
private static final String mqttServerSubscribeTopic = "wso2/iot/+/" + VirtualFireAlarmConstants.DEVICE_TYPE + "/+/reply";
|
new ConcurrentHashMap<String, String>();
|
||||||
private static final String iotServerSubscriber = "IoT-Server";
|
|
||||||
|
|
||||||
static{
|
public void setVirtualFireAlarmXmppConnector(
|
||||||
String xmppServer = XmppConfig.getInstance().getXmppControlQueue().getServerURL();
|
VirtualFireAlarmXmppConnector virtualFireAlarmXmppConnector) {
|
||||||
int indexOfChar = xmppServer.lastIndexOf('/');
|
VirtualFireAlarmControllerService.virtualFireAlarmXmppConnector =
|
||||||
if (indexOfChar != -1) {
|
virtualFireAlarmXmppConnector;
|
||||||
xmppServer = xmppServer.substring((indexOfChar + 1), xmppServer.length());
|
virtualFireAlarmXmppConnector.initConnector();
|
||||||
}
|
virtualFireAlarmXmppConnector.connectAndLogin();
|
||||||
|
|
||||||
int xmppPort = Integer.parseInt(XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
|
|
||||||
xmppClient = new XMPPClient(xmppServer, xmppPort) {
|
|
||||||
@Override
|
|
||||||
protected void processXMPPMessage(Message xmppMessage) {
|
|
||||||
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
String xmppUsername = XmppConfig.getInstance().getXmppUsername();
|
|
||||||
String xmppPassword = XmppConfig.getInstance().getXmppPassword();
|
|
||||||
|
|
||||||
try {
|
|
||||||
xmppClient.connectAndLogin(xmppUsername, xmppPassword, "iotServer");
|
|
||||||
} catch (DeviceManagementException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
|
|
||||||
xmppClient.setMessageFilterAndListener("");
|
|
||||||
|
|
||||||
String mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
|
|
||||||
mqttClient = new MQTTClient(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE, mqttEndpoint, mqttServerSubscribeTopic) {
|
|
||||||
@Override
|
|
||||||
protected void postMessageArrived(String topic, MqttMessage message) {
|
|
||||||
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
try {
|
|
||||||
mqttClient.connectAndSubscribe();
|
|
||||||
} catch (DeviceManagementException e) {
|
|
||||||
e.printStackTrace();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setVirtualFireAlarmMqttSubscriber(
|
||||||
|
VirtualFireAlarmMqttSubscriber virtualFireAlarmMqttSubscriber) {
|
||||||
|
VirtualFireAlarmControllerService.virtualFireAlarmMqttSubscriber =
|
||||||
|
virtualFireAlarmMqttSubscriber;
|
||||||
|
virtualFireAlarmMqttSubscriber.initConnector();
|
||||||
|
virtualFireAlarmMqttSubscriber.connectAndSubscribe();
|
||||||
|
}
|
||||||
|
|
||||||
|
public VirtualFireAlarmXmppConnector getVirtualFireAlarmXmppConnector() {
|
||||||
|
return virtualFireAlarmXmppConnector;
|
||||||
|
}
|
||||||
|
|
||||||
|
public VirtualFireAlarmMqttSubscriber getVirtualFireAlarmMqttSubscriber() {
|
||||||
|
return virtualFireAlarmMqttSubscriber;
|
||||||
|
}
|
||||||
|
|
||||||
@Path("/register/{owner}/{deviceId}/{ip}")
|
@Path("/register/{owner}/{deviceId}/{ip}")
|
||||||
@POST
|
@POST
|
||||||
public String registerDeviceIP(@PathParam("owner") String owner,
|
public String registerDeviceIP(@PathParam("owner") String owner,
|
||||||
@PathParam("deviceId") String deviceId,
|
@PathParam("deviceId") String deviceId,
|
||||||
@PathParam("ip") String deviceIP,
|
@PathParam("ip") String deviceIP,
|
||||||
@Context HttpServletResponse response) {
|
@Context HttpServletResponse response) {
|
||||||
String result;
|
String result;
|
||||||
|
|
||||||
log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId +
|
log.info("Got register call from IP: " + deviceIP + " for Device ID: " + deviceId +
|
||||||
" of owner: " + owner);
|
" of owner: " + owner);
|
||||||
|
|
||||||
deviceToIpMap.put(deviceId, deviceIP);
|
deviceToIpMap.put(deviceId, deviceIP);
|
||||||
|
|
||||||
@ -162,10 +136,10 @@ public class VirtualFireAlarmControllerService {
|
|||||||
@Path("/bulb/{state}")
|
@Path("/bulb/{state}")
|
||||||
@POST
|
@POST
|
||||||
public void switchBulb(@HeaderParam("owner") String owner,
|
public void switchBulb(@HeaderParam("owner") String owner,
|
||||||
@HeaderParam("deviceId") String deviceId,
|
@HeaderParam("deviceId") String deviceId,
|
||||||
@HeaderParam("protocol") String protocol,
|
@HeaderParam("protocol") String protocol,
|
||||||
@PathParam("state") String state,
|
@PathParam("state") String state,
|
||||||
@Context HttpServletResponse response) {
|
@Context HttpServletResponse response) {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
DeviceValidator deviceValidator = new DeviceValidator();
|
DeviceValidator deviceValidator = new DeviceValidator();
|
||||||
@ -196,27 +170,28 @@ public class VirtualFireAlarmControllerService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
String protocolString = protocol.toUpperCase();
|
String protocolString = protocol.toUpperCase();
|
||||||
String callUrlPattern = BULB_CONTEXT + switchToState;
|
String callUrlPattern = VirtualFireAlarmConstants.BULB_CONTEXT + switchToState;
|
||||||
|
|
||||||
log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP + " " +
|
log.info("Sending command: '" + callUrlPattern + "' to virtual-firealarm at: " + deviceIP +
|
||||||
"via" + " " + protocolString);
|
" " +
|
||||||
|
"via" + " " + protocolString);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
switch (protocolString) {
|
switch (protocolString) {
|
||||||
case HTTP_PROTOCOL:
|
case HTTP_PROTOCOL:
|
||||||
sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true);
|
sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true);
|
||||||
break;
|
break;
|
||||||
case MQTT_PROTOCOL:
|
case MQTT_PROTOCOL:
|
||||||
sendCommandViaMQTT(owner, deviceId, BULB_CONTEXT.replace("/", ""),
|
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT.replace("/", ""),
|
||||||
switchToState);
|
switchToState);
|
||||||
break;
|
break;
|
||||||
case XMPP_PROTOCOL:
|
case XMPP_PROTOCOL:
|
||||||
// requestBulbChangeViaXMPP(switchToState, response);
|
// requestBulbChangeViaXMPP(switchToState, response);
|
||||||
sendCommandViaXMPP(owner, deviceId, BULB_CONTEXT, switchToState);
|
sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.BULB_CONTEXT, switchToState);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
if (protocolString == null) {
|
if (protocolString == null) {
|
||||||
sendCommandViaHTTP(deviceIP, 80, callUrlPattern, true);
|
sendCommandViaHTTP(deviceIP, 9090, callUrlPattern, true);
|
||||||
} else {
|
} else {
|
||||||
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
||||||
return;
|
return;
|
||||||
@ -225,7 +200,7 @@ public class VirtualFireAlarmControllerService {
|
|||||||
}
|
}
|
||||||
} catch (DeviceManagementException e) {
|
} catch (DeviceManagementException e) {
|
||||||
log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" +
|
log.error("Failed to send command '" + callUrlPattern + "' to: " + deviceIP + " via" +
|
||||||
" " + protocol);
|
" " + protocol);
|
||||||
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
|
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -237,16 +212,16 @@ public class VirtualFireAlarmControllerService {
|
|||||||
@Path("/readsonar")
|
@Path("/readsonar")
|
||||||
@GET
|
@GET
|
||||||
public String requestSonarReading(@HeaderParam("owner") String owner,
|
public String requestSonarReading(@HeaderParam("owner") String owner,
|
||||||
@HeaderParam("deviceId") String deviceId,
|
@HeaderParam("deviceId") String deviceId,
|
||||||
@HeaderParam("protocol") String protocol,
|
@HeaderParam("protocol") String protocol,
|
||||||
@Context HttpServletResponse response) {
|
@Context HttpServletResponse response) {
|
||||||
String replyMsg = "";
|
String replyMsg = "";
|
||||||
|
|
||||||
DeviceValidator deviceValidator = new DeviceValidator();
|
DeviceValidator deviceValidator = new DeviceValidator();
|
||||||
try {
|
try {
|
||||||
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
|
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
|
||||||
VirtualFireAlarmConstants
|
VirtualFireAlarmConstants
|
||||||
.DEVICE_TYPE))) {
|
.DEVICE_TYPE))) {
|
||||||
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
|
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
|
||||||
return "Unauthorized Access";
|
return "Unauthorized Access";
|
||||||
}
|
}
|
||||||
@ -267,25 +242,24 @@ public class VirtualFireAlarmControllerService {
|
|||||||
try {
|
try {
|
||||||
switch (protocol) {
|
switch (protocol) {
|
||||||
case HTTP_PROTOCOL:
|
case HTTP_PROTOCOL:
|
||||||
log.info("Sending request to read sonar value at : " + deviceIp +
|
log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL);
|
||||||
" via " + HTTP_PROTOCOL);
|
replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false);
|
||||||
|
break;
|
||||||
|
|
||||||
replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false);
|
case MQTT_PROTOCOL:
|
||||||
|
log.info("Sending request to read sonar value at : " + deviceIp + " via " + MQTT_PROTOCOL);
|
||||||
|
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT.replace("/", ""), "");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case XMPP_PROTOCOL:
|
case XMPP_PROTOCOL:
|
||||||
log.info("Sending request to read sonar value at : " + deviceIp +
|
log.info("Sending request to read sonar value at : " + deviceIp + " via " + XMPP_PROTOCOL);
|
||||||
" via " +
|
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.SONAR_CONTEXT, ".");
|
||||||
XMPP_PROTOCOL);
|
|
||||||
replyMsg = sendCommandViaXMPP(owner, deviceId, SONAR_CONTEXT, ".");
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
if (protocol == null) {
|
if (protocol == null) {
|
||||||
log.info("Sending request to read sonar value at : " + deviceIp +
|
log.info("Sending request to read sonar value at : " + deviceIp + " via " + HTTP_PROTOCOL);
|
||||||
" via " + HTTP_PROTOCOL);
|
replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.SONAR_CONTEXT, false);
|
||||||
|
|
||||||
replyMsg = sendCommandViaHTTP(deviceIp, 80, SONAR_CONTEXT, false);
|
|
||||||
} else {
|
} else {
|
||||||
replyMsg = "Requested protocol '" + protocol + "' is not supported";
|
replyMsg = "Requested protocol '" + protocol + "' is not supported";
|
||||||
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
||||||
@ -308,16 +282,16 @@ public class VirtualFireAlarmControllerService {
|
|||||||
@Path("/readtemperature")
|
@Path("/readtemperature")
|
||||||
@GET
|
@GET
|
||||||
public String requestTemperature(@HeaderParam("owner") String owner,
|
public String requestTemperature(@HeaderParam("owner") String owner,
|
||||||
@HeaderParam("deviceId") String deviceId,
|
@HeaderParam("deviceId") String deviceId,
|
||||||
@HeaderParam("protocol") String protocol,
|
@HeaderParam("protocol") String protocol,
|
||||||
@Context HttpServletResponse response) {
|
@Context HttpServletResponse response) {
|
||||||
String replyMsg = "";
|
String replyMsg = "";
|
||||||
|
|
||||||
DeviceValidator deviceValidator = new DeviceValidator();
|
DeviceValidator deviceValidator = new DeviceValidator();
|
||||||
try {
|
try {
|
||||||
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
|
if (!deviceValidator.isExist(owner, SUPER_TENANT, new DeviceIdentifier(deviceId,
|
||||||
VirtualFireAlarmConstants
|
VirtualFireAlarmConstants
|
||||||
.DEVICE_TYPE))) {
|
.DEVICE_TYPE))) {
|
||||||
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
|
response.setStatus(Response.Status.UNAUTHORIZED.getStatusCode());
|
||||||
return "Unauthorized Access";
|
return "Unauthorized Access";
|
||||||
}
|
}
|
||||||
@ -338,26 +312,25 @@ public class VirtualFireAlarmControllerService {
|
|||||||
try {
|
try {
|
||||||
switch (protocol) {
|
switch (protocol) {
|
||||||
case HTTP_PROTOCOL:
|
case HTTP_PROTOCOL:
|
||||||
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp +
|
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL);
|
||||||
" via " + HTTP_PROTOCOL);
|
replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false);
|
||||||
|
break;
|
||||||
|
|
||||||
replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false);
|
case MQTT_PROTOCOL:
|
||||||
|
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + MQTT_PROTOCOL);
|
||||||
|
sendCommandViaMQTT(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT.replace("/", ""), "");
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case XMPP_PROTOCOL:
|
case XMPP_PROTOCOL:
|
||||||
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp +
|
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + XMPP_PROTOCOL);
|
||||||
" via " +
|
replyMsg = sendCommandViaXMPP(owner, deviceId, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, ".");
|
||||||
XMPP_PROTOCOL);
|
|
||||||
replyMsg = sendCommandViaXMPP(owner, deviceId, TEMPERATURE_CONTEXT, ".");
|
|
||||||
// replyMsg = requestTemperatureViaXMPP(response);
|
// replyMsg = requestTemperatureViaXMPP(response);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
default:
|
default:
|
||||||
if (protocol == null) {
|
if (protocol == null) {
|
||||||
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp +
|
log.info("Sending request to read virtual-firealarm-temperature at : " + deviceIp + " via " + HTTP_PROTOCOL);
|
||||||
" via " + HTTP_PROTOCOL);
|
replyMsg = sendCommandViaHTTP(deviceIp, 9090, VirtualFireAlarmConstants.TEMPERATURE_CONTEXT, false);
|
||||||
|
|
||||||
replyMsg = sendCommandViaHTTP(deviceIp, 80, TEMPERATURE_CONTEXT, false);
|
|
||||||
} else {
|
} else {
|
||||||
replyMsg = "Requested protocol '" + protocol + "' is not supported";
|
replyMsg = "Requested protocol '" + protocol + "' is not supported";
|
||||||
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
response.setStatus(Response.Status.NOT_ACCEPTABLE.getStatusCode());
|
||||||
@ -439,12 +412,14 @@ public class VirtualFireAlarmControllerService {
|
|||||||
|
|
||||||
if (registeredIp == null) {
|
if (registeredIp == null) {
|
||||||
log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " +
|
log.warn("Unregistered IP: Temperature Data Received from an un-registered IP " +
|
||||||
deviceIp + " for device ID - " + deviceId);
|
deviceIp + " for device ID - " + deviceId);
|
||||||
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
|
response.setStatus(Response.Status.PRECONDITION_FAILED.getStatusCode());
|
||||||
return;
|
return;
|
||||||
} else if (!registeredIp.equals(deviceIp)) {
|
} else if (!registeredIp.equals(deviceIp)) {
|
||||||
log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " +
|
log.warn("Conflicting IP: Received IP is " + deviceIp + ". Device with ID " +
|
||||||
deviceId + " is already registered under some other IP. Re-registration " + "required");
|
deviceId +
|
||||||
|
" is already registered under some other IP. Re-registration " +
|
||||||
|
"required");
|
||||||
response.setStatus(Response.Status.CONFLICT.getStatusCode());
|
response.setStatus(Response.Status.CONFLICT.getStatusCode());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -454,12 +429,13 @@ public class VirtualFireAlarmControllerService {
|
|||||||
ctx.setTenantDomain(SUPER_TENANT, true);
|
ctx.setTenantDomain(SUPER_TENANT, true);
|
||||||
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx
|
DeviceAnalyticsService deviceAnalyticsService = (DeviceAnalyticsService) ctx
|
||||||
.getOSGiService(DeviceAnalyticsService.class, null);
|
.getOSGiService(DeviceAnalyticsService.class, null);
|
||||||
Object metdaData[] = {dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId,
|
Object metdaData[] =
|
||||||
System.currentTimeMillis()};
|
{dataMsg.owner, VirtualFireAlarmConstants.DEVICE_TYPE, dataMsg.deviceId,
|
||||||
|
System.currentTimeMillis()};
|
||||||
Object payloadData[] = {temperature};
|
Object payloadData[] = {temperature};
|
||||||
try {
|
try {
|
||||||
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0",
|
deviceAnalyticsService.publishEvent(TEMPERATURE_STREAM_DEFINITION, "1.0.0",
|
||||||
metdaData, new Object[0], payloadData);
|
metdaData, new Object[0], payloadData);
|
||||||
} catch (DataPublisherConfigurationException e) {
|
} catch (DataPublisherConfigurationException e) {
|
||||||
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
|
response.setStatus(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
|
||||||
|
|
||||||
@ -551,7 +527,7 @@ public class VirtualFireAlarmControllerService {
|
|||||||
|
|
||||||
|
|
||||||
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource,
|
private String sendCommandViaXMPP(String deviceOwner, String deviceId, String resource,
|
||||||
String state) throws DeviceManagementException {
|
String state) throws DeviceManagementException {
|
||||||
|
|
||||||
String replyMsg = "";
|
String replyMsg = "";
|
||||||
String scriptArguments = "";
|
String scriptArguments = "";
|
||||||
@ -636,7 +612,7 @@ public class VirtualFireAlarmControllerService {
|
|||||||
|
|
||||||
|
|
||||||
private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource,
|
private boolean sendCommandViaMQTT(String deviceOwner, String deviceId, String resource,
|
||||||
String state) throws DeviceManagementException {
|
String state) throws DeviceManagementException {
|
||||||
|
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
DeviceController deviceController = new DeviceController();
|
DeviceController deviceController = new DeviceController();
|
||||||
@ -644,7 +620,7 @@ public class VirtualFireAlarmControllerService {
|
|||||||
try {
|
try {
|
||||||
result = deviceController.publishMqttControl(deviceOwner,
|
result = deviceController.publishMqttControl(deviceOwner,
|
||||||
VirtualFireAlarmConstants.DEVICE_TYPE,
|
VirtualFireAlarmConstants.DEVICE_TYPE,
|
||||||
deviceId, resource, state);
|
deviceId, resource, state);
|
||||||
} catch (DeviceControllerException e) {
|
} catch (DeviceControllerException e) {
|
||||||
String errorMsg = "Error whilst trying to publish to MQTT Queue";
|
String errorMsg = "Error whilst trying to publish to MQTT Queue";
|
||||||
log.error(errorMsg);
|
log.error(errorMsg);
|
||||||
@ -655,16 +631,16 @@ public class VirtualFireAlarmControllerService {
|
|||||||
|
|
||||||
|
|
||||||
private String sendCommandViaHTTP(final String deviceIp, int deviceServerPort,
|
private String sendCommandViaHTTP(final String deviceIp, int deviceServerPort,
|
||||||
String callUrlPattern,
|
String callUrlPattern,
|
||||||
boolean fireAndForgot)
|
boolean fireAndForgot)
|
||||||
throws DeviceManagementException {
|
throws DeviceManagementException {
|
||||||
|
|
||||||
if (deviceServerPort == 0) {
|
if (deviceServerPort == 0) {
|
||||||
deviceServerPort = 80;
|
deviceServerPort = 9090;
|
||||||
}
|
}
|
||||||
|
|
||||||
String responseMsg = "";
|
String responseMsg = "";
|
||||||
String urlString = URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern;
|
String urlString = VirtualFireAlarmConstants.URL_PREFIX + deviceIp + ":" + deviceServerPort + callUrlPattern;
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
if (log.isDebugEnabled()) {
|
||||||
log.debug(urlString);
|
log.debug(urlString);
|
||||||
@ -740,7 +716,7 @@ public class VirtualFireAlarmControllerService {
|
|||||||
/* This methods creates and returns a http connection object */
|
/* This methods creates and returns a http connection object */
|
||||||
|
|
||||||
private HttpURLConnection getHttpConnection(String urlString) throws
|
private HttpURLConnection getHttpConnection(String urlString) throws
|
||||||
DeviceManagementException {
|
DeviceManagementException {
|
||||||
|
|
||||||
URL connectionUrl = null;
|
URL connectionUrl = null;
|
||||||
HttpURLConnection httpConnection = null;
|
HttpURLConnection httpConnection = null;
|
||||||
@ -803,5 +779,4 @@ public class VirtualFireAlarmControllerService {
|
|||||||
|
|
||||||
return completeResponse.toString();
|
return completeResponse.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,81 @@
|
|||||||
|
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
||||||
|
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||||
|
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttConfig;
|
||||||
|
import org.wso2.carbon.device.mgt.iot.common.controlqueue.mqtt.MqttSubscriber;
|
||||||
|
import org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.plugin.constants
|
||||||
|
.VirtualFireAlarmConstants;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class VirtualFireAlarmMqttSubscriber extends MqttSubscriber {
|
||||||
|
private static Log log = LogFactory.getLog(VirtualFireAlarmMqttSubscriber.class);
|
||||||
|
|
||||||
|
private static final String subscribeTopic =
|
||||||
|
"wso2" + File.separator + "iot" + File.separator + "+" + File.separator +
|
||||||
|
VirtualFireAlarmConstants.DEVICE_TYPE + File.separator + "+" + File.separator +
|
||||||
|
"reply";
|
||||||
|
private static final String iotServerSubscriber = UUID.randomUUID().toString().substring(0, 5);
|
||||||
|
private static String mqttEndpoint;
|
||||||
|
|
||||||
|
private VirtualFireAlarmMqttSubscriber() {
|
||||||
|
super(iotServerSubscriber, VirtualFireAlarmConstants.DEVICE_TYPE,
|
||||||
|
MqttConfig.getInstance().getMqttQueueEndpoint(), subscribeTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initConnector() {
|
||||||
|
mqttEndpoint = MqttConfig.getInstance().getMqttQueueEndpoint();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connectAndSubscribe() {
|
||||||
|
try {
|
||||||
|
super.connectAndSubscribe();
|
||||||
|
} catch (DeviceManagementException e) {
|
||||||
|
log.error("Subscription to MQTT Broker at: " + mqttEndpoint + " failed");
|
||||||
|
retryMQTTSubscription();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void postMessageArrived(String topic, MqttMessage message) {
|
||||||
|
log.info("Message " + message.toString() + " was received for topic: " + topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void retryMQTTSubscription() {
|
||||||
|
Thread retryToSubscribe = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (true) {
|
||||||
|
if (!isConnected()) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Subscriber re-trying to reach MQTT queue....");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
VirtualFireAlarmMqttSubscriber.super.connectAndSubscribe();
|
||||||
|
} catch (DeviceManagementException e1) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Attempt to re-connect to MQTT-Queue failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
log.error("MQTT: Thread S;eep Interrupt Exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
retryToSubscribe.setDaemon(true);
|
||||||
|
retryToSubscribe.start();
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,88 @@
|
|||||||
|
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.jivesoftware.smack.packet.Message;
|
||||||
|
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
||||||
|
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConfig;
|
||||||
|
import org.wso2.carbon.device.mgt.iot.common.controlqueue.xmpp.XmppConnector;
|
||||||
|
|
||||||
|
public class VirtualFireAlarmXmppConnector extends XmppConnector {
|
||||||
|
private static Log log = LogFactory.getLog(VirtualFireAlarmXmppConnector.class);
|
||||||
|
|
||||||
|
private static String xmppServerIP;
|
||||||
|
// private static int xmppServerPort;
|
||||||
|
private static String xmppAdminUsername;
|
||||||
|
private static String xmppAdminPassword;
|
||||||
|
private static String xmppAdminAccountJID;
|
||||||
|
|
||||||
|
private VirtualFireAlarmXmppConnector() {
|
||||||
|
super(XmppConfig.getInstance().getXmppServerIP(),
|
||||||
|
XmppConfig.getInstance().getSERVER_CONNECTION_PORT());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void initConnector() {
|
||||||
|
xmppServerIP = XmppConfig.getInstance().getXmppServerIP();
|
||||||
|
xmppAdminUsername = XmppConfig.getInstance().getXmppUsername();
|
||||||
|
xmppAdminPassword = XmppConfig.getInstance().getXmppPassword();
|
||||||
|
xmppAdminAccountJID = xmppAdminUsername + "@" + xmppServerIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void connectAndLogin() {
|
||||||
|
try {
|
||||||
|
super.connectAndLogin(xmppAdminUsername, xmppAdminPassword, null);
|
||||||
|
super.setMessageFilterOnReceiver(xmppAdminAccountJID);
|
||||||
|
} catch (DeviceManagementException e) {
|
||||||
|
log.error("Connect/Login attempt to XMPP Server at: " + xmppServerIP + " failed");
|
||||||
|
retryXMPPConnection();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void processXMPPMessage(Message xmppMessage) {
|
||||||
|
String from = xmppMessage.getFrom();
|
||||||
|
String message = xmppMessage.getBody();
|
||||||
|
log.info("Received XMPP message '" + message + "' from " + from);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void retryXMPPConnection() {
|
||||||
|
Thread retryToConnect = new Thread() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
if (!isConnected()) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Re-trying to reach XMPP Server....");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
VirtualFireAlarmXmppConnector.super.connectAndLogin(xmppAdminUsername,
|
||||||
|
xmppAdminPassword,
|
||||||
|
null);
|
||||||
|
VirtualFireAlarmXmppConnector.super.setMessageFilterOnReceiver(
|
||||||
|
xmppAdminAccountJID);
|
||||||
|
} catch (DeviceManagementException e1) {
|
||||||
|
if (log.isDebugEnabled()) {
|
||||||
|
log.debug("Attempt to re-connect to XMPP-Server failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
Thread.sleep(5000);
|
||||||
|
} catch (InterruptedException e1) {
|
||||||
|
log.error("XMPP: Thread Sleep Interrupt Exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
retryToConnect.setDaemon(true);
|
||||||
|
retryToConnect.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
@ -1,280 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
|
||||||
*
|
|
||||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
|
||||||
* Version 2.0 (the "License"); you may not use this file except
|
|
||||||
* in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.mqtt;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttCallback;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttMessage;
|
|
||||||
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
|
|
||||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class contains the Agent specific implementation for all the MQTT functionality. This
|
|
||||||
* includes connecting to a MQTT Broker & subscribing to the appropriate MQTT-topic, action plan
|
|
||||||
* upon losing connection or successfully delivering a message to the broker and processing
|
|
||||||
* incoming messages. Makes use of the 'Paho-MQTT' library provided by Eclipse Org.
|
|
||||||
* <p/>
|
|
||||||
* It is an abstract class with an abstract method 'postMessageArrived' allowing the user to have
|
|
||||||
* their own implementation of the actions to be taken upon receiving a message to the subscribed
|
|
||||||
* MQTT-Topic.
|
|
||||||
*/
|
|
||||||
public abstract class MQTTClient implements MqttCallback {
|
|
||||||
private static final Log log = LogFactory.getLog(MQTTClient.class);
|
|
||||||
|
|
||||||
private MqttClient client;
|
|
||||||
private String clientId;
|
|
||||||
private MqttConnectOptions options;
|
|
||||||
private String subscribeTopic;
|
|
||||||
private String clientWillTopic;
|
|
||||||
private String mqttBrokerEndPoint;
|
|
||||||
private int reConnectionInterval;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
|
|
||||||
* Broker URL and the topic to subscribe.
|
|
||||||
*
|
|
||||||
* @param deviceOwner the owner of the device.
|
|
||||||
* @param deviceType the CDMF Device-Type of the device.
|
|
||||||
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
|
|
||||||
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
|
|
||||||
*/
|
|
||||||
protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
|
|
||||||
String subscribeTopic) {
|
|
||||||
this.clientId = deviceOwner + ":" + deviceType;
|
|
||||||
this.subscribeTopic = subscribeTopic;
|
|
||||||
this.clientWillTopic = deviceType + File.separator + "disconnection";
|
|
||||||
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
|
|
||||||
this.reConnectionInterval = 5000;
|
|
||||||
this.initSubscriber();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for the MQTTClient which takes in the owner, type of the device and the MQTT
|
|
||||||
* Broker URL and the topic to subscribe. Additionally this constructor takes in the
|
|
||||||
* reconnection-time interval between successive attempts to connect to the broker.
|
|
||||||
*
|
|
||||||
* @param deviceOwner the owner of the device.
|
|
||||||
* @param deviceType the CDMF Device-Type of the device.
|
|
||||||
* @param mqttBrokerEndPoint the IP/URL of the MQTT broker endpoint.
|
|
||||||
* @param subscribeTopic the MQTT topic to which the client is to be subscribed
|
|
||||||
* @param reConnectionInterval time interval in SECONDS between successive attempts to connect
|
|
||||||
* to the broker.
|
|
||||||
*/
|
|
||||||
protected MQTTClient(String deviceOwner, String deviceType, String mqttBrokerEndPoint,
|
|
||||||
String subscribeTopic, int reConnectionInterval) {
|
|
||||||
this.clientId = deviceOwner + ":" + deviceType;
|
|
||||||
this.subscribeTopic = subscribeTopic;
|
|
||||||
this.clientWillTopic = deviceType + File.separator + "disconnection";
|
|
||||||
this.mqttBrokerEndPoint = mqttBrokerEndPoint;
|
|
||||||
this.reConnectionInterval = reConnectionInterval;
|
|
||||||
this.initSubscriber();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the MQTT-Client.
|
|
||||||
* Creates a client using the given MQTT-broker endpoint and the clientId (which is
|
|
||||||
* constructed by a concatenation of [deviceOwner]:[deviceType]). Also sets the client's
|
|
||||||
* options parameter with the clientWillTopic (in-case of connection failure) and other info.
|
|
||||||
* Also sets the call-back this current class.
|
|
||||||
*/
|
|
||||||
private void initSubscriber() {
|
|
||||||
try {
|
|
||||||
client = new MqttClient(this.mqttBrokerEndPoint, clientId, null);
|
|
||||||
log.info("MQTT subscriber was created with ClientID : " + clientId);
|
|
||||||
} catch (MqttException ex) {
|
|
||||||
String errorMsg = "MQTT Client Error\n" + "\tReason: " + ex.getReasonCode() +
|
|
||||||
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
|
|
||||||
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
|
|
||||||
"\n\tException: " + ex;
|
|
||||||
log.error(errorMsg);
|
|
||||||
}
|
|
||||||
|
|
||||||
options = new MqttConnectOptions();
|
|
||||||
options.setCleanSession(false);
|
|
||||||
options.setWill(clientWillTopic, "connection crashed".getBytes(StandardCharsets.UTF_8), 2,
|
|
||||||
true);
|
|
||||||
client.setCallback(this);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks whether the connection to the MQTT-Broker persists.
|
|
||||||
*
|
|
||||||
* @return true if the client is connected to the MQTT-Broker, else false.
|
|
||||||
*/
|
|
||||||
public boolean isConnected() {
|
|
||||||
return client.isConnected();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connects to the MQTT-Broker and if successfully established connection, then tries to
|
|
||||||
* subscribe to the MQTT-Topic specific to the device. (The MQTT-Topic specific to the
|
|
||||||
* device is created is taken in as a constructor parameter of this class) .
|
|
||||||
*
|
|
||||||
* @throws DeviceManagementException in the event of 'Connecting to' or 'Subscribing to' the
|
|
||||||
* MQTT broker fails.
|
|
||||||
*/
|
|
||||||
public void connectAndSubscribe() throws DeviceManagementException {
|
|
||||||
try {
|
|
||||||
client.connect(options);
|
|
||||||
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Subscriber connected to queue at: " + this.mqttBrokerEndPoint);
|
|
||||||
}
|
|
||||||
} catch (MqttSecurityException ex) {
|
|
||||||
String errorMsg = "MQTT Security Exception when connecting to queue\n" + "\tReason: " +
|
|
||||||
" " +
|
|
||||||
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
|
|
||||||
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
|
|
||||||
ex.getCause() + "\n\tException: " + ex; //throw
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug(errorMsg);
|
|
||||||
}
|
|
||||||
throw new DeviceManagementException(errorMsg, ex);
|
|
||||||
|
|
||||||
} catch (MqttException ex) {
|
|
||||||
String errorMsg = "MQTT Exception when connecting to queue\n" + "\tReason: " +
|
|
||||||
ex.getReasonCode() + "\n\tMessage: " + ex.getMessage() +
|
|
||||||
"\n\tLocalMsg: " + ex.getLocalizedMessage() + "\n\tCause: " +
|
|
||||||
ex.getCause() + "\n\tException: " + ex; //throw
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug(errorMsg);
|
|
||||||
}
|
|
||||||
throw new DeviceManagementException(errorMsg, ex);
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
client.subscribe(subscribeTopic, 0);
|
|
||||||
|
|
||||||
log.info("Subscriber - " + clientId + " subscribed to topic: " + subscribeTopic);
|
|
||||||
} catch (MqttException ex) {
|
|
||||||
String errorMsg = "MQTT Exception when trying to subscribe to topic: " +
|
|
||||||
subscribeTopic + "\n\tReason: " + ex.getReasonCode() +
|
|
||||||
"\n\tMessage: " + ex.getMessage() + "\n\tLocalMsg: " +
|
|
||||||
ex.getLocalizedMessage() + "\n\tCause: " + ex.getCause() +
|
|
||||||
"\n\tException: " + ex;
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug(errorMsg);
|
|
||||||
}
|
|
||||||
throw new DeviceManagementException(errorMsg, ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback method which is triggered once the MQTT client losers its connection to the broker.
|
|
||||||
* A scheduler thread is spawned to continuously re-attempt and connect to the broker and
|
|
||||||
* subscribe to the device's topic. This thread is scheduled to execute after every break
|
|
||||||
* equal to that of the 'reConnectionInterval' of the MQTTClient.
|
|
||||||
*
|
|
||||||
* @param throwable a Throwable Object containing the details as to why the failure occurred.
|
|
||||||
*/
|
|
||||||
public void connectionLost(Throwable throwable) {
|
|
||||||
log.warn("Lost Connection for client: " + this.clientId + " to " + this.mqttBrokerEndPoint + ".\nThis was due to - " + throwable.getMessage());
|
|
||||||
|
|
||||||
Runnable reSubscriber = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
if (!isConnected()) {
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Subscriber reconnecting to queue........");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
connectAndSubscribe();
|
|
||||||
} catch (DeviceManagementException e) {
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Could not reconnect and subscribe to ControlQueue.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
|
|
||||||
service.scheduleAtFixedRate(reSubscriber, 0, this.reConnectionInterval, TimeUnit.SECONDS);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback method which is triggered upon receiving a MQTT Message from the broker. Spawns a
|
|
||||||
* new thread that executes any actions to be taken with the received message.
|
|
||||||
*
|
|
||||||
* @param topic the MQTT-Topic to which the received message was published to and the
|
|
||||||
* client was subscribed to.
|
|
||||||
* @param mqttMessage the actual MQTT-Message that was received from the broker.
|
|
||||||
*/
|
|
||||||
public void messageArrived(final String topic, final MqttMessage mqttMessage) {
|
|
||||||
Thread subscriberThread = new Thread() {
|
|
||||||
public void run() {
|
|
||||||
postMessageArrived(topic, mqttMessage);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
subscriberThread.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Callback method which gets triggered upon successful completion of a message delivery to
|
|
||||||
* the broker.
|
|
||||||
*
|
|
||||||
* @param iMqttDeliveryToken the MQTT-DeliveryToken which includes the details about the
|
|
||||||
* specific message delivery.
|
|
||||||
*/
|
|
||||||
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
||||||
String message = "";
|
|
||||||
try {
|
|
||||||
message = iMqttDeliveryToken.getMessage().toString();
|
|
||||||
} catch (MqttException e) {
|
|
||||||
log.error("Error occurred whilst trying to read the message from the MQTT delivery token.");
|
|
||||||
}
|
|
||||||
String topic = iMqttDeliveryToken.getTopics()[0];
|
|
||||||
String client = iMqttDeliveryToken.getClient().getClientId();
|
|
||||||
log.info("Message - '" + message + "' of client [" + client + "] for the topic (" + topic + ") was delivered successfully.");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is an abstract method used for post processing the received MQTT-message. This
|
|
||||||
* method will be implemented as per requirement at the time of creating an object of this
|
|
||||||
* class.
|
|
||||||
*
|
|
||||||
* @param topic The topic for which the message was received for.
|
|
||||||
* @param message The message received for the subscription to the above topic.
|
|
||||||
*/
|
|
||||||
protected abstract void postMessageArrived(String topic, MqttMessage message);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the MQTTClient object.
|
|
||||||
*
|
|
||||||
* @return the MQTTClient object.
|
|
||||||
*/
|
|
||||||
public MqttClient getClient() {
|
|
||||||
return client;
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@ -1,241 +0,0 @@
|
|||||||
/*
|
|
||||||
* Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
|
||||||
*
|
|
||||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
|
||||||
* Version 2.0 (the "License"); you may not use this file except
|
|
||||||
* in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing,
|
|
||||||
* software distributed under the License is distributed on an
|
|
||||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
||||||
* KIND, either express or implied. See the License for the
|
|
||||||
* specific language governing permissions and limitations
|
|
||||||
* under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.xmpp;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.jivesoftware.smack.ConnectionConfiguration;
|
|
||||||
import org.jivesoftware.smack.PacketListener;
|
|
||||||
import org.jivesoftware.smack.SmackConfiguration;
|
|
||||||
import org.jivesoftware.smack.XMPPConnection;
|
|
||||||
import org.jivesoftware.smack.XMPPException;
|
|
||||||
import org.jivesoftware.smack.filter.AndFilter;
|
|
||||||
import org.jivesoftware.smack.filter.FromContainsFilter;
|
|
||||||
import org.jivesoftware.smack.filter.PacketFilter;
|
|
||||||
import org.jivesoftware.smack.filter.PacketTypeFilter;
|
|
||||||
import org.jivesoftware.smack.packet.Message;
|
|
||||||
import org.jivesoftware.smack.packet.Packet;
|
|
||||||
import org.wso2.carbon.device.mgt.common.DeviceManagementException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This class contains the Agent specific implementation for all the XMPP functionality. This
|
|
||||||
* includes connecting to a XMPP Server & Login using the device's XMPP-Account, Setting
|
|
||||||
* listeners and filters on incoming XMPP messages and Sending XMPP replies for control signals
|
|
||||||
* received. Makes use of the 'Smack-XMPP' library provided by jivesoftware/igniterealtime.
|
|
||||||
* <p/>
|
|
||||||
* It is an abstract class with an abstract method 'processXMPPMessage' allowing the user to have
|
|
||||||
* their own implementation of the actions to be taken upon receiving an appropriate XMPP message.
|
|
||||||
*/
|
|
||||||
public abstract class XMPPClient {
|
|
||||||
private static final Log log = LogFactory.getLog(XMPPClient.class);
|
|
||||||
|
|
||||||
private int replyTimeoutInterval = 500; // millis
|
|
||||||
private String server;
|
|
||||||
private int port;
|
|
||||||
private ConnectionConfiguration config;
|
|
||||||
private XMPPConnection connection;
|
|
||||||
private PacketFilter filter;
|
|
||||||
private PacketListener listener;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Constructor for XMPPClient passing server-IP and the XMPP-port.
|
|
||||||
*
|
|
||||||
* @param server the IP of the XMPP server.
|
|
||||||
* @param port the XMPP server's port to connect to. (default - 5222)
|
|
||||||
*/
|
|
||||||
public XMPPClient(String server, int port) {
|
|
||||||
this.server = server;
|
|
||||||
this.port = port;
|
|
||||||
initXMPPClient();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initializes the XMPP Client.
|
|
||||||
* Sets the time-out-limit whilst waiting for XMPP-replies from server. Creates the XMPP
|
|
||||||
* configurations to connect to the server and creates the XMPPConnection object used for
|
|
||||||
* connecting and Logging-In.
|
|
||||||
*/
|
|
||||||
private void initXMPPClient() {
|
|
||||||
log.info(String.format(
|
|
||||||
"Initializing connection to XMPP Server at %1$s via port %2$d......", server,
|
|
||||||
port));
|
|
||||||
SmackConfiguration.setPacketReplyTimeout(replyTimeoutInterval);
|
|
||||||
config = new ConnectionConfiguration(server, port);
|
|
||||||
// TODO:: Need to enable SASL-Authentication appropriately
|
|
||||||
config.setSASLAuthenticationEnabled(false);
|
|
||||||
config.setSecurityMode(ConnectionConfiguration.SecurityMode.disabled);
|
|
||||||
connection = new XMPPConnection(config);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Connects to the XMPP-Server and if successfully established connection, then tries to Log
|
|
||||||
* in using the device's XMPP Account credentials. (The XMPP-Account specific to the device is
|
|
||||||
* created in the XMPP server whilst downloading the Agent from the IoT Server) .
|
|
||||||
*
|
|
||||||
* @param username the username of the device's XMPP-Account.
|
|
||||||
* @param password the password of the device's XMPP-Account.
|
|
||||||
* @param resource the resource the resource, specific to the XMPP-Account to which the login
|
|
||||||
* is made to
|
|
||||||
* @throws DeviceManagementException in the event of 'Connecting to' or 'Logging into' the
|
|
||||||
* XMPP server fails.
|
|
||||||
*/
|
|
||||||
public void connectAndLogin(String username, String password, String resource)
|
|
||||||
throws DeviceManagementException {
|
|
||||||
try {
|
|
||||||
connection.connect();
|
|
||||||
log.info(String.format(
|
|
||||||
"Connection to XMPP Server at %1$s established successfully......", server));
|
|
||||||
|
|
||||||
} catch (XMPPException xmppExcepion) {
|
|
||||||
String errorMsg =
|
|
||||||
"Connection attempt to the XMPP Server at " + server + " via port " + port +
|
|
||||||
" failed.";
|
|
||||||
log.info(errorMsg);
|
|
||||||
throw new DeviceManagementException(errorMsg, xmppExcepion);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connection.isConnected()) {
|
|
||||||
try {
|
|
||||||
if (resource == null) {
|
|
||||||
connection.login(username, password);
|
|
||||||
log.info(String.format("Logged into XMPP Server at %1$s as user %2$s......", server, username));
|
|
||||||
} else {
|
|
||||||
connection.login(username, password, resource);
|
|
||||||
log.info(String.format(
|
|
||||||
"Logged into XMPP Server at %1$s as user %2$s on resource %3$s......",
|
|
||||||
server, username, resource));
|
|
||||||
}
|
|
||||||
} catch (XMPPException xmppException) {
|
|
||||||
String errorMsg =
|
|
||||||
"Login attempt to the XMPP Server at " + server + " with username - " +
|
|
||||||
username + " failed.";
|
|
||||||
log.info(errorMsg);
|
|
||||||
throw new DeviceManagementException(errorMsg, xmppException);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets a filter on all the incoming XMPP-Messages for the JID (XMPP-Account ID) passed in.
|
|
||||||
* Also creates a listener for the incoming messages and connects the listener to the
|
|
||||||
* XMPPConnection alongside the set filter.
|
|
||||||
*
|
|
||||||
* @param senderJID the JID (XMPP-Account ID) to which the filter is to be set.
|
|
||||||
*/
|
|
||||||
public void setMessageFilterAndListener(String senderJID) {
|
|
||||||
filter = new AndFilter(new PacketTypeFilter(Message.class), new
|
|
||||||
FromContainsFilter(
|
|
||||||
senderJID));
|
|
||||||
listener = new PacketListener() {
|
|
||||||
@Override
|
|
||||||
public void processPacket(Packet packet) {
|
|
||||||
if (packet instanceof Message) {
|
|
||||||
final Message xmppMessage = (Message) packet;
|
|
||||||
Thread msgProcessThread = new Thread() {
|
|
||||||
public void run() {
|
|
||||||
processXMPPMessage(xmppMessage);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
msgProcessThread.start();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
connection.addPacketListener(listener, filter);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sends an XMPP message
|
|
||||||
*
|
|
||||||
* @param JID the JID (XMPP Account ID) to which the message is to be sent to.
|
|
||||||
* @param message the XMPP-Message that is to be sent.
|
|
||||||
*/
|
|
||||||
public void sendXMPPMessage(String JID, String message) {
|
|
||||||
sendXMPPMessage(JID, message, "Reply-From-Device");
|
|
||||||
if (log.isDebugEnabled()) {
|
|
||||||
log.debug("Message: " + message + " to XMPP JID [" + JID + "] sent successfully");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Overloaded method to send an XMPP message. Includes the subject to be mentioned in the
|
|
||||||
* message that is sent.
|
|
||||||
*
|
|
||||||
* @param JID the JID (XMPP Account ID) to which the message is to be sent to.
|
|
||||||
* @param message the XMPP-Message that is to be sent.
|
|
||||||
* @param subject the subject that the XMPP-Message would carry.
|
|
||||||
*/
|
|
||||||
public void sendXMPPMessage(String JID, String message, String subject) {
|
|
||||||
Message xmppMessage = new Message();
|
|
||||||
xmppMessage.setTo(JID);
|
|
||||||
xmppMessage.setSubject(subject);
|
|
||||||
xmppMessage.setBody(message);
|
|
||||||
xmppMessage.setType(Message.Type.chat);
|
|
||||||
connection.sendPacket(xmppMessage);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks whether the connection to the XMPP-Server persists.
|
|
||||||
*
|
|
||||||
* @return true if the client is connected to the XMPP-Server, else false.
|
|
||||||
*/
|
|
||||||
public boolean isConnected() {
|
|
||||||
return connection.isConnected();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Sets the client's time-out-limit whilst waiting for XMPP-replies from server.
|
|
||||||
*
|
|
||||||
* @param millis the time in millis to be set as the time-out-limit whilst waiting for a
|
|
||||||
* XMPP-reply.
|
|
||||||
*/
|
|
||||||
public void setReplyTimeoutInterval(int millis) {
|
|
||||||
this.replyTimeoutInterval = millis;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Disables default debugger provided by the XMPPConnection.
|
|
||||||
*/
|
|
||||||
public void disableDebugger() {
|
|
||||||
connection.DEBUG_ENABLED = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Closes the connection to the XMPP Server.
|
|
||||||
*/
|
|
||||||
public void closeConnection() {
|
|
||||||
if (connection != null && connection.isConnected()) {
|
|
||||||
connection.disconnect();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is an abstract method used for post processing the received XMPP-message. This
|
|
||||||
* method will be implemented as per requirement at the time of creating an object of this
|
|
||||||
* class.
|
|
||||||
*
|
|
||||||
* @param xmppMessage the xmpp message received by the listener.
|
|
||||||
*/
|
|
||||||
protected abstract void processXMPPMessage(Message xmppMessage);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@ -23,11 +23,12 @@
|
|||||||
http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd">
|
http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd">
|
||||||
|
|
||||||
|
|
||||||
<jaxrs:server id="FireAlarmController" address="/controller">
|
<jaxrs:server id="VirtualFireAlarmController" address="/controller">
|
||||||
<jaxrs:serviceBeans>
|
<jaxrs:serviceBeans>
|
||||||
<bean id="VirtualFireAlarmControllerService"
|
<bean id="VirtualFireAlarmControllerService"
|
||||||
class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmControllerService">
|
class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.VirtualFireAlarmControllerService">
|
||||||
<!--<property name="mqttFireAlarmSubscriber" ref="mqttSubscriber"/>-->
|
<property name="virtualFireAlarmMqttSubscriber" ref="mqttSubscriberBean"/>
|
||||||
|
<property name="virtualFireAlarmXmppConnector" ref="xmppConnectorBean"/>
|
||||||
</bean>
|
</bean>
|
||||||
</jaxrs:serviceBeans>
|
</jaxrs:serviceBeans>
|
||||||
<jaxrs:providers>
|
<jaxrs:providers>
|
||||||
@ -46,6 +47,12 @@
|
|||||||
</jaxrs:server>
|
</jaxrs:server>
|
||||||
|
|
||||||
|
|
||||||
|
<bean id="mqttSubscriberBean" class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmMqttSubscriber" >
|
||||||
|
|
||||||
|
</bean>
|
||||||
|
<bean id="xmppConnectorBean" class="org.wso2.carbon.device.mgt.iot.sample.virtual.firealarm.service.impl.util.VirtualFireAlarmXmppConnector" >
|
||||||
|
|
||||||
|
</bean>
|
||||||
<!--<bean id="mqttSubscriber" class="org.wso2.carbon.device.mgt.iot.firealarm.api.util.MQTTFirealarmSubscriber" >-->
|
<!--<bean id="mqttSubscriber" class="org.wso2.carbon.device.mgt.iot.firealarm.api.util.MQTTFirealarmSubscriber" >-->
|
||||||
<!-- -->
|
<!-- -->
|
||||||
<!--</bean>-->
|
<!--</bean>-->
|
||||||
|
|||||||
17
pom.xml
17
pom.xml
@ -804,6 +804,19 @@
|
|||||||
<artifactId>commons-httpclient</artifactId>
|
<artifactId>commons-httpclient</artifactId>
|
||||||
<version>${orbit.version.commons-httpclient}</version>
|
<version>${orbit.version.commons-httpclient}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<!--Dependencies on XMPP Client Library-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.igniterealtime.smack.wso2</groupId>
|
||||||
|
<artifactId>smack</artifactId>
|
||||||
|
<version>${smack.wso2.version}</version>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.igniterealtime.smack.wso2</groupId>
|
||||||
|
<artifactId>smackx</artifactId>
|
||||||
|
<version>${smackx.wso2.version}</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</dependencyManagement>
|
</dependencyManagement>
|
||||||
|
|
||||||
@ -927,6 +940,10 @@
|
|||||||
<commons-io.version>2.4</commons-io.version>
|
<commons-io.version>2.4</commons-io.version>
|
||||||
<jsr311-api.version>1.1.1</jsr311-api.version>
|
<jsr311-api.version>1.1.1</jsr311-api.version>
|
||||||
<commons-json.version>2.0.0.wso2v1</commons-json.version>
|
<commons-json.version>2.0.0.wso2v1</commons-json.version>
|
||||||
|
|
||||||
|
<!--XMPP/MQTT Version-->
|
||||||
|
<smack.wso2.version>3.0.4.wso2v1</smack.wso2.version>
|
||||||
|
<smackx.wso2.version>3.0.4.wso2v1</smackx.wso2.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<scm>
|
<scm>
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user