提交 897d6a7f authored 作者: 宋勇's avatar 宋勇

修改s3类型上的bucket 赋值

上级 e0fadca7
...@@ -16,7 +16,12 @@ public class CSVClientService { ...@@ -16,7 +16,12 @@ public class CSVClientService {
private MinioClient minioClient; private MinioClient minioClient;
public CSVClientService( public CSVClientService(
String endpoint, String provider, String username, String password, Integer port, String backet) String endpoint,
String provider,
String username,
String password,
Integer port,
String backet)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
......
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.apache.seatunnel.datasource.plugin.csv; package org.apache.seatunnel.datasource.plugin.csv;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
...@@ -201,7 +200,6 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -201,7 +200,6 @@ public class CSVDatasourceChannel implements DataSourceChannel {
} }
try { try {
createS3Client(requestParams); createS3Client(requestParams);
return true; return true;
} catch (Exception ex) { } catch (Exception ex) {
...@@ -309,7 +307,8 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -309,7 +307,8 @@ public class CSVDatasourceChannel implements DataSourceChannel {
String password = requestParams.get("secret_key"); String password = requestParams.get("secret_key");
String bucket = requestParams.get("bucket"); String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new CSVClientService(endpoint, provider, username, password, port, bucket); s3ClientService =
new CSVClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -16,7 +16,12 @@ public class ExcelClientService { ...@@ -16,7 +16,12 @@ public class ExcelClientService {
private MinioClient minioClient; private MinioClient minioClient;
public ExcelClientService( public ExcelClientService(
String endpoint, String provider, String username, String password, Integer port, String bucket) String endpoint,
String provider,
String username,
String password,
Integer port,
String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
......
...@@ -306,7 +306,8 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -306,7 +306,8 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
String password = requestParams.get("secret_key"); String password = requestParams.get("secret_key");
String bucket = requestParams.get("bucket"); String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new ExcelClientService(endpoint, provider, username, password, port, bucket); s3ClientService =
new ExcelClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -16,14 +16,19 @@ public class S3ClientService { ...@@ -16,14 +16,19 @@ public class S3ClientService {
private MinioClient minioClient; private MinioClient minioClient;
public S3ClientService( public S3ClientService(
String endpoint, String provider, String username, String password, Integer port,String bucket) String endpoint,
String provider,
String username,
String password,
Integer port,
String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET=bucket; this.BUCKET = bucket;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -317,7 +317,8 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -317,7 +317,8 @@ public class S3DatasourceChannel implements DataSourceChannel {
String password = requestParams.get("secret_key"); String password = requestParams.get("secret_key");
String bucket = requestParams.get("bucket"); String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new S3ClientService(endpoint, provider, username, password, port, bucket); s3ClientService =
new S3ClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -16,14 +16,19 @@ public class XMLClientService { ...@@ -16,14 +16,19 @@ public class XMLClientService {
private MinioClient minioClient; private MinioClient minioClient;
public XMLClientService( public XMLClientService(
String endpoint, String provider, String username, String password, Integer port,String bucket) String endpoint,
String provider,
String username,
String password,
Integer port,
String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET=bucket; this.BUCKET = bucket;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -297,16 +297,17 @@ public class XMLDatasourceChannel implements DataSourceChannel { ...@@ -297,16 +297,17 @@ public class XMLDatasourceChannel implements DataSourceChannel {
public XMLClientService createS3Client(Map<String, String> requestParams) { public XMLClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") ; String endpoint = requestParams.get("fs.s3a.endpoint");
Integer port = Integer port =
Integer.valueOf( Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", "")); requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") ; String provider = requestParams.get("fs.s3a.aws.credentials.provider");
String username = requestParams.get("access_key") ; String username = requestParams.get("access_key");
String password = requestParams.get("secret_key") ; String password = requestParams.get("secret_key");
String bucket = requestParams.get("bucket") ; String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new XMLClientService(endpoint, provider, username, password, port,bucket); s3ClientService =
new XMLClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论