mirror of
https://repository.entgra.net/community/device-mgt-plugins.git
synced 2025-09-16 23:42:15 +00:00
mqtt qos property included for input and set default 0
Co-authored-by: Amalka Subasinghe <amalka@entgra.io> Co-committed-by: Amalka Subasinghe <amalka@entgra.io>
This commit is contained in:
parent
7876d87c51
commit
ff9abb7563
@ -124,6 +124,15 @@ public class MQTTEventAdapterFactory extends InputEventAdapterFactory {
|
|||||||
clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
|
clientId.setHint(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_CONF_CLIENTID_HINT));
|
||||||
propertyList.add(clientId);
|
propertyList.add(clientId);
|
||||||
|
|
||||||
|
// set qos
|
||||||
|
Property qosProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
|
||||||
|
qosProperty.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS));
|
||||||
|
qosProperty.setRequired(false);
|
||||||
|
qosProperty.setOptions(new String[]{"0", "1", "2"});
|
||||||
|
qosProperty.setDefaultValue("0");
|
||||||
|
|
||||||
|
propertyList.add(qosProperty);
|
||||||
|
|
||||||
return propertyList;
|
return propertyList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -56,6 +56,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|||||||
|
|
||||||
private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration;
|
private MQTTBrokerConnectionConfiguration mqttBrokerConnectionConfiguration;
|
||||||
private String topic;
|
private String topic;
|
||||||
|
private int qos;
|
||||||
private String topicStructure;
|
private String topicStructure;
|
||||||
private String tenantDomain;
|
private String tenantDomain;
|
||||||
private volatile boolean connectionSucceeded = false;
|
private volatile boolean connectionSucceeded = false;
|
||||||
@ -79,6 +80,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|||||||
int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive();
|
int keepAlive = mqttBrokerConnectionConfiguration.getKeepAlive();
|
||||||
this.topicStructure = new String(topic);
|
this.topicStructure = new String(topic);
|
||||||
this.topic = PropertyUtils.replacePlaceholders(topic);
|
this.topic = PropertyUtils.replacePlaceholders(topic);
|
||||||
|
this.qos = mqttBrokerConnectionConfiguration.getQos();
|
||||||
this.eventAdapterListener = inputEventAdapterListener;
|
this.eventAdapterListener = inputEventAdapterListener;
|
||||||
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
this.tenantDomain = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantDomain();
|
||||||
|
|
||||||
@ -158,7 +160,7 @@ public class MQTTAdapterListener implements MqttCallback, Runnable {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
mqttClient.subscribe(topic);
|
mqttClient.subscribe(topic, qos);
|
||||||
log.info("mqtt receiver subscribed to topic: " + topic);
|
log.info("mqtt receiver subscribed to topic: " + topic);
|
||||||
} catch (MqttException e) {
|
} catch (MqttException e) {
|
||||||
log.error("Failed to subscribe to topic: " + topic + ", Retrying.....");
|
log.error("Failed to subscribe to topic: " + topic + ", Retrying.....");
|
||||||
|
|||||||
@ -32,6 +32,7 @@ public class MQTTBrokerConnectionConfiguration {
|
|||||||
private String brokerScopes = null;
|
private String brokerScopes = null;
|
||||||
private boolean cleanSession = true;
|
private boolean cleanSession = true;
|
||||||
private int keepAlive;
|
private int keepAlive;
|
||||||
|
private int qos;
|
||||||
private String brokerUrl;
|
private String brokerUrl;
|
||||||
private String dcrUrl;
|
private String dcrUrl;
|
||||||
private String contentValidatorType;
|
private String contentValidatorType;
|
||||||
@ -83,6 +84,14 @@ public class MQTTBrokerConnectionConfiguration {
|
|||||||
return adapterName;
|
return adapterName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getQos() {
|
||||||
|
return qos;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQos(int qos) {
|
||||||
|
this.qos = qos;
|
||||||
|
}
|
||||||
|
|
||||||
public MQTTBrokerConnectionConfiguration(InputEventAdapterConfiguration eventAdapterConfiguration,
|
public MQTTBrokerConnectionConfiguration(InputEventAdapterConfiguration eventAdapterConfiguration,
|
||||||
Map<String, String> globalProperties) throws InputEventAdapterException {
|
Map<String, String> globalProperties) throws InputEventAdapterException {
|
||||||
|
|
||||||
@ -131,6 +140,15 @@ public class MQTTBrokerConnectionConfiguration {
|
|||||||
} else {
|
} else {
|
||||||
keepAlive = MQTTEventAdapterConstants.ADAPTER_CONF_DEFAULT_KEEP_ALIVE;
|
keepAlive = MQTTEventAdapterConstants.ADAPTER_CONF_DEFAULT_KEEP_ALIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String qosVal = globalProperties.get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
|
||||||
|
if (qosVal != null && !qosVal.isEmpty()) {
|
||||||
|
this.qos = Integer.parseInt(qosVal);
|
||||||
|
} else {
|
||||||
|
qosVal = eventAdapterConfiguration.getProperties().get(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS);
|
||||||
|
this.qos = Integer.parseInt(qosVal);
|
||||||
|
}
|
||||||
|
|
||||||
this.contentTransformerType = eventAdapterConfiguration.getProperties()
|
this.contentTransformerType = eventAdapterConfiguration.getProperties()
|
||||||
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE);
|
.get(MQTTEventAdapterConstants.ADAPTER_CONF_CONTENT_TRANSFORMER_TYPE);
|
||||||
}
|
}
|
||||||
|
|||||||
@ -47,6 +47,7 @@ public class MQTTEventAdapterConstants {
|
|||||||
public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint";
|
public static final String ADAPTER_CONF_CLEAN_SESSION_HINT = "cleanSession.hint";
|
||||||
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
|
public static final String ADAPTER_CONF_KEEP_ALIVE = "keepAlive";
|
||||||
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000;
|
public static final int ADAPTER_CONF_DEFAULT_KEEP_ALIVE = 20000;
|
||||||
|
public static final String ADAPTER_MESSAGE_QOS = "qos";
|
||||||
|
|
||||||
public static final int INITIAL_RECONNECTION_DURATION = 4000;
|
public static final int INITIAL_RECONNECTION_DURATION = 4000;
|
||||||
public static final int RECONNECTION_PROGRESS_FACTOR = 2;
|
public static final int RECONNECTION_PROGRESS_FACTOR = 2;
|
||||||
|
|||||||
@ -18,6 +18,7 @@
|
|||||||
|
|
||||||
topic=Topic
|
topic=Topic
|
||||||
topic.hint=Topic subscribed
|
topic.hint=Topic subscribed
|
||||||
|
qos=Quality of Service
|
||||||
clientId=Client Id
|
clientId=Client Id
|
||||||
clientId.hint=client identifier is used by the server to identify a client when it reconnects, It used for durable subscriptions or reliable delivery of messages is required.
|
clientId.hint=client identifier is used by the server to identify a client when it reconnects, It used for durable subscriptions or reliable delivery of messages is required.
|
||||||
url=Broker Url (Not required), If it is not provided then it will connect to the default broker.
|
url=Broker Url (Not required), If it is not provided then it will connect to the default broker.
|
||||||
|
|||||||
@ -92,7 +92,7 @@ public class MQTTEventAdapterFactory extends OutputEventAdapterFactory {
|
|||||||
qos.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS));
|
qos.setDisplayName(resourceBundle.getString(MQTTEventAdapterConstants.ADAPTER_MESSAGE_QOS));
|
||||||
qos.setRequired(false);
|
qos.setRequired(false);
|
||||||
qos.setOptions(new String[]{"0", "1", "2"});
|
qos.setOptions(new String[]{"0", "1", "2"});
|
||||||
qos.setDefaultValue("2");
|
qos.setDefaultValue("0");
|
||||||
|
|
||||||
// set topic
|
// set topic
|
||||||
Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);
|
Property topicProperty = new Property(MQTTEventAdapterConstants.ADAPTER_MESSAGE_TOPIC);
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user