提交 e1c8749b authored 作者: 李纤's avatar 李纤

修改minio连接

上级 661d10cf
......@@ -17,6 +17,8 @@
package org.apache.seatunnel.datasource.plugin.s3;
import com.alibaba.fastjson2.JSON;
import com.amazonaws.services.dynamodbv2.xspec.S;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
......@@ -45,6 +47,7 @@ import lombok.NonNull;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
......@@ -233,11 +236,7 @@ public class S3DatasourceChannel implements DataSourceChannel {
@NonNull String table) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<TableField> tableFields = new ArrayList<>();
String all = getObject(minioClient, table, database);
TableField tableField = new TableField();
tableField.setName(all);
tableFields.add(tableField);
List<TableField> tableFields = getObject(minioClient, table, database, requestParams);
return tableFields;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
......@@ -254,11 +253,7 @@ public class S3DatasourceChannel implements DataSourceChannel {
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields = new ArrayList<>();
String all = getObject(minioClient, tab, database);
TableField tableField = new TableField();
tableField.setName(all);
tableFields.add(tableField);
List<TableField> tableFields = getObject(minioClient, tab, database, requestParams);
stringList.put(tab, tableFields);
});
return stringList;
......@@ -266,8 +261,8 @@ public class S3DatasourceChannel implements DataSourceChannel {
// "getTableFields is not supported for S3 datasource");
}
private String getObject(MinioClient minioClient, String table, String database) {
StringBuilder all = new StringBuilder();
private List<TableField> getObject(MinioClient minioClient, String table, String database, Map<String, String> requestParams) {
List<TableField> all = new ArrayList<>();
try {
GetObjectResponse minioClientObject =
minioClient.getObject(
......@@ -276,8 +271,21 @@ public class S3DatasourceChannel implements DataSourceChannel {
new BufferedReader(new InputStreamReader(minioClientObject));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
all.append(line);
if (table.endsWith("csv")) {
String delimiter = requestParams.get("delimiter");
String[] split = line.split(delimiter);
Class<TableField> tableFieldClass = TableField.class;
Field[] declaredFields = tableFieldClass.getDeclaredFields();
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < split.length; i++) {
String name = declaredFields[i].getName();
map.put(name, split[i]);
}
TableField tableField = JSON.parseObject(JSON.toJSONString(map), TableField.class);
all.add(tableField);
}
}
System.out.println(all+"==================================================================");
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
......@@ -289,7 +297,7 @@ public class S3DatasourceChannel implements DataSourceChannel {
| XmlParserException e) {
throw new RuntimeException(e);
}
return all.toString();
return all;
}
public S3ClientService createS3Client(Map<String, String> requestParams) {
......
......@@ -19,6 +19,7 @@ class HadoopS3AConfigurationTest {
map.put("adapterId", "1714559639070314496");
map.put("bucket", "ces");
map.put("port", "9090");
map.put("delimiter", ",");
map.put("fs.s3a.endpoint", "http://192.168.1.174:9090/");
map.put("fs.s3a.connection.ssl.enabled", "false");
map.put(
......@@ -33,10 +34,7 @@ class HadoopS3AConfigurationTest {
// List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup",
// null);
// System.out.println(tables+"-----------------------");
List<TableField> tableField =
new S3DatasourceChannel()
.getTableFields(
"S3", map, "backup", "BackUp/20231129/20231129111134141.sql");
List<TableField> tableField = new S3DatasourceChannel().getTableFields("S3", map, "ces", "aa/bb/cc/ss.csv");
System.out.println(tableField + "-----------------------");
System.out.println(configuration);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论