mirror of
https://repository.entgra.net/community/device-mgt-core.git
synced 2025-10-06 02:01:45 +00:00
Archival/Purging Task Improvement
This commit is contained in:
parent
0aec365dce
commit
4c067f0f28
@ -22,7 +22,9 @@ package org.wso2.carbon.device.mgt.core.archival;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.device.mgt.common.TransactionManagementException;
|
||||
import org.wso2.carbon.device.mgt.core.archival.beans.*;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.*;
|
||||
import org.wso2.carbon.device.mgt.core.config.DeviceConfigurationManager;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
@ -36,7 +38,9 @@ public class ArchivalServiceImpl implements ArchivalService {
|
||||
private ArchivalDAO archivalDAO;
|
||||
private DataDeletionDAO dataDeletionDAO;
|
||||
|
||||
private static int ITERATION_COUNT = 10000;
|
||||
private static int ITERATION_COUNT =
|
||||
DeviceConfigurationManager.getInstance().getDeviceManagementConfig().getArchivalConfiguration()
|
||||
.getArchivalTaskConfiguration().getBatchSize();
|
||||
|
||||
private String[] NOT_IN_PROGRESS_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED"};
|
||||
private String[] NOT_PENDING_OPS = new String[]{"COMPLETED", "ERROR", "REPEATED", "IN_PROGRESS"};
|
||||
@ -49,96 +53,173 @@ public class ArchivalServiceImpl implements ArchivalService {
|
||||
|
||||
@Override
|
||||
public void archiveTransactionalRecords() throws ArchivalException {
|
||||
List<Integer> allOperations;
|
||||
List<Integer> pendingAndIPOperations;
|
||||
try {
|
||||
ArchivalSourceDAOFactory.openConnection();
|
||||
ArchivalDestinationDAOFactory.openConnection();
|
||||
|
||||
List<Integer> allOperations = archivalDAO.getAllOperations();
|
||||
List<Integer> pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
|
||||
|
||||
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
|
||||
" P&IP Operations");
|
||||
//Get the diff of operations
|
||||
Set<Integer> setA = new HashSet<>(allOperations);
|
||||
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
|
||||
setA.removeAll(setB);
|
||||
|
||||
List<Integer> candidates = new ArrayList<>();
|
||||
candidates.addAll(setA);
|
||||
|
||||
int total = candidates.size();
|
||||
int batches = calculateNumberOfBatches(total);
|
||||
int batchSize = ITERATION_COUNT;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
|
||||
log.debug("Fetching All Operations");
|
||||
}
|
||||
allOperations = archivalDAO.getAllOperations();
|
||||
|
||||
beginTransactions();
|
||||
for (int i = 1; i <= batches; i++) {
|
||||
int startIdx = batchSize * (i - 1);
|
||||
int endIdx = batchSize * i;
|
||||
if (i == batches) {
|
||||
endIdx = startIdx + (total % batchSize);
|
||||
}
|
||||
if(log.isDebugEnabled()) {
|
||||
log.debug("\n\n############ Iterating over batch " + i + "[" +
|
||||
startIdx + "," + endIdx + "] #######");
|
||||
}
|
||||
List<Integer> subList = candidates.subList(startIdx, endIdx);
|
||||
prepareTempTable(subList);
|
||||
|
||||
//Purge the largest table, DM_DEVICE_OPERATION_RESPONSE
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging operation responses");
|
||||
}
|
||||
archivalDAO.moveOperationResponses();
|
||||
|
||||
//Purge the notifications table, DM_NOTIFICATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging notifications");
|
||||
}
|
||||
archivalDAO.moveNotifications();
|
||||
|
||||
//Purge the command operations table, DM_COMMAND_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging command operations");
|
||||
}
|
||||
archivalDAO.moveCommandOperations();
|
||||
|
||||
//Purge the profile operation table, DM_PROFILE_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging profile operations");
|
||||
}
|
||||
archivalDAO.moveProfileOperations();
|
||||
|
||||
//Purge the config operation table, DM_CONFIG_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging config operations");
|
||||
}
|
||||
archivalDAO.moveConfigOperations();
|
||||
|
||||
//Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging enrolment mappings");
|
||||
}
|
||||
archivalDAO.moveEnrolmentMappings();
|
||||
|
||||
//Finally, purge the operations table, DM_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Purging operations");
|
||||
}
|
||||
archivalDAO.moveOperations();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Fetching All Pending Operations");
|
||||
}
|
||||
commitTransactions();
|
||||
pendingAndIPOperations = archivalDAO.getPendingAndInProgressOperations();
|
||||
|
||||
} catch (ArchivalDAOException e) {
|
||||
rollbackTransactions();
|
||||
throw new ArchivalException("An error occurred while data archival", e);
|
||||
// rollbackTransactions();
|
||||
String msg = "Rollback the get all operations and get all pending operations";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalException("An error occurred while connecting to the archival database.", e);
|
||||
String msg = "An error occurred while connecting to the archival database";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
} finally {
|
||||
ArchivalSourceDAOFactory.closeConnection();
|
||||
ArchivalDestinationDAOFactory.closeConnection();
|
||||
}
|
||||
|
||||
log.info(allOperations.size() + " All Operations. " + pendingAndIPOperations.size() +
|
||||
" P&IP Operations");
|
||||
//Get the diff of operations
|
||||
Set<Integer> setA = new HashSet<>(allOperations);
|
||||
Set<Integer> setB = new HashSet<>(pendingAndIPOperations);
|
||||
setA.removeAll(setB);
|
||||
|
||||
List<Integer> candidates = new ArrayList<>();
|
||||
candidates.addAll(setA);
|
||||
|
||||
int total = candidates.size();
|
||||
int batches = calculateNumberOfBatches(total);
|
||||
int batchSize = ITERATION_COUNT;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(total + " Operations ready for archiving. " + batches + " iterations to be done.");
|
||||
log.debug(batchSize + " is the batch size");
|
||||
}
|
||||
|
||||
for (int i = 1; i <= batches; i++) {
|
||||
int startIdx = batchSize * (i - 1);
|
||||
int endIdx = batchSize * i;
|
||||
if (i == batches) {
|
||||
endIdx = startIdx + (total % batchSize);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("\n\n############ Iterating over batch " + i + "[" +
|
||||
startIdx + "," + endIdx + "] #######");
|
||||
}
|
||||
List<Integer> subList = candidates.subList(startIdx, endIdx);
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("SubList size is: " + subList.size());
|
||||
if (subList.size() > 0) {
|
||||
log.debug("First Element is: " + subList.get(0));
|
||||
log.debug("Last Element is: " + subList.get(subList.size() - 1));
|
||||
}
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
for (Integer val : subList) {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Sub List Element: " + val);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
beginTransactions();
|
||||
prepareTempTable(subList);
|
||||
commitTransactions();
|
||||
} catch (Exception e) {
|
||||
rollbackTransactions();
|
||||
String msg = "Error occurred while preparing the operations.";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
} finally {
|
||||
ArchivalSourceDAOFactory.closeConnection();
|
||||
ArchivalDestinationDAOFactory.closeConnection();
|
||||
}
|
||||
|
||||
List<ArchiveOperationResponse> operationResponses = null;
|
||||
List<ArchiveNotification> notification = null;
|
||||
List<ArchiveCommandOperation> commandOperations = null;
|
||||
List<ArchiveProfileOperation> profileOperations = null;
|
||||
List<ArchiveEnrolmentOperationMap> enrollmentMapping = null;
|
||||
List<ArchiveOperation> operations = null;
|
||||
|
||||
try {
|
||||
openConnection();
|
||||
operationResponses = archivalDAO.selectOperationResponses();
|
||||
notification = archivalDAO.selectNotifications();
|
||||
commandOperations = archivalDAO.selectCommandOperations();
|
||||
profileOperations = archivalDAO.selectProfileOperations();
|
||||
enrollmentMapping = archivalDAO.selectEnrolmentMappings();
|
||||
operations = archivalDAO.selectOperations();
|
||||
|
||||
} catch (Exception e) {
|
||||
String msg = "Error occurred while retrieving data.";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
} finally {
|
||||
closeConnection();
|
||||
}
|
||||
|
||||
try {
|
||||
beginTransactions();
|
||||
|
||||
//Purge the largest table, DM_DEVICE_OPERATION_RESPONSE
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving operation responses");
|
||||
}
|
||||
archivalDAO.moveOperationResponses(operationResponses);
|
||||
|
||||
//Purge the notifications table, DM_NOTIFICATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving notifications");
|
||||
}
|
||||
archivalDAO.moveNotifications(notification);
|
||||
|
||||
//Purge the command operations table, DM_COMMAND_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving command operations");
|
||||
}
|
||||
archivalDAO.moveCommandOperations(commandOperations);
|
||||
|
||||
//Purge the profile operation table, DM_PROFILE_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving profile operations");
|
||||
}
|
||||
archivalDAO.moveProfileOperations(profileOperations);
|
||||
|
||||
//Purge the enrolment mappings table, DM_ENROLMENT_OP_MAPPING
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving enrolment mappings");
|
||||
}
|
||||
archivalDAO.moveEnrolmentMappings(enrollmentMapping);
|
||||
|
||||
//Finally, purge the operations table, DM_OPERATION
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Archiving operations");
|
||||
}
|
||||
archivalDAO.moveOperations(operations);
|
||||
commitTransactions();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("End of Iteration : " + i);
|
||||
}
|
||||
} catch (ArchivalDAOException e) {
|
||||
rollbackTransactions();
|
||||
String msg = "Error occurred while trying to archive data to the six tables";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
} finally {
|
||||
ArchivalSourceDAOFactory.closeConnection();
|
||||
ArchivalDestinationDAOFactory.closeConnection();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private void prepareTempTable(List<Integer> subList) throws ArchivalDAOException {
|
||||
@ -147,6 +228,9 @@ public class ArchivalServiceImpl implements ArchivalService {
|
||||
log.debug("## Truncating the temporary table");
|
||||
}
|
||||
archivalDAO.truncateOperationIDsForArchival();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("## Inserting into the temporary table");
|
||||
}
|
||||
archivalDAO.copyOperationIDsForArchival(subList);
|
||||
}
|
||||
|
||||
@ -160,6 +244,28 @@ public class ArchivalServiceImpl implements ArchivalService {
|
||||
}
|
||||
}
|
||||
|
||||
private void openConnection() throws ArchivalException {
|
||||
try {
|
||||
ArchivalSourceDAOFactory.openConnection();
|
||||
} catch (SQLException e) {
|
||||
String msg = "An error occurred during opening connection";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void closeConnection() throws ArchivalException {
|
||||
try {
|
||||
ArchivalSourceDAOFactory.closeConnection();
|
||||
} catch (Exception e) {
|
||||
String msg = "An error occurred during opening connection";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalException(msg, e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void commitTransactions() {
|
||||
ArchivalSourceDAOFactory.commitTransaction();
|
||||
ArchivalDestinationDAOFactory.commitTransaction();
|
||||
|
||||
@ -0,0 +1,44 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
public class ArchiveCommandOperation {
|
||||
|
||||
|
||||
private int operationId;
|
||||
private int enabled;
|
||||
|
||||
public int getOperationId() {
|
||||
return operationId;
|
||||
}
|
||||
|
||||
public void setOperationId(int operationId) {
|
||||
this.operationId = operationId;
|
||||
}
|
||||
|
||||
public int getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(int enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
public class ArchiveEnrolmentOperationMap {
|
||||
|
||||
|
||||
private int id;
|
||||
private int enrolmentId;
|
||||
private int operationId;
|
||||
private String status;
|
||||
private int createdTimestamp;
|
||||
private int updatedTimestamp;
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public int getEnrolmentId() {
|
||||
return enrolmentId;
|
||||
}
|
||||
|
||||
public void setEnrolmentId(int enrolmentId) {
|
||||
this.enrolmentId = enrolmentId;
|
||||
}
|
||||
|
||||
public int getOperationId() {
|
||||
return operationId;
|
||||
}
|
||||
|
||||
public void setOperationId(int operationId) {
|
||||
this.operationId = operationId;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public int getCreatedTimestamp() {
|
||||
return createdTimestamp;
|
||||
}
|
||||
|
||||
public void setCreatedTimestamp(int createdTimestamp) {
|
||||
this.createdTimestamp = createdTimestamp;
|
||||
}
|
||||
|
||||
public int getUpdatedTimestamp() {
|
||||
return updatedTimestamp;
|
||||
}
|
||||
|
||||
public void setUpdatedTimestamp(int updatedTimestamp) {
|
||||
this.updatedTimestamp = updatedTimestamp;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,80 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
public class ArchiveNotification {
|
||||
|
||||
|
||||
private int notificationId;
|
||||
private int deviceId;
|
||||
private int operationId;
|
||||
private int tenantId;
|
||||
private String status;
|
||||
private String description;
|
||||
|
||||
public int getNotificationId() {
|
||||
return notificationId;
|
||||
}
|
||||
|
||||
public void setNotificationId(int notificationId) {
|
||||
this.notificationId = notificationId;
|
||||
}
|
||||
|
||||
public int getDeviceId() {
|
||||
return deviceId;
|
||||
}
|
||||
|
||||
public void setDeviceId(int deviceId) {
|
||||
this.deviceId = deviceId;
|
||||
}
|
||||
|
||||
public int getOperationId() {
|
||||
return operationId;
|
||||
}
|
||||
|
||||
public void setOperationId(int operationId) {
|
||||
this.operationId = operationId;
|
||||
}
|
||||
|
||||
public int getTenantId() {
|
||||
return tenantId;
|
||||
}
|
||||
|
||||
public void setTenantId(int tenantId) {
|
||||
this.tenantId = tenantId;
|
||||
}
|
||||
|
||||
public String getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public void setStatus(String status) {
|
||||
this.status = status;
|
||||
}
|
||||
|
||||
public String getDescription() {
|
||||
return description;
|
||||
}
|
||||
|
||||
public void setDescription(String description) {
|
||||
this.description = description;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
|
||||
public class ArchiveOperation {
|
||||
|
||||
private int id;
|
||||
private String type;
|
||||
private Timestamp createdTimeStamp;
|
||||
private Timestamp recievedTimeStamp;
|
||||
private String operationCode;
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public String getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(String type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public Timestamp getCreatedTimeStamp() {
|
||||
return createdTimeStamp;
|
||||
}
|
||||
|
||||
public void setCreatedTimeStamp(Timestamp createdTimeStamp) {
|
||||
this.createdTimeStamp = createdTimeStamp;
|
||||
}
|
||||
|
||||
public Timestamp getRecievedTimeStamp() {
|
||||
return recievedTimeStamp;
|
||||
}
|
||||
|
||||
public void setRecievedTimeStamp(Timestamp recievedTimeStamp) {
|
||||
this.recievedTimeStamp = recievedTimeStamp;
|
||||
}
|
||||
|
||||
public String getOperationCode() {
|
||||
return operationCode;
|
||||
}
|
||||
|
||||
public void setOperationCode(String operationCode) {
|
||||
this.operationCode = operationCode;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,83 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
import java.sql.Timestamp;
|
||||
|
||||
public class ArchiveOperationResponse {
|
||||
|
||||
|
||||
private int id;
|
||||
private int enrolmentId;
|
||||
private int operationId;
|
||||
private int enOpMapId;
|
||||
private Object operationResponse;
|
||||
private Timestamp receivedTimeStamp;
|
||||
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public void setId(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
public int getEnrolmentId() {
|
||||
return enrolmentId;
|
||||
}
|
||||
|
||||
public void setEnrolmentId(int enrolmentId) {
|
||||
this.enrolmentId = enrolmentId;
|
||||
}
|
||||
|
||||
public int getOperationId() {
|
||||
return operationId;
|
||||
}
|
||||
|
||||
public void setOperationId(int operationId) {
|
||||
this.operationId = operationId;
|
||||
}
|
||||
|
||||
public int getEnOpMapId() {
|
||||
return enOpMapId;
|
||||
}
|
||||
|
||||
public void setEnOpMapId(int enOpMapId) {
|
||||
this.enOpMapId = enOpMapId;
|
||||
}
|
||||
|
||||
public Object getOperationResponse() {
|
||||
return operationResponse;
|
||||
}
|
||||
|
||||
public void setOperationResponse(Object operationResponse) {
|
||||
this.operationResponse = operationResponse;
|
||||
}
|
||||
|
||||
public Timestamp getReceivedTimeStamp() {
|
||||
return receivedTimeStamp;
|
||||
}
|
||||
|
||||
public void setReceivedTimeStamp(Timestamp receivedTimeStamp) {
|
||||
this.receivedTimeStamp = receivedTimeStamp;
|
||||
}
|
||||
}
|
||||
|
||||
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* Copyright (c) 2018, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
|
||||
*
|
||||
* WSO2 Inc. licenses this file to you under the Apache License,
|
||||
* Version 2.0 (the "License"); you may not use this file except
|
||||
* in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.beans;
|
||||
|
||||
public class ArchiveProfileOperation {
|
||||
|
||||
|
||||
private int operationId;
|
||||
private int enabled;
|
||||
private Object operationDetails;
|
||||
|
||||
public int getOperationId() {
|
||||
return operationId;
|
||||
}
|
||||
|
||||
public void setOperationId(int operationId) {
|
||||
this.operationId = operationId;
|
||||
}
|
||||
|
||||
public int getEnabled() {
|
||||
return enabled;
|
||||
}
|
||||
|
||||
public void setEnabled(int enabled) {
|
||||
this.enabled = enabled;
|
||||
}
|
||||
|
||||
public Object getOperationDetails() {
|
||||
return operationDetails;
|
||||
}
|
||||
|
||||
public void setOperationDetails(Object operationDetails) {
|
||||
this.operationDetails = operationDetails;
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,6 +18,9 @@
|
||||
|
||||
package org.wso2.carbon.device.mgt.core.archival.dao;
|
||||
|
||||
import org.wso2.carbon.device.mgt.core.archival.beans.*;
|
||||
|
||||
import java.sql.ResultSet;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@ -33,19 +36,29 @@ public interface ArchivalDAO {
|
||||
|
||||
void copyOperationIDsForArchival(List<Integer> operationIds) throws ArchivalDAOException;
|
||||
|
||||
void moveOperationResponses() throws ArchivalDAOException;
|
||||
List<ArchiveOperationResponse> selectOperationResponses() throws ArchivalDAOException;
|
||||
|
||||
void moveNotifications() throws ArchivalDAOException;
|
||||
void moveOperationResponses(List<ArchiveOperationResponse> rs) throws ArchivalDAOException;
|
||||
|
||||
void moveCommandOperations() throws ArchivalDAOException;
|
||||
List<ArchiveNotification> selectNotifications() throws ArchivalDAOException;
|
||||
|
||||
void moveProfileOperations() throws ArchivalDAOException;
|
||||
void moveNotifications(List<ArchiveNotification> rs) throws ArchivalDAOException;
|
||||
|
||||
void moveConfigOperations() throws ArchivalDAOException;
|
||||
List<ArchiveCommandOperation> selectCommandOperations() throws ArchivalDAOException;
|
||||
|
||||
void moveEnrolmentMappings() throws ArchivalDAOException;
|
||||
void moveCommandOperations(List<ArchiveCommandOperation> rs) throws ArchivalDAOException;
|
||||
|
||||
void moveOperations() throws ArchivalDAOException;
|
||||
List<ArchiveProfileOperation> selectProfileOperations() throws ArchivalDAOException;
|
||||
|
||||
void moveProfileOperations(List<ArchiveProfileOperation> rs) throws ArchivalDAOException;
|
||||
|
||||
List<ArchiveEnrolmentOperationMap> selectEnrolmentMappings() throws ArchivalDAOException;
|
||||
|
||||
void moveEnrolmentMappings(List<ArchiveEnrolmentOperationMap> rs) throws ArchivalDAOException;
|
||||
|
||||
List<ArchiveOperation> selectOperations() throws ArchivalDAOException;
|
||||
|
||||
void moveOperations(List<ArchiveOperation> rs) throws ArchivalDAOException;
|
||||
|
||||
void truncateOperationIDsForArchival() throws ArchivalDAOException;
|
||||
|
||||
|
||||
@ -57,4 +57,14 @@ public class ArchivalDAOUtil {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void cleanupResultSet(ResultSet rs) {
|
||||
if (rs != null) {
|
||||
try {
|
||||
rs.close();
|
||||
} catch (SQLException e) {
|
||||
log.warn("Error occurred while closing the result set", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -20,11 +20,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.device.mgt.core.archival.beans.*;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.*;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
|
||||
public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
|
||||
@ -56,20 +56,25 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT DISTINCT OPERATION_ID FROM DM_ENROLMENT_OP_MAPPING " +
|
||||
"WHERE CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " +
|
||||
this.retentionPeriod + ", NOW())) AND NOW()";
|
||||
"WHERE CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selected Operation Ids from Enrolment OP Mapping");
|
||||
}
|
||||
while (rs.next()) {
|
||||
operationIds.add(rs.getInt("OPERATION_ID"));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
|
||||
String msg = "An error occurred while getting a list operation Ids to archive";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(operationIds.size() + " operations found for the archival");
|
||||
log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]");
|
||||
}
|
||||
return operationIds;
|
||||
}
|
||||
@ -82,21 +87,26 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT DISTINCT OPERATION_ID " +
|
||||
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS IN('PENDING', 'IN_PROGRESS') " +
|
||||
" AND CREATED_TIMESTAMP BETWEEN DATE(TIMESTAMPADD(DAY, " + this.retentionPeriod +", NOW())) " +
|
||||
"AND NOW()";
|
||||
" FROM DM_ENROLMENT_OP_MAPPING WHERE STATUS='PENDING' OR STATUS='IN_PROGRESS' " +
|
||||
" AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL " + this.retentionPeriod + " DAY)";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selected Pending or In Progress Operation IDs");
|
||||
}
|
||||
while (rs.next()) {
|
||||
operationIds.add(rs.getInt("OPERATION_ID"));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("An error occurred while getting a list operation Ids to archive", e);
|
||||
String msg = "An error occurred while getting a list pending or in progress operation Ids to archive";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(operationIds.size() + " PENDING and IN_PROFRESS operations found for the archival");
|
||||
log.debug(operationIds.size() + " operations found for the archival");
|
||||
log.debug(operationIds.size() + "[" + operationIds.get(0) + "," + operationIds.get(batchSize - 1) + "]");
|
||||
}
|
||||
return operationIds;
|
||||
}
|
||||
@ -123,44 +133,88 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
log.debug(count + " Records copied to the temporary table.");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error while copying operation Ids for archival", e);
|
||||
String msg = "Error while copying operation Ids for archival";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveOperationResponses() throws ArchivalDAOException {
|
||||
public List<ArchiveOperationResponse> selectOperationResponses() throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
List<ArchiveOperationResponse> operationResponses = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN " +
|
||||
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" o.ID,\n" +
|
||||
" o.ENROLMENT_ID,\n" +
|
||||
" o.OPERATION_ID,\n" +
|
||||
" o.EN_OP_MAP_ID,\n" +
|
||||
" o.OPERATION_RESPONSE,\n" +
|
||||
" o.RECEIVED_TIMESTAMP\n" +
|
||||
"FROM\n" +
|
||||
" DM_DEVICE_OPERATION_RESPONSE o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
while (rs.next()) {
|
||||
ArchiveOperationResponse rep = new ArchiveOperationResponse();
|
||||
rep.setId(rs.getInt("ID"));
|
||||
rep.setEnrolmentId(rs.getInt("ENROLMENT_ID"));
|
||||
rep.setOperationId(rs.getInt("OPERATION_ID"));
|
||||
rep.setOperationResponse(rs.getBytes("OPERATION_RESPONSE"));
|
||||
rep.setReceivedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||
operationResponses.add(rep);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selecting done for the Operation Response");
|
||||
}
|
||||
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while archiving the operation responses";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
|
||||
return operationResponses;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveOperationResponses(List<ArchiveOperationResponse> archiveOperationResponse) throws ArchivalDAOException {
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
|
||||
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?,?)";
|
||||
String sql = "INSERT INTO DM_DEVICE_OPERATION_RESPONSE_ARCH VALUES(?, ?, ?, ?, ?,?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("ID"));
|
||||
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
|
||||
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setInt(4, rs.getInt("EN_OP_MAP_ID"));
|
||||
stmt2.setBytes(5, rs.getBytes("OPERATION_RESPONSE"));
|
||||
stmt2.setTimestamp(6, rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||
stmt2.setTimestamp(7, this.currentTimestamp);
|
||||
for (ArchiveOperationResponse rs : archiveOperationResponse) {
|
||||
stmt2.setInt(1, rs.getId());
|
||||
stmt2.setInt(2, rs.getEnrolmentId());
|
||||
stmt2.setInt(3, rs.getOperationId());
|
||||
stmt2.setBytes(4, (byte[]) rs.getOperationResponse());
|
||||
stmt2.setTimestamp(5, rs.getReceivedTimeStamp());
|
||||
stmt2.setTimestamp(6, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing batch " + count);
|
||||
log.debug("Executing Operation Responses batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -169,259 +223,382 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
log.debug(count + " [OPERATION_RESPONSES] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
//try the deletion now
|
||||
sql = "DELETE FROM DM_DEVICE_OPERATION_RESPONSE WHERE OPERATION_ID IN (" +
|
||||
" SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_DEVICE_OPERATION_RESPONSE o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.OPERATION_ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving operations ", e);
|
||||
String msg = "Error occurred while archiving the operation responses";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveNotifications() throws ArchivalDAOException {
|
||||
public List<ArchiveNotification> selectNotifications() throws ArchivalDAOException {
|
||||
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
List<ArchiveNotification> notifications = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_NOTIFICATION WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" o.NOTIFICATION_ID,\n" +
|
||||
" o.DEVICE_ID,\n" +
|
||||
" o.OPERATION_ID,\n" +
|
||||
" o.TENANT_ID,\n" +
|
||||
" o.STATUS,\n" +
|
||||
" o.DESCRIPTION\n" +
|
||||
"FROM\n" +
|
||||
" DM_NOTIFICATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
// ArchivalDestinationDAOFactory.beginTransaction();
|
||||
while (rs.next()) {
|
||||
|
||||
ArchiveNotification note = new ArchiveNotification();
|
||||
note.setNotificationId(rs.getInt("NOTIFICATION_ID"));
|
||||
note.setDeviceId(rs.getInt("DEVICE_ID"));
|
||||
note.setOperationId(rs.getInt("OPERATION_ID"));
|
||||
note.setTenantId(rs.getInt("TENANT_ID"));
|
||||
note.setStatus(rs.getString("STATUS"));
|
||||
note.setDescription(rs.getString("DESCRIPTION"));
|
||||
notifications.add(note);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selecting done for the Notification");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while archiving the notifications";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
return notifications;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void moveNotifications(List<ArchiveNotification> archiveNotifications) throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
|
||||
sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
|
||||
String sql = "INSERT INTO DM_NOTIFICATION_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("NOTIFICATION_ID"));
|
||||
stmt2.setInt(2, rs.getInt("DEVICE_ID"));
|
||||
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setInt(4, rs.getInt("TENANT_ID"));
|
||||
stmt2.setString(5, rs.getString("STATUS"));
|
||||
stmt2.setString(6, rs.getString("DESCRIPTION"));
|
||||
// while (rs.next()) {
|
||||
for (ArchiveNotification rs : archiveNotifications) {
|
||||
stmt2.setInt(1, rs.getNotificationId());
|
||||
stmt2.setInt(2, rs.getDeviceId());
|
||||
stmt2.setInt(3, rs.getOperationId());
|
||||
stmt2.setInt(4, rs.getTenantId());
|
||||
stmt2.setString(5, rs.getStatus());
|
||||
stmt2.setString(6, rs.getDescription());
|
||||
stmt2.setTimestamp(7, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing Notifications batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
stmt2.executeBatch();
|
||||
// ArchivalDestinationDAOFactory.commitTransaction();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [NOTIFICATIONS] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_NOTIFICATION" +
|
||||
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_NOTIFICATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.OPERATION_ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving notifications ", e);
|
||||
String msg = "Error occurred while archiving the notifications";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveCommandOperations() throws ArchivalDAOException {
|
||||
public List<ArchiveCommandOperation> selectCommandOperations() throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
|
||||
List<ArchiveCommandOperation> commandOperations = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_COMMAND_OPERATION WHERE OPERATION_ID IN " +
|
||||
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" *\n" +
|
||||
"FROM\n" +
|
||||
" DM_COMMAND_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
while (rs.next()) {
|
||||
ArchiveCommandOperation op = new ArchiveCommandOperation();
|
||||
op.setOperationId(rs.getInt("OPERATION_ID"));
|
||||
op.setEnabled(rs.getInt("ENABLED"));
|
||||
|
||||
commandOperations.add(op);
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selecting done for the Command Operation");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while archiving the command operation";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
return commandOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveCommandOperations(List<ArchiveCommandOperation> commandOperations) throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
|
||||
sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
|
||||
String sql = "INSERT INTO DM_COMMAND_OPERATION_ARCH VALUES(?,?,?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setInt(2, rs.getInt("ENABLED"));
|
||||
for (ArchiveCommandOperation rs : commandOperations) {
|
||||
stmt2.setInt(1, rs.getOperationId());
|
||||
stmt2.setInt(2, rs.getEnabled());
|
||||
stmt2.setTimestamp(3, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing Command Operations batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [COMMAND_OPERATION] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_COMMAND_OPERATION" +
|
||||
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_COMMAND_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.OPERATION_ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving command operations", e);
|
||||
String msg = "Error occurred while archiving the command operation";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveProfileOperations() throws ArchivalDAOException {
|
||||
public List<ArchiveProfileOperation> selectProfileOperations() throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
List<ArchiveProfileOperation> profileOperations = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_PROFILE_OPERATION WHERE OPERATION_ID IN " +
|
||||
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" *\n" +
|
||||
"FROM\n" +
|
||||
" DM_PROFILE_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
while (rs.next()) {
|
||||
ArchiveProfileOperation op = new ArchiveProfileOperation();
|
||||
|
||||
op.setOperationId(rs.getInt("OPERATION_ID"));
|
||||
op.setEnabled(rs.getInt("ENABLED"));
|
||||
op.setOperationDetails(rs.getBytes("OPERATION_DETAILS"));
|
||||
profileOperations.add(op);
|
||||
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selecting done for the Profile Operation");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while archiving the profile operation";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
return profileOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveProfileOperations(List<ArchiveProfileOperation> profileOperations) throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
|
||||
sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
|
||||
String sql = "INSERT INTO DM_PROFILE_OPERATION_ARCH VALUES(?, ?, ?, ?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setInt(2, rs.getInt("ENABLED"));
|
||||
stmt2.setBytes(3, rs.getBytes("OPERATION_DETAILS"));
|
||||
stmt2.setTimestamp(4,this.currentTimestamp );
|
||||
for (ArchiveProfileOperation rs : profileOperations) {
|
||||
stmt2.setInt(1, rs.getOperationId());
|
||||
stmt2.setInt(2, rs.getEnabled());
|
||||
stmt2.setBytes(3, (byte[]) rs.getOperationDetails());
|
||||
stmt2.setTimestamp(4, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing Profile Operations batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [PROFILE_OPERATION] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_PROFILE_OPERATION" +
|
||||
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_PROFILE_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.OPERATION_ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving profile operations", e);
|
||||
String msg = "Error occurred while archiving the profile operation";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveConfigOperations() throws ArchivalDAOException {
|
||||
public List<ArchiveEnrolmentOperationMap> selectEnrolmentMappings() throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
List<ArchiveEnrolmentOperationMap> operationMaps = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_CONFIG_OPERATION WHERE OPERATION_ID IN " +
|
||||
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" o.ID,\n" +
|
||||
" o.ENROLMENT_ID,\n" +
|
||||
" o.OPERATION_ID,\n" +
|
||||
" o.STATUS,\n" +
|
||||
" o.CREATED_TIMESTAMP,\n" +
|
||||
" o.UPDATED_TIMESTAMP\n" +
|
||||
"FROM\n" +
|
||||
" DM_ENROLMENT_OP_MAPPING o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
|
||||
sql = "INSERT INTO DM_CONFIG_OPERATION_ARCH VALUES(?, ?, ?, ?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setBytes(2, rs.getBytes("OPERATION_CONFIG"));
|
||||
stmt2.setInt(3, rs.getInt("ENABLED"));
|
||||
stmt2.setTimestamp(4,this.currentTimestamp );
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
}
|
||||
ArchiveEnrolmentOperationMap eom = new ArchiveEnrolmentOperationMap();
|
||||
eom.setId(rs.getInt("ID"));
|
||||
eom.setEnrolmentId(rs.getInt("ENROLMENT_ID"));
|
||||
eom.setOperationId(rs.getInt("OPERATION_ID"));
|
||||
eom.setStatus(rs.getString("STATUS"));
|
||||
eom.setCreatedTimestamp(rs.getInt("CREATED_TIMESTAMP"));
|
||||
eom.setUpdatedTimestamp(rs.getInt("UPDATED_TIMESTAMP"));
|
||||
operationMaps.add(eom);
|
||||
}
|
||||
stmt2.executeBatch();
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [CONFIG_OPERATION] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_CONFIG_OPERATION" +
|
||||
" WHERE OPERATION_ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
log.debug("Selecting done for the Enrolment OP Mapping");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving config operations", e);
|
||||
String msg = "Error occurred while archiving the enrolment op mappings";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
|
||||
return operationMaps;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveEnrolmentMappings() throws ArchivalDAOException {
|
||||
public void moveEnrolmentMappings(List<ArchiveEnrolmentOperationMap> operationMaps) throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN " +
|
||||
"(SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
|
||||
sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?,?)";
|
||||
String sql = "INSERT INTO DM_ENROLMENT_OP_MAPPING_ARCH VALUES(?, ?, ?, ?, ?, ?, ?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("ID"));
|
||||
stmt2.setInt(2, rs.getInt("ENROLMENT_ID"));
|
||||
stmt2.setInt(3, rs.getInt("OPERATION_ID"));
|
||||
stmt2.setString(4, rs.getString("STATUS"));
|
||||
stmt2.setString(5, rs.getString("PUSH_NOTIFICATION_STATUS"));
|
||||
stmt2.setInt(6, rs.getInt("CREATED_TIMESTAMP"));
|
||||
stmt2.setInt(7, rs.getInt("UPDATED_TIMESTAMP"));
|
||||
stmt2.setTimestamp(8, this.currentTimestamp);
|
||||
for (ArchiveEnrolmentOperationMap rs : operationMaps) {
|
||||
stmt2.setInt(1, rs.getId());
|
||||
stmt2.setInt(2, rs.getEnrolmentId());
|
||||
stmt2.setInt(3, rs.getOperationId());
|
||||
stmt2.setString(4, rs.getStatus());
|
||||
stmt2.setInt(5, rs.getCreatedTimestamp());
|
||||
stmt2.setInt(6, rs.getUpdatedTimestamp());
|
||||
stmt2.setTimestamp(7, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Executing batch " + count);
|
||||
log.debug("Executing Enrolment Mappings batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -429,67 +606,119 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [ENROLMENT_OP_MAPPING] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING WHERE OPERATION_ID IN (" +
|
||||
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_ENROLMENT_OP_MAPPING o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.OPERATION_ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.OPERATION_ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving enrolment mappings", e);
|
||||
String msg = "Error occurred while archiving the enrolment op mappings";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveOperations() throws ArchivalDAOException {
|
||||
public List<ArchiveOperation> selectOperations() throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
ResultSet rs = null;
|
||||
List<ArchiveOperation> operations = new ArrayList<>();
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
String sql = "SELECT * FROM DM_OPERATION WHERE ID IN (SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
String sql = "SELECT \n" +
|
||||
" o.ID,\n" +
|
||||
" o.TYPE,\n" +
|
||||
" o.CREATED_TIMESTAMP,\n" +
|
||||
" o.RECEIVED_TIMESTAMP,\n" +
|
||||
" o.OPERATION_CODE\n" +
|
||||
"FROM\n" +
|
||||
" DM_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID;";
|
||||
stmt = this.createMemoryEfficientStatement(conn);
|
||||
rs = stmt.executeQuery(sql);
|
||||
|
||||
while (rs.next()) {
|
||||
|
||||
ArchiveOperation op = new ArchiveOperation();
|
||||
op.setId(rs.getInt("ID"));
|
||||
op.setType(rs.getString("TYPE"));
|
||||
op.setCreatedTimeStamp(rs.getTimestamp("CREATED_TIMESTAMP"));
|
||||
op.setRecievedTimeStamp(rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||
op.setOperationCode(rs.getString("OPERATION_CODE"));
|
||||
|
||||
operations.add(op);
|
||||
|
||||
}
|
||||
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Selecting done for the Operation");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
String msg = "Error occurred while archiving the operations";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
}
|
||||
return operations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void moveOperations(List<ArchiveOperation> operations) throws ArchivalDAOException {
|
||||
Statement stmt = null;
|
||||
PreparedStatement stmt2 = null;
|
||||
Statement stmt3 = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
Connection conn2 = ArchivalDestinationDAOFactory.getConnection();
|
||||
sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
|
||||
String sql = "INSERT INTO DM_OPERATION_ARCH VALUES(?, ?, ?, ?, ?, ?)";
|
||||
stmt2 = conn2.prepareStatement(sql);
|
||||
|
||||
int count = 0;
|
||||
while (rs.next()) {
|
||||
stmt2.setInt(1, rs.getInt("ID"));
|
||||
stmt2.setString(2, rs.getString("TYPE"));
|
||||
stmt2.setTimestamp(3, rs.getTimestamp("CREATED_TIMESTAMP"));
|
||||
stmt2.setTimestamp(4, rs.getTimestamp("RECEIVED_TIMESTAMP"));
|
||||
stmt2.setString(5, rs.getString("OPERATION_CODE"));
|
||||
for (ArchiveOperation rs : operations) {
|
||||
stmt2.setInt(1, rs.getId());
|
||||
stmt2.setString(2, rs.getType());
|
||||
stmt2.setTimestamp(3, rs.getCreatedTimeStamp());
|
||||
stmt2.setTimestamp(4, rs.getRecievedTimeStamp());
|
||||
stmt2.setString(5, rs.getOperationCode());
|
||||
stmt2.setTimestamp(6, this.currentTimestamp);
|
||||
stmt2.addBatch();
|
||||
|
||||
if (++count % batchSize == 0) {
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("Final Execution of Operations batch " + count);
|
||||
}
|
||||
}
|
||||
}
|
||||
stmt2.executeBatch();
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(count + " [OPERATIONS] Records copied to the archival table. Starting deletion");
|
||||
}
|
||||
sql = "DELETE FROM DM_OPERATION WHERE ID IN (" +
|
||||
"SELECT ID FROM DM_ARCHIVED_OPERATIONS)";
|
||||
sql = "DELETE o.* FROM DM_OPERATION o\n" +
|
||||
" INNER JOIN\n" +
|
||||
" DM_ARCHIVED_OPERATIONS da ON o.ID = da.ID \n" +
|
||||
"WHERE\n" +
|
||||
" o.ID = da.ID;";
|
||||
stmt3 = conn.createStatement();
|
||||
int affected = stmt3.executeUpdate(sql);
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(affected + " Rows deleted");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while moving operations", e);
|
||||
String msg = "Error occurred while archiving the operations";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt, rs);
|
||||
ArchivalDAOUtil.cleanupResources(stmt2);
|
||||
ArchivalDAOUtil.cleanupResources(stmt3);
|
||||
}
|
||||
@ -503,11 +732,13 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
conn.setAutoCommit(false);
|
||||
String sql = "TRUNCATE DM_ARCHIVED_OPERATIONS";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while truncating operation Ids", e);
|
||||
String msg = "Error occurred while truncating operation Ids";
|
||||
log.error(msg, e);
|
||||
throw new ArchivalDAOException(msg, e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt);
|
||||
}
|
||||
@ -519,4 +750,59 @@ public class ArchivalDAOImpl implements ArchivalDAO {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
private String buildWhereClause(String[] statuses) {
|
||||
StringBuilder whereClause = new StringBuilder("WHERE ");
|
||||
for (int i = 0; i < statuses.length; i++) {
|
||||
whereClause.append("STATUS ='");
|
||||
whereClause.append(statuses[i]);
|
||||
whereClause.append("' ");
|
||||
if (i != (statuses.length - 1))
|
||||
whereClause.append(" OR ");
|
||||
}
|
||||
return whereClause.toString();
|
||||
}
|
||||
|
||||
private void copyOperationIDsForArchival() throws ArchivalDAOException {
|
||||
PreparedStatement stmt = null;
|
||||
Statement createStmt = null;
|
||||
try {
|
||||
Connection conn = ArchivalSourceDAOFactory.getConnection();
|
||||
// conn.setAutoCommit(false);
|
||||
// String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" +
|
||||
// " SELECT DISTINCT op.ID as OPERATION_ID, NOW()" +
|
||||
// " FROM DM_ENROLMENT_OP_MAPPING AS opm" +
|
||||
// " LEFT JOIN DM_OPERATION AS op ON opm.OPERATION_ID = op.ID" +
|
||||
// " WHERE opm.STATUS='ERROR' OR opm.STATUS='COMPLETED'" +
|
||||
// " AND op.RECEIVED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY);";
|
||||
// stmt = conn.prepareStatement(sql);
|
||||
// stmt.setInt(1, this.retentionPeriod);
|
||||
// stmt.addBatch();
|
||||
// stmt.executeBatch();
|
||||
// conn.commit();
|
||||
|
||||
//Create the temporary table first
|
||||
// String sql = "CREATE TEMPORARY TABLE DM_ARCHIVED_OPERATIONS (ID INTEGER NOT NULL," +
|
||||
// " CREATED_TIMESTAMP TIMESTAMP NOT NULL, PRIMARY KEY (ID))" ;
|
||||
// createStmt = conn.createStatement();
|
||||
// createStmt.execute(sql);
|
||||
// if(log.isDebugEnabled()) {
|
||||
// log.debug("Temporary table DM_ARCHIVED_OPERATIONS has been created ");
|
||||
// }
|
||||
//Copy eligible operations into DM_ARCHIVED_OPERATIONS
|
||||
String sql = "INSERT INTO DM_ARCHIVED_OPERATIONS(ID,CREATED_TIMESTAMP)" +
|
||||
" SELECT DISTINCT OPERATION_ID, NOW()" +
|
||||
" FROM DM_ENROLMENT_OP_MAPPING" +
|
||||
" WHERE STATUS='ERROR' OR STATUS='COMPLETED' OR STATUS='REPEATED'" +
|
||||
" AND CREATED_TIMESTAMP < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
int affected = stmt.executeUpdate();
|
||||
log.info(affected + " Eligible operations found for archival");
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while copying operation Ids for archival", e);
|
||||
} finally {
|
||||
ArchivalDAOUtil.cleanupResources(stmt);
|
||||
ArchivalDAOUtil.cleanupResources(createStmt);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -21,10 +21,11 @@ package org.wso2.carbon.device.mgt.core.archival.dao.impl;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOException;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDAOUtil;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.ArchivalDestinationDAOFactory;
|
||||
import org.wso2.carbon.device.mgt.core.archival.dao.DataDeletionDAO;
|
||||
import org.wso2.carbon.device.mgt.core.task.impl.ArchivedDataDeletionTask;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
@ -52,8 +53,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
"WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting operation responses", e);
|
||||
@ -72,8 +72,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting notifications", e);
|
||||
@ -92,8 +91,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting command operations", e);
|
||||
@ -112,8 +110,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
" WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting profile operations", e);
|
||||
@ -131,8 +128,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
String sql = "DELETE FROM DM_ENROLMENT_OP_MAPPING_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting enrolment mappings", e);
|
||||
@ -150,8 +146,7 @@ public class DataDeletionDAOImpl implements DataDeletionDAO {
|
||||
String sql = "DELETE FROM DM_OPERATION_ARCH WHERE ARCHIVED_AT < DATE_SUB(NOW(), INTERVAL ? DAY)";
|
||||
stmt = conn.prepareStatement(sql);
|
||||
stmt.setInt(1, this.retentionPeriod);
|
||||
stmt.addBatch();
|
||||
stmt.executeBatch();
|
||||
stmt.executeUpdate();
|
||||
conn.commit();
|
||||
} catch (SQLException e) {
|
||||
throw new ArchivalDAOException("Error occurred while deleting operations", e);
|
||||
|
||||
@ -55,7 +55,7 @@ public class ArchivedDataDeletionTask implements Task {
|
||||
log.error("An error occurred while executing DataDeletionTask", e);
|
||||
}
|
||||
long endTime = System.nanoTime();
|
||||
long difference = (endTime - startTime) / 1000000 * 1000;
|
||||
long difference = (endTime - startTime) / (1000000 * 1000);
|
||||
log.info("DataDeletionTask completed. Total execution time: " + difference + " seconds");
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user