提交 c7a8f3b2 authored 作者: 李纤's avatar 李纤

修改minio连接

上级 00c4231a
...@@ -86,6 +86,13 @@ ...@@ -86,6 +86,13 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>io.quarkiverse.minio</groupId>
<artifactId>minio-client</artifactId>
<version>0.2.0</version>
</dependency>
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>--> <!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>--> <!-- <artifactId>slf4j-reload4j</artifactId>-->
......
package org.apache.seatunnel.datasource.plugin.s3;
import com.amazonaws.services.dynamodbv2.xspec.S;
import io.minio.MinioClient;
import io.minio.errors.MinioException;
public class S3ClientService {
private String ENDPOINT;
private String PROVIDER;
private String USERNAME;
private String PASSWORD;
private String BUCKET;
private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MinioClient minioClient;
private String TOPIC;
public S3ClientService(
String endpoint,
String provider,
String username,
String password,
String bucket,
Integer port) throws MinioException {
this.ENDPOINT = endpoint;
this.PROVIDER = provider;
this.USERNAME = username;
this.PASSWORD = password;
this.BUCKET = bucket;
this.PORT = port;
setMinioClient(endpoint, provider, username, password, bucket, port);
}
public MinioClient getMinioClient() {
return minioClient;
}
public void setMinioClient(
String endpoint, String provider,
String username, String password, String bucket, Integer port)
throws MinioException {
minioClient = new MinioClient.Builder()
.endpoint(endpoint, port, true)
.credentials(username, password)
.build();
}
//
// /** 关闭MQTT连接 */
// public void close() throws MqttException {
// mqttClient.close();
// mqttClient.disconnect();
// }
}
...@@ -17,7 +17,10 @@ ...@@ -17,7 +17,10 @@
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import io.minio.MinioClient;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
...@@ -69,14 +72,24 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -69,14 +72,24 @@ public class S3DatasourceChannel implements DataSourceChannel {
@Override @Override
public boolean checkDataSourceConnectivity( public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) { @NonNull String pluginName, @NonNull Map<String, String> requestParams) {
Configuration conf = HadoopS3AConfiguration.getConfiguration(requestParams); if (requestParams.isEmpty()) {
try (FileSystem fs = FileSystem.get(conf)) { throw new SeaTunnelException("requestParmas 为空!");
fs.listStatus(new Path("/")); }
try {
S3ClientService s3Client = createS3Client(requestParams);
return true; return true;
} catch (IOException e) { } catch (Exception ex) {
throw new DataSourcePluginException( throw new DataSourcePluginException(
String.format("check s3 connectivity failed, config is: %s", requestParams), e); "check s3 connectivity failed, " + ex.getMessage(), ex);
} }
// try (FileSystem fs = FileSystem.get(conf)) {
//// fs.listStatus(new Path("/"));
//
// return true;
// } catch (IOException e) {
// throw new DataSourcePluginException(
// String.format("check s3 connectivity failed, config is: %s", requestParams), e);
// }
} }
@Override @Override
...@@ -98,4 +111,20 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -98,4 +111,20 @@ public class S3DatasourceChannel implements DataSourceChannel {
throw new UnsupportedOperationException( throw new UnsupportedOperationException(
"getTableFields is not supported for S3 datasource"); "getTableFields is not supported for S3 datasource");
} }
private S3ClientService createS3Client(Map<String, String> requestParams) {
String endpoint = requestParams.get("fs.s3a.endpoint").split(":")[0] + "";
Integer port = Integer.valueOf(requestParams.get("fs.s3a.endpoint").split(":")[1]);
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + "";
String bucket = requestParams.get("bucket") + "";
try {
S3ClientService s3ClientService = new S3ClientService(endpoint, provider, username, password, bucket, port);
return s3ClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
}
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论