提交 661d10cf authored 作者: 李纤's avatar 李纤

修改minio连接

上级 248dd73b
...@@ -93,7 +93,6 @@ ...@@ -93,7 +93,6 @@
<version>0.2.0</version> <version>0.2.0</version>
</dependency> </dependency>
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>--> <!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>--> <!-- <artifactId>slf4j-reload4j</artifactId>-->
......
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import io.minio.MinioClient; import io.minio.MinioClient;
import io.minio.errors.MinioException; import io.minio.errors.MinioException;
...@@ -17,11 +16,7 @@ public class S3ClientService { ...@@ -17,11 +16,7 @@ public class S3ClientService {
private MinioClient minioClient; private MinioClient minioClient;
public S3ClientService( public S3ClientService(
String endpoint, String endpoint, String provider, String username, String password, Integer port)
String provider,
String username,
String password,
Integer port)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
...@@ -36,11 +31,7 @@ public class S3ClientService { ...@@ -36,11 +31,7 @@ public class S3ClientService {
} }
public void setMinioClient( public void setMinioClient(
String endpoint, String endpoint, String provider, String username, String password, Integer port)
String provider,
String username,
String password,
Integer port)
throws MinioException { throws MinioException {
minioClient = minioClient =
new MinioClient.Builder() new MinioClient.Builder()
......
...@@ -17,16 +17,17 @@ ...@@ -17,16 +17,17 @@
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import com.amazonaws.services.dynamodbv2.xspec.S;
import io.minio.ListBucketsArgs;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import io.minio.BucketExistsArgs; import io.minio.BucketExistsArgs;
import io.minio.GetObjectArgs;
import io.minio.GetObjectResponse;
import io.minio.ListObjectsArgs; import io.minio.ListObjectsArgs;
import io.minio.MinioClient; import io.minio.MinioClient;
import io.minio.Result; import io.minio.Result;
...@@ -41,7 +42,9 @@ import io.minio.messages.Bucket; ...@@ -41,7 +42,9 @@ import io.minio.messages.Bucket;
import io.minio.messages.Item; import io.minio.messages.Item;
import lombok.NonNull; import lombok.NonNull;
import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.security.InvalidKeyException; import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException; import java.security.NoSuchAlgorithmException;
import java.util.ArrayList; import java.util.ArrayList;
...@@ -94,10 +97,13 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -94,10 +97,13 @@ public class S3DatasourceChannel implements DataSourceChannel {
String s = x.get().objectName(); String s = x.get().objectName();
if (!dir) { if (!dir) {
tab.add(s); tab.add(s);
} else {
getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
} }
// else{
// getAllFile(tab, getFileByDir(minioClient, database,s), minioClient, database);
// }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
...@@ -120,7 +126,11 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -120,7 +126,11 @@ public class S3DatasourceChannel implements DataSourceChannel {
// datasource"); // datasource");
} }
private void getAllFile(List<String> tab, Iterable<Result<Item>> results, MinioClient minioClient, String database) { private void getAllFile(
List<String> tab,
Iterable<Result<Item>> results,
MinioClient minioClient,
String database) {
results.forEach( results.forEach(
x -> { x -> {
try { try {
...@@ -128,33 +138,38 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -128,33 +138,38 @@ public class S3DatasourceChannel implements DataSourceChannel {
String s = x.get().objectName(); String s = x.get().objectName();
if (!dir) { if (!dir) {
tab.add(s); tab.add(s);
}else{ } else {
this.getAllFile(tab, getFileByDir(minioClient, database, s), minioClient, database); this.getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
}}); }
});
} }
private Iterable<Result<Item>> getFileByDir(@NonNull MinioClient minioClient,@NonNull String bucket, String dir) { private Iterable<Result<Item>> getFileByDir(
@NonNull MinioClient minioClient, @NonNull String bucket, String dir) {
Iterable<Result<Item>> results; Iterable<Result<Item>> results;
if (StringUtils.isNotEmpty(dir)) { if (StringUtils.isEmpty(dir)) {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build()); results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build());
} } else {
else{
results = results =
minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).prefix(dir).build()); minioClient.listObjects(
ListObjectsArgs.builder().bucket(bucket).prefix(dir).build());
} }
return results; return results;
} }
@Override @Override
...@@ -216,8 +231,16 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -216,8 +231,16 @@ public class S3DatasourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams, @NonNull Map<String, String> requestParams,
@NonNull String database, @NonNull String database,
@NonNull String table) { @NonNull String table) {
throw new UnsupportedOperationException( createS3Client(requestParams);
"getTableFields is not supported for S3 datasource"); MinioClient minioClient = s3ClientService.getMinioClient();
List<TableField> tableFields = new ArrayList<>();
String all = getObject(minioClient, table, database);
TableField tableField = new TableField();
tableField.setName(all);
tableFields.add(tableField);
return tableFields;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
} }
@Override @Override
...@@ -226,21 +249,61 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -226,21 +249,61 @@ public class S3DatasourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams, @NonNull Map<String, String> requestParams,
@NonNull String database, @NonNull String database,
@NonNull List<String> tables) { @NonNull List<String> tables) {
throw new UnsupportedOperationException( createS3Client(requestParams);
"getTableFields is not supported for S3 datasource"); MinioClient minioClient = s3ClientService.getMinioClient();
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields = new ArrayList<>();
String all = getObject(minioClient, tab, database);
TableField tableField = new TableField();
tableField.setName(all);
tableFields.add(tableField);
stringList.put(tab, tableFields);
});
return stringList;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
private String getObject(MinioClient minioClient, String table, String database) {
StringBuilder all = new StringBuilder();
try {
GetObjectResponse minioClientObject =
minioClient.getObject(
GetObjectArgs.builder().object(table).bucket(database).build());
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader(minioClientObject));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
all.append(line);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
return all.toString();
} }
public S3ClientService createS3Client(Map<String, String> requestParams) { public S3ClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint")+ ""; String endpoint = requestParams.get("fs.s3a.endpoint") + "";
Integer port = Integer.valueOf(requestParams.get("fs.s3a.endpoint").substring(i+1).replace("/","")); Integer port =
Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + ""; String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + ""; String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key") + "";
// String bucket = requestParams.get("bucket") + ""; // String bucket = requestParams.get("bucket") + "";
try { try {
s3ClientService = s3ClientService = new S3ClientService(endpoint, provider, username, password, port);
new S3ClientService(endpoint, provider, username, password, port);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import java.util.HashMap; import java.util.HashMap;
...@@ -26,10 +28,16 @@ class HadoopS3AConfigurationTest { ...@@ -26,10 +28,16 @@ class HadoopS3AConfigurationTest {
map.put("access_key", "zyminio"); map.put("access_key", "zyminio");
map.put("secret_key", "zysoft123"); map.put("secret_key", "zysoft123");
Configuration configuration = HadoopS3AConfiguration.getConfiguration(map); Configuration configuration = HadoopS3AConfiguration.getConfiguration(map);
// List<String> databases = new S3DatasourceChannel().getDatabases("S3", map); // List<String> databases = new S3DatasourceChannel().getDatabases("S3", map);
// System.out.println(databases+"-----------------------"); // System.out.println(databases+"-----------------------");
List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup", null); // List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup",
System.out.println(tables+"-----------------------"); // null);
// System.out.println(tables+"-----------------------");
List<TableField> tableField =
new S3DatasourceChannel()
.getTableFields(
"S3", map, "backup", "BackUp/20231129/20231129111134141.sql");
System.out.println(tableField + "-----------------------");
System.out.println(configuration); System.out.println(configuration);
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论