Commit 0634e753 authored by pengxin's avatar pengxin

添加数据清洗功能。

parent 32843a49
package com.yice.webadmin.app.data;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.Date;
@Data
@ApiModel
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "dataset_data")
public class DatasetData {
/**
* 文档标识
*/
@Id
@ApiModelProperty(name = "_id",value = "文档标识")
private String id;
@ApiModelProperty(name = "version_id",value = "版本标识")
/**
* 版本标识
*/
@Field("version_id")
private Long versionId;
@ApiModelProperty(name = "data",value = "json格式数据")
/**
* json格式数据
*/
@Field("data")
private String data;
@ApiModelProperty(name = "create_time",value="创建时间")
/**
* 创建时间
*/
@Field("create_time")
private Date createTime;
@ApiModelProperty(name = "mark_status",value="标记状态")
/**
* 标记状态
*/
@Field("mark_status")
private Integer markStatus;
}
package com.yice.webadmin.app.data;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.Date;
@Data
@ApiModel
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "dataset_data_clean")
public class DatasetDataClean {
/**
* 文档标识
*/
@Id
@ApiModelProperty(name = "_id",value = "文档标识")
private String id;
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
/**
* 清洗任务标识id
*/
@Field("clean_id")
private Long cleanId;
@ApiModelProperty(name = "clean_before_data",value = "清洗前数据")
/**
* 清洗前数据
*/
@Field("clean_before_data")
private String cleanBeforeData;
@ApiModelProperty(name = "clean_after_data",value="清洗后数据")
/**
* 清洗后数据
*/
@Field("clean_after_data")
private String cleanAfterData;
@ApiModelProperty(name = "create_time",value="创建时间")
/**
* 创建时间
*/
@Field("create_time")
private Date createTime;
}
......@@ -7,6 +7,7 @@ import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.Date;
......@@ -17,19 +18,33 @@ import java.util.Date;
@Document(collection = "dataset_data_deduplicate")
public class DatasetDataDeduplicate {
/**
* 文档标识
*/
@Id
@ApiModelProperty(name = "_id",value = "文档标识")
private String id;
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
/**
* 清洗任务标识id
*/
@Field("clean_id")
private Long cleanId;
@ApiModelProperty(name = "clean_before_data",value = "清洗前数据")
/**
* 清洗前数据
*/
@Field("clean_before_data")
private String cleanBeforeData;
@ApiModelProperty(name = "clean_after_data",value="清洗后数据")
/**
* 清洗后数据
*/
@Field("clean_after_data")
private String cleanAfterData;
@ApiModelProperty(name = "create_time",value="创建时间")
/**
* 创建时间
*/
@Field("create_time")
private Date createTime;
}
package com.yice.webadmin.app.data;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.Date;
@Data
@ApiModel
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "dataset_data_desensitive")
public class DatasetDataDesensitive {
/**
* 文档标识
*/
@Id
@ApiModelProperty(name = "_id",value = "文档标识")
private String id;
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
/**
* 清洗任务标识id
*/
@Field("clean_id")
private Long cleanId;
@ApiModelProperty(name = "clean_before_data",value = "清洗前数据")
/**
* 清洗前数据
*/
@Field("clean_before_data")
private String cleanBeforeData;
@ApiModelProperty(name = "clean_after_data",value="清洗后数据")
/**
* 清洗后数据
*/
@Field("clean_after_data")
private String cleanAfterData;
@ApiModelProperty(name = "create_time",value="创建时间")
/**
* 创建时间
*/
@Field("create_time")
private Date createTime;
}
package com.yice.webadmin.app.data;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
import org.springframework.data.mongodb.core.mapping.Field;
import java.util.Date;
@Data
@ApiModel
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "dataset_data_filter")
public class DatasetDataFilter {
/**
* 文档标识
*/
@Id
@ApiModelProperty(name = "_id",value = "文档标识")
private String id;
@ApiModelProperty(name = "clean_id",value = "清洗任务标识id")
/**
* 清洗任务标识id
*/
@Field("clean_id")
private Long cleanId;
@ApiModelProperty(name = "content",value = "过滤内容")
/**
* 清洗后数据
*/
@Field("content")
private String content;
@ApiModelProperty(name = "create_time",value="创建时间")
/**
* 创建时间
*/
@Field("create_time")
private Date createTime;
}
......@@ -13,7 +13,6 @@ import com.yice.common.core.object.MyPageParam;
import com.yice.common.core.object.MyRelationParam;
import com.yice.common.core.util.MyModelUtil;
import com.yice.common.sequence.wrapper.IdGeneratorWrapper;
import com.yice.webadmin.app.config.PythonConfig;
import com.yice.webadmin.app.constant.DatasetConstant;
import com.yice.webadmin.app.dao.DatasetCleanConfigMapper;
import com.yice.webadmin.app.dao.DatasetCleanMapper;
......@@ -33,12 +32,10 @@ import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
......@@ -68,8 +65,6 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private DatasetDataService datasetDataService;
@Autowired
private DatasetVersionService datasetVersionService;
@Autowired
private PythonConfig pythonConfig;
private final ConcurrentHashMap<Long, Future<?>> futures = new ConcurrentHashMap<>();
......@@ -123,11 +118,22 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
datasetVersion.setCleanStatus(DatasetConstant.CLEAN_PROGRESS);
datasetVersionService.updateById(datasetVersion);
//TODO:未完成
//doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId());
doDatasetCleanHandler(datasetClean.getDatasetId(), datasetClean.getCleanId());
return datasetClean;
}
@Async("taskExecutor")
public Future<Void> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId);
dealWithTaskHandler(datasetId,cleanId);
DatasetClean filter = new DatasetClean();
filter.setCleanStatus(DatasetConstant.CLEAN_FINISHED);
filter.setFinishTime(new Date());
this.updateById(filter);
return new AsyncResult<>(null);
}
/**
* 清洗100个样本
* @param datasetId 数据集对应的版本
......@@ -143,19 +149,6 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
}
}
@Async("taskExecutor")
public Future<Void> executeCleanTaskAsync(List<DatasetData> dataList, Long cleanId, Long datasetId) {
asyncDealWithDatasetSaveBatch(dataList, cleanId);
dealWithTaskHandler(datasetId,cleanId);
dealWithDatasetFileDataHandler(datasetId);
DatasetClean filter = new DatasetClean();
filter.setCleanStatus(DatasetConstant.CLEAN_FINISHED);
filter.setFinishTime(new Date());
this.updateById(filter);
return new AsyncResult<>(null);
}
/**
* 只异步处理前100条数据清洗数据
* @param dataList 清洗列表
......@@ -206,18 +199,27 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
rules.add(config.getDesensitiveConfig());
rules.add(config.getDeduplicateConfig());
rules.add(config.getCleanConfig());
rules = rules.stream()
.filter(rule -> rule != null && !rule.isEmpty())
.collect(Collectors.toList());
rules = JsonNameExtractor.extractNames(rules);
}
DatasetVersion datasetVersion = this.datasetVersionService.getById(datasetId);
datasetVersionService.saveDatasetInfo(datasetVersion.getVersionName());
clearFileDatasetData(datasetVersion.getFileUrl());
Long count = datasetDataService.count(datasetId);
int pageSize = DatasetConstant.MAX_SIZE;
int totalPages = (int) Math.ceil((double) count / pageSize);
MyPageParam param = null;
MyPageParam param;
for (int i = 1; i <= totalPages; i++) {
param = new MyPageParam();
param.setPageNum(i);
param.setPageSize(pageSize);
dealWithDatasetNodeData(datasetDataService.list(datasetId, param), datasetId, rules);
List<DatasetData> dataList = datasetDataService.list(datasetId, param);
dealWithDatasetNodeData(dataList, datasetId, rules);
appendDataListToFile(datasetVersion.getFileUrl() ,dataList);
}
} catch (Exception ex) {
log.error("deal with task handler is error:" , ex);
......@@ -225,42 +227,56 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
}
/**
* 写入到json中的配置文件中
* @param datasetId 数据集Id
* 第一个方法:清空文件
* @param filePath 文件地址
*/
private void dealWithDatasetFileDataHandler(Long datasetId) {
try {
DatasetVersion datasetVersion = this.datasetVersionService.getById(datasetId);
String versionName = datasetVersion.getVersionName();
//再存储数据集配置文件
datasetVersionService.saveDatasetInfo(versionName);
}catch (Exception ex){
log.error("deal with dataset node data:", ex);
public void clearFileDatasetData(String filePath) {
File file = new File(filePath);
if (file.exists()) {
FileWriter fileWriter = null;
try {
fileWriter = new FileWriter(file, true);
fileWriter.write("");
} catch (IOException e) {
log.error("clearFileDatasetData method is error:", e);
} finally {
if (fileWriter != null) {
try {
fileWriter.close();
} catch (IOException e) {
log.error("file write close is errot", e);
}
}
}
}
}
/**
* 保存导入文件。
*
* @param importFile 导入的文件。
* @return 保存的本地文件名。
* 第二个方法:将数据列表追加到文件
* @param filePath 文件地址
* @param dataList 数据集列表
*/
private String saveDatasetFile(MultipartFile importFile, String versionName, Long versionId) throws IOException {
String fullName = pythonConfig.getDatasetFileBaseDir() + versionName + ".json";
public void appendDataListToFile(String filePath, List<DatasetData> dataList) {
FileWriter fileWriter = null;
try {
byte[] bytes = importFile.getBytes();
Path path = Paths.get(fullName);
// 如果没有files文件夹,则创建
if (!Files.isWritable(path)) {
Files.createDirectories(Paths.get(pythonConfig.getDatasetFileBaseDir()));
fileWriter = new FileWriter(filePath, true);
// 遍历你的数据列表,并将每一条数据写入到文件中
for (DatasetData data : dataList) {
fileWriter.write(data.getData());
fileWriter.write("\n");
}
// 文件写入指定路径、应该是追加到文件里面
Files.write(path, bytes);
} catch (IOException e) {
log.error("Failed to write imported file [" + importFile.getOriginalFilename() + " ].", e);
throw e;
log.error("file write close is errot", e);
} finally {
// 如果fileWriter不为空,关闭它
if (fileWriter != null) {
try {
fileWriter.close();
} catch (IOException e) {
log.error("file write close is errot", e);
}
}
}
return fullName;
}
/**
......@@ -276,8 +292,8 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
for (DatasetData datasetData : dataList) {
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
String output = DataCleanerUtil.buildCleanAfterData(datasetData.getData(),rules);
datasetData.setData(createNewDataNode(data, output));
String output = DataCleanerUtil.buildCleanAfterData(data, rules);
datasetData.setData(createNewDataNode(datasetData.getData(), output));
}
this.datasetDataService.updateBatch(dataList, datasetId);
}
......@@ -291,27 +307,27 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* @param json Json对象
* @param output 输出对象
* @return 新的data数据
* @throws JsonProcessingException
* @throws JsonProcessingException 转换异常
*/
private String createNewDataNode(String json, String output) throws JsonProcessingException{
String modifiedJson = null;
ObjectMapper mapper = new ObjectMapper();
JsonNode rootNode = mapper.readTree(json);
// 获取data节点
JsonNode dataNode = rootNode.get(DatasetConstant.DATA);
if (dataNode != null && dataNode.isObject()) {
// 检查output字段是否存在
if (rootNode.has(DatasetConstant.OUTPUT)) {
// 转换data节点为ObjectNode以便修改
ObjectNode dataObjectNode = (ObjectNode) dataNode;
// 转换root节点为ObjectNode以便修改
ObjectNode objectNode = (ObjectNode) rootNode;
// 替换output字段的值
dataObjectNode.put(DatasetConstant.OUTPUT, output);
objectNode.put(DatasetConstant.OUTPUT, output);
// 将修改后的JSON转换回字符串
modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(rootNode);
}
modifiedJson = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(objectNode);
System.out.println(modifiedJson);
}
return modifiedJson;
}
......@@ -326,14 +342,11 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private List<DatasetDataClean> asyncCleanTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataClean> cleans = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig) {
rules = JsonNameExtractor.extractNames(cleanConfig.getCleanConfig());
}
if(null != cleanConfig && null == cleanConfig.getCleanConfig()) return cleans;
List<String> rules = JsonNameExtractor.extractNames(cleanConfig.getCleanConfig());
if(CollUtil.isNotEmpty(dataList)) {
ObjectMapper objectMapper = new ObjectMapper();
for(DatasetData datasetData: dataList) {
......@@ -363,20 +376,17 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private List<DatasetDataDesensitive> asyncDesensitiveTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataDesensitive> desensitives = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig cleanConfig = datasetCleanConfigService.getOne(filter);
if(null != cleanConfig) {
rules = JsonNameExtractor.extractNames(cleanConfig.getDesensitiveConfig());
}
if(null != cleanConfig && null == cleanConfig.getDesensitiveConfig()) return desensitives;
List<String> rules = JsonNameExtractor.extractNames(cleanConfig.getDesensitiveConfig());
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
DatasetDataDesensitive desensitive = new DatasetDataDesensitive();
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
JsonNode node = objectMapper.readTree(datasetData.getData());
String data = node.get(DatasetConstant.OUTPUT).textValue();
desensitive.setCleanBeforeData(data);
desensitive.setCleanAfterData(DataCleanerUtil.buildCleanAfterData(data,rules));
desensitive.setCleanId(cleanId);
......@@ -400,20 +410,17 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private List<DatasetDataDeduplicate> asyncDeduplicateTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataDeduplicate> deduplicates = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig deduplicateConfig = datasetCleanConfigService.getOne(filter);
if(null != deduplicateConfig) {
rules = JsonNameExtractor.extractNames(deduplicateConfig.getDeduplicateConfig());
}
if(null != deduplicateConfig && null == deduplicateConfig.getDeduplicateConfig()) return deduplicates;
List<String> rules = JsonNameExtractor.extractNames(deduplicateConfig.getDeduplicateConfig());
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
DatasetDataDeduplicate deduplicate = new DatasetDataDeduplicate();
JsonNode rootNode = objectMapper.readTree(datasetData.getData());
String data = rootNode.get(DatasetConstant.OUTPUT).textValue();
JsonNode jsonNode = objectMapper.readTree(datasetData.getData());
String data = jsonNode.get(DatasetConstant.OUTPUT).textValue();
deduplicate.setCleanBeforeData(data);
deduplicate.setCleanAfterData(DataCleanerUtil.buildCleanAfterData(data, rules));
deduplicate.setCleanId(cleanId);
......@@ -437,14 +444,11 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
private List<DatasetDataFilter> asyncFilterTaskHandler(List<DatasetData> dataList, Long cleanId) {
List<DatasetDataFilter> filters = new ArrayList<>();
try {
List<String> rules = new ArrayList<>();
DatasetCleanConfig filter = new DatasetCleanConfig();
filter.setCleanId(cleanId);
DatasetCleanConfig filterConfig = datasetCleanConfigService.getOne(filter);
if(null != filterConfig) {
rules = JsonNameExtractor.extractNames(filterConfig.getFilterConfig());
}
if(null != filterConfig && null == filterConfig.getFilterConfig()) return filters;
List<String> rules = JsonNameExtractor.extractNames(filterConfig.getFilterConfig());
ObjectMapper objectMapper = new ObjectMapper();
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData: dataList) {
......
package com.yice.webadmin.app.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.yice.common.core.object.MyPageParam;
......@@ -13,6 +14,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
......@@ -157,7 +159,22 @@ public class DatasetDataServiceImpl implements DatasetDataService {
*/
@Override
public void updateBatch(List<DatasetData> dataList, Long versionId) {
mongoTemplate.save(dataList, MongoConstant.COLLECT_NAME + versionId);
if(CollUtil.isNotEmpty(dataList)) {
for(DatasetData datasetData : dataList) {
// 解析data字段的字符串为Document或Bson
Document dataDocument = Document.parse(datasetData.getData());
// 构建查询条件
Query query = new Query(Criteria.where(MongoConstant.ID).is(datasetData.getId()));
// 构建更新操作
Update update = new Update();
update.set(MongoConstant.DATA, dataDocument);
// 执行更新操作
mongoTemplate.updateFirst(query, update, MongoConstant.COLLECT_NAME + versionId);
}
}
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment