mirror of
https://repository.entgra.net/community/device-mgt-core.git
synced 2025-10-06 02:01:45 +00:00
notify cluster formation changed implementation
This commit is contained in:
parent
f19f48d050
commit
615da6303f
@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
|
||||
@Override public Map<Integer, ServerContext> getActiveServers() throws HeartBeatManagementException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconC
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconUtils;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.InvalidConfigurationStateException;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifier;
|
||||
import org.w3c.dom.Document;
|
||||
import org.wso2.carbon.utils.CarbonUtils;
|
||||
|
||||
@ -29,8 +30,10 @@ import javax.xml.bind.JAXBContext;
|
||||
import javax.xml.bind.JAXBException;
|
||||
import javax.xml.bind.Unmarshaller;
|
||||
import javax.xml.bind.annotation.XmlElement;
|
||||
import javax.xml.bind.annotation.XmlElementWrapper;
|
||||
import javax.xml.bind.annotation.XmlRootElement;
|
||||
import java.io.File;
|
||||
import java.util.List;
|
||||
|
||||
@XmlRootElement(name = "HeartBeatBeaconConfig")
|
||||
public class HeartBeatBeaconConfig {
|
||||
@ -50,6 +53,7 @@ public class HeartBeatBeaconConfig {
|
||||
private static final String SERVER_UUID_FILE_LOCATION =
|
||||
CarbonUtils.getCarbonConfigDirPath() + File.separator + "server-credentials.properties";
|
||||
|
||||
private List<String> notifiers;
|
||||
private HeartBeatBeaconConfig() {
|
||||
}
|
||||
|
||||
@ -135,4 +139,13 @@ public class HeartBeatBeaconConfig {
|
||||
}
|
||||
}
|
||||
|
||||
@XmlElementWrapper(name = "ClusterFormationChangedNotifiers", required = true)
|
||||
@XmlElement(name = "Notifier", required = true)
|
||||
public List<String> getNotifiers() {
|
||||
return notifiers;
|
||||
}
|
||||
|
||||
public void setNotifiers(List<String> notifiers) {
|
||||
this.notifiers = notifiers;
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.HeartBeatBeaconU
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.HeartBeatBeaconConfig;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.config.datasource.DataSourceConfig;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.dao.HeartBeatBeaconDAOFactory;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementServiceImpl;
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -29,6 +30,8 @@ import org.apache.commons.logging.LogFactory;
|
||||
import org.osgi.service.component.ComponentContext;
|
||||
import org.wso2.carbon.ndatasource.core.DataSourceService;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* @scr.component name="io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.heartbeatBeaconComponent"
|
||||
* immediate="true"
|
||||
@ -62,6 +65,17 @@ public class HeartBeatBeaconComponent {
|
||||
HeartBeatExecutor.setUpNotifiers(HeartBeatBeaconUtils.getServerDetails());
|
||||
}
|
||||
|
||||
ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository
|
||||
= new ClusterFormationChangedNotifierRepository();
|
||||
List<String> notifiers = HeartBeatBeaconConfig.getInstance().getNotifiers();
|
||||
if (notifiers != null && notifiers.size() > 0) {
|
||||
for (String notifier : notifiers) {
|
||||
clusterFormationChangedNotifierRepository.addNotifier(notifier);
|
||||
}
|
||||
}
|
||||
HeartBeatBeaconDataHolder.getInstance().setClusterFormationChangedNotifierRepository(
|
||||
clusterFormationChangedNotifierRepository);
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Heart Beat Notifier bundle has been successfully initialized");
|
||||
}
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
|
||||
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.internal;
|
||||
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.ClusterFormationChangedNotifierRepository;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service.HeartBeatManagementService;
|
||||
|
||||
public class HeartBeatBeaconDataHolder {
|
||||
@ -27,6 +28,7 @@ public class HeartBeatBeaconDataHolder {
|
||||
|
||||
private static HeartBeatBeaconDataHolder thisInstance = new HeartBeatBeaconDataHolder();
|
||||
|
||||
private ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository;
|
||||
private HeartBeatBeaconDataHolder() {}
|
||||
|
||||
public static HeartBeatBeaconDataHolder getInstance() {
|
||||
@ -48,4 +50,12 @@ public class HeartBeatBeaconDataHolder {
|
||||
public void setLocalServerUUID(String localServerUUID) {
|
||||
this.localServerUUID = localServerUUID;
|
||||
}
|
||||
|
||||
public ClusterFormationChangedNotifierRepository getClusterFormationChangedNotifierRepository() {
|
||||
return clusterFormationChangedNotifierRepository;
|
||||
}
|
||||
|
||||
public void setClusterFormationChangedNotifierRepository(ClusterFormationChangedNotifierRepository clusterFormationChangedNotifierRepository) {
|
||||
this.clusterFormationChangedNotifierRepository = clusterFormationChangedNotifierRepository;
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,6 +69,7 @@ public class HeartBeatExecutor {
|
||||
try {
|
||||
recordHeartBeat(designatedUUID);
|
||||
electDynamicTaskExecutionCandidate(cumilativeTimeOut);
|
||||
notifyClusterFormationChanged(cumilativeTimeOut);
|
||||
} catch (Exception e) {
|
||||
log.error("Error while executing record heart beat task. This will result in schedule operation malfunction.", e);
|
||||
}
|
||||
@ -98,5 +99,8 @@ public class HeartBeatExecutor {
|
||||
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().electCandidate(cumilativeTimeOut);
|
||||
}
|
||||
|
||||
static void notifyClusterFormationChanged(int cumilativeTimeOut) throws HeartBeatManagementException {
|
||||
HeartBeatBeaconDataHolder.getInstance().getHeartBeatManagementService().notifyClusterFormationChanged(cumilativeTimeOut);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -0,0 +1,25 @@
|
||||
/*
|
||||
* Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
|
||||
*
|
||||
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service;
|
||||
|
||||
public interface ClusterFormationChangedNotifier {
|
||||
|
||||
String getType();
|
||||
|
||||
void notifyClusterFormationChanged(int hashIndex, int activeServerCount);
|
||||
}
|
||||
@ -0,0 +1,64 @@
|
||||
/*
|
||||
* Copyright (c) 2018 - 2023, Entgra (Pvt) Ltd. (http://www.entgra.io) All Rights Reserved.
|
||||
*
|
||||
* Entgra (Pvt) Ltd. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.service;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
public class ClusterFormationChangedNotifierRepository {
|
||||
|
||||
private Map<String, ClusterFormationChangedNotifier> notifiers;
|
||||
private static final Log log = LogFactory.getLog(ClusterFormationChangedNotifierRepository.class);
|
||||
|
||||
public ClusterFormationChangedNotifierRepository() {
|
||||
this.notifiers = new ConcurrentHashMap<>();
|
||||
}
|
||||
|
||||
public void addNotifier(ClusterFormationChangedNotifier notifier) {
|
||||
notifiers.put(notifier.getType(), notifier);
|
||||
}
|
||||
|
||||
public void addNotifier(String className) {
|
||||
try {
|
||||
if (!StringUtils.isEmpty(className)) {
|
||||
Class<?> clz = Class.forName(className);
|
||||
ClusterFormationChangedNotifier notifier = (ClusterFormationChangedNotifier) clz.newInstance();
|
||||
notifiers.put(notifier.getType(), notifier);
|
||||
}
|
||||
} catch (ClassNotFoundException e) {
|
||||
log.error("Provided ClusterFormationChangedNotifier implementation '" + className + "' cannot be found", e);
|
||||
} catch (InstantiationException e) {
|
||||
log.error("Error occurred while instantiating ClusterFormationChangedNotifier implementation '" +
|
||||
className + "'", e);
|
||||
} catch (IllegalAccessException e) {
|
||||
log.error("Error occurred while adding ClusterFormationChangedNotifier implementation '" + className + "'", e);
|
||||
}
|
||||
}
|
||||
|
||||
public ClusterFormationChangedNotifier getNotifier(String type) {
|
||||
return notifiers.get(type);
|
||||
}
|
||||
|
||||
public Map<String, ClusterFormationChangedNotifier> getNotifiers() {
|
||||
return notifiers;
|
||||
}
|
||||
}
|
||||
@ -36,6 +36,7 @@ public interface HeartBeatManagementService {
|
||||
boolean recordHeartBeat(HeartBeatEvent event) throws HeartBeatManagementException;
|
||||
|
||||
void electCandidate(int elapsedTimeInSeconds) throws HeartBeatManagementException;
|
||||
void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException;
|
||||
|
||||
boolean updateTaskExecutionAcknowledgement(String newTask) throws HeartBeatManagementException;
|
||||
|
||||
|
||||
@ -48,6 +48,9 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
||||
|
||||
private final HeartBeatDAO heartBeatDAO;
|
||||
|
||||
private static int lastActiveCount = -1;
|
||||
private static int lastHashIndex = -1;
|
||||
|
||||
public HeartBeatManagementServiceImpl() {
|
||||
this.heartBeatDAO = HeartBeatBeaconDAOFactory.getHeartBeatDAO();
|
||||
}
|
||||
@ -254,6 +257,46 @@ public class HeartBeatManagementServiceImpl implements HeartBeatManagementServic
|
||||
throw new HeartBeatManagementException(msg);
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
|
||||
if (HeartBeatBeaconConfig.getInstance().isEnabled()) {
|
||||
try {
|
||||
Map<String, ServerContext> servers = heartBeatDAO.getActiveServerDetails(elapsedTimeInSeconds);
|
||||
if (servers != null && !servers.isEmpty()) {
|
||||
String serverUUID = HeartBeatBeaconDataHolder.getInstance().getLocalServerUUID();
|
||||
ServerContext serverContext = servers.get(serverUUID);
|
||||
|
||||
// cluster change can be identified, either by changing hash index or changing active server count
|
||||
if ((lastHashIndex != serverContext.getIndex()) || (lastActiveCount != servers.size())) {
|
||||
lastHashIndex = serverContext.getIndex();
|
||||
lastActiveCount = servers.size();
|
||||
|
||||
ClusterFormationChangedNotifierRepository repository = HeartBeatBeaconDataHolder.getInstance().getClusterFormationChangedNotifierRepository();
|
||||
Map<String, ClusterFormationChangedNotifier> notifiers = repository.getNotifiers();
|
||||
for (String type : notifiers.keySet()) {
|
||||
ClusterFormationChangedNotifier notifier = notifiers.get(type);
|
||||
Runnable r = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
notifier.notifyClusterFormationChanged(lastHashIndex, lastActiveCount);
|
||||
}
|
||||
};
|
||||
new Thread(r).start();
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (HeartBeatDAOException e) {
|
||||
String msg = "Error occurred while notifyClusterFormationChanged.";
|
||||
log.error(msg, e);
|
||||
throw new HeartBeatManagementException(msg, e);
|
||||
}
|
||||
} else {
|
||||
String msg = "Heart Beat Configuration Disabled. Error while notifyClusterFormationChanged.";
|
||||
log.error(msg);
|
||||
throw new HeartBeatManagementException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private void electCandidate(Map<String, ServerContext> servers) throws HeartBeatDAOException {
|
||||
String electedCandidate = getRandomElement(servers.keySet());
|
||||
|
||||
@ -66,4 +66,9 @@ public class TestHeartBeatManagementService implements HeartBeatManagementServic
|
||||
@Override public Map<Integer, ServerContext> getActiveServers() throws HeartBeatManagementException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyClusterFormationChanged(int elapsedTimeInSeconds) throws HeartBeatManagementException {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -49,4 +49,7 @@
|
||||
<NotifierFrequencyInSeconds>300</NotifierFrequencyInSeconds>
|
||||
<TimeSkewInSeconds>5</TimeSkewInSeconds>
|
||||
<ServerTimeOutIntervalInSeconds>600</ServerTimeOutIntervalInSeconds>
|
||||
<ClusterFormationChangedNotifiers>
|
||||
<Notifier></Notifier>
|
||||
</ClusterFormationChangedNotifiers>
|
||||
</HeartBeatBeaconConfig>
|
||||
|
||||
@ -51,11 +51,21 @@
|
||||
<NotifierFrequencyInSeconds>{{heart_beat_beacon_conf.notifier_frequency_in_seconds}}</NotifierFrequencyInSeconds>
|
||||
<TimeSkewInSeconds>{{heart_beat_beacon_conf.time_skew_in_seconds}}</TimeSkewInSeconds>
|
||||
<ServerTimeOutIntervalInSeconds>{{heart_beat_beacon_conf.sever_timeout_interval_in_seconds}}</ServerTimeOutIntervalInSeconds>
|
||||
{% if heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers is defined %}
|
||||
<ClusterFormationChangedNotifiers>
|
||||
{%- for cluster_formation_changed_notifier in heart_beat_beacon_conf.cluster_formation_changed_configs.cluster_formation_changed_notifiers -%}
|
||||
<Notifier>{{cluster_formation_changed_notifier}}</Notifier>
|
||||
{% endfor %}
|
||||
</ClusterFormationChangedNotifiers>
|
||||
{% endif %}
|
||||
{% else %}
|
||||
<Enable>false</Enable>
|
||||
<NotifierInitialDelayInSeconds>30</NotifierInitialDelayInSeconds>
|
||||
<NotifierFrequencyInSeconds>300</NotifierFrequencyInSeconds>
|
||||
<TimeSkewInSeconds>5</TimeSkewInSeconds>
|
||||
<ServerTimeOutIntervalInSeconds>600</ServerTimeOutIntervalInSeconds>
|
||||
<ClusterFormationChangedNotifiers>
|
||||
<Notifier></Notifier>
|
||||
</ClusterFormationChangedNotifiers>
|
||||
{% endif %}
|
||||
</HeartBeatBeaconConfig>
|
||||
|
||||
Loading…
Reference in New Issue
Block a user