Commit a3940966 authored by pengxin's avatar pengxin

清洗过程去掉写人json文件中以及更新配置。

parent 171bbcd5
......@@ -92,6 +92,11 @@ public class DatasetConstant {
*/
public static final Integer MARK = 1;
/**
* 未清洗
*/
public static final Integer NOT_CLEAN_PROGRESS = 3;
/**
* 清洗中
*/
......
......@@ -13,6 +13,7 @@ import com.yice.common.log.model.constant.SysOperationLogType;
import com.yice.webadmin.app.dto.DatasetCleanDto;
import com.yice.webadmin.app.model.DatasetClean;
import com.yice.webadmin.app.service.DatasetCleanService;
import com.yice.webadmin.app.service.DatasetDataService;
import com.yice.webadmin.app.vo.DatasetCleanVo;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
......@@ -35,6 +36,8 @@ public class DatasetCleanController {
@Autowired
private DatasetCleanService datasetCleanService;
@Autowired
private DatasetDataService datasetDataService;
/**
* 新增数据集清洗数据。
......@@ -71,6 +74,8 @@ public class DatasetCleanController {
}
DatasetClean datasetClean = MyModelUtil.copyTo(datasetCleanDto, DatasetClean.class);
datasetClean = datasetCleanService.addNew(datasetClean);
datasetDataService.doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId(),
datasetClean.getCleanConfigId());
return ResponseResult.success(datasetClean);
}
......
......@@ -85,6 +85,12 @@ public class DatasetClean extends BaseModel {
@TableField(exist = false)
private List<String> cleanMethod;
/**
* 清洗配置标识。
*/
@TableField(exist = false)
private Long cleanConfigId;
/**
* 清洗配置对象。
*/
......
......@@ -53,9 +53,10 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> {
* @param dataList 数据集列表
* @param cleanId 清洗标识
* @param datasetId 清洗集标识
* @param cleanConfigId 清洗配置标识
* @return 线程方法
*/
Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId);
Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId, Long cleanConfigId);
/**
* 保存清洗对象以及清洗配置对象。
......
......@@ -25,8 +25,9 @@ public interface DatasetDataService {
* 开始清洗工作
* @param datasetId 清洗数据集
* @param cleanId 清洗标识
* @param cleanConfigId 清洗配置标识
*/
void doDatasetCleanHandler(Long datasetId, Long cleanId);
void doDatasetCleanHandler(Long datasetId, Long cleanId, Long cleanConfigId);
/**
* 删除整个集合中的文档数据。
......
......@@ -98,16 +98,15 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
datasetClean.setCleanType(DatasetConstant.CLEAN_TYPE);
datasetCleanMapper.insert(this.buildDefaultValue(datasetClean));
Long cleanConfigId = idGenerator.nextLongId();
DatasetCleanConfig datasetCleanConfig = datasetClean.getConfig();
if(null != datasetCleanConfig) {
datasetCleanConfig.setCleanId(datasetClean.getCleanId());
datasetCleanConfig.setCleanConfigId(idGenerator.nextLongId());
datasetCleanConfig.setCleanConfigId(cleanConfigId);
MyModelUtil.fillCommonsForInsert(datasetCleanConfig);
datasetCleanConfigMapper.insert(datasetCleanConfig);
}
updateVersionStatus(datasetClean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS);
datasetDataService.doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId());
datasetClean.setCleanConfigId(cleanConfigId);
return datasetClean;
}
......@@ -116,13 +115,15 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* @param dataList 数据集列表
* @param cleanId 清洗标识
* @param datasetId 清洗集标识
* @param cleanConfigId 清洗配置标识
* @return 线程方法
*/
@Transactional(rollbackFor = Exception.class)
@Override
public Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId);
dealWithTaskHandler(datasetId,cleanId);
public Future<Long> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId,
Long datasetId, Long cleanConfigId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId, cleanConfigId);
dealWithTaskHandler(datasetId,cleanConfigId);
updateCleanStatus(cleanId,DatasetConstant.CLEAN_FINISHED);
updateVersionStatus(datasetId,DatasetConstant.CLEAN_FINISHED);
return new AsyncResult<>(cleanId);
......@@ -157,14 +158,14 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* @param dataList 清洗列表
* @param cleanId 清洗标识
*/
private void asyncDealWithDatasetSaveBatch(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataClean> cleans = asyncCleanTaskHandler(dataList, cleanId);
private void asyncDealWithDatasetSaveBatch(List<DatasetData> dataList, Long cleanId, Long cleanConfigId) {
List<DatasetDataClean> cleans = asyncCleanTaskHandler(dataList, cleanId, cleanConfigId);
this.datasetCleanConfigService.saveDatasetClean(cleans);
List<DatasetDataFilter> filters = asyncFilterTaskHandler(dataList, cleanId);
List<DatasetDataFilter> filters = asyncFilterTaskHandler(dataList, cleanId, cleanConfigId);
this.datasetCleanConfigService.saveDatasetFilter(filters);
List<DatasetDataDeduplicate> deduplicates = asyncDeduplicateTaskHandler(dataList, cleanId);
List<DatasetDataDeduplicate> deduplicates = asyncDeduplicateTaskHandler(dataList, cleanId, cleanConfigId);
this.datasetCleanConfigService.saveDatasetDeduplicate(deduplicates);
List<DatasetDataDesensitive> desensitives = asyncDesensitiveTaskHandler(dataList, cleanId);
List<DatasetDataDesensitive> desensitives = asyncDesensitiveTaskHandler(dataList, cleanId, cleanConfigId);
this.datasetCleanConfigService.saveDatasetDesensitive(desensitives);
}
......@@ -189,7 +190,7 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
public void restartCleanTask(Long cleanId) {
DatasetClean clean = this.datasetCleanMapper.selectById(cleanId);
if(null != clean){
datasetDataService.doDatasetCleanHandler(clean.getDatasetId(), cleanId);
datasetDataService.doDatasetCleanHandler(clean.getDatasetId(), cleanId, null);
updateCleanStatus(cleanId,DatasetConstant.CLEAN_PROGRESS);
updateVersionStatus(clean.getDatasetId(),DatasetConstant.CLEAN_PROGRESS);
}
......@@ -200,24 +201,18 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* ===总数据清洗过程===
* 1、分页处理每页10000条数据
* 2、更新Mongodb数据库中的数据
* 3、更新json存储地址的数据集数据
* 4、更新版本数据集状态
* 3、更新版本数据集状态
* ==============================
* 处理数据集
* @param cleanId 清洗任务id
* @param datasetId 清洗数据集标识
* @param cleanConfigId 清洗配置标识
* @return 清洗列表
*/
private void dealWithTaskHandler(Long datasetId, Long cleanId) {
private void dealWithTaskHandler(Long datasetId, Long cleanConfigId) {
try {
DatasetVersion datasetVersion = this.datasetVersionService.getById(datasetId);
datasetVersionService.saveDatasetInfo(datasetVersion.getVersionName());
clearFileDatasetData(datasetVersion.getFileUrl());
//记录是否有追加数据
Integer index = 0;
Long count = datasetDataService.count(datasetId);
if (count > 0) {
List<DatasetRule> rules = buildRulesList(cleanId);
List<DatasetRule> rules = buildRulesList(cleanConfigId);
int pageSize = DatasetConstant.MAX_SIZE;
int totalPages = (int) Math.ceil((double) count / pageSize);
MyPageParam param;
......@@ -227,21 +222,10 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
param.setPageSize(pageSize);
List<DatasetData> dataList = datasetDataService.list(datasetId, param);
//写入到数据集中
List<DatasetData> newDataList = dealWithDatasetNodeData(dataList, datasetId, rules);
if(CollUtil.isNotEmpty(newDataList)) {
appendDataListToFile(datasetVersion.getFileUrl() ,newDataList, i);
index ++;
if(CollUtil.isNotEmpty(dataList)) {
dealWithDatasetNodeData(dataList, datasetId, rules);
}
}
//解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[]
if(index > 0){
readJsonAppendSymbol(datasetVersion.getFileUrl());
}
//删除为空的数据集数据
this.datasetDataService.deleteByData(datasetId);
}
} catch (Exception ex) {
log.error("deal with task handler is error:" , ex);
......@@ -250,14 +234,11 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
/**
* 构建规则列表
* @param cleanId 清洗标识
* @param cleanConfigId 清洗配置标识
* @return 规则列表
*/
private List<DatasetRule> buildRulesList(Long cleanId) {
DatasetCleanConfig cleanConfig = new DatasetCleanConfig();
cleanConfig.setCleanId(cleanId);
DatasetCleanConfig datasetCleanConfig = datasetCleanConfigService.getOne(cleanConfig);
private List<DatasetRule> buildRulesList(Long cleanConfigId) {
DatasetCleanConfig datasetCleanConfig = datasetCleanConfigService.getById(cleanConfigId);
List<DatasetRule> rules = new ArrayList<>();
if(null != datasetCleanConfig) {
String[] nonEmptyJsonStrings = {datasetCleanConfig.getFilterConfig(),datasetCleanConfig.getDesensitiveConfig(),
......@@ -286,31 +267,6 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
return rules;
}
/**
* 第一个方法:清空文件
* @param filePath 文件地址
*/
public void clearFileDatasetData(String filePath) {
File file = new File(filePath);
if (file.exists()) {
FileWriter fileWriter = null;
try {
fileWriter = new FileWriter(file, true);
fileWriter.write("");
} catch (IOException e) {
log.error("clearFileDatasetData method is error:", e);
} finally {
if (fileWriter != null) {
try {
fileWriter.close();
} catch (IOException e) {
log.error("file write close is errot", e);
}
}
}
}
}
/**
* 解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[]
* @param filePath 文件地址
......@@ -464,17 +420,16 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* ===异常清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @param cleanId 清洗标识
* @param cleanConfigId 清洗任务配置id
*
* @return 清洗列表
*/
private List<DatasetDataClean> asyncCleanTaskHandler(List<DatasetData> dataList, Long cleanId) {
private List<DatasetDataClean> asyncCleanTaskHandler(List<DatasetData> dataList, Long cleanId, Long cleanConfigId) {
List<DatasetDataClean> cleans = new ArrayList<>();
try {
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig && null == cleanConfig.getCleanConfig()) return cleans;
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getById(cleanConfigId);
if(null == cleanConfig || null == cleanConfig.getCleanConfig()) return cleans;
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
......@@ -506,16 +461,16 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* ===隐私清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @param cleanId 清洗标识
* @param cleanConfigId 清洗任务配置id
* @return 清洗列表
*/
private List<DatasetDataDesensitive> asyncDesensitiveTaskHandler(List<DatasetData> dataList, Long cleanId) {
private List<DatasetDataDesensitive> asyncDesensitiveTaskHandler(List<DatasetData> dataList, Long cleanId,
Long cleanConfigId) {
List<DatasetDataDesensitive> desensitives = new ArrayList<>();
try {
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig desenstiveCfg = datasetCleanConfigService.getOne(filter);
if(null != desenstiveCfg && null == desenstiveCfg.getDesensitiveConfig()) return desensitives;
DatasetCleanConfig desenstiveCfg = datasetCleanConfigService.getById(cleanConfigId);
if(null == desenstiveCfg || null == desenstiveCfg.getDesensitiveConfig()) return desensitives;
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
......@@ -548,15 +503,15 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @param cleanConfigId 清洗任务配置id
* @return 清洗列表
*/
private List<DatasetDataDeduplicate> asyncDeduplicateTaskHandler(List<DatasetData> dataList, Long cleanId) {
private List<DatasetDataDeduplicate> asyncDeduplicateTaskHandler(List<DatasetData> dataList, Long cleanId,
Long cleanConfigId) {
List<DatasetDataDeduplicate> deduplicates = new ArrayList<>();
try {
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig deduplicateConfig = datasetCleanConfigService.getOne(filter);
if(null != deduplicateConfig && null == deduplicateConfig.getDeduplicateConfig()) return deduplicates;
DatasetCleanConfig deduplicateConfig = datasetCleanConfigService.getById(cleanConfigId);
if(null == deduplicateConfig || null == deduplicateConfig.getDeduplicateConfig()) return deduplicates;
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
......@@ -589,15 +544,15 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @param cleanConfigId 清洗任务配置id
* @return 清洗列表
*/
private List<DatasetDataFilter> asyncFilterTaskHandler(List<DatasetData> dataList, Long cleanId) {
private List<DatasetDataFilter> asyncFilterTaskHandler(List<DatasetData> dataList, Long cleanId,
Long cleanConfigId) {
List<DatasetDataFilter> filters = new ArrayList<>();
try {
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig filterConfig = datasetCleanConfigService.getOne(filter);
if(null != filterConfig && null == filterConfig.getFilterConfig()) return filters;
DatasetCleanConfig filterConfig = datasetCleanConfigService.getById(cleanConfigId);
if(null == filterConfig || null == filterConfig.getFilterConfig()) return filters;
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
......
......@@ -22,8 +22,6 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* 数据集版本数据操作服务类。
......@@ -89,16 +87,17 @@ public class DatasetDataServiceImpl implements DatasetDataService {
* 开始清洗工作
* @param datasetId 数据集对应的版本
* @param cleanId 清洗任务标识
* @param cleanConfigId 清洗配置标识
*/
@Async
@Override
public void doDatasetCleanHandler(Long datasetId, Long cleanId) {
public void doDatasetCleanHandler(Long datasetId, Long cleanId, Long cleanConfigId) {
MyPageParam param = new MyPageParam();
param.setPageNum(DatasetConstant.PAGE_NUM);
param.setPageSize(DatasetConstant.MAX_PAGE_SIZE);
List<DatasetData> dataList = this.list(datasetId, param);
if(CollUtil.isNotEmpty(dataList)) {
datasetCleanService.executeCleanTaskAsync(dataList, cleanId, datasetId);
datasetCleanService.executeCleanTaskAsync(dataList, cleanId, datasetId, cleanConfigId);
}
}
......
......@@ -100,7 +100,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
datasetVersion.setDatasetVersion(version);
datasetVersion.setVersionName(reDatasetManage.getDatasetName() + "_V" + version);
datasetVersion.setDatasetId(reDatasetManage.getDatasetId());
datasetVersion.setCleanStatus(0);
datasetVersion.setCleanStatus(DatasetConstant.NOT_CLEAN_PROGRESS);
datasetVersion.setDataVolume(0L);
datasetVersion.setEnhanceStatus(0);
datasetVersion.setInputStatus(0);
......@@ -326,7 +326,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
String versionName = datasetVersion.getVersionName();
//先存储文件
String fullName = pythonConfig.getDatasetFileBaseDir() + versionName + ".json";
this.doDealTaskHandler(datasetVersion.getVersionId(), versionName, fullName);
this.doDealTaskHandler(datasetVersion.getVersionId(), fullName);
//再存储数据集配置文件
this.saveDatasetInfo(versionName);
DatasetVersion filter = new DatasetVersion();
......@@ -346,10 +346,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
* ==============================
* 处理数据集
* @param datasetId 清洗任务id
* @param versionName 数据集名称
* @return 清洗列表
*/
private void doDealTaskHandler(Long datasetId, String versionName,String fileUrl) {
private void doDealTaskHandler(Long datasetId, String fileUrl) {
try {
Integer index = 0;
Long count = datasetDataService.count(datasetId);
......@@ -365,7 +364,8 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
//写入到数据集中
if(CollUtil.isNotEmpty(dataList)) {
datasetCleanService.appendDataListToFile(fileUrl ,dataList, i);
datasetCleanService.appendDataListToFile(fileUrl, dataList, i);
index ++;
}
}
......@@ -378,7 +378,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
this.datasetDataService.deleteByData(datasetId);
}
} catch (Exception ex) {
log.error("deal with task handler is error:" , ex);
log.error("do deal with task handler is error:" , ex);
}
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment