提交 d161fb17 authored 作者: 李纤's avatar 李纤

修改minio连接

上级 359abc74
......@@ -93,6 +93,7 @@
<version>0.2.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>-->
......
package org.apache.seatunnel.datasource.plugin.s3;
import io.minio.MinioClient;
import io.minio.errors.MinioException;
......@@ -20,16 +21,14 @@ public class S3ClientService {
String provider,
String username,
String password,
String bucket,
Integer port)
throws MinioException {
this.ENDPOINT = endpoint;
this.PROVIDER = provider;
this.USERNAME = username;
this.PASSWORD = password;
this.BUCKET = bucket;
this.PORT = port;
setMinioClient(endpoint, provider, username, password, bucket, port);
setMinioClient(endpoint, provider, username, password, port);
}
public MinioClient getMinioClient() {
......@@ -41,12 +40,11 @@ public class S3ClientService {
String provider,
String username,
String password,
String bucket,
Integer port)
throws MinioException {
minioClient =
new MinioClient.Builder()
.endpoint(endpoint, port, true)
.endpoint(endpoint, port, false)
.credentials(username, password)
.build();
}
......
......@@ -17,6 +17,8 @@
package org.apache.seatunnel.datasource.plugin.s3;
import com.amazonaws.services.dynamodbv2.xspec.S;
import io.minio.ListBucketsArgs;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
......@@ -42,6 +44,7 @@ import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
......@@ -72,6 +75,7 @@ public class S3DatasourceChannel implements DataSourceChannel {
Map<String, String> requestParams,
String database,
Map<String, String> options) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> tab = new ArrayList<>();
try {
......@@ -113,9 +117,12 @@ public class S3DatasourceChannel implements DataSourceChannel {
// datasource");
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> db = new ArrayList<>();
try {
......@@ -148,7 +155,7 @@ public class S3DatasourceChannel implements DataSourceChannel {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
S3ClientService s3Client = createS3Client(requestParams);
createS3Client(requestParams);
return true;
} catch (Exception ex) {
throw new DataSourcePluginException(
......@@ -185,16 +192,17 @@ public class S3DatasourceChannel implements DataSourceChannel {
"getTableFields is not supported for S3 datasource");
}
private S3ClientService createS3Client(Map<String, String> requestParams) {
String endpoint = requestParams.get("fs.s3a.endpoint").split(":")[0] + "";
Integer port = Integer.valueOf(requestParams.get("fs.s3a.endpoint").split(":")[1]);
public S3ClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint")+ "";
Integer port = Integer.valueOf(requestParams.get("fs.s3a.endpoint").substring(i+1).replace("/",""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + "";
String bucket = requestParams.get("bucket") + "";
// String bucket = requestParams.get("bucket") + "";
try {
s3ClientService =
new S3ClientService(endpoint, provider, username, password, bucket, port);
new S3ClientService(endpoint, provider, username, password, port);
return s3ClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
......
......@@ -3,6 +3,7 @@ package org.apache.seatunnel.datasource.plugin.s3;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
......@@ -14,8 +15,9 @@ class HadoopS3AConfigurationTest {
map.put("groupName", "测试s3适配器1");
map.put("adapterId", "1714559639070314496");
map.put("bucket", "hdfs://192.168.1.174:9001/s3atests3");
map.put("fs.s3a.endpoint", "http://192.168.1.174:9001");
map.put("bucket", "ces");
map.put("port", "9090");
map.put("fs.s3a.endpoint", "http://192.168.1.174:9090/");
map.put("fs.s3a.connection.ssl.enabled", "false");
map.put(
"fs.s3a.aws.credentials.provider",
......@@ -24,6 +26,10 @@ class HadoopS3AConfigurationTest {
map.put("access_key", "zyminio");
map.put("secret_key", "zysoft123");
Configuration configuration = HadoopS3AConfiguration.getConfiguration(map);
// List<String> databases = new S3DatasourceChannel().getDatabases("S3", map);
// System.out.println(databases+"-----------------------");
List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup", null);
System.out.println(tables+"-----------------------");
System.out.println(configuration);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论