Commit 1ff4ada4 authored by linpeiqin's avatar linpeiqin

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	application-webadmin/src/main/java/com/yice/webadmin/app/service/impl/DatasetVersionServiceImpl.java
parents f060228d a6c3090d
...@@ -45,7 +45,6 @@ import org.springframework.web.multipart.MultipartFile; ...@@ -45,7 +45,6 @@ import org.springframework.web.multipart.MultipartFile;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.*; import java.util.*;
...@@ -335,13 +334,8 @@ public class DatasetVersionController { ...@@ -335,13 +334,8 @@ public class DatasetVersionController {
errorMessage = "数据验证失败,导入文件不能为空!"; errorMessage = "数据验证失败,导入文件不能为空!";
return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST, errorMessage); return ResponseResult.error(ErrorCodeEnum.ARGUMENT_NULL_EXIST, errorMessage);
} }
this.saveMongoDB(importFile,versionId);
DatasetVersion datasetVersion = this.datasetVersionService.getById(versionId); DatasetVersion datasetVersion = this.datasetVersionService.getById(versionId);
String versionName = datasetVersion.getVersionName() + "_V" + datasetVersion.getDatasetVersion();
//先存储文件
String fullName = this.saveDatasetFile(importFile, versionName, versionId);
//再存储数据集配置文件
datasetVersionService.saveDatasetInfo(versionName);
datasetVersion.setFileUrl(fullName);
datasetVersion.setInputStatus(DatasetConstant.INPUT_STATUS); datasetVersion.setInputStatus(DatasetConstant.INPUT_STATUS);
datasetVersion.setDataVolume(Long.valueOf(JSON.parseArray(new String(importFile.getBytes(), StandardCharsets.UTF_8)).size())); datasetVersion.setDataVolume(Long.valueOf(JSON.parseArray(new String(importFile.getBytes(), StandardCharsets.UTF_8)).size()));
this.datasetVersionService.updateById(datasetVersion); this.datasetVersionService.updateById(datasetVersion);
...@@ -349,29 +343,19 @@ public class DatasetVersionController { ...@@ -349,29 +343,19 @@ public class DatasetVersionController {
} }
/** /**
* 保存导入文件 * 写入到mongodb中
* *
* @param importFile 导入的文件。 * @param importFile 导入的文件。
* @return 保存的本地文件名。 * @return 保存的本地文件名。
*/ */
private String saveDatasetFile(MultipartFile importFile, String versionName, Long versionId) throws IOException { private void saveMongoDB(MultipartFile importFile, Long versionId) throws IOException {
String fullName = pythonConfig.getDatasetFileBaseDir() + versionName + ".json";
try { try {
byte[] bytes = importFile.getBytes(); byte[] bytes = importFile.getBytes();
Path path = Paths.get(fullName);
// 如果没有files文件夹,则创建
if (!Files.isWritable(path)) {
Files.createDirectories(Paths.get(pythonConfig.getDatasetFileBaseDir()));
}
// 文件写入指定路径、应该是追加到文件里面
Files.write(path, bytes);
// 写入到mongodb中
datasetVersionService.writeDatasetFile(bytes,importFile.getOriginalFilename(),versionId); datasetVersionService.writeDatasetFile(bytes,importFile.getOriginalFilename(),versionId);
} catch (IOException e) { } catch (IOException e) {
log.error("Failed to write imported file [" + importFile.getOriginalFilename() + " ].", e); log.error("Failed to save mongo db imported file [" + importFile.getOriginalFilename() + " ].", e);
throw e; throw e;
} }
return fullName;
} }
@GetMapping("/export") @GetMapping("/export")
......
...@@ -23,6 +23,19 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> { ...@@ -23,6 +23,19 @@ public interface DatasetCleanService extends IBaseService<DatasetClean, Long> {
*/ */
DatasetClean saveNew(DatasetClean datasetClean); DatasetClean saveNew(DatasetClean datasetClean);
/**
* 第二个方法:将数据列表追加到文件
* @param filePath 文件地址
* @param dataList 数据集列表
*/
void appendDataListToFile(String filePath, List<DatasetData> dataList,Integer pageNum);
/**
* 解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[]
* @param filePath 文件地址
*/
void readJsonAppendSymbol(String filePath);
/** /**
* 重新清洗任务 * 重新清洗任务
* @param cleanId 清洗任务id * @param cleanId 清洗任务id
......
...@@ -315,7 +315,8 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -315,7 +315,8 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* 解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[] * 解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[]
* @param filePath 文件地址 * @param filePath 文件地址
*/ */
private void readJsonAppendSymbol(String filePath) { @Override
public void readJsonAppendSymbol(String filePath) {
try (FileReader fileReader = new FileReader(filePath); try (FileReader fileReader = new FileReader(filePath);
BufferedReader bufferedReader = new BufferedReader(fileReader)) { BufferedReader bufferedReader = new BufferedReader(fileReader)) {
...@@ -357,7 +358,8 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp ...@@ -357,7 +358,8 @@ public class DatasetCleanServiceImpl extends BaseService<DatasetClean, Long> imp
* @param filePath 文件地址 * @param filePath 文件地址
* @param dataList 数据集列表 * @param dataList 数据集列表
*/ */
private void appendDataListToFile(String filePath, List<DatasetData> dataList,Integer pageNum) { @Override
public void appendDataListToFile(String filePath, List<DatasetData> dataList,Integer pageNum) {
FileWriter fileWriter = null; FileWriter fileWriter = null;
try { try {
//为第一页的情况下 //为第一页的情况下
......
...@@ -14,6 +14,7 @@ import com.yice.common.core.base.dao.BaseDaoMapper; ...@@ -14,6 +14,7 @@ import com.yice.common.core.base.dao.BaseDaoMapper;
import com.yice.common.core.base.service.BaseService; import com.yice.common.core.base.service.BaseService;
import com.yice.common.core.constant.ErrorCodeEnum; import com.yice.common.core.constant.ErrorCodeEnum;
import com.yice.common.core.object.CallResult; import com.yice.common.core.object.CallResult;
import com.yice.common.core.object.MyPageParam;
import com.yice.common.core.object.MyRelationParam; import com.yice.common.core.object.MyRelationParam;
import com.yice.common.core.object.ResponseResult; import com.yice.common.core.object.ResponseResult;
import com.yice.common.core.util.MyModelUtil; import com.yice.common.core.util.MyModelUtil;
...@@ -120,9 +121,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -120,9 +121,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
this.buildDefaultValue(datasetVersion); this.buildDefaultValue(datasetVersion);
//进行存储、写入以及更新配置等操作 //进行存储、写入以及更新配置等操作
DatasetVersion dataset = dealWithWriteAndSave(datasetVersion.getDatasetId(), DatasetVersion dataset = dealWithWriteAndSave(datasetVersion.getDatasetId(),
datasetVersion.getHisVersion(), datasetVersion.getVersionId(), datasetVersion.getHisVersion(),datasetVersion.getVersionId(),
datasetVersion.getVersionName()); datasetVersion.getVersionName());
if (null != dataset) { if(null != dataset) {
datasetVersion.setDataVolume(dataset.getDataVolume()); datasetVersion.setDataVolume(dataset.getDataVolume());
datasetVersion.setFileUrl(dataset.getFileUrl()); datasetVersion.setFileUrl(dataset.getFileUrl());
datasetVersion.setInputStatus(dataset.getInputStatus()); datasetVersion.setInputStatus(dataset.getInputStatus());
...@@ -136,10 +137,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -136,10 +137,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 进行存储、写入以及更新配置等操作 * 进行存储、写入以及更新配置等操作
* * @param datasetId 数据集标识
* @param datasetId 数据集标识 * @param hisVersion 历史标识
* @param hisVersion 历史标识 * @param versionId 版本标识
* @param versionId 版本标识
* @param datasetName 数据集名称 * @param datasetName 数据集名称
*/ */
private DatasetVersion dealWithWriteAndSave(Long datasetId, Integer hisVersion, private DatasetVersion dealWithWriteAndSave(Long datasetId, Integer hisVersion,
...@@ -172,7 +172,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -172,7 +172,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 存储到python数据集中并进行更新操作 * 存储到python数据集中并进行更新操作
*
* @param versionName 版本名称 * @param versionName 版本名称
* @throws IOException 异常操作 * @throws IOException 异常操作
*/ */
...@@ -198,7 +197,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -198,7 +197,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 截取最后一个斜杠后的部分 * 截取最后一个斜杠后的部分
*
* @param fileName 文件名称 * @param fileName 文件名称
* @return 返回截取后的字符串 * @return 返回截取后的字符串
*/ */
...@@ -212,15 +210,14 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -212,15 +210,14 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 文件目录进行复制并写入到Mongodb数据库中 * 文件目录进行复制并写入到Mongodb数据库中
* * @param fullName 新文件名称
* @param fullName 新文件名称 * @param versionId 版本标识
* @param versionId 版本标识
* @param originFileName 原始文件名 * @param originFileName 原始文件名
* @return 字节编码 * @return 字节编码
*/ */
private byte[] writeAndSaveFileDetail(String fullName, String originFileName, Long versionId) throws IOException { private byte[] writeAndSaveFileDetail(String fullName,String originFileName,Long versionId) throws IOException{
byte[] bytes = null; byte[] bytes = null;
try { try{
bytes = readFileToBytes(originFileName); bytes = readFileToBytes(originFileName);
Path path = Paths.get(fullName); Path path = Paths.get(fullName);
// 如果没有files文件夹,则创建 // 如果没有files文件夹,则创建
...@@ -230,7 +227,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -230,7 +227,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
// 文件写入指定路径 // 文件写入指定路径
Files.write(path, bytes); Files.write(path, bytes);
// 写入到mongodb中 // 写入到mongodb中
writeDatasetFile(bytes, fullName, versionId); writeDatasetFile(bytes,fullName,versionId);
} catch (IOException e) { } catch (IOException e) {
log.error("Failed to write imported file [" + fullName + " ].", e); log.error("Failed to write imported file [" + fullName + " ].", e);
throw e; throw e;
...@@ -240,7 +237,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -240,7 +237,6 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 读取地址内容并转成bytes数据 * 读取地址内容并转成bytes数据
*
* @param filePath 文件地址 * @param filePath 文件地址
* @return byte数组 * @return byte数组
* @throws IOException 异常 * @throws IOException 异常
...@@ -255,10 +251,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -255,10 +251,9 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 写入到mongodb中 * 写入到mongodb中
* * @param bytes 字节
* @param bytes 字节
* @param originalFilename 导入文件名称 * @param originalFilename 导入文件名称
* @param versionId 版本标识 * @param versionId 版本标识
*/ */
@Override @Override
public void writeDatasetFile(byte[] bytes, String originalFilename, Long versionId) { public void writeDatasetFile(byte[] bytes, String originalFilename, Long versionId) {
...@@ -269,7 +264,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -269,7 +264,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
datasetDataService.delete(versionId); datasetDataService.delete(versionId);
//保存到mongodb中 //保存到mongodb中
datasetDataService.save(new DatasetData(null, versionId, result, new Date(), null)); datasetDataService.save(new DatasetData(null, versionId, result, new Date(), null));
} catch (Exception ex) { }catch (Exception ex) {
log.error("Failed to write mongodb database [" + originalFilename + " ].", ex); log.error("Failed to write mongodb database [" + originalFilename + " ].", ex);
} }
} }
...@@ -307,7 +302,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -307,7 +302,7 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
/** /**
* 更新数据对象。 * 更新数据对象。
* *
* @param datasetVersion 更新的对象。 * @param datasetVersion 更新的对象。
* @return 成功返回true,否则false。 * @return 成功返回true,否则false。
*/ */
@Transactional(rollbackFor = Exception.class) @Transactional(rollbackFor = Exception.class)
...@@ -316,6 +311,72 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long> ...@@ -316,6 +311,72 @@ public class DatasetVersionServiceImpl extends BaseService<DatasetVersion, Long>
return datasetVersionMapper.updateById(datasetVersion) > 0; return datasetVersionMapper.updateById(datasetVersion) > 0;
} }
/**
* 写入json格式路径
* @param datasetVersion
* @throws IOException 异常
*/
public boolean saveDatasetJsonPath(DatasetVersion datasetVersion) throws IOException{
//导入时不需要写入到json文件中
String versionName = datasetVersion.getVersionName();
//先存储文件
String fullName = pythonConfig.getDatasetFileBaseDir() + versionName + ".json";
this.doDealTaskHandler(datasetVersion.getVersionId(), versionName, fullName);
//再存储数据集配置文件
this.saveDatasetInfo(versionName);
DatasetVersion filter = new DatasetVersion();
filter.setVersionId(datasetVersion.getVersionId());
filter.setFileUrl(fullName);
filter.setReleaseStatus(DatasetConstant.STATUS_PUBLISHED);
return this.updateById(filter);
}
/**
* ==============================
* ===总数据清洗过程===
* 1、分页处理每页10000条数据
* 2、更新Mongodb数据库中的数据
* 3、更新json存储地址的数据集数据
* 4、更新版本数据集状态
* ==============================
* 处理数据集
* @param datasetId 清洗任务id
* @param versionName 数据集名称
* @return 清洗列表
*/
private void doDealTaskHandler(Long datasetId, String versionName,String fileUrl) {
try {
Integer index = 0;
Long count = datasetDataService.count(datasetId);
if (count > 0) {
int pageSize = DatasetConstant.MAX_SIZE;
int totalPages = (int) Math.ceil((double) count / pageSize);
MyPageParam param;
for (int i = 1; i <= totalPages; i++) {
param = new MyPageParam();
param.setPageNum(i);
param.setPageSize(pageSize);
List<DatasetData> dataList = datasetDataService.list(datasetId, param);
//写入到数据集中
if(CollUtil.isNotEmpty(dataList)) {
datasetCleanService.appendDataListToFile(fileUrl ,dataList, i);
}
}
//解析文件去掉多余的数据,比如文件里面最后一个,多加了一个",",缺少符号[]
if(index > 0){
datasetCleanService.readJsonAppendSymbol(fileUrl);
}
//删除为空的数据集数据
this.datasetDataService.deleteByData(datasetId);
}
} catch (Exception ex) {
log.error("deal with task handler is error:" , ex);
}
}
/** /**
* 删除指定数据。 * 删除指定数据。
* *
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment