Commit bf71dab9 authored by pengxin's avatar pengxin

进行异步操作。

parent ccb4b969
...@@ -97,7 +97,7 @@ public class DatasetCleanController { ...@@ -97,7 +97,7 @@ public class DatasetCleanController {
* @return 应答结果对象,包含新增对象主键Id。 * @return 应答结果对象,包含新增对象主键Id。
*/ */
@OperationLog(type = SysOperationLogType.RESTART_CLEAN) @OperationLog(type = SysOperationLogType.RESTART_CLEAN)
@GetMapping("/restartClean") @PostMapping("/restartClean")
public ResponseResult<Void> restartClean(@RequestParam Long cleanId) { public ResponseResult<Void> restartClean(@RequestParam Long cleanId) {
if (MyCommonUtil.existBlankArgument(cleanId)) { if (MyCommonUtil.existBlankArgument(cleanId)) {
return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST); return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST);
......
package com.yice.webadmin.app.service; package com.yice.webadmin.app.service;
import com.yice.common.core.base.service.IBaseService; import com.yice.common.core.base.service.IBaseService;
import com.yice.webadmin.app.data.DatasetData;
import com.yice.webadmin.app.model.DatasetClean; import com.yice.webadmin.app.model.DatasetClean;
import java.util.List; import java.util.List;
import java.util.concurrent.Future;
/** /**
* 数据集导出数据操作服务接口。 * 数据集导出数据操作服务接口。
...@@ -33,6 +35,15 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> { ...@@ -33,6 +35,15 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> {
*/ */
void stopCleanTask(Long cleanId); void stopCleanTask(Long cleanId);
/**
* 线程方法
* @param dataList 数据集列表
* @param cleanId 清洗标识
* @param datasetId 清洗集标识
* @return 线程方法
*/
Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId);
/** /**
* 保存清洗对象以及清洗配置对象。 * 保存清洗对象以及清洗配置对象。
* *
......
...@@ -21,6 +21,13 @@ public interface DatasetDataService { ...@@ -21,6 +21,13 @@ public interface DatasetDataService {
*/ */
void save(DatasetData datasetData); void save(DatasetData datasetData);
/**
* 开始清洗工作
* @param datasetId 清洗数据集
* @param cleanId 清洗标识
*/
void doDatasetCleanHandler(Long datasetId, Long cleanId);
/** /**
* 删除整个集合中的文档数据。 * 删除整个集合中的文档数据。
* *
......
...@@ -29,7 +29,6 @@ import com.yice.webadmin.app.util.JsonNameExtractor; ...@@ -29,7 +29,6 @@ import com.yice.webadmin.app.util.JsonNameExtractor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult; import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
...@@ -111,26 +110,10 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -111,26 +110,10 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
} }
updateVersionStatus(datasetClean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS); updateVersionStatus(datasetClean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS);
doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId()); datasetDataService.doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId());
return datasetClean; return datasetClean;
} }
/**
* 清洗100个样本
* @param datasetId 数据集对应的版本
*/
@Async("taskExecutor")
public void doDatasetCleanHandler(Long datasetId, Long cleanId) {
MyPageParam param = new MyPageParam();
param.setPageNum(DatasetConstant.PAGE_NUM);
param.setPageSize(DatasetConstant.MAX_PAGE_SIZE);
List<DatasetData> dataList = datasetDataService.list(datasetId, param);
if(CollUtil.isNotEmpty(dataList)) {
Future<Void> future = executeCleanTaskAsync(dataList, cleanId, datasetId);
futures.put(cleanId, future);
}
}
/** /**
* 线程方法 * 线程方法
* @param dataList 数据集列表 * @param dataList 数据集列表
...@@ -138,12 +121,14 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -138,12 +121,14 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* @param datasetId 清洗集标识 * @param datasetId 清洗集标识
* @return 线程方法 * @return 线程方法
*/ */
public Future<Void> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) { @Transactional(rollbackFor = Exception.class)
@Override
public Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId); asyncDealWithDatasetSaveBatch(dataList, cleanId);
dealWithTaskHandler(datasetId,cleanId); dealWithTaskHandler(datasetId,cleanId);
updateCleanStatus(cleanId,DatasetConstant.CLEAN_FINISHED); updateCleanStatus(cleanId,DatasetConstant.CLEAN_FINISHED);
updateVersionStatus(datasetId,DatasetConstant.CLEAN_FINISHED); updateVersionStatus(datasetId,DatasetConstant.CLEAN_FINISHED);
return new AsyncResult<>(null); return new AsyncResult<>(cleanId);
} }
/** /**
...@@ -211,7 +196,7 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -211,7 +196,7 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
public void restartCleanTask(Long cleanId) { public void restartCleanTask(Long cleanId) {
DatasetClean clean = this.datasetCleanMapper.selectById(cleanId); DatasetClean clean = this.datasetCleanMapper.selectById(cleanId);
if(null != clean){ if(null != clean){
doDatasetCleanHandler(clean.getDatasetId(), cleanId); datasetDataService.doDatasetCleanHandler(clean.getDatasetId(), cleanId);
updateCleanStatus(cleanId,DatasetConstant.CLEAN_PROGRESS); updateCleanStatus(cleanId,DatasetConstant.CLEAN_PROGRESS);
updateVersionStatus(clean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS); updateVersionStatus(clean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS);
} }
......
...@@ -7,6 +7,7 @@ import com.yice.common.core.object.MyPageParam; ...@@ -7,6 +7,7 @@ import com.yice.common.core.object.MyPageParam;
import com.yice.webadmin.app.constant.DatasetConstant; import com.yice.webadmin.app.constant.DatasetConstant;
import com.yice.webadmin.app.constant.MongoConstant; import com.yice.webadmin.app.constant.MongoConstant;
import com.yice.webadmin.app.data.DatasetData; import com.yice.webadmin.app.data.DatasetData;
import com.yice.webadmin.app.service.DatasetCleanService;
import com.yice.webadmin.app.service.DatasetDataService; import com.yice.webadmin.app.service.DatasetDataService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
...@@ -16,10 +17,13 @@ import org.springframework.data.mongodb.core.MongoTemplate; ...@@ -16,10 +17,13 @@ import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria; import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update; import org.springframework.data.mongodb.core.query.Update;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/** /**
* 数据集版本数据操作服务类。 * 数据集版本数据操作服务类。
...@@ -34,6 +38,14 @@ public class DatasetDataServiceImpl implements DatasetDataService { ...@@ -34,6 +38,14 @@ public class DatasetDataServiceImpl implements DatasetDataService {
@Autowired @Autowired
private MongoTemplate mongoTemplate; private MongoTemplate mongoTemplate;
@Autowired
private DatasetCleanService datasetCleanService;
/**
* 线程集合
*/
private final ConcurrentHashMap<Long, Future<?>> futures = new ConcurrentHashMap<>();
/** /**
* 保存新增对象。 * 保存新增对象。
* *
...@@ -78,6 +90,24 @@ public class DatasetDataServiceImpl implements DatasetDataService { ...@@ -78,6 +90,24 @@ public class DatasetDataServiceImpl implements DatasetDataService {
MongoConstant.COLLECT_NAME + versionId); MongoConstant.COLLECT_NAME + versionId);
} }
/**
* 开始清洗工作
* @param datasetId 数据集对应的版本
* @param cleanId 清洗任务标识
*/
@Async
@Override
public void doDatasetCleanHandler(Long datasetId, Long cleanId) {
MyPageParam param = new MyPageParam();
param.setPageNum(DatasetConstant.PAGE_NUM);
param.setPageSize(DatasetConstant.MAX_PAGE_SIZE);
List<DatasetData> dataList = this.list(datasetId, param);
if(CollUtil.isNotEmpty(dataList)) {
Future<Long> future = datasetCleanService.executeCleanTaskAsync(dataList, cleanId, datasetId);
futures.put(cleanId, future);
}
}
/** /**
* 根据条件查询数据集数据。 * 根据条件查询数据集数据。
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment