mirror of
https://repository.entgra.net/community/device-mgt-core.git
synced 2025-10-06 02:01:45 +00:00
Enhance dynamic task execution functionality
This commit is contained in:
commit
dcbcfe23fe
@ -103,6 +103,7 @@
|
||||
org.wso2.carbon.ndatasource.core,
|
||||
org.wso2.carbon.ntask.core.*,
|
||||
org.wso2.carbon.ntask.common,
|
||||
io.entgra.task.mgt.common.*,
|
||||
org.apache.commons.collections;version="${commons-collections.version.range}",
|
||||
org.wso2.carbon.email.sender.*,
|
||||
io.swagger.annotations.*;resolution:=optional,
|
||||
@ -347,6 +348,10 @@
|
||||
<artifactId>okhttp</artifactId>
|
||||
<scope>compile</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.wso2.carbon.devicemgt</groupId>
|
||||
<artifactId>io.entgra.task.mgt.common</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
|
||||
@ -32,31 +32,10 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
private static final Log log = LogFactory.getLog(OperationTimeoutTask.class);
|
||||
private OperationTimeout operationTimeoutConfig;
|
||||
|
||||
@Override
|
||||
public void setProperties(Map<String, String> properties) {
|
||||
super.setProperties(properties);
|
||||
String operationTimeoutTaskConfigStr = properties
|
||||
.get(OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getProperty(String name) {
|
||||
return super.getProperty(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshContext() {
|
||||
super.refreshContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup() {
|
||||
@ -65,12 +44,15 @@ public class OperationTimeoutTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
@Override
|
||||
protected void executeDynamicTask() {
|
||||
String operationTimeoutTaskConfigStr = getProperty(
|
||||
OperationTimeoutTaskManagerServiceImpl.OPERATION_TIMEOUT_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
OperationTimeout operationTimeoutConfig = gson.fromJson(operationTimeoutTaskConfigStr, OperationTimeout.class);
|
||||
try {
|
||||
|
||||
long timeMillis = System.currentTimeMillis() - operationTimeoutConfig.getTimeout() * 60 * 1000;
|
||||
List<String> deviceTypes = new ArrayList<>();
|
||||
if (operationTimeoutConfig.getDeviceTypes().size() == 1 &&
|
||||
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get( 0))) {
|
||||
"ALL".equals(operationTimeoutConfig.getDeviceTypes().get(0))) {
|
||||
try {
|
||||
List<DeviceType> deviceTypeList = DeviceManagementDataHolder.getInstance()
|
||||
.getDeviceManagementProvider().getDeviceTypes();
|
||||
|
||||
@ -37,7 +37,6 @@ import org.wso2.carbon.device.mgt.core.task.impl.DynamicPartitionedScheduleTask;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* This implements the Task service which monitors the device activity periodically & update the device-status if
|
||||
@ -47,19 +46,8 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
private static final Log log = LogFactory.getLog(DeviceStatusMonitoringTask.class);
|
||||
private String deviceType;
|
||||
private DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig;
|
||||
private int deviceTypeId = -1;
|
||||
|
||||
@Override
|
||||
public void setProperties(Map<String, String> properties) {
|
||||
super.setProperties(properties);
|
||||
deviceType = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE);
|
||||
deviceTypeId = Integer.parseInt(properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID));
|
||||
String deviceStatusTaskConfigStr = properties.get(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setup() {
|
||||
}
|
||||
@ -92,6 +80,11 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
@Override
|
||||
public void executeDynamicTask() {
|
||||
deviceType = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE);
|
||||
deviceTypeId = Integer.parseInt(getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_TYPE_ID));
|
||||
String deviceStatusTaskConfigStr = getProperty(DeviceStatusTaskManagerServiceImpl.DEVICE_STATUS_TASK_CONFIG);
|
||||
Gson gson = new Gson();
|
||||
DeviceStatusTaskPluginConfig deviceStatusTaskPluginConfig = gson.fromJson(deviceStatusTaskConfigStr, DeviceStatusTaskPluginConfig.class);
|
||||
try {
|
||||
List<EnrolmentInfo> enrolmentInfoTobeUpdated = new ArrayList<>();
|
||||
List<DeviceMonitoringData> allDevicesForMonitoring = getAllDevicesForMonitoring();
|
||||
@ -102,10 +95,10 @@ public class DeviceStatusMonitoringTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
EnrolmentInfo enrolmentInfo = monitoringData.getDevice().getEnrolmentInfo();
|
||||
EnrolmentInfo.Status status = null;
|
||||
if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig
|
||||
if (lastUpdatedTime >= deviceStatusTaskPluginConfig
|
||||
.getIdleTimeToMarkInactive()) {
|
||||
status = EnrolmentInfo.Status.INACTIVE;
|
||||
} else if (lastUpdatedTime >= this.deviceStatusTaskPluginConfig
|
||||
} else if (lastUpdatedTime >= deviceStatusTaskPluginConfig
|
||||
.getIdleTimeToMarkUnreachable()) {
|
||||
status = EnrolmentInfo.Status.UNREACHABLE;
|
||||
}
|
||||
|
||||
@ -48,7 +48,6 @@ import org.wso2.carbon.device.mgt.core.task.DeviceMgtTaskException;
|
||||
import org.wso2.carbon.device.mgt.core.task.DeviceTaskManager;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
|
||||
|
||||
@ -56,14 +55,9 @@ public class DeviceDetailsRetrieverTask extends DynamicPartitionedScheduleTask {
|
||||
private String deviceType;
|
||||
private DeviceManagementProviderService deviceManagementProviderService;
|
||||
|
||||
@Override
|
||||
public void setProperties(Map<String, String> map) {
|
||||
super.setProperties(map);
|
||||
deviceType = map.get("DEVICE_TYPE");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void executeDynamicTask() {
|
||||
deviceType = getProperty("DEVICE_TYPE");
|
||||
deviceManagementProviderService = DeviceManagementDataHolder.getInstance()
|
||||
.getDeviceManagementProvider();
|
||||
OperationMonitoringTaskConfig operationMonitoringTaskConfig = deviceManagementProviderService
|
||||
|
||||
@ -19,10 +19,11 @@
|
||||
package org.wso2.carbon.device.mgt.core.task.impl;
|
||||
|
||||
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
|
||||
import org.wso2.carbon.device.mgt.common.DynamicTaskContext;
|
||||
import org.wso2.carbon.device.mgt.common.ServerCtxInfo;
|
||||
import org.wso2.carbon.device.mgt.core.internal.DeviceManagementDataHolder;
|
||||
import org.wso2.carbon.ntask.core.Task;
|
||||
|
||||
@ -37,11 +38,11 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
private Map<String, String> properties;
|
||||
|
||||
@Override
|
||||
public void setProperties(Map<String, String> properties) {
|
||||
public final void setProperties(Map<String, String> properties) {
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
public String getProperty(String name) {
|
||||
public final String getProperty(String name) {
|
||||
if (properties == null) {
|
||||
return null;
|
||||
}
|
||||
@ -62,7 +63,7 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
}
|
||||
}
|
||||
} catch (HeartBeatManagementException e) {
|
||||
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function." , e);
|
||||
log.error("Error Instantiating Variables necessary for Dynamic Task Scheduling. Dynamic Tasks will not function.", e);
|
||||
}
|
||||
setup();
|
||||
}
|
||||
@ -70,11 +71,40 @@ public abstract class DynamicPartitionedScheduleTask implements Task {
|
||||
@Override
|
||||
public final void execute() {
|
||||
refreshContext();
|
||||
if (taskContext != null && taskContext.isPartitioningEnabled()) {
|
||||
String localHashIndex = getProperty(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
|
||||
// These tasks are not dynamically scheduled. They are added via a config so scheduled in each node
|
||||
// during the server startup
|
||||
if (localHashIndex == null ) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing startup scheduled task (" + getTaskName() + ")");
|
||||
}
|
||||
executeDynamicTask();
|
||||
return;
|
||||
}
|
||||
if (localHashIndex.equals(String.valueOf(taskContext.getServerHashIndex()))) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing dynamically scheduled task (" + getTaskName() +
|
||||
") for current server hash index: " + localHashIndex);
|
||||
}
|
||||
executeDynamicTask();
|
||||
} else {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Ignoring execution of task (" + getTaskName() +
|
||||
") not belonging to current serer hash index: " + localHashIndex);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
executeDynamicTask();
|
||||
}
|
||||
}
|
||||
|
||||
public void refreshContext(){
|
||||
if(taskContext != null && taskContext.isPartitioningEnabled()) {
|
||||
public String getTaskName() {
|
||||
return getProperty(TaskMgtConstants.Task.LOCAL_TASK_NAME);
|
||||
}
|
||||
|
||||
public void refreshContext() {
|
||||
if (taskContext != null && taskContext.isPartitioningEnabled()) {
|
||||
try {
|
||||
updateContext();
|
||||
} catch (HeartBeatManagementException e) {
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package io.entgra.task.mgt.common.constant;
|
||||
|
||||
public class TaskMgtConstant {
|
||||
public class TaskMgtConstants {
|
||||
public static final class DataSourceProperties {
|
||||
private DataSourceProperties() {
|
||||
throw new AssertionError();
|
||||
@ -46,7 +46,8 @@ public class TaskMgtConstant {
|
||||
public static final String NAME_SEPARATOR = "_";
|
||||
public static final String PROPERTY_KEY_COLUMN_NAME = "PROPERTY_NAME";
|
||||
public static final String PROPERTY_VALUE_COLUMN_NAME = "PROPERTY_VALUE";
|
||||
public static final String __TENANT_ID_PROP__ = "__TENANT_ID_PROP__";
|
||||
|
||||
public static final String TENANT_ID_PROP = "__TENANT_ID_PROP__";
|
||||
public static final String LOCAL_HASH_INDEX = "__LOCAL_HASH_INDEX__";
|
||||
public static final String LOCAL_TASK_NAME = "__LOCAL_TASK_NAME__";
|
||||
}
|
||||
}
|
||||
@ -63,10 +63,12 @@
|
||||
javax.naming,
|
||||
io.entgra.task.mgt.common.*,
|
||||
org.wso2.carbon.utils.*,
|
||||
org.wso2.carbon.ntask.*,
|
||||
org.wso2.carbon.device.mgt.core.*,
|
||||
org.wso2.carbon.ntask.core.*,
|
||||
org.wso2.carbon.ntask.common,
|
||||
org.wso2.carbon.device.mgt.common.*,
|
||||
org.wso2.carbon.context,
|
||||
org.apache.commons.codec.binary;version="${commons-codec.wso2.osgi.version.range}",
|
||||
org.apache.commons.codec.digest;version="${commons-codec.wso2.osgi.version.range}",
|
||||
io.entgra.server.bootup.heartbeat.beacon.dto,
|
||||
io.entgra.server.bootup.heartbeat.beacon.exception,
|
||||
io.entgra.server.bootup.heartbeat.beacon.service,
|
||||
@ -113,8 +115,8 @@
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.wso2.carbon.devicemgt</groupId>
|
||||
<artifactId>org.wso2.carbon.device.mgt.core</artifactId>
|
||||
<groupId>commons-codec.wso2</groupId>
|
||||
<artifactId>commons-codec</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
@ -127,6 +129,13 @@
|
||||
<artifactId>io.entgra.server.bootup.heartbeat.beacon</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
|
||||
<!--nTask dependencies-->
|
||||
<dependency>
|
||||
<groupId>org.wso2.carbon.commons</groupId>
|
||||
<artifactId>org.wso2.carbon.ntask.core</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package io.entgra.task.mgt.core.config;
|
||||
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementException;
|
||||
import io.entgra.task.mgt.core.util.TaskManagementUtil;
|
||||
import org.w3c.dom.Document;
|
||||
@ -38,7 +38,7 @@ public class TaskConfigurationManager {
|
||||
|
||||
private static final String TASK_MGT_CONFIG_PATH =
|
||||
CarbonUtils.getCarbonConfigDirPath() + File.separator +
|
||||
TaskMgtConstant.DataSourceProperties.TASK_CONFIG_XML_NAME;
|
||||
TaskMgtConstants.DataSourceProperties.TASK_CONFIG_XML_NAME;
|
||||
|
||||
public static TaskConfigurationManager getInstance() {
|
||||
if (taskConfigurationManager == null) {
|
||||
|
||||
@ -17,7 +17,7 @@
|
||||
*/
|
||||
package io.entgra.task.mgt.core.dao.common;
|
||||
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.IllegalTransactionStateException;
|
||||
import io.entgra.task.mgt.common.exception.TransactionManagementException;
|
||||
import io.entgra.task.mgt.common.exception.UnsupportedDatabaseEngineException;
|
||||
@ -25,9 +25,9 @@ import io.entgra.task.mgt.core.config.datasource.DataSourceConfig;
|
||||
import io.entgra.task.mgt.core.config.datasource.JNDILookupDefinition;
|
||||
import io.entgra.task.mgt.core.dao.DynamicTaskDAO;
|
||||
import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO;
|
||||
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import io.entgra.task.mgt.core.dao.impl.DynamicTaskDAOImpl;
|
||||
import io.entgra.task.mgt.core.dao.impl.DynamicTaskPropDAOImpl;
|
||||
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
@ -48,8 +48,8 @@ public class TaskManagementDAOFactory {
|
||||
public static DynamicTaskDAO getDynamicTaskDAO() {
|
||||
if (databaseEngine != null) {
|
||||
switch (databaseEngine) {
|
||||
case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2:
|
||||
case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL:
|
||||
case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2:
|
||||
case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL:
|
||||
return new DynamicTaskDAOImpl();
|
||||
default:
|
||||
throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine);
|
||||
@ -61,8 +61,8 @@ public class TaskManagementDAOFactory {
|
||||
public static DynamicTaskPropDAO getDynamicTaskPropDAO() {
|
||||
if (databaseEngine != null) {
|
||||
switch (databaseEngine) {
|
||||
case TaskMgtConstant.DataBaseTypes.DB_TYPE_H2:
|
||||
case TaskMgtConstant.DataBaseTypes.DB_TYPE_MYSQL:
|
||||
case TaskMgtConstants.DataBaseTypes.DB_TYPE_H2:
|
||||
case TaskMgtConstants.DataBaseTypes.DB_TYPE_MYSQL:
|
||||
return new DynamicTaskPropDAOImpl();
|
||||
default:
|
||||
throw new UnsupportedDatabaseEngineException("Unsupported database engine : " + databaseEngine);
|
||||
|
||||
@ -17,15 +17,14 @@
|
||||
*/
|
||||
package io.entgra.task.mgt.core.dao.impl;
|
||||
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementDAOException;
|
||||
import io.entgra.task.mgt.core.dao.DynamicTaskPropDAO;
|
||||
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import io.entgra.task.mgt.core.dao.common.TaskManagementDAOFactory;
|
||||
import io.entgra.task.mgt.core.dao.util.TaskManagementDAOUtil;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.context.PrivilegedCarbonContext;
|
||||
import org.wso2.carbon.device.mgt.core.dao.util.DeviceManagementDAOUtil;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
@ -79,8 +78,8 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
resultSet = stmt.executeQuery();
|
||||
properties = new HashMap<>();
|
||||
while (resultSet.next()) {
|
||||
properties.put(resultSet.getString(TaskMgtConstant.Task.PROPERTY_KEY_COLUMN_NAME)
|
||||
, resultSet.getString(TaskMgtConstant.Task.PROPERTY_VALUE_COLUMN_NAME));
|
||||
properties.put(resultSet.getString(TaskMgtConstants.Task.PROPERTY_KEY_COLUMN_NAME)
|
||||
, resultSet.getString(TaskMgtConstants.Task.PROPERTY_VALUE_COLUMN_NAME));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while fetching task properties of : '" + dynamicTaskId + "'";
|
||||
@ -119,7 +118,7 @@ public class DynamicTaskPropDAOImpl implements DynamicTaskPropDAO {
|
||||
throw new TaskManagementDAOException
|
||||
("Error occurred while updating device properties to database.", e);
|
||||
} finally {
|
||||
DeviceManagementDAOUtil.cleanupResources(stmt, null);
|
||||
TaskManagementDAOUtil.cleanupResources(stmt, null);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,7 +20,6 @@ package io.entgra.task.mgt.core.dao.util;
|
||||
import io.entgra.task.mgt.common.bean.DynamicTask;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.device.mgt.common.Device;
|
||||
|
||||
import javax.naming.InitialContext;
|
||||
import javax.sql.DataSource;
|
||||
|
||||
@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2023, Entgra Pvt Ltd. (http://www.wso2.org) All Rights Reserved.
|
||||
* Copyright (c) 2023, Entgra Pvt Ltd. (https://entgra.io) All Rights Reserved.
|
||||
*
|
||||
* Entgra Pvt Ltd. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
@ -17,11 +17,12 @@
|
||||
*/
|
||||
package io.entgra.task.mgt.core.service;
|
||||
|
||||
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import io.entgra.task.mgt.common.bean.DynamicTask;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.exception.TaskNotFoundException;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementDAOException;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementException;
|
||||
import io.entgra.task.mgt.common.exception.TaskNotFoundException;
|
||||
import io.entgra.task.mgt.common.exception.TransactionManagementException;
|
||||
import io.entgra.task.mgt.common.spi.TaskManagementService;
|
||||
import io.entgra.task.mgt.core.dao.DynamicTaskDAO;
|
||||
@ -62,13 +63,13 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
throw new TaskManagementException(msg);
|
||||
}
|
||||
if (!nTaskService.getRegisteredTaskTypes().contains(
|
||||
TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) {
|
||||
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
|
||||
try {
|
||||
nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
|
||||
this.taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
|
||||
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
|
||||
this.taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
|
||||
} catch (TaskException e) {
|
||||
String msg = "Error occurred while registering task type ["
|
||||
+ TaskMgtConstant.Task.DYNAMIC_TASK_TYPE
|
||||
+ TaskMgtConstants.Task.DYNAMIC_TASK_TYPE
|
||||
+ "], hence unable to schedule the task.";
|
||||
log.error(msg);
|
||||
throw new TaskManagementException(msg, e);
|
||||
@ -90,6 +91,17 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
// add into the ntask core
|
||||
taskId = TaskManagementUtil.generateTaskId(dynamicTaskId);
|
||||
|
||||
try {
|
||||
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_HASH_INDEX, String.valueOf(serverHashIdx));
|
||||
taskProperties.put(TaskMgtConstants.Task.LOCAL_TASK_NAME, taskId);
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Unexpected exception when getting server hash index.";
|
||||
log.error(msg, e);
|
||||
throw new TaskManagementException(msg, e);
|
||||
}
|
||||
|
||||
if (!isTaskExists(taskId)) {
|
||||
TaskInfo.TriggerInfo triggerInfo = new TaskInfo.TriggerInfo();
|
||||
triggerInfo.setCronExpression(dynamicTask.getCronExpression());
|
||||
@ -101,7 +113,7 @@ public class TaskManagementServiceImpl implements TaskManagementService {
|
||||
}
|
||||
} else {
|
||||
String msg = "Task '" + taskId + "' is already exists in the ntask core "
|
||||
+ "Hence cannot create another task for the same.";
|
||||
+ "Hence not creating another task for the same name.";
|
||||
log.error(msg);
|
||||
}
|
||||
|
||||
|
||||
@ -19,7 +19,7 @@ package io.entgra.task.mgt.core.util;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import io.entgra.server.bootup.heartbeat.beacon.exception.HeartBeatManagementException;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementException;
|
||||
import io.entgra.task.mgt.core.internal.TaskManagerDataHolder;
|
||||
import org.apache.commons.codec.digest.DigestUtils;
|
||||
@ -31,7 +31,6 @@ import javax.xml.XMLConstants;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
@ -60,8 +59,8 @@ public class TaskManagementUtil {
|
||||
try {
|
||||
int serverHashIdx = TaskManagerDataHolder.getInstance().getHeartBeatService()
|
||||
.getServerCtxInfo().getLocalServerHashIdx();
|
||||
return TaskMgtConstant.Task.DYNAMIC_TASK_TYPE + TaskMgtConstant.Task.NAME_SEPARATOR + dynamicTaskId
|
||||
+ TaskMgtConstant.Task.NAME_SEPARATOR + serverHashIdx;
|
||||
return TaskMgtConstants.Task.DYNAMIC_TASK_TYPE + TaskMgtConstants.Task.NAME_SEPARATOR + dynamicTaskId
|
||||
+ TaskMgtConstants.Task.NAME_SEPARATOR + serverHashIdx;
|
||||
} catch (HeartBeatManagementException e) {
|
||||
String msg = "Failed to generate task id for a dynamic task " + dynamicTaskId;
|
||||
log.error(msg, e);
|
||||
@ -70,11 +69,12 @@ public class TaskManagementUtil {
|
||||
}
|
||||
|
||||
public static String generateTaskPropsMD5(Map<String, String> taskProperties) throws TaskManagementException {
|
||||
if (taskProperties.containsKey(TaskMgtConstant.Task.__TENANT_ID_PROP__)) {
|
||||
taskProperties.remove(TaskMgtConstant.Task.__TENANT_ID_PROP__);
|
||||
}
|
||||
taskProperties.remove(TaskMgtConstants.Task.TENANT_ID_PROP);
|
||||
taskProperties.remove(TaskMgtConstants.Task.LOCAL_HASH_INDEX);
|
||||
taskProperties.remove(TaskMgtConstants.Task.LOCAL_TASK_NAME);
|
||||
Gson gson = new Gson();
|
||||
String json = gson.toJson(taskProperties);
|
||||
return DigestUtils.md5Hex(json);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -20,7 +20,7 @@ package io.entgra.task.mgt.watcher;
|
||||
|
||||
|
||||
import io.entgra.task.mgt.common.bean.DynamicTask;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstant;
|
||||
import io.entgra.task.mgt.common.constant.TaskMgtConstants;
|
||||
import io.entgra.task.mgt.common.exception.TaskManagementException;
|
||||
import io.entgra.task.mgt.core.util.TaskManagementUtil;
|
||||
import io.entgra.task.mgt.watcher.internal.TaskWatcherDataHolder;
|
||||
@ -64,10 +64,10 @@ public class IoTSStartupHandler implements ServerStartupObserver {
|
||||
}
|
||||
try {
|
||||
if (!nTaskService.getRegisteredTaskTypes().contains(
|
||||
TaskMgtConstant.Task.DYNAMIC_TASK_TYPE)) {
|
||||
nTaskService.registerTaskType(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
|
||||
TaskMgtConstants.Task.DYNAMIC_TASK_TYPE)) {
|
||||
nTaskService.registerTaskType(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
|
||||
}
|
||||
taskManager = nTaskService.getTaskManager(TaskMgtConstant.Task.DYNAMIC_TASK_TYPE);
|
||||
taskManager = nTaskService.getTaskManager(TaskMgtConstants.Task.DYNAMIC_TASK_TYPE);
|
||||
|
||||
List<DynamicTask> dynamicTasks = TaskWatcherDataHolder.getInstance().getTaskManagementService()
|
||||
.getAllDynamicTasks();
|
||||
|
||||
Loading…
Reference in New Issue
Block a user