Commit 32843a49 authored by pengxin's avatar pengxin

清洗数据新增规则。

parent 56c625a3
...@@ -65,6 +65,11 @@ ...@@ -65,6 +65,11 @@
<artifactId>core</artifactId> <artifactId>core</artifactId>
<version>3.4.1</version> <version>3.4.1</version>
</dependency> </dependency>
<dependency>
<groupId>com.github.houbb</groupId>
<artifactId>opencc4j</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
......
package com.yice.webadmin.app.constant;
/**
* 数据清洗类常量类
*/
public class DatasetCleanConstant {
/**
* 移除不可见字符
*/
public static final String REMOVE_INVISIBLE_CHARACTER = "remove_invisible_character";
/**
* 规范化空格
*/
public static final String REPLACE_UNIFORM_WHITESPACE = "replace_uniform_whitespace";
/**
* 去除乱码
*/
public static final String REMOVE_NON_MEANING_CHARACTERS = "remove_non_meaning_characters";
/**
* 繁体转简体
*/
public static final String REPLACE_TRADITIONAL_CHINESE_TO_SIMPLIFIED = "replace_traditional_chinese_to_simplified";
/**
* 去除网页标识符
*/
public static final String REMOVE_WEB_IDENTIFIERS = "remove_web_identifiers";
/**
* 去除表情
*/
public static final String REMOVE_EMOJI = "remove_emoji";
/**
* 去除Email
*/
public static final String REPLACE_EMAILS = "replace_emails";
/**
* 去除IP地址
*/
public static final String REPLACE_IP = "replace_ip";
/**
* 去除数字
*/
public static final String REPLACE_IDENTIFIER = "replace_identifier";
}
...@@ -22,6 +22,11 @@ public class DatasetConstant { ...@@ -22,6 +22,11 @@ public class DatasetConstant {
*/ */
public static final Integer UNMARK = 0; public static final Integer UNMARK = 0;
/**
* 默认单次写入10000条数据
*/
public static final Integer MAX_SIZE = 10000;
/** /**
* 已完成状态 * 已完成状态
*/ */
...@@ -57,6 +62,11 @@ public class DatasetConstant { ...@@ -57,6 +62,11 @@ public class DatasetConstant {
*/ */
public static final String OUTPUT = "output"; public static final String OUTPUT = "output";
/**
* data数据
*/
public static final String DATA = "data";
/** /**
* 已标记 * 已标记
*/ */
...@@ -65,7 +75,17 @@ public class DatasetConstant { ...@@ -65,7 +75,17 @@ public class DatasetConstant {
/** /**
* 清洗中 * 清洗中
*/ */
public static final Integer CLEAN_PROGRESS = 1; public static final Integer CLEAN_PROGRESS = 0;
/**
* 清洗完成
*/
public static final Integer CLEAN_FINISHED = 1;
/**
* 文本数据清洗
*/
public static final Integer CLEAN_TYPE = 1;
/** /**
* 分页个数 * 分页个数
......
...@@ -10,10 +10,8 @@ import com.yice.common.core.util.MyModelUtil; ...@@ -10,10 +10,8 @@ import com.yice.common.core.util.MyModelUtil;
import com.yice.common.core.util.MyPageUtil; import com.yice.common.core.util.MyPageUtil;
import com.yice.common.log.annotation.OperationLog; import com.yice.common.log.annotation.OperationLog;
import com.yice.common.log.model.constant.SysOperationLogType; import com.yice.common.log.model.constant.SysOperationLogType;
import com.yice.webadmin.app.dto.DatasetCleanConfigDto;
import com.yice.webadmin.app.dto.DatasetCleanDto; import com.yice.webadmin.app.dto.DatasetCleanDto;
import com.yice.webadmin.app.model.DatasetClean; import com.yice.webadmin.app.model.DatasetClean;
import com.yice.webadmin.app.model.DatasetCleanConfig;
import com.yice.webadmin.app.service.DatasetCleanService; import com.yice.webadmin.app.service.DatasetCleanService;
import com.yice.webadmin.app.vo.DatasetCleanVo; import com.yice.webadmin.app.vo.DatasetCleanVo;
import io.swagger.annotations.Api; import io.swagger.annotations.Api;
...@@ -63,21 +61,35 @@ public class DatasetCleanController { ...@@ -63,21 +61,35 @@ public class DatasetCleanController {
* @param datasetCleanDto 新增对象。 * @param datasetCleanDto 新增对象。
* @return 应答结果对象,包含新增对象主键Id。 * @return 应答结果对象,包含新增对象主键Id。
*/ */
@ApiOperationSupport(ignoreParameters = {"datasetCleanDto.cleanId","datasetCleanConfigDto.cleanConfigId"}) @ApiOperationSupport(ignoreParameters = {"datasetCleanDto.cleanId"})
@OperationLog(type = SysOperationLogType.ADD_ALL) @OperationLog(type = SysOperationLogType.ADD_ALL)
@PostMapping("/addAll") @PostMapping("/startClean")
public ResponseResult<DatasetClean> addAll(@MyRequestBody DatasetCleanDto datasetCleanDto, public ResponseResult<DatasetClean> startClean(@MyRequestBody DatasetCleanDto datasetCleanDto) {
@MyRequestBody DatasetCleanConfigDto datasetCleanConfigDto) {
String errorMessage = MyCommonUtil.getModelValidationError(datasetCleanDto, false); String errorMessage = MyCommonUtil.getModelValidationError(datasetCleanDto, false);
if (errorMessage != null) { if (errorMessage != null) {
return ResponseResult.error(ErrorCodeEnum.DATA_VALIDATED_FAILED, errorMessage); return ResponseResult.error(ErrorCodeEnum.DATA_VALIDATED_FAILED, errorMessage);
} }
DatasetClean datasetClean = MyModelUtil.copyTo(datasetCleanDto, DatasetClean.class); DatasetClean datasetClean = MyModelUtil.copyTo(datasetCleanDto, DatasetClean.class);
DatasetCleanConfig datasetCleanConfig = MyModelUtil.copyTo(datasetCleanConfigDto, DatasetCleanConfig.class); datasetClean = datasetCleanService.addNew(datasetClean);
datasetClean = datasetCleanService.saveNew(datasetClean,datasetCleanConfig);
return ResponseResult.success(datasetClean); return ResponseResult.success(datasetClean);
} }
/**
* 停止数据集清洗数据。
*
* @param cleanId 新增对象。
* @return 应答结果对象,包含新增对象主键Id。
*/
@OperationLog(type = SysOperationLogType.DELETE)
@PostMapping("/stopClean")
public ResponseResult<Void> stopClean(@RequestParam Long cleanId) {
if (MyCommonUtil.existBlankArgument(cleanId)) {
return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST);
}
datasetCleanService.stopCleanTask(cleanId);
return ResponseResult.success();
}
/** /**
* 更新数据集清洗数据。 * 更新数据集清洗数据。
* *
......
...@@ -356,7 +356,7 @@ public class DatasetVersionController { ...@@ -356,7 +356,7 @@ public class DatasetVersionController {
return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST, errorMessage); return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST, errorMessage);
} }
DatasetVersion datasetVersion = this.datasetVersionService.getById(versionId); DatasetVersion datasetVersion = this.datasetVersionService.getById(versionId);
String versionName = datasetVersion.getVersionName(); String versionName = datasetVersion.getVersionName() + "_V" + datasetVersion.getDatasetVersion();
//先存储文件 //先存储文件
String fullName = this.saveDatasetFile(importFile, versionName, versionId); String fullName = this.saveDatasetFile(importFile, versionName, versionId);
//再存储数据集配置文件 //再存储数据集配置文件
......
...@@ -24,8 +24,11 @@ public class DatasetDataDeduplicate { ...@@ -24,8 +24,11 @@ public class DatasetDataDeduplicate {
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id") @ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
private Long cleanId; private Long cleanId;
@ApiModelProperty(name = "content",value = "去重内容") @ApiModelProperty(name = "clean_before_data",value = "清洗前数据")
private String content; private String cleanBeforeData;
@ApiModelProperty(name = "clean_after_data",value="清洗后数据")
private String cleanAfterData;
@ApiModelProperty(name = "create_time",value="创建时间") @ApiModelProperty(name = "create_time",value="创建时间")
private Date createTime; private Date createTime;
......
...@@ -24,8 +24,11 @@ public class DatasetDataDesensitive { ...@@ -24,8 +24,11 @@ public class DatasetDataDesensitive {
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id") @ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
private Long cleanId; private Long cleanId;
@ApiModelProperty(name = "content",value = "去隐私内容") @ApiModelProperty(name = "clean_before_data",value = "清洗前数据")
private String content; private String cleanBeforeData;
@ApiModelProperty(name = "clean_after_data",value="清洗后数据")
private String cleanAfterData;
@ApiModelProperty(name = "create_time",value="创建时间") @ApiModelProperty(name = "create_time",value="创建时间")
private Date createTime; private Date createTime;
......
package com.yice.webadmin.app.dto; package com.yice.webadmin.app.dto;
import com.yice.webadmin.app.model.DatasetCleanConfig;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
...@@ -36,6 +37,12 @@ public class DatasetCleanDto { ...@@ -36,6 +37,12 @@ public class DatasetCleanDto {
@ApiModelProperty(value = "清洗数据集名称") @ApiModelProperty(value = "清洗数据集名称")
private String datasetName; private String datasetName;
/**
* 清洗配置对象。
*/
@ApiModelProperty(value = "清洗配置对象")
private DatasetCleanConfig config;
/** /**
* 清洗方式。 * 清洗方式。
*/ */
......
...@@ -29,10 +29,16 @@ public class DatasetDataDeduplicateDto { ...@@ -29,10 +29,16 @@ public class DatasetDataDeduplicateDto {
private Long cleanId; private Long cleanId;
/** /**
* 过滤内容 * 清洗前数据
*/ */
@ApiModelProperty(value = "过滤内容") @ApiModelProperty(value = "清洗前数据")
private String content; private String cleanBeforeData;
/**
* 清洗后数据。
*/
@ApiModelProperty(value="清洗后数据")
private String cleanAfterData;
/** /**
* 创建时间。 * 创建时间。
......
...@@ -29,10 +29,16 @@ public class DatasetDataDesensitiveDto { ...@@ -29,10 +29,16 @@ public class DatasetDataDesensitiveDto {
private Long cleanId; private Long cleanId;
/** /**
* 过滤内容 * 清洗前数据
*/ */
@ApiModelProperty(value = "过滤内容") @ApiModelProperty(value = "清洗前数据")
private String content; private String cleanBeforeData;
/**
* 清洗后数据。
*/
@ApiModelProperty(value="清洗后数据")
private String cleanAfterData;
/** /**
* 创建时间。 * 创建时间。
......
...@@ -80,6 +80,12 @@ public class DatasetClean extends BaseModel { ...@@ -80,6 +80,12 @@ public class DatasetClean extends BaseModel {
@TableField(exist = false) @TableField(exist = false)
private List<String> cleanMethod; private List<String> cleanMethod;
/**
* 清洗配置对象。
*/
@TableField(exist = false)
private DatasetCleanConfig config;
/** /**
* 创建人名称字典。 * 创建人名称字典。
*/ */
......
...@@ -2,7 +2,6 @@ package com.yice.webadmin.app.service; ...@@ -2,7 +2,6 @@ package com.yice.webadmin.app.service;
import com.yice.common.core.base.service.IBaseService; import com.yice.common.core.base.service.IBaseService;
import com.yice.webadmin.app.model.DatasetClean; import com.yice.webadmin.app.model.DatasetClean;
import com.yice.webadmin.app.model.DatasetCleanConfig;
import java.util.List; import java.util.List;
...@@ -22,14 +21,19 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> { ...@@ -22,14 +21,19 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> {
*/ */
DatasetClean saveNew(DatasetClean datasetClean); DatasetClean saveNew(DatasetClean datasetClean);
/**
* 停止清洗任务
* @param cleanId 清洗任务id
*/
void stopCleanTask(Long cleanId);
/** /**
* 保存清洗对象以及清洗配置对象。 * 保存清洗对象以及清洗配置对象。
* *
* @param datasetClean 新增对象。 * @param datasetClean 新增对象。
* @param datasetCleanConfig 新增配置对象。
* @return 返回新增对象。 * @return 返回新增对象。
*/ */
DatasetClean saveNew(DatasetClean datasetClean, DatasetCleanConfig datasetCleanConfig); DatasetClean addNew(DatasetClean datasetClean);
/** /**
* 利用数据库的insertList语法,批量插入对象列表。 * 利用数据库的insertList语法,批量插入对象列表。
......
...@@ -78,6 +78,15 @@ public interface DatasetDataService { ...@@ -78,6 +78,15 @@ public interface DatasetDataService {
*/ */
void update(DatasetData datasetData); void update(DatasetData datasetData);
/**
* 批量处理数据集列表。
*
* @param dataList 批量处理数据集列表。
* @param versionId 版本标识
* @return 返回修改后的对象。
*/
void updateBatch(List<DatasetData> dataList, Long versionId);
/** /**
* 删除指定数据。 * 删除指定数据。
* *
......
...@@ -264,7 +264,8 @@ public class DatasetCleanConfigServiceImpl extends BaseService<DatasetCleanConfi ...@@ -264,7 +264,8 @@ public class DatasetCleanConfigServiceImpl extends BaseService<DatasetCleanConfi
List<Document> documents = new ArrayList<>(); List<Document> documents = new ArrayList<>();
if(CollUtil.isNotEmpty(deduplicates)) { if(CollUtil.isNotEmpty(deduplicates)) {
for(DatasetDataDeduplicate deduplicate : deduplicates) { for(DatasetDataDeduplicate deduplicate : deduplicates) {
Document document = new Document(MongoConstant.CONTENT, deduplicate.getContent()) Document document = new Document(MongoConstant.CLEAN_BEFORE_DATA, deduplicate.getCleanBeforeData())
.append(MongoConstant.CLEAN_AFTER_DATA, deduplicate.getCleanAfterData())
.append(MongoConstant.CLEAN_ID, deduplicate.getCleanId()) .append(MongoConstant.CLEAN_ID, deduplicate.getCleanId())
.append(MongoConstant.CREATE_TIME, new Date()); .append(MongoConstant.CREATE_TIME, new Date());
documents.add(document); documents.add(document);
...@@ -283,9 +284,10 @@ public class DatasetCleanConfigServiceImpl extends BaseService<DatasetCleanConfi ...@@ -283,9 +284,10 @@ public class DatasetCleanConfigServiceImpl extends BaseService<DatasetCleanConfi
public void saveDatasetDesensitive(List<DatasetDataDesensitive> desensitives) { public void saveDatasetDesensitive(List<DatasetDataDesensitive> desensitives) {
List<Document> documents = new ArrayList<>(); List<Document> documents = new ArrayList<>();
if(CollUtil.isNotEmpty(desensitives)) { if(CollUtil.isNotEmpty(desensitives)) {
for(DatasetDataDesensitive dataDesensitive : desensitives) { for(DatasetDataDesensitive desensitive : desensitives) {
Document document = new Document(MongoConstant.CONTENT, dataDesensitive.getContent()) Document document = new Document(MongoConstant.CLEAN_BEFORE_DATA, desensitive.getCleanBeforeData())
.append(MongoConstant.CLEAN_ID, dataDesensitive.getCleanId()) .append(MongoConstant.CLEAN_AFTER_DATA, desensitive.getCleanAfterData())
.append(MongoConstant.CLEAN_ID, desensitive.getCleanId())
.append(MongoConstant.CREATE_TIME, new Date()); .append(MongoConstant.CREATE_TIME, new Date());
documents.add(document); documents.add(document);
} }
......
...@@ -2,6 +2,10 @@ package com.yice.webadmin.app.service.impl; ...@@ -2,6 +2,10 @@ package com.yice.webadmin.app.service.impl;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper; import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.pagehelper.Page; import com.github.pagehelper.Page;
import com.yice.common.core.base.dao.BaseDaoMapper; import com.yice.common.core.base.dao.BaseDaoMapper;
import com.yice.common.core.base.service.BaseService; import com.yice.common.core.base.service.BaseService;
...@@ -9,11 +13,11 @@ import com.yice.common.core.object.MyPageParam; ...@@ -9,11 +13,11 @@ import com.yice.common.core.object.MyPageParam;
import com.yice.common.core.object.MyRelationParam; import com.yice.common.core.object.MyRelationParam;
import com.yice.common.core.util.MyModelUtil; import com.yice.common.core.util.MyModelUtil;
import com.yice.common.sequence.wrapper.IdGeneratorWrapper; import com.yice.common.sequence.wrapper.IdGeneratorWrapper;
import com.yice.webadmin.app.config.PythonConfig;
import com.yice.webadmin.app.constant.DatasetConstant; import com.yice.webadmin.app.constant.DatasetConstant;
import com.yice.webadmin.app.dao.DatasetCleanConfigMapper; import com.yice.webadmin.app.dao.DatasetCleanConfigMapper;
import com.yice.webadmin.app.dao.DatasetCleanMapper; import com.yice.webadmin.app.dao.DatasetCleanMapper;
import com.yice.webadmin.app.data.DatasetData; import com.yice.webadmin.app.data.*;
import com.yice.webadmin.app.data.DatasetDataFilter;
import com.yice.webadmin.app.model.DatasetClean; import com.yice.webadmin.app.model.DatasetClean;
import com.yice.webadmin.app.model.DatasetCleanConfig; import com.yice.webadmin.app.model.DatasetCleanConfig;
import com.yice.webadmin.app.model.DatasetVersion; import com.yice.webadmin.app.model.DatasetVersion;
...@@ -21,14 +25,25 @@ import com.yice.webadmin.app.service.DatasetCleanConfigService; ...@@ -21,14 +25,25 @@ import com.yice.webadmin.app.service.DatasetCleanConfigService;
import com.yice.webadmin.app.service.DatasetCleanService; import com.yice.webadmin.app.service.DatasetCleanService;
import com.yice.webadmin.app.service.DatasetDataService; import com.yice.webadmin.app.service.DatasetDataService;
import com.yice.webadmin.app.service.DatasetVersionService; import com.yice.webadmin.app.service.DatasetVersionService;
import com.yice.webadmin.app.util.DataCleanerUtil;
import com.yice.webadmin.app.util.JsonNameExtractor; import com.yice.webadmin.app.util.JsonNameExtractor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
...@@ -53,6 +68,10 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -53,6 +68,10 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private DatasetDataService datasetDataService; private DatasetDataService datasetDataService;
@Autowired @Autowired
private DatasetVersionService datasetVersionService; private DatasetVersionService datasetVersionService;
@Autowired
private PythonConfig pythonConfig;
private final ConcurrentHashMap<Long, Future<?>> futures = new ConcurrentHashMap<>();
/** /**
* 返回当前Service的主表Mapper对象。 * 返回当前Service的主表Mapper对象。
...@@ -81,23 +100,31 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -81,23 +100,31 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* 保存清洗对象以及清洗配置对象。 * 保存清洗对象以及清洗配置对象。
* *
* @param datasetClean 新增对象。 * @param datasetClean 新增对象。
* @param datasetCleanConfig 新增配置对象。
* @return 返回新增对象。 * @return 返回新增对象。
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
@Override @Override
public DatasetClean saveNew(DatasetClean datasetClean, DatasetCleanConfig datasetCleanConfig) { public DatasetClean addNew(DatasetClean datasetClean) {
datasetClean.setCleanStatus(DatasetConstant.CLEAN_PROGRESS);
datasetClean.setStartTime(new Date());
datasetClean.setCleanType(DatasetConstant.CLEAN_TYPE);
datasetCleanMapper.insert(this.buildDefaultValue(datasetClean)); datasetCleanMapper.insert(this.buildDefaultValue(datasetClean));
datasetCleanConfig.setCleanId(datasetClean.getCleanId());
datasetCleanConfig.setCleanConfigId(idGenerator.nextLongId());
MyModelUtil.fillCommonsForInsert(datasetCleanConfig); DatasetCleanConfig datasetCleanConfig = datasetClean.getConfig();
datasetCleanConfigMapper.insert(datasetCleanConfig); if(null != datasetCleanConfig) {
datasetCleanConfig.setCleanId(datasetClean.getCleanId());
datasetCleanConfig.setCleanConfigId(idGenerator.nextLongId());
MyModelUtil.fillCommonsForInsert(datasetCleanConfig);
datasetCleanConfigMapper.insert(datasetCleanConfig);
}
DatasetVersion datasetVersion = new DatasetVersion(); DatasetVersion datasetVersion = new DatasetVersion();
datasetVersion.setVersionId(datasetClean.getDatasetId()); datasetVersion.setVersionId(datasetClean.getDatasetId());
datasetVersion.setCleanStatus(DatasetConstant.CLEAN_PROGRESS); datasetVersion.setCleanStatus(DatasetConstant.CLEAN_PROGRESS);
datasetVersionService.updateById(datasetVersion); datasetVersionService.updateById(datasetVersion);
//TODO:未完成
//doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId());
return datasetClean; return datasetClean;
} }
...@@ -111,22 +138,332 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -111,22 +138,332 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
param.setPageSize(DatasetConstant.MAX_PAGE_SIZE); param.setPageSize(DatasetConstant.MAX_PAGE_SIZE);
List<DatasetData> dataList = datasetDataService.list(datasetId, param); List<DatasetData> dataList = datasetDataService.list(datasetId, param);
if(CollUtil.isNotEmpty(dataList)) { if(CollUtil.isNotEmpty(dataList)) {
Future<Void> future = executeCleanTaskAsync(dataList, cleanId, datasetId);
futures.put(cleanId, future);
}
}
@Async("taskExecutor")
public Future<Void> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId);
dealWithTaskHandler(datasetId,cleanId);
dealWithDatasetFileDataHandler(datasetId);
DatasetClean filter = new DatasetClean();
filter.setCleanStatus(DatasetConstant.CLEAN_FINISHED);
filter.setFinishTime(new Date());
this.updateById(filter);
return new AsyncResult<>(null);
}
/**
* 只异步处理前100条数据清洗数据
* @param dataList 清洗列表
* @param cleanId 清洗标识
*/
private void asyncDealWithDatasetSaveBatch(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataClean> cleans = asyncCleanTaskHandler(dataList, cleanId);
this.datasetCleanConfigService.saveDatasetClean(cleans);
List<DatasetDataFilter> filters = asyncFilterTaskHandler(dataList, cleanId);
this.datasetCleanConfigService.saveDatasetFilter(filters);
List<DatasetDataDeduplicate> deduplicates = asyncDeduplicateTaskHandler(dataList, cleanId);
this.datasetCleanConfigService.saveDatasetDeduplicate(deduplicates);
List<DatasetDataDesensitive> desensitives = asyncDesensitiveTaskHandler(dataList, cleanId);
this.datasetCleanConfigService.saveDatasetDesensitive(desensitives);
}
/**
* 停止清洗任务
* @param cleanId 清洗任务id
*/
@Override
public void stopCleanTask(Long cleanId) {
Future<?> future = futures.remove(cleanId);
if (future != null && !future.isDone()) {
future.cancel(true);
} }
datasetCleanConfigService.saveDatasetClean(null);
datasetCleanConfigService.saveDatasetDeduplicate(null);
} }
private void doCleanDatasetHandler(List<DatasetData> dataList, Long cleanId) { /**
if(CollUtil.isNotEmpty(dataList)) { * ==============================
for(DatasetData datasetData: dataList) { * ===总数据清洗过程===
DatasetCleanConfig filter = new DatasetCleanConfig(); * 1、分页处理每页10000条数据
filter.setCleanId(cleanId); * 2、更新Mongodb数据库中的数据
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter); * 3、更新json存储地址的数据集数据
} * ==============================
* 处理数据集
* @param cleanId 清洗任务id
* @return 清洗列表
*/
private void dealWithTaskHandler(Long datasetId, Long cleanId) {
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig config = datasetCleanConfigService.getOne(filter);
if(null != config) {
rules.add(config.getFilterConfig());
rules.add(config.getDesensitiveConfig());
rules.add(config.getDeduplicateConfig());
rules.add(config.getCleanConfig());
rules = JsonNameExtractor.extractNames(rules);
}
Long count = datasetDataService.count(datasetId);
int pageSize = DatasetConstant.MAX_SIZE;
int totalPages = (int) Math.ceil((double) count / pageSize);
MyPageParam param = null;
for (int i = 1; i <= totalPages; i++) {
param = new MyPageParam();
param.setPageNum(i);
param.setPageSize(pageSize);
dealWithDatasetNodeData(datasetDataService.list(datasetId, param), datasetId, rules);
}
} catch (Exception ex) {
log.error("deal with task handler is error:" , ex);
} }
} }
/**
* 写入到json中的配置文件中
* @param datasetId 数据集Id
*/
private void dealWithDatasetFileDataHandler(Long datasetId) {
try {
DatasetVersion datasetVersion = this.datasetVersionService.getById(datasetId);
String versionName = datasetVersion.getVersionName();
//再存储数据集配置文件
datasetVersionService.saveDatasetInfo(versionName);
}catch (Exception ex){
log.error("deal with dataset node data:", ex);
}
}
/**
* 保存导入文件。
*
* @param importFile 导入的文件。
* @return 保存的本地文件名。
*/
private String saveDatasetFile(MultipartFile importFile, String versionName, Long versionId) throws IOException {
String fullName = pythonConfig.getDatasetFileBaseDir() + versionName + ".json";
try {
byte[] bytes = importFile.getBytes();
Path path = Paths.get(fullName);
// 如果没有files文件夹,则创建
if (!Files.isWritable(path)) {
Files.createDirectories(Paths.get(pythonConfig.getDatasetFileBaseDir()));
}
// 文件写入指定路径、应该是追加到文件里面
Files.write(path, bytes);
} catch (IOException e) {
log.error("Failed to write imported file [" + importFile.getOriginalFilename() + " ].", e);
throw e;
}
return fullName;
}
/**
* 处理data节点中的
* @param dataList 数据集列表
* @param datasetId 数据集标识
* @param rules 规则列表
*/
private void dealWithDatasetNodeData(List<DatasetData> dataList, Long datasetId, List<String> rules) {
try {
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
for (DatasetData datasetData : dataList) {
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
String output = DataCleanerUtil.buildCleanAfterData(datasetData.getData(),rules);
datasetData.setData(createNewDataNode(data, output));
}
this.datasetDataService.updateBatch(dataList, datasetId);
}
}catch (JsonProcessingException ex){
log.error("deal with dataset node data:", ex);
}
}
/**
* 创建新的数据节点
* @param json Json对象
* @param output 输出对象
* @return 新的data数据
* @throws JsonProcessingException
*/
private String createNewDataNode(String json, String output) throws JsonProcessingException{
String modifiedJson = null;
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(json);
// 获取data节点
JsonNode dataNode = rootNode.get(DatasetConstant.DATA);
if (dataNode != null && dataNode.isObject()) {
// 转换data节点为ObjectNode以便修改
ObjectNode dataObjectNode = (ObjectNode) dataNode;
// 替换output字段的值
dataObjectNode.put(DatasetConstant.OUTPUT, output);
// 将修改后的JSON转换回字符串
modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode);
}
return modifiedJson;
}
/**
* ===异常清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
*
* @return 清洗列表
*/
private List<DatasetDataClean> asyncCleanTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataClean> cleans = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig) {
rules = JsonNameExtractor.extractNames(cleanConfig.getCleanConfig());
}
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
for(DatasetData datasetData: dataList) {
DatasetDataClean dataClean = new DatasetDataClean();
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
dataClean.setCleanBeforeData(data);
dataClean.setCleanAfterData(DataCleanerUtil.buildCleanAfterData(data,rules));
dataClean.setCleanId(cleanId);
dataClean.setCreateTime(new Date());
cleans.add(dataClean);
}
}
} catch (Exception ex) {
log.error("Async clean task handler is error:" , ex);
}
return cleans;
}
/**
* ===隐私清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @return 清洗列表
*/
private List<DatasetDataDesensitive> asyncDesensitiveTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataDesensitive> desensitives = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig) {
rules = JsonNameExtractor.extractNames(cleanConfig.getDesensitiveConfig());
}
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
DatasetDataDesensitive desensitive = new DatasetDataDesensitive();
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
desensitive.setCleanBeforeData(data);
desensitive.setCleanAfterData(DataCleanerUtil.buildCleanAfterData(data,rules));
desensitive.setCleanId(cleanId);
desensitive.setCreateTime(new Date());
desensitives.add(desensitive);
}
}
} catch (Exception ex) {
log.error("Async desensitive task handler is error:" , ex);
}
return desensitives;
}
/**
* ===去重清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @return 清洗列表
*/
private List<DatasetDataDeduplicate> asyncDeduplicateTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataDeduplicate> deduplicates = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig deduplicateConfig = datasetCleanConfigService.getOne(filter);
if(null != deduplicateConfig) {
rules = JsonNameExtractor.extractNames(deduplicateConfig.getDeduplicateConfig());
}
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
DatasetDataDeduplicate deduplicate = new DatasetDataDeduplicate();
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
deduplicate.setCleanBeforeData(data);
deduplicate.setCleanAfterData(DataCleanerUtil.buildCleanAfterData(data, rules));
deduplicate.setCleanId(cleanId);
deduplicate.setCreateTime(new Date());
deduplicates.add(deduplicate);
}
}
} catch (Exception ex) {
log.error("Async deduplicate task handler is error:" , ex);
}
return deduplicates;
}
/**
* ===过滤清洗===
* 处理数据集
* @param dataList 清洗数据集列表
* @param cleanId 清洗任务id
* @return 清洗列表
*/
private List<DatasetDataFilter> asyncFilterTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataFilter> filters = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig filterConfig = datasetCleanConfigService.getOne(filter);
if(null != filterConfig) {
rules = JsonNameExtractor.extractNames(filterConfig.getFilterConfig());
}
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
DatasetDataFilter dataFilter = new DatasetDataFilter();
dataFilter.setCleanId(cleanId);
dataFilter.setCreateTime(new Date());
//TODO
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
dataFilter.setContent(data);
filters.add(dataFilter);
}
}
} catch (Exception ex) {
log.error("Async filter task handler is error:" , ex);
}
return filters;
}
/** /**
* 利用数据库的insertList语法,批量插入对象列表。 * 利用数据库的insertList语法,批量插入对象列表。
* *
...@@ -204,7 +541,7 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -204,7 +541,7 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
/** /**
* 构建数据集方法 * 构建数据集方法
* @param datasetCleanList 数据列表 * @param datasetCleanList 数据清洗列表
*/ */
private void buildDatasetMethod(List<DatasetClean> datasetCleanList){ private void buildDatasetMethod(List<DatasetClean> datasetCleanList){
List<String> jsonStrings = new ArrayList<>(); List<String> jsonStrings = new ArrayList<>();
...@@ -213,14 +550,14 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -213,14 +550,14 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
DatasetCleanConfig filter = new DatasetCleanConfig(); DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(datasetClean.getCleanId()); filter.setCleanId(datasetClean.getCleanId());
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter); DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig) { if(null != cleanConfig) {
jsonStrings.add(cleanConfig.getCleanConfig()); jsonStrings.add(cleanConfig.getCleanConfig());
jsonStrings.add(cleanConfig.getDeduplicateConfig()); jsonStrings.add(cleanConfig.getDeduplicateConfig());
jsonStrings.add(cleanConfig.getFilterConfig()); jsonStrings.add(cleanConfig.getFilterConfig());
jsonStrings.add(cleanConfig.getDesensitiveConfig()); jsonStrings.add(cleanConfig.getDesensitiveConfig());
List<String> nonEmptyJsonStrings = jsonStrings.stream() List<String> nonEmptyJsonStrings = jsonStrings.stream()
.filter(s -> s != null && !s.isEmpty()) .filter(s -> s != null && !s.isEmpty()).collect(Collectors.toList());
.collect(Collectors.toList());
datasetClean.setCleanMethod(JsonNameExtractor.extractNames(nonEmptyJsonStrings)); datasetClean.setCleanMethod(JsonNameExtractor.extractNames(nonEmptyJsonStrings));
} }
} }
......
...@@ -148,6 +148,18 @@ public class DatasetDataServiceImpl implements DatasetDataService { ...@@ -148,6 +148,18 @@ public class DatasetDataServiceImpl implements DatasetDataService {
MongoConstant.COLLECT_NAME + datasetData.getVersionId()); MongoConstant.COLLECT_NAME + datasetData.getVersionId());
} }
/**
* 更新数据对象。
*
* @param dataList 更新的对象。
* @param versionId 更新的对象。
* @return 成功返回true,否则false。
*/
@Override
public void updateBatch(List<DatasetData> dataList, Long versionId) {
mongoTemplate.save(dataList, MongoConstant.COLLECT_NAME + versionId);
}
/** /**
* 删除指定数据。 * 删除指定数据。
* *
......
package com.yice.webadmin.app.util;
import com.github.houbb.opencc4j.util.ZhConverterUtil;
import com.yice.webadmin.app.constant.DatasetCleanConstant;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j
public class DataCleanerUtil {
/**
* 定义清洗后的数据
*
* @param data 清洗数据
* @param rules 清洗规则
* @return 返回清洗后的数据
*/
public static String buildCleanAfterData(String data, List<String> rules) {
for (String rule : rules) {
switch (rule) {
case DatasetCleanConstant.REMOVE_INVISIBLE_CHARACTER:
data = data.replaceAll("[\\p{C}]", "&nbsp;");
break;
case DatasetCleanConstant.REPLACE_UNIFORM_WHITESPACE:
data = data.replaceAll("[\\p{Cs}\\p{Co}\\p{Cn}]", "");
break;
case DatasetCleanConstant.REMOVE_NON_MEANING_CHARACTERS:
data = data.replaceAll("[\\p{Z}\\u2000-\\u200A\\u2028\\u2029\\u3000]", "");
break;
case DatasetCleanConstant.REPLACE_TRADITIONAL_CHINESE_TO_SIMPLIFIED:
data = ZhConverterUtil.toSimple(data);
break;
case DatasetCleanConstant.REMOVE_WEB_IDENTIFIERS:
data = data.replaceAll("<[^>]*>", "");
break;
case DatasetCleanConstant.REMOVE_EMOJI:
data = data.replaceAll("[\\ud83c[\\udffb-\\udfff]|\\ud83d[\\udc00-\\ude4f]|\\ud83d[\\ude80-\\udeff]|\\ud83e[\\udd10-\\uddff]]", "");
break;
case DatasetCleanConstant.REPLACE_EMAILS:
data = data.replaceAll("\\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\\.[A-Z|a-z]{2,}\\b", "EMAIL");
break;
case DatasetCleanConstant.REPLACE_IP:
data = data.replaceAll("\\b(25[0-5]\\.|2[0-4][0-9]\\.|[01]?[0-9][0-9]?\\.){3}(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\\b", "IP_ADDRESS");
break;
case DatasetCleanConstant.REPLACE_IDENTIFIER:
data = data.replaceAll("\\d+", "PI:KEY");
break;
}
}
return data;
}
}
...@@ -38,4 +38,28 @@ public class JsonNameExtractor { ...@@ -38,4 +38,28 @@ public class JsonNameExtractor {
return names; return names;
} }
/**
* 拼接名字
* @param rule json名字
* @return 名字列表
*/
public static List<String> extractNames(String rule) {
ObjectMapper mapper = new ObjectMapper();
List<String> names = new ArrayList<>();
try {
JsonNode rootNode = mapper.readTree(rule);
if (rootNode.isArray()) {
for (JsonNode jsonNode : rootNode) {
JsonNode nameNode = jsonNode.get("name");
if (nameNode != null && nameNode.isTextual()) {
names.add(nameNode.asText());
}
}
}
} catch (IOException e) {
log.error("extract name method overload is error", e);
}
return names;
}
} }
package com.yice.webadmin.app.vo; package com.yice.webadmin.app.vo;
import com.yice.webadmin.app.model.DatasetCleanConfig;
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
...@@ -42,6 +43,12 @@ public class DatasetCleanVo { ...@@ -42,6 +43,12 @@ public class DatasetCleanVo {
@ApiModelProperty(value = "清洗数据集名称") @ApiModelProperty(value = "清洗数据集名称")
private String datasetName; private String datasetName;
/**
* 清洗配置对象。
*/
@ApiModelProperty(value = "清洗配置对象")
private DatasetCleanConfig config;
/** /**
* 开始时间。 * 开始时间。
*/ */
......
...@@ -29,10 +29,16 @@ public class DatasetDataDeduplicateVo { ...@@ -29,10 +29,16 @@ public class DatasetDataDeduplicateVo {
private Long cleanId; private Long cleanId;
/** /**
* 过滤内容 * 清洗前数据
*/ */
@ApiModelProperty(value = "过滤内容") @ApiModelProperty(value = "清洗前数据")
private String content; private String cleanBeforeData;
/**
* 清洗后数据。
*/
@ApiModelProperty(value="清洗后数据")
private String cleanAfterData;
/** /**
* 创建时间。 * 创建时间。
......
...@@ -29,10 +29,16 @@ public class DatasetDataDesensitiveVo { ...@@ -29,10 +29,16 @@ public class DatasetDataDesensitiveVo {
private Long cleanId; private Long cleanId;
/** /**
* 过滤内容 * 清洗前数据
*/ */
@ApiModelProperty(value = "过滤内容") @ApiModelProperty(value = "清洗前数据")
private String content; private String cleanBeforeData;
/**
* 清洗后数据。
*/
@ApiModelProperty(value="清洗后数据")
private String cleanAfterData;
/** /**
* 创建时间。 * 创建时间。
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment