提交 359abc74 authored 作者: 李纤's avatar 李纤

修改minio连接

上级 4ed9e844
...@@ -32,15 +32,10 @@ public class S3ClientService { ...@@ -32,15 +32,10 @@ public class S3ClientService {
setMinioClient(endpoint, provider, username, password, bucket, port); setMinioClient(endpoint, provider, username, password, bucket, port);
} }
/** public MinioClient getMinioClient() {
* @param endpoint return minioClient;
* @param provider }
* @param username
* @param password
* @param bucket
* @param port
* @throws MinioException
*/
public void setMinioClient( public void setMinioClient(
String endpoint, String endpoint,
String provider, String provider,
......
...@@ -23,12 +23,31 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; ...@@ -23,12 +23,31 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import io.minio.BucketExistsArgs;
import io.minio.ListObjectsArgs;
import io.minio.MinioClient;
import io.minio.Result;
import io.minio.errors.ErrorResponseException;
import io.minio.errors.InsufficientDataException;
import io.minio.errors.InternalException;
import io.minio.errors.InvalidResponseException;
import io.minio.errors.MinioException;
import io.minio.errors.ServerException;
import io.minio.errors.XmlParserException;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull; import lombok.NonNull;
import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
public class S3DatasourceChannel implements DataSourceChannel { public class S3DatasourceChannel implements DataSourceChannel {
private S3ClientService s3ClientService;
public static class Holder { public static class Holder {
private static final S3DatasourceChannel INSTANCE = new S3DatasourceChannel(); private static final S3DatasourceChannel INSTANCE = new S3DatasourceChannel();
} }
...@@ -53,13 +72,73 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -53,13 +72,73 @@ public class S3DatasourceChannel implements DataSourceChannel {
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
Map<String, String> options) { Map<String, String> options) {
throw new UnsupportedOperationException("getTables is not supported for S3 datasource"); MinioClient minioClient = s3ClientService.getMinioClient();
List<String> tab = new ArrayList<>();
try {
boolean b =
minioClient.bucketExists(BucketExistsArgs.builder().bucket(database).build());
if (!b) {
return tab;
// throw new MinioException("桶不存在");
}
Iterable<Result<Item>> results =
minioClient.listObjects(ListObjectsArgs.builder().bucket(database).build());
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
if (!dir) {
String s = x.get().objectName();
tab.add(s);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
return tab;
} catch (InvalidKeyException | IOException | NoSuchAlgorithmException | MinioException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getTables is not supported for S3
// datasource");
} }
@Override @Override
public List<String> getDatabases( public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) { @NonNull String pluginName, @NonNull Map<String, String> requestParams) {
throw new UnsupportedOperationException("getDatabases is not supported for S3 datasource"); MinioClient minioClient = s3ClientService.getMinioClient();
List<String> db = new ArrayList<>();
try {
List<Bucket> buckets = minioClient.listBuckets();
buckets.forEach(
x -> {
String name = x.name();
db.add(name);
});
return db;
} catch (ServerException
| ErrorResponseException
| InsufficientDataException
| IOException
| NoSuchAlgorithmException
| InvalidKeyException
| InvalidResponseException
| XmlParserException
| InternalException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getDatabases is not supported for S3
// datasource");
} }
@Override @Override
...@@ -114,9 +193,8 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -114,9 +193,8 @@ public class S3DatasourceChannel implements DataSourceChannel {
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key") + "";
String bucket = requestParams.get("bucket") + ""; String bucket = requestParams.get("bucket") + "";
try { try {
S3ClientService s3ClientService = s3ClientService =
new S3ClientService(endpoint, provider, username, password, bucket, port); new S3ClientService(endpoint, provider, username, password, bucket, port);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论