mirror of
https://repository.entgra.net/community/device-mgt-core.git
synced 2025-10-06 02:01:45 +00:00
Fix inconsistencies in dynamic and random task execution
This commit is contained in:
parent
c8cf4b86ca
commit
4045e39a76
@ -18,9 +18,6 @@
|
||||
package io.entgra.device.mgt.core.device.mgt.core.operation.timeout.task.impl;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import io.entgra.device.mgt.core.device.mgt.common.exceptions.DeviceManagementException;
|
||||
import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.Activity;
|
||||
import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.ActivityStatus;
|
||||
@ -29,75 +26,85 @@ import io.entgra.device.mgt.core.device.mgt.common.operation.mgt.OperationManage
|
||||
import io.entgra.device.mgt.core.device.mgt.core.config.operation.timeout.OperationTimeout;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.dto.DeviceType;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.internal.DeviceManagementDataHolder;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.task.impl.RandomlyAssignedScheduleTask;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
|
||||
public class OperationTimeoutTask extends RandomlyAssignedScheduleTask {
|
||||
|
||||
private static final Log log = LogFactory.getLog(OperationTimeoutTask.class);
|
||||
public static final String OPERATION_TIMEOUT_TASK = "OPERATION_TIMEOUT_TASK";
|
||||
private Map<String, String> properties;
|
||||
|
||||
@Override
|
||||
public final void setProperties(Map<String, String> properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public final String getProperty(String name) {
|
||||
if (properties == null) {
|
||||
return null;
|
||||
}
|
||||
return properties.get(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void executeDynamicTask() {
|
||||
if (isQualifiedToExecuteTask()) { // this task will run only in one node when the deployment has multiple nodes
|
||||
String operationTimeoutTaskConfigStr = getProperty(
|
||||
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
|
||||
try {
|
||||
long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout();
|
||||
List<String> deviceTypes = new ArrayList<>();
|
||||
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
|
||||
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
|
||||
try {
|
||||
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
|
||||
.getDeviceManagementProvider().getDeviceTypes();
|
||||
for (DeviceType deviceType : deviceTypeList) {
|
||||
deviceTypes.add(deviceType.getName());
|
||||
}
|
||||
} catch (DeviceManagementException e) {
|
||||
log.error("Error occurred while reading device types", e);
|
||||
}
|
||||
} else {
|
||||
deviceTypes = operationTimeoutConfig.getDeviceTypes();
|
||||
}
|
||||
List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis,
|
||||
operationTimeoutConfig.getInitialStatus());
|
||||
for (Activity activity : activities) {
|
||||
for (ActivityStatus activityStatus : activity.getActivityStatus()) {
|
||||
String operationId = activity.getActivityId().replace("ACTIVITY_", "");
|
||||
Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.getOperation(Integer.parseInt(operationId));
|
||||
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus()));
|
||||
DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.updateOperation(activityStatus.getDeviceIdentifier(), operation);
|
||||
}
|
||||
}
|
||||
public String getTaskName() {
|
||||
return OPERATION_TIMEOUT_TASK;
|
||||
}
|
||||
|
||||
} catch (OperationManagementException e) {
|
||||
String msg = "Error occurred while retrieving operations.";
|
||||
log.error(msg, e);
|
||||
@Override
|
||||
protected void executeRandomlyAssignedTask() {
|
||||
// this task will run only in one node when the deployment has multiple nodes
|
||||
String operationTimeoutTaskConfigStr = getProperty(
|
||||
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
|
||||
try {
|
||||
long timeMillis = System.currentTimeMillis() - (long) operationTimeoutConfig.getTimeout();
|
||||
List<String> deviceTypes = new ArrayList<>();
|
||||
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
|
||||
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
|
||||
try {
|
||||
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
|
||||
.getDeviceManagementProvider().getDeviceTypes();
|
||||
for (DeviceType deviceType : deviceTypeList) {
|
||||
deviceTypes.add(deviceType.getName());
|
||||
}
|
||||
} catch (DeviceManagementException e) {
|
||||
log.error("Error occurred while reading device types", e);
|
||||
}
|
||||
} else {
|
||||
deviceTypes = operationTimeoutConfig.getDeviceTypes();
|
||||
}
|
||||
List<Activity> activities = DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.getActivities(deviceTypes, operationTimeoutConfig.getCode(), timeMillis,
|
||||
operationTimeoutConfig.getInitialStatus());
|
||||
for (Activity activity : activities) {
|
||||
for (ActivityStatus activityStatus : activity.getActivityStatus()) {
|
||||
String operationId = activity.getActivityId().replace("ACTIVITY_", "");
|
||||
Operation operation = DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.getOperation(Integer.parseInt(operationId));
|
||||
operation.setStatus(Operation.Status.valueOf(operationTimeoutConfig.getNextStatus()));
|
||||
DeviceManagementDataHolder.getInstance().getOperationManager()
|
||||
.updateOperation(activityStatus.getDeviceIdentifier(), operation);
|
||||
}
|
||||
}
|
||||
|
||||
} catch (OperationManagementException e) {
|
||||
String msg = "Error occurred while retrieving operations.";
|
||||
log.error(msg, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private boolean isQualifiedToExecuteTask() {
|
||||
if (isDynamicTaskEligible()) {
|
||||
try {
|
||||
return DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
|
||||
} catch (HeartBeatManagementException e) {
|
||||
log.error("Error while checking is qualified to execute task", e);
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@ -23,16 +23,16 @@ import org.apache.commons.logging.LogFactory;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalException;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalService;
|
||||
import io.entgra.device.mgt.core.device.mgt.core.archival.ArchivalServiceImpl;
|
||||
import org.wso2.carbon.ntask.core.Task;
|
||||
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ArchivalTask implements Task {
|
||||
public class ArchivalTask extends RandomlyAssignedScheduleTask {
|
||||
|
||||
private static final Log log = LogFactory.getLog(ArchivalTask.class);
|
||||
private static final String TASK_NAME = "DATA_ARCHIVAL_TASK";
|
||||
|
||||
private ArchivalService archivalService;
|
||||
|
||||
@ -42,12 +42,12 @@ public class ArchivalTask implements Task {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
protected void setup() {
|
||||
this.archivalService = new ArchivalServiceImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() {
|
||||
protected void executeRandomlyAssignedTask() {
|
||||
log.info("Executing ArchivalTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
|
||||
long startTime = System.currentTimeMillis();
|
||||
try {
|
||||
@ -60,6 +60,11 @@ public class ArchivalTask implements Task {
|
||||
log.info("ArchivalTask completed. Total execution time: " + getDurationBreakdown(difference));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskName() {
|
||||
return TASK_NAME;
|
||||
}
|
||||
|
||||
private String getDurationBreakdown(long millis) {
|
||||
if (millis < 0) {
|
||||
throw new IllegalArgumentException("Duration must be greater than zero!");
|
||||
@ -74,4 +79,5 @@ public class ArchivalTask implements Task {
|
||||
|
||||
return (days + " Days " + hours + " Hours " + minutes + " Minutes " + seconds + " Seconds");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -29,9 +29,10 @@ import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
public class ArchivedDataDeletionTask implements Task {
|
||||
public class ArchivedDataDeletionTask extends RandomlyAssignedScheduleTask {
|
||||
|
||||
private static Log log = LogFactory.getLog(ArchivedDataDeletionTask.class);
|
||||
private static final Log log = LogFactory.getLog(ArchivedDataDeletionTask.class);
|
||||
private static final String TASK_NAME = "ARCHIVED_DATA_CLEANUP_TASK";
|
||||
|
||||
private ArchivalService archivalService;
|
||||
|
||||
@ -41,12 +42,12 @@ public class ArchivedDataDeletionTask implements Task {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
public void setup() {
|
||||
this.archivalService = new ArchivalServiceImpl();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute() {
|
||||
protected void executeRandomlyAssignedTask() {
|
||||
log.info("Executing DataDeletionTask at " + new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(new Date()));
|
||||
long startTime = System.nanoTime();
|
||||
try {
|
||||
@ -58,4 +59,10 @@ public class ArchivedDataDeletionTask implements Task {
|
||||
long difference = (endTime - startTime) / (1000000 * 1000);
|
||||
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getTaskName() {
|
||||
return TASK_NAME;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -109,4 +109,5 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
|
||||
protected void setup() {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -53,7 +53,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
public final void init() {
|
||||
try {
|
||||
boolean dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
|
||||
if(dynamicTaskEnabled){
|
||||
if (dynamicTaskEnabled) {
|
||||
taskContext = new DynamicTaskContext();
|
||||
taskContext.setPartitioningEnabled(true);
|
||||
} else {
|
||||
@ -75,9 +75,9 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
|
||||
// These tasks are not dynamically scheduled. They are added via a config so scheduled in each node
|
||||
// during the server startup
|
||||
if (localHashIndex == null ) {
|
||||
if (localHashIndex == null) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " +
|
||||
log.debug("Executing startup scheduled task (" + getTaskName() + ") with class: " +
|
||||
this.getClass().getName());
|
||||
}
|
||||
executeDynamicTask();
|
||||
@ -116,7 +116,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
|
||||
private void updateContext() throws HeartBeatManagementException {
|
||||
ServerCtxInfo ctxInfo = DeviceManagementDataHolder.getInstance().getHeartBeatService().getServerCtxInfo();
|
||||
if(ctxInfo != null) {
|
||||
if (ctxInfo != null) {
|
||||
populateContext(ctxInfo);
|
||||
} else {
|
||||
log.info("Dynamic Task Context not present. Tasks will run on regular worker/manager mode.");
|
||||
@ -127,10 +127,10 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
taskContext.setActiveServerCount(ctxInfo.getActiveServerCount());
|
||||
taskContext.setServerHashIndex(ctxInfo.getLocalServerHashIdx());
|
||||
|
||||
if(log.isDebugEnabled()){
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Initiating execution of dynamic task for server : " + taskContext.getServerHashIndex() +
|
||||
" where active server count is : " + taskContext.getActiveServerCount() +
|
||||
" partitioning task enabled : " + taskContext.isPartitioningEnabled());
|
||||
" where active server count is : " + taskContext.getActiveServerCount() +
|
||||
" partitioning task enabled : " + taskContext.isPartitioningEnabled());
|
||||
}
|
||||
}
|
||||
|
||||
@ -142,7 +142,8 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
return taskContext;
|
||||
}
|
||||
|
||||
public static boolean isDynamicTaskEligible(){
|
||||
@Deprecated
|
||||
public static boolean isDynamicTaskEligible() {
|
||||
return taskContext != null && taskContext.isPartitioningEnabled();
|
||||
}
|
||||
|
||||
|
||||
@ -38,7 +38,7 @@ public abstract class RandomlyAssignedScheduleTask implements Task {
|
||||
try {
|
||||
dynamicTaskEnabled = DeviceManagementDataHolder.getInstance().getHeartBeatService().isTaskPartitioningEnabled();
|
||||
} catch (HeartBeatManagementException e) {
|
||||
log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling." , e);
|
||||
log.error("Error Instantiating Variables necessary for Randomly Assigned Task Scheduling.", e);
|
||||
}
|
||||
//This is done so that sub class extending this abstract class is forced to specify a task name.
|
||||
taskName = getTaskName();
|
||||
@ -48,18 +48,20 @@ public abstract class RandomlyAssignedScheduleTask implements Task {
|
||||
@Override
|
||||
public final void execute() {
|
||||
refreshContext();
|
||||
executeRandomlyAssignedTask();
|
||||
if (isQualifiedToExecuteTask()) {
|
||||
executeRandomlyAssignedTask();
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshContext(){
|
||||
if(dynamicTaskEnabled) {
|
||||
public void refreshContext() {
|
||||
if (dynamicTaskEnabled) {
|
||||
try {
|
||||
qualifiedToExecuteTask = DeviceManagementDataHolder.getInstance().getHeartBeatService().isQualifiedToExecuteTask();
|
||||
log.info("## NODE Qualified to execute Randomly Assigned Task : " + taskName);
|
||||
DeviceManagementDataHolder.getInstance().getHeartBeatService().updateTaskExecutionAcknowledgement(taskName);
|
||||
} catch (HeartBeatManagementException e) {
|
||||
log.error("Error refreshing Variables necessary for Randomly Assigned Scheduled Task. " +
|
||||
"Dynamic Tasks will not function.", e);
|
||||
"Dynamic Tasks will not function.", e);
|
||||
}
|
||||
} else {
|
||||
qualifiedToExecuteTask = true;
|
||||
@ -75,4 +77,5 @@ public abstract class RandomlyAssignedScheduleTask implements Task {
|
||||
}
|
||||
|
||||
public abstract String getTaskName();
|
||||
|
||||
}
|
||||
|
||||
@ -64,7 +64,7 @@ public class DelegationTask extends DynamicPartitionedScheduleTask {
|
||||
try {
|
||||
devices = new ArrayList<>();
|
||||
toBeNotified = new ArrayList<>();
|
||||
if (isDynamicTaskEligible()) {
|
||||
if(getTaskContext() != null && getTaskContext().isPartitioningEnabled()){
|
||||
devices.addAll(service.getAllocatedDevices(deviceType,
|
||||
getTaskContext().getActiveServerCount(),
|
||||
getTaskContext().getServerHashIndex()));
|
||||
|
||||
@ -105,7 +105,7 @@ public class MonitoringTask extends DynamicPartitionedScheduleTask {
|
||||
PolicyManagementDataHolder.getInstance().getDeviceManagementService()
|
||||
.getPolicyMonitoringManager(deviceType);
|
||||
List<Device> devices;
|
||||
if(isDynamicTaskEligible()){
|
||||
if (getTaskContext() != null && getTaskContext().isPartitioningEnabled()) {
|
||||
devices = deviceManagementProviderService
|
||||
.getAllocatedDevices(deviceType, getTaskContext().getActiveServerCount(),
|
||||
getTaskContext().getServerHashIndex());
|
||||
|
||||
@ -49,5 +49,6 @@ public class TaskMgtConstants {
|
||||
public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__";
|
||||
public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__";
|
||||
public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__";
|
||||
public static final String DYNAMIC_TASK_ID = "__DYNAMIC_TASK_ID__";
|
||||
}
|
||||
}
|
||||
|
||||
@ -22,6 +22,7 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException
|
||||
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public interface TaskManagementService {
|
||||
|
||||
@ -37,7 +38,9 @@ public interface TaskManagementService {
|
||||
|
||||
List<DynamicTask> getAllDynamicTasks() throws TaskManagementException;
|
||||
|
||||
DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException;
|
||||
Map<Integer, List<DynamicTask>> getDynamicTasksForAllTenants() throws TaskManagementException;
|
||||
|
||||
DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException;
|
||||
|
||||
List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException;
|
||||
}
|
||||
|
||||
@ -27,16 +27,18 @@ import java.util.List;
|
||||
*/
|
||||
public interface DynamicTaskDAO {
|
||||
|
||||
int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException;
|
||||
int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException;
|
||||
boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException;
|
||||
void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException;
|
||||
DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
List<DynamicTask> getAllDynamicTasks() throws TaskManagementDAOException;
|
||||
|
||||
List<DynamicTask> getActiveDynamicTasks() throws TaskManagementDAOException;
|
||||
List<DynamicTask> getAllDynamicTasks(int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
List<DynamicTask> getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
}
|
||||
|
||||
@ -26,10 +26,11 @@ import java.util.Map;
|
||||
*/
|
||||
public interface DynamicTaskPropDAO {
|
||||
|
||||
void addTaskProperties(int dynamicTaskId, Map<String, String> properties) throws TaskManagementDAOException;
|
||||
void addTaskProperties(int dynamicTaskId, Map<String, String> properties, int tenantId)
|
||||
throws TaskManagementDAOException;
|
||||
|
||||
Map<String, String> getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException;
|
||||
Map<String, String> getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException;
|
||||
|
||||
void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties)
|
||||
void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties, int tenantId)
|
||||
throws TaskManagementDAOException;
|
||||
}
|
||||
|
||||
@ -103,18 +103,22 @@ public class TaskManagementDAOFactory {
|
||||
conn.setAutoCommit(false);
|
||||
currentConnection.set(conn);
|
||||
} catch (SQLException e) {
|
||||
throw new TransactionManagementException("Error occurred while retrieving config.datasource connection", e);
|
||||
throw new TransactionManagementException("Error occurred while retrieving datasource connection", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void openConnection() throws SQLException {
|
||||
public static void openConnection() throws TransactionManagementException {
|
||||
Connection conn = currentConnection.get();
|
||||
if (conn != null) {
|
||||
throw new IllegalTransactionStateException("A transaction is already active within the context of " +
|
||||
"this particular thread. Therefore, calling 'beginTransaction/openConnection' while another " +
|
||||
"transaction is already active is a sign of improper transaction handling");
|
||||
}
|
||||
conn = dataSource.getConnection();
|
||||
try {
|
||||
conn = dataSource.getConnection();
|
||||
} catch (SQLException e) {
|
||||
throw new TransactionManagementException("Error occurred while retrieving datasource connection", e);
|
||||
}
|
||||
currentConnection.set(conn);
|
||||
}
|
||||
|
||||
|
||||
@ -17,16 +17,19 @@
|
||||
*/
|
||||
package io.entgra.device.mgt.core.task.mgt.core.dao.impl;
|
||||
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementDAOException;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||
|
||||
import java.sql.*;
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
|
||||
|
||||
@ -34,9 +37,9 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
private static final Log log = LogFactory.getLog(DynamicTaskDAOImpl.class);
|
||||
|
||||
@Override
|
||||
public int addTask(DynamicTask dynamicTask) throws TaskManagementDAOException {
|
||||
public int addTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException {
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet rs = null;
|
||||
ResultSet rs;
|
||||
int taskId = -1;
|
||||
try {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
@ -48,13 +51,14 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
stmt.setString(2, dynamicTask.getName());
|
||||
stmt.setBoolean(3, dynamicTask.isEnabled());
|
||||
stmt.setString(4, dynamicTask.getTaskClassName());
|
||||
stmt.setInt(5, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
|
||||
stmt.setInt(5, tenantId);
|
||||
stmt.executeUpdate();
|
||||
|
||||
rs = stmt.getGeneratedKeys();
|
||||
if (rs.next()) {
|
||||
taskId = rs.getInt(1);
|
||||
}
|
||||
dynamicTask.setDynamicTaskId(taskId);
|
||||
return taskId;
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while inserting task '" + dynamicTask.getName() + "'";
|
||||
@ -66,16 +70,17 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean updateDynamicTask(DynamicTask dynamicTask) throws TaskManagementDAOException {
|
||||
public boolean updateDynamicTask(DynamicTask dynamicTask, int tenantId) throws TaskManagementDAOException {
|
||||
PreparedStatement stmt = null;
|
||||
int rows;
|
||||
try {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ?";
|
||||
String sql = "UPDATE DYNAMIC_TASK SET CRON = ?,IS_ENABLED = ? WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setString(1, dynamicTask.getCronExpression());
|
||||
stmt.setBoolean(2, dynamicTask.isEnabled());
|
||||
stmt.setInt(3, dynamicTask.getDynamicTaskId());
|
||||
stmt.setInt(4, tenantId);
|
||||
rows = stmt.executeUpdate();
|
||||
return (rows > 0);
|
||||
} catch (SQLException e) {
|
||||
@ -87,9 +92,8 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void deleteDynamicTask(int dynamicTaskId) throws TaskManagementDAOException {
|
||||
public void deleteDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Request received in DAO Layer to delete dynamic task with the id: " + dynamicTaskId);
|
||||
}
|
||||
@ -98,7 +102,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
try (PreparedStatement stmt = conn.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS)) {
|
||||
stmt.setInt(1, dynamicTaskId);
|
||||
stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
|
||||
stmt.setInt(2, tenantId);
|
||||
stmt.executeUpdate();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
@ -110,7 +114,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementDAOException {
|
||||
public DynamicTask getDynamicTask(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
|
||||
DynamicTask dynamicTask = null;
|
||||
try {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
@ -118,7 +122,7 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
|
||||
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
|
||||
stmt.setInt(1, dynamicTaskId);
|
||||
stmt.setInt(2, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
|
||||
stmt.setInt(2, tenantId);
|
||||
try (ResultSet rs = stmt.executeQuery()) {
|
||||
if (rs.next()) {
|
||||
dynamicTask = TaskManagementDAOUtil.loadDynamicTask(rs);
|
||||
@ -155,13 +159,14 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementDAOException {
|
||||
List<DynamicTask> dynamicTasks = null;
|
||||
public List<DynamicTask> getAllDynamicTasks(int tenantId) throws TaskManagementDAOException {
|
||||
List<DynamicTask> dynamicTasks;
|
||||
try {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' ";
|
||||
String sql = "SELECT * FROM DYNAMIC_TASK WHERE TENANT_ID = ?";
|
||||
|
||||
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
|
||||
stmt.setInt(1, tenantId);
|
||||
try (ResultSet rs = stmt.executeQuery()) {
|
||||
dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs);
|
||||
}
|
||||
@ -173,4 +178,26 @@ public class DynamicTaskDAOImpl implements DynamicTaskDAO {
|
||||
}
|
||||
return dynamicTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DynamicTask> getActiveDynamicTasks(int tenantId) throws TaskManagementDAOException {
|
||||
List<DynamicTask> dynamicTasks;
|
||||
try {
|
||||
Connection conn = TaskManagementDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DYNAMIC_TASK WHERE IS_ENABLED = 'true' AND TENANT_ID = ?";
|
||||
|
||||
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
|
||||
stmt.setInt(1, tenantId);
|
||||
try (ResultSet rs = stmt.executeQuery()) {
|
||||
dynamicTasks = TaskManagementDAOUtil.loadDynamicTasks(rs);
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while getting all dynamic task data ";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementDAOException(msg, e);
|
||||
}
|
||||
return dynamicTasks;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -38,9 +38,9 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
private static final Log log = LogFactory.getLog(DynamicTaskPropDAOImpl.class);
|
||||
|
||||
@Override
|
||||
public void addTaskProperties(int taskId, Map<String, String> properties)
|
||||
public void addTaskProperties(int taskId, Map<String, String> properties, int tenantId)
|
||||
throws TaskManagementDAOException {
|
||||
Connection conn = null;
|
||||
Connection conn;
|
||||
PreparedStatement stmt = null;
|
||||
try {
|
||||
conn = TaskManagementDAOFactory.getConnection();
|
||||
@ -51,7 +51,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
stmt.setInt(1, taskId);
|
||||
stmt.setString(2, propertyKey);
|
||||
stmt.setString(3, properties.get(propertyKey));
|
||||
stmt.setInt(4, PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId());
|
||||
stmt.setInt(4, tenantId);
|
||||
stmt.addBatch();
|
||||
}
|
||||
stmt.executeBatch();
|
||||
@ -64,17 +64,17 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public Map<String, String> getDynamicTaskProps(int dynamicTaskId) throws TaskManagementDAOException {
|
||||
Connection conn = null;
|
||||
public Map<String, String> getDynamicTaskProps(int dynamicTaskId, int tenantId) throws TaskManagementDAOException {
|
||||
Connection conn;
|
||||
PreparedStatement stmt = null;
|
||||
ResultSet resultSet = null;
|
||||
Map<String, String> properties;
|
||||
try {
|
||||
conn = TaskManagementDAOFactory.getConnection();
|
||||
stmt = conn.prepareStatement(
|
||||
"SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ?");
|
||||
"SELECT * FROM DYNAMIC_TASK_PROPERTIES WHERE DYNAMIC_TASK_ID = ? AND TENANT_ID = ?");
|
||||
stmt.setInt(1, dynamicTaskId);
|
||||
stmt.setInt(2, tenantId);
|
||||
resultSet = stmt.executeQuery();
|
||||
properties = new HashMap<>();
|
||||
while (resultSet.next()) {
|
||||
@ -92,7 +92,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties)
|
||||
public void updateDynamicTaskProps(int dynamicTaskId, Map<String, String> properties, int tenantId)
|
||||
throws TaskManagementDAOException {
|
||||
if (properties.isEmpty()) {
|
||||
if (log.isDebugEnabled()) {
|
||||
@ -105,12 +105,13 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
try {
|
||||
conn = TaskManagementDAOFactory.getConnection();
|
||||
stmt = conn.prepareStatement("UPDATE DYNAMIC_TASK_PROPERTIES SET PROPERTY_VALUE = ? " +
|
||||
"WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ?");
|
||||
"WHERE DYNAMIC_TASK_ID = ? AND PROPERTY_NAME = ? AND TENANT_ID = ?");
|
||||
|
||||
for (Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
stmt.setString(1, entry.getValue());
|
||||
stmt.setInt(2, dynamicTaskId);
|
||||
stmt.setString(3, entry.getKey());
|
||||
stmt.setInt(4, tenantId);
|
||||
stmt.addBatch();
|
||||
}
|
||||
stmt.executeBatch();
|
||||
@ -121,4 +122,5 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
TaskManagementDAOUtil.cleanupResources(stmt, null);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,10 +17,6 @@
|
||||
*/
|
||||
package io.entgra.device.mgt.core.task.mgt.core.service;
|
||||
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants;
|
||||
@ -30,14 +26,21 @@ import io.entgra.device.mgt.core.task.mgt.common.exception.TaskNotFoundException
|
||||
import io.entgra.device.mgt.core.task.mgt.common.exception.TransactionManagementException;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.spi.TaskManagementService;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskDAO;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.DynamicTaskPropDAO;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.dao.common.TaskManagementDAOFactory;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.util.TaskManagementUtil;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||
import org.wso2.carbon.ntask.common.TaskException;
|
||||
import org.wso2.carbon.ntask.core.TaskInfo;
|
||||
import org.wso2.carbon.ntask.core.TaskManager;
|
||||
import org.wso2.carbon.ntask.core.service.TaskService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -78,42 +81,43 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
|
||||
@Override
|
||||
public void createTask(DynamicTask dynamicTask) throws TaskManagementException {
|
||||
String taskId;
|
||||
String nTaskName;
|
||||
int dynamicTaskId;
|
||||
int serverHashIdx;
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
try {
|
||||
// add into the dynamic task tables
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
int dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask);
|
||||
|
||||
Map<String, String> taskProperties = dynamicTask.getProperties();
|
||||
dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, taskProperties);
|
||||
|
||||
// add into the ntask core
|
||||
taskId = TaskManagementUtil.generateTaskId(dynamicTaskId);
|
||||
dynamicTaskId = dynamicTaskDAO.addTask(dynamicTask, tenantId);
|
||||
dynamicTaskPropDAO.addTaskProperties(dynamicTaskId, dynamicTask.getProperties(), tenantId);
|
||||
|
||||
try {
|
||||
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId);
|
||||
nTaskName = TaskManagementUtil.generateNTaskName(dynamicTaskId, serverHashIdx);
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Unexpected exception when getting server hash index.";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
}
|
||||
|
||||
if (!isTaskExists(taskId)) {
|
||||
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||
triggerInfo.setCronExpression(dynamicTask.getCronExpression());
|
||||
TaskInfo taskInfo = new TaskInfo(taskId, dynamicTask.getTaskClassName(), taskProperties, triggerInfo);
|
||||
taskManager.registerTask(taskInfo);
|
||||
taskManager.scheduleTask(taskId);
|
||||
if (!dynamicTask.isEnabled()) {
|
||||
taskManager.pauseTask(taskId);
|
||||
}
|
||||
} else {
|
||||
String msg = "Task '" + taskId + "' is already exists in the ntask core "
|
||||
+ "Hence not creating another task for the same name.";
|
||||
log.error(msg);
|
||||
if (isTaskExists(nTaskName)) {
|
||||
String msg = "Task '" + nTaskName + "' is already exists in the ntask core. "
|
||||
+ "Hence removing existing task from nTask before adding new one.";
|
||||
log.warn(msg);
|
||||
taskManager.deleteTask(nTaskName);
|
||||
}
|
||||
|
||||
// add into the ntask core
|
||||
Map<String, String> taskProperties = TaskManagementUtil
|
||||
.populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx);
|
||||
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||
triggerInfo.setCronExpression(dynamicTask.getCronExpression());
|
||||
TaskInfo taskInfo = new TaskInfo(nTaskName, dynamicTask.getTaskClassName(), taskProperties, triggerInfo);
|
||||
taskManager.registerTask(taskInfo);
|
||||
taskManager.scheduleTask(nTaskName);
|
||||
if (!dynamicTask.isEnabled()) {
|
||||
taskManager.pauseTask(nTaskName);
|
||||
}
|
||||
|
||||
TaskManagementDAOFactory.commitTransaction();
|
||||
@ -137,19 +141,20 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateTask(int dynamicTaskId, DynamicTask dynamicTask) throws TaskManagementException
|
||||
, TaskNotFoundException {
|
||||
public void updateTask(int dynamicTaskId, DynamicTask dynamicTask)
|
||||
throws TaskManagementException, TaskNotFoundException {
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
try {
|
||||
//Update dynamic task table
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId);
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
|
||||
|
||||
if (existingTask != null) {
|
||||
existingTask.setEnabled(dynamicTask.isEnabled());
|
||||
existingTask.setCronExpression(dynamicTask.getCronExpression());
|
||||
dynamicTaskDAO.updateDynamicTask(existingTask);
|
||||
dynamicTaskDAO.updateDynamicTask(existingTask, tenantId);
|
||||
if (!dynamicTask.getProperties().isEmpty()) {
|
||||
dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties());
|
||||
dynamicTaskPropDAO.updateDynamicTaskProps(dynamicTaskId, dynamicTask.getProperties(), tenantId);
|
||||
}
|
||||
} else {
|
||||
String msg = "Task '" + dynamicTaskId + "' is not exists in the dynamic task table.";
|
||||
@ -158,12 +163,14 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
// Update task in the ntask core
|
||||
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(taskId)) {
|
||||
TaskInfo taskInfo = taskManager.getTask(taskId);
|
||||
if (!dynamicTask.getProperties().isEmpty()) {
|
||||
taskInfo.setProperties(dynamicTask.getProperties());
|
||||
}
|
||||
String nTaskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(nTaskName)) {
|
||||
TaskInfo taskInfo = taskManager.getTask(nTaskName);
|
||||
|
||||
Map<String, String> taskProperties = TaskManagementUtil
|
||||
.populateNTaskProperties(dynamicTask, nTaskName);
|
||||
taskInfo.setProperties(taskProperties);
|
||||
|
||||
TaskInfo.TriggerInfo triggerInfo;
|
||||
if (taskInfo.getTriggerInfo() == null) {
|
||||
triggerInfo = new TaskInfo.TriggerInfo();
|
||||
@ -173,9 +180,9 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
triggerInfo.setCronExpression(dynamicTask.getCronExpression());
|
||||
taskInfo.setTriggerInfo(triggerInfo);
|
||||
taskManager.registerTask(taskInfo);
|
||||
taskManager.rescheduleTask(taskId);
|
||||
taskManager.rescheduleTask(nTaskName);
|
||||
} else {
|
||||
String msg = "Task '" + taskId + "' is not exists in the n task core "
|
||||
String msg = "Task '" + nTaskName + "' is not exists in the n task core "
|
||||
+ "Hence cannot update the task.";
|
||||
log.error(msg);
|
||||
}
|
||||
@ -200,16 +207,17 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void toggleTask(int dynamicTaskId, boolean isEnabled) throws TaskManagementException
|
||||
, TaskNotFoundException {
|
||||
public void toggleTask(int dynamicTaskId, boolean isEnabled)
|
||||
throws TaskManagementException, TaskNotFoundException {
|
||||
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
try {
|
||||
//update dynamic task table
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId);
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
|
||||
if (existingTask != null) {
|
||||
existingTask.setEnabled(isEnabled);
|
||||
dynamicTaskDAO.updateDynamicTask(existingTask);
|
||||
dynamicTaskDAO.updateDynamicTask(existingTask, tenantId);
|
||||
} else {
|
||||
String msg = "Task '" + dynamicTaskId + "' is not exists.";
|
||||
log.error(msg);
|
||||
@ -217,15 +225,15 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
// Update task in the ntask core
|
||||
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(taskId)) {
|
||||
String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(taskName)) {
|
||||
if (isEnabled) {
|
||||
taskManager.resumeTask(taskId);
|
||||
taskManager.resumeTask(taskName);
|
||||
} else {
|
||||
taskManager.pauseTask(taskId);
|
||||
taskManager.pauseTask(taskName);
|
||||
}
|
||||
} else {
|
||||
String msg = "Task '" + taskId + "' is not exists in the ntask core "
|
||||
String msg = "Task '" + taskName + "' is not exists in the ntask core "
|
||||
+ "Hence cannot toggle the task in the ntask.";
|
||||
log.error(msg);
|
||||
}
|
||||
@ -251,22 +259,23 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
@Override
|
||||
public void deleteTask(int dynamicTaskId) throws TaskManagementException, TaskNotFoundException {
|
||||
// delete task from dynamic task table
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
try {
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId);
|
||||
DynamicTask existingTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
|
||||
if (existingTask != null) {
|
||||
dynamicTaskDAO.deleteDynamicTask(dynamicTaskId);
|
||||
dynamicTaskDAO.deleteDynamicTask(dynamicTaskId, tenantId);
|
||||
} else {
|
||||
String msg = "Task '" + dynamicTaskId + "' is not exists.";
|
||||
log.error(msg);
|
||||
throw new TaskNotFoundException(msg);
|
||||
}
|
||||
|
||||
String taskId = TaskManagementUtil.generateTaskId(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(taskId)) {
|
||||
taskManager.deleteTask(taskId);
|
||||
String taskName = TaskManagementUtil.generateNTaskName(existingTask.getDynamicTaskId());
|
||||
if (isTaskExists(taskName)) {
|
||||
taskManager.deleteTask(taskName);
|
||||
} else {
|
||||
String msg = "Task '" + taskId + "' is not exists in the ntask core "
|
||||
String msg = "Task '" + taskName + "' is not exists in the ntask core "
|
||||
+ "Hence cannot delete from the ntask core.";
|
||||
log.error(msg);
|
||||
}
|
||||
@ -292,22 +301,21 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
|
||||
@Override
|
||||
public List<DynamicTask> getAllDynamicTasks() throws TaskManagementException {
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
List<DynamicTask> dynamicTasks;
|
||||
try {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Fetching the details of all dynamic tasks");
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Fetching the details of all dynamic tasks");
|
||||
}
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
dynamicTasks = dynamicTaskDAO.getAllDynamicTasks();
|
||||
TaskManagementDAOFactory.openConnection();
|
||||
dynamicTasks = dynamicTaskDAO.getAllDynamicTasks(tenantId);
|
||||
if (dynamicTasks != null) {
|
||||
for (DynamicTask dynamicTask : dynamicTasks) {
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO
|
||||
.getDynamicTaskProps(dynamicTask.getDynamicTaskId()));
|
||||
.getDynamicTaskProps(dynamicTask.getDynamicTaskId(), tenantId));
|
||||
}
|
||||
}
|
||||
TaskManagementDAOFactory.commitTransaction();
|
||||
} catch (TaskManagementDAOException e) {
|
||||
TaskManagementDAOFactory.rollbackTransaction();
|
||||
String msg = "Error occurred while fetching all dynamic tasks";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
@ -322,20 +330,63 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamicTask getDynamicTaskById(int dynamicTaskId) throws TaskManagementException {
|
||||
public Map<Integer, List<DynamicTask>> getDynamicTasksForAllTenants() throws TaskManagementException {
|
||||
List<DynamicTask> dynamicTasks;
|
||||
try {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Fetching the details of dynamic tasks for all tenants");
|
||||
}
|
||||
TaskManagementDAOFactory.openConnection();
|
||||
dynamicTasks = dynamicTaskDAO.getAllDynamicTasks();
|
||||
if (dynamicTasks != null) {
|
||||
for (DynamicTask dynamicTask : dynamicTasks) {
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO
|
||||
.getDynamicTaskProps(dynamicTask.getDynamicTaskId(), dynamicTask.getTenantId()));
|
||||
}
|
||||
}
|
||||
} catch (TaskManagementDAOException e) {
|
||||
String msg = "Error occurred while fetching all dynamic tasks";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
} catch (TransactionManagementException e) {
|
||||
String msg = "Failed to start/open transaction to get all dynamic tasks";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
} finally {
|
||||
TaskManagementDAOFactory.closeConnection();
|
||||
}
|
||||
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
|
||||
List<DynamicTask> dts;
|
||||
if (dynamicTasks != null) {
|
||||
for (DynamicTask dt : dynamicTasks) {
|
||||
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
|
||||
dts = tenantedDynamicTasks.get(dt.getTenantId());
|
||||
} else {
|
||||
dts = new ArrayList<>();
|
||||
}
|
||||
dts.add(dt);
|
||||
tenantedDynamicTasks.put(dt.getTenantId(), dts);
|
||||
}
|
||||
}
|
||||
return tenantedDynamicTasks;
|
||||
}
|
||||
|
||||
@Override
|
||||
public DynamicTask getDynamicTask(int dynamicTaskId) throws TaskManagementException {
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
DynamicTask dynamicTask;
|
||||
try {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Fetching the details of dynamic task '" + dynamicTaskId + "'");
|
||||
}
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
dynamicTask = dynamicTaskDAO.getDynamicTaskById(dynamicTaskId);
|
||||
TaskManagementDAOFactory.openConnection();
|
||||
dynamicTask = dynamicTaskDAO.getDynamicTask(dynamicTaskId, tenantId);
|
||||
if (dynamicTask != null) {
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId()));
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(),
|
||||
tenantId));
|
||||
}
|
||||
TaskManagementDAOFactory.commitTransaction();
|
||||
} catch (TaskManagementDAOException e) {
|
||||
TaskManagementDAOFactory.rollbackTransaction();
|
||||
String msg = "Error occurred while fetching dynamic task '" + dynamicTaskId + "'";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
@ -351,21 +402,21 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
|
||||
@Override
|
||||
public List<DynamicTask> getActiveDynamicTasks() throws TaskManagementException {
|
||||
int tenantId = PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId();
|
||||
List<DynamicTask> dynamicTasks;
|
||||
try {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Fetching the details of all active dynamic tasks");
|
||||
}
|
||||
TaskManagementDAOFactory.beginTransaction();
|
||||
dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks();
|
||||
TaskManagementDAOFactory.openConnection();
|
||||
dynamicTasks = dynamicTaskDAO.getActiveDynamicTasks(tenantId);
|
||||
if (dynamicTasks != null) {
|
||||
for (DynamicTask dynamicTask : dynamicTasks) {
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId()));
|
||||
dynamicTask.setProperties(dynamicTaskPropDAO.getDynamicTaskProps(dynamicTask.getDynamicTaskId(),
|
||||
tenantId));
|
||||
}
|
||||
}
|
||||
TaskManagementDAOFactory.commitTransaction();
|
||||
} catch (TaskManagementDAOException e) {
|
||||
TaskManagementDAOFactory.rollbackTransaction();
|
||||
String msg = "Error occurred while fetching all active dynamic tasks";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
@ -380,18 +431,19 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
|
||||
// check whether task exist in the ntask core
|
||||
private boolean isTaskExists(String taskId) throws TaskManagementException, TaskException {
|
||||
if (StringUtils.isEmpty(taskId)) {
|
||||
String msg = "Task ID must not be null or empty.";
|
||||
private boolean isTaskExists(String taskName) throws TaskManagementException, TaskException {
|
||||
if (StringUtils.isEmpty(taskName)) {
|
||||
String msg = "Task Name must not be null or empty.";
|
||||
log.error(msg);
|
||||
throw new TaskManagementException(msg);
|
||||
}
|
||||
List<TaskInfo> tasks = taskManager.getAllTasks();
|
||||
for (TaskInfo t : tasks) {
|
||||
if (taskId.equals(t.getName())) {
|
||||
if (taskName.equals(t.getName())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -17,20 +17,21 @@
|
||||
*/
|
||||
package io.entgra.device.mgt.core.task.mgt.core.util;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.entgra.device.mgt.core.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.bean.DynamicTask;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.device.mgt.core.task.mgt.common.exception.TaskManagementException;
|
||||
import io.entgra.device.mgt.core.task.mgt.core.internal.TaskManagerDataHolder;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||
|
||||
import javax.xml.XMLConstants;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -55,11 +56,11 @@ public class TaskManagementUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static String generateTaskId(int dynamicTaskId) throws TaskManagementException {
|
||||
public static String generateNTaskName(int dynamicTaskId) throws TaskManagementException {
|
||||
try {
|
||||
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
return generateTaskId(dynamicTaskId, serverHashIdx);
|
||||
return generateNTaskName(dynamicTaskId, serverHashIdx);
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId;
|
||||
log.error(msg, e);
|
||||
@ -67,18 +68,33 @@ public class TaskManagementUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static String generateTaskId(int dynamicTaskId, int serverHashIdx) {
|
||||
public static String generateNTaskName(int dynamicTaskId, int serverHashIdx) {
|
||||
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId
|
||||
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
|
||||
}
|
||||
|
||||
public static String generateTaskPropsMD5(Map<String, String> taskProperties) {
|
||||
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP);
|
||||
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
|
||||
taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME);
|
||||
Gson gson = new Gson();
|
||||
String json = gson.toJson(taskProperties);
|
||||
return DigestUtils.md5Hex(json);
|
||||
public static Map<String, String> populateNTaskProperties(DynamicTask dynamicTask,
|
||||
String nTaskName) throws TaskManagementException {
|
||||
try {
|
||||
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
return populateNTaskProperties(dynamicTask, nTaskName, serverHashIdx);
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Failed to populate nTask properties a dynamic task " + dynamicTask.getDynamicTaskId();
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
public static Map<String, String> populateNTaskProperties(DynamicTask dynamicTask,
|
||||
String nTaskName, int serverHashIdx) {
|
||||
Map<String, String> taskProperties = new HashMap<>();
|
||||
taskProperties.put(TaskMgtConstants.Task.DYNAMIC_TASK_ID, String.valueOf(dynamicTask.getDynamicTaskId()));
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, nTaskName);
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
|
||||
taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP,
|
||||
String.valueOf(PrivilegedCarbonContext.getThreadLocalCarbonContext().getTenantId()));
|
||||
return taskProperties;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -38,7 +38,6 @@ import org.wso2.carbon.user.core.service.RealmService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -69,47 +68,38 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
|
||||
private void compareTasks() {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Comparing Tasks from carbon nTask manager and entgra task manager");
|
||||
log.debug("Comparing Tasks from carbon nTask manager and Entgra task manager.");
|
||||
}
|
||||
TaskService nTaskService = TaskWatcherDataHolder.getInstance().getnTaskService();
|
||||
if (nTaskService == null) {
|
||||
String msg = "Unable to load TaskService from the carbon nTask core";
|
||||
String msg = "Unable to load TaskService from the carbon nTask core.";
|
||||
log.error(msg);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService()
|
||||
.getAllDynamicTasks();
|
||||
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = TaskWatcherDataHolder.getInstance()
|
||||
.getTaskManagementService().getDynamicTasksForAllTenants();
|
||||
|
||||
scheduleMissingTasks(nTaskService, dynamicTasks);
|
||||
deleteObsoleteTasks(nTaskService, dynamicTasks);
|
||||
scheduleMissingTasks(nTaskService, tenantedDynamicTasks);
|
||||
deleteObsoleteTasks(nTaskService, tenantedDynamicTasks);
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Task Comparison Completed and all tasks in current node are updated");
|
||||
log.debug("Task Comparison Completed and all tasks in current node are updated.");
|
||||
}
|
||||
} catch (TaskException e) {
|
||||
String msg = "Error occurred while accessing carbon nTask manager.";
|
||||
log.error(msg, e);
|
||||
} catch (TaskManagementException e) {
|
||||
String msg = "Error occurred while retrieving all active tasks from entgra task manager";
|
||||
String msg = "Error occurred while retrieving all active tasks from Entgra task manager.";
|
||||
log.error(msg, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private static void scheduleMissingTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
|
||||
private static void scheduleMissingTasks(TaskService nTaskService, Map<Integer,
|
||||
List<DynamicTask>> tenantedDynamicTasks)
|
||||
throws TaskException, TaskManagementException {
|
||||
Map<Integer, List<DynamicTask>> tenantedDynamicTasks = new HashMap<>();
|
||||
List<DynamicTask> dts;
|
||||
for (DynamicTask dt : dynamicTasks) {
|
||||
if (tenantedDynamicTasks.containsKey(dt.getTenantId())) {
|
||||
dts = tenantedDynamicTasks.get(dt.getTenantId());
|
||||
} else {
|
||||
dts = new ArrayList<>();
|
||||
}
|
||||
dts.add(dt);
|
||||
tenantedDynamicTasks.put(dt.getTenantId(), dts);
|
||||
}
|
||||
|
||||
TaskManager taskManager;
|
||||
for (Integer tenantId : tenantedDynamicTasks.keySet()) {
|
||||
if (tenantId == -1) {
|
||||
@ -126,36 +116,56 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
List<TaskInfo> tasks = taskManager.getAllTasks();
|
||||
// add or update task into nTask core
|
||||
for (DynamicTask dt : tenantedDynamicTasks.get(tenantId)) {
|
||||
String generatedTaskId = TaskManagementUtil.generateTaskId(dt.getDynamicTaskId());
|
||||
int serverHashIdx;
|
||||
try {
|
||||
serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Failed to get server hash index for dynamic task " + dt.getDynamicTaskId();
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
}
|
||||
|
||||
String nTaskName = TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), serverHashIdx);
|
||||
boolean isExist = false;
|
||||
for (TaskInfo taskInfo : tasks) {
|
||||
if (taskInfo.getName().equals(generatedTaskId)) {
|
||||
isExist = true;
|
||||
if (taskInfo.getName().equals(nTaskName)) {
|
||||
|
||||
TaskInfo.TriggerInfo triggerInfo = taskInfo.getTriggerInfo();
|
||||
String dynamicTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(dt.getProperties());
|
||||
String existingTaskPropMD5 = TaskManagementUtil.generateTaskPropsMD5(taskInfo.getProperties());
|
||||
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())
|
||||
|| !dynamicTaskPropMD5.equals(existingTaskPropMD5)) {
|
||||
if (taskInfo.getProperties() == null) {
|
||||
String msg = "Task properties not found for task " + nTaskName
|
||||
+ ". Therefore deleting the nTask schedule.";
|
||||
log.warn(msg);
|
||||
taskManager.deleteTask(nTaskName);
|
||||
break;
|
||||
}
|
||||
|
||||
isExist = true;
|
||||
if (!triggerInfo.getCronExpression().equals(dt.getCronExpression())) {
|
||||
triggerInfo.setCronExpression(dt.getCronExpression());
|
||||
taskInfo.setTriggerInfo(triggerInfo);
|
||||
taskInfo.setProperties(populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()));
|
||||
taskInfo.setProperties(TaskManagementUtil
|
||||
.populateNTaskProperties(dt, taskInfo.getName(), serverHashIdx));
|
||||
taskManager.registerTask(taskInfo);
|
||||
taskManager.rescheduleTask(generatedTaskId);
|
||||
taskManager.rescheduleTask(nTaskName);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Task - '" + generatedTaskId + "' updated according to the dynamic task table");
|
||||
log.debug("Task - '" + nTaskName
|
||||
+ "' updated according to the dynamic task table.");
|
||||
}
|
||||
}
|
||||
if (dt.isEnabled()
|
||||
&& taskManager.getTaskState(generatedTaskId) == TaskManager.TaskState.PAUSED) {
|
||||
taskManager.resumeTask(generatedTaskId);
|
||||
&& taskManager.getTaskState(nTaskName) == TaskManager.TaskState.PAUSED) {
|
||||
taskManager.resumeTask(nTaskName);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Task - '" + generatedTaskId + "' enabled according to the dynamic task table");
|
||||
log.debug("Task - '" + nTaskName
|
||||
+ "' enabled according to the dynamic task table.");
|
||||
}
|
||||
} else if (!dt.isEnabled()
|
||||
&& taskManager.getTaskState(generatedTaskId) != TaskManager.TaskState.PAUSED) {
|
||||
taskManager.pauseTask(generatedTaskId);
|
||||
&& taskManager.getTaskState(nTaskName) != TaskManager.TaskState.PAUSED) {
|
||||
taskManager.pauseTask(nTaskName);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Task - '" + generatedTaskId + "' disabled according to the dynamic task table");
|
||||
log.debug("Task - '" + nTaskName
|
||||
+ "' disabled according to the dynamic task table.");
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -164,12 +174,12 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
if (!isExist) {
|
||||
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||
triggerInfo.setCronExpression(dt.getCronExpression());
|
||||
TaskInfo taskInfo = new TaskInfo(generatedTaskId, dt.getTaskClassName(),
|
||||
populateTaskProperties(tenantId, generatedTaskId, dt.getProperties()), triggerInfo);
|
||||
TaskInfo taskInfo = new TaskInfo(nTaskName, dt.getTaskClassName(), TaskManagementUtil
|
||||
.populateNTaskProperties(dt, nTaskName, serverHashIdx), triggerInfo);
|
||||
taskManager.registerTask(taskInfo);
|
||||
taskManager.scheduleTask(generatedTaskId);
|
||||
taskManager.scheduleTask(nTaskName);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("New task -'" + generatedTaskId + "' created according to the dynamic task table");
|
||||
log.debug("New task -'" + nTaskName + "' created according to the dynamic task table.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -177,24 +187,8 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
}
|
||||
}
|
||||
|
||||
private static Map<String, String> populateTaskProperties(int tenantId, String generatedTaskId,
|
||||
Map<String, String> taskProperties)
|
||||
throws TaskManagementException {
|
||||
try {
|
||||
int serverHashIdx = TaskWatcherDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, generatedTaskId);
|
||||
taskProperties.put(TaskMgtConstants.Task.TENANT_ID_PROP, String.valueOf(tenantId));
|
||||
return taskProperties;
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Unexpected exception when getting server hash index.";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void deleteObsoleteTasks(TaskService nTaskService, List<DynamicTask> dynamicTasks)
|
||||
private static void deleteObsoleteTasks(TaskService nTaskService,
|
||||
Map<Integer, List<DynamicTask>> tenantedDynamicTasks)
|
||||
throws TaskManagementException, TaskException {
|
||||
|
||||
List<Tenant> tenants = new ArrayList<>();
|
||||
@ -224,6 +218,13 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
}
|
||||
|
||||
for (Tenant tenant : tenants) {
|
||||
if (tenantedDynamicTasks.get(tenant.getId()) == null) {
|
||||
if (log.isTraceEnabled()) {
|
||||
log.trace("Dynamic tasks not running for tenant: [" + tenant.getId() + "] "
|
||||
+ tenant.getDomain());
|
||||
}
|
||||
continue;
|
||||
}
|
||||
PrivilegedCarbonContext.startTenantFlow();
|
||||
PrivilegedCarbonContext.getThreadLocalCarbonContext().setTenantId(tenant.getId(), true);
|
||||
if (!nTaskService.getRegisteredTaskTypes().contains(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
|
||||
@ -234,10 +235,10 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
// Remove deleted items from the nTask core
|
||||
for (TaskInfo taskInfo : tasks) {
|
||||
boolean isExist = false;
|
||||
for (DynamicTask dt : dynamicTasks) {
|
||||
for (DynamicTask dt : tenantedDynamicTasks.get(tenant.getId())) {
|
||||
for (int hid : hashIds) {
|
||||
if (tenant.getId() == dt.getTenantId() &&
|
||||
taskInfo.getName().equals(TaskManagementUtil.generateTaskId(dt.getDynamicTaskId(), hid))) {
|
||||
taskInfo.getName().equals(TaskManagementUtil.generateNTaskName(dt.getDynamicTaskId(), hid))) {
|
||||
isExist = true;
|
||||
break;
|
||||
}
|
||||
|
||||
@ -782,9 +782,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL,
|
||||
NAME VARCHAR(300) DEFAULT NULL ,
|
||||
CRON VARCHAR(8000) DEFAULT NULL,
|
||||
CRON VARCHAR(100) DEFAULT NULL,
|
||||
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL,
|
||||
TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID)
|
||||
);
|
||||
@ -792,8 +792,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
|
||||
DYNAMIC_TASK_ID INTEGER NOT NULL,
|
||||
PROPERTY_NAME VARCHAR(100) DEFAULT 0,
|
||||
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL,
|
||||
TENANT_ID VARCHAR(100),
|
||||
PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
|
||||
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
|
||||
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
|
||||
@ -852,10 +852,10 @@ CREATE TABLE DM_GEOFENCE_EVENT_MAPPING (
|
||||
IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[DYNAMIC_TASK]') AND TYPE IN (N'U'))
|
||||
CREATE TABLE DYNAMIC_TASK (
|
||||
DYNAMIC_TASK_ID INTEGER IDENTITY(1,1) NOT NULL,
|
||||
NAME VARCHAR(255) DEFAULT NULL ,
|
||||
CRON VARCHAR(8000) DEFAULT NULL,
|
||||
NAME VARCHAR(300) DEFAULT NULL ,
|
||||
CRON VARCHAR(100) DEFAULT NULL,
|
||||
IS_ENABLED BIT NOT NULL DEFAULT 0,
|
||||
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL,
|
||||
TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID)
|
||||
);
|
||||
@ -864,8 +864,8 @@ IF NOT EXISTS (SELECT * FROM SYS.OBJECTS WHERE OBJECT_ID = OBJECT_ID(N'[DBO].[D
|
||||
CREATE TABLE DYNAMIC_TASK_PROPERTIES (
|
||||
DYNAMIC_TASK_ID INTEGER NOT NULL,
|
||||
PROPERTY_NAME VARCHAR(100) DEFAULT 0,
|
||||
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL,
|
||||
TENANT_ID VARCHAR(100),
|
||||
PROPERTY_VALUE VARCHAR(8000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
|
||||
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
|
||||
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
|
||||
@ -853,9 +853,9 @@ CREATE TABLE IF NOT EXISTS DM_EXT_PERMISSION_MAPPING (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
DYNAMIC_TASK_ID INTEGER AUTO_INCREMENT NOT NULL,
|
||||
NAME VARCHAR(300) DEFAULT NULL ,
|
||||
CRON VARCHAR(8000) DEFAULT NULL,
|
||||
CRON VARCHAR(100) DEFAULT NULL,
|
||||
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL,
|
||||
TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID)
|
||||
) ENGINE=InnoDB;
|
||||
@ -863,8 +863,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
|
||||
DYNAMIC_TASK_ID INTEGER NOT NULL,
|
||||
PROPERTY_NAME VARCHAR(100) DEFAULT 0,
|
||||
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL,
|
||||
TENANT_ID VARCHAR(100),
|
||||
PROPERTY_VALUE TEXT DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
|
||||
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
|
||||
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
|
||||
@ -1127,9 +1127,9 @@ CREATE TABLE DM_GEOFENCE (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
DYNAMIC_TASK_ID NUMBER(10) NOT NULL,
|
||||
NAME VARCHAR2(300) DEFAULT NULL ,
|
||||
CRON VARCHAR2(8000) DEFAULT NULL,
|
||||
CRON VARCHAR2(100) DEFAULT NULL,
|
||||
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
TASK_CLASS_NAME VARCHAR2(8000) DEFAULT NULL,
|
||||
TASK_CLASS_NAME VARCHAR2(1000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
CONSTRAINT PK_DYNAMIC_TASK PRIMARY KEY (DYNAMIC_TASK_ID)
|
||||
) ENGINE=InnoDB;
|
||||
@ -1137,8 +1137,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
|
||||
DYNAMIC_TASK_ID INTEGER NOT NULL,
|
||||
PROPERTY_NAME VARCHAR2(100) DEFAULT 0,
|
||||
PROPERTY_VALUE VARCHAR2(100) DEFAULT NULL,
|
||||
TENANT_ID VARCHAR2(100),
|
||||
PROPERTY_VALUE VARCHAR2(8000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
CONSTRAINT PK_DYNAMIC_TASK_PROPERTIES PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
|
||||
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
|
||||
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
|
||||
@ -772,9 +772,9 @@ CREATE TABLE IF NOT EXISTS DM_GEOFENCE (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
DYNAMIC_TASK_ID INTEGER DEFAULT NEXTVAL ('DYNAMIC_TASK_seq') NOT NULL,
|
||||
NAME VARCHAR(300) DEFAULT NULL ,
|
||||
CRON VARCHAR(8000) DEFAULT NULL,
|
||||
CRON VARCHAR(100) DEFAULT NULL,
|
||||
IS_ENABLED BOOLEAN NOT NULL DEFAULT FALSE,
|
||||
TASK_CLASS_NAME VARCHAR(8000) DEFAULT NULL,
|
||||
TASK_CLASS_NAME VARCHAR(1000) DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID)
|
||||
) ENGINE=InnoDB;
|
||||
@ -782,8 +782,8 @@ CREATE TABLE IF NOT EXISTS DYNAMIC_TASK (
|
||||
CREATE TABLE IF NOT EXISTS DYNAMIC_TASK_PROPERTIES (
|
||||
DYNAMIC_TASK_ID INTEGER NOT NULL,
|
||||
PROPERTY_NAME VARCHAR(100) DEFAULT 0,
|
||||
PROPERTY_VALUE VARCHAR(100) DEFAULT NULL,
|
||||
TENANT_ID VARCHAR(100),
|
||||
PROPERTY_VALUE TEXT DEFAULT NULL,
|
||||
TENANT_ID INTEGER DEFAULT 0,
|
||||
PRIMARY KEY (DYNAMIC_TASK_ID, PROPERTY_NAME, TENANT_ID),
|
||||
CONSTRAINT FK_DYNAMIC_TASK_TASK_PROPERTIES FOREIGN KEY (DYNAMIC_TASK_ID) REFERENCES
|
||||
DYNAMIC_TASK (DYNAMIC_TASK_ID) ON DELETE CASCADE ON UPDATE CASCADE
|
||||
|
||||
Loading…
Reference in New Issue
Block a user