提交 e0fadca7 authored 作者: 宋勇's avatar 宋勇

修改s3类型上的bucket 赋值

上级 d18b47a1
...@@ -16,13 +16,14 @@ public class CSVClientService { ...@@ -16,13 +16,14 @@ public class CSVClientService {
private MinioClient minioClient; private MinioClient minioClient;
public CSVClientService( public CSVClientService(
String endpoint, String provider, String username, String password, Integer port) String endpoint, String provider, String username, String password, Integer port, String backet)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET = backet;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
package org.apache.seatunnel.datasource.plugin.csv; package org.apache.seatunnel.datasource.plugin.csv;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
...@@ -96,14 +97,14 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -96,14 +97,14 @@ public class CSVDatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -137,14 +138,14 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -137,14 +138,14 @@ public class CSVDatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -178,14 +179,14 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -178,14 +179,14 @@ public class CSVDatasourceChannel implements DataSourceChannel {
}); });
return db; return db;
} catch (ServerException } catch (ServerException
| ErrorResponseException | ErrorResponseException
| InsufficientDataException | InsufficientDataException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| XmlParserException | XmlParserException
| InternalException e) { | InternalException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// throw new UnsupportedOperationException("getDatabases is not supported for S3 // throw new UnsupportedOperationException("getDatabases is not supported for S3
...@@ -199,6 +200,8 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -199,6 +200,8 @@ public class CSVDatasourceChannel implements DataSourceChannel {
throw new SeaTunnelException("requestParmas 为空!"); throw new SeaTunnelException("requestParmas 为空!");
} }
try { try {
createS3Client(requestParams); createS3Client(requestParams);
return true; return true;
} catch (Exception ex) { } catch (Exception ex) {
...@@ -282,14 +285,14 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -282,14 +285,14 @@ public class CSVDatasourceChannel implements DataSourceChannel {
System.out.println( System.out.println(
all + "=================================================================="); all + "==================================================================");
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return all; return all;
...@@ -297,16 +300,16 @@ public class CSVDatasourceChannel implements DataSourceChannel { ...@@ -297,16 +300,16 @@ public class CSVDatasourceChannel implements DataSourceChannel {
public CSVClientService createS3Client(Map<String, String> requestParams) { public CSVClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + ""; String endpoint = requestParams.get("fs.s3a.endpoint");
Integer port = Integer port =
Integer.valueOf( Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", "")); requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + ""; String provider = requestParams.get("fs.s3a.aws.credentials.provider");
String username = requestParams.get("access_key") + ""; String username = requestParams.get("access_key");
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key");
// String bucket = requestParams.get("bucket") + ""; String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new CSVClientService(endpoint, provider, username, password, port); s3ClientService = new CSVClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -16,13 +16,14 @@ public class ExcelClientService { ...@@ -16,13 +16,14 @@ public class ExcelClientService {
private MinioClient minioClient; private MinioClient minioClient;
public ExcelClientService( public ExcelClientService(
String endpoint, String provider, String username, String password, Integer port) String endpoint, String provider, String username, String password, Integer port, String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET = bucket;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -96,14 +96,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -96,14 +96,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -137,14 +137,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -137,14 +137,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -178,14 +178,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -178,14 +178,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
}); });
return db; return db;
} catch (ServerException } catch (ServerException
| ErrorResponseException | ErrorResponseException
| InsufficientDataException | InsufficientDataException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| XmlParserException | XmlParserException
| InternalException e) { | InternalException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// throw new UnsupportedOperationException("getDatabases is not supported for S3 // throw new UnsupportedOperationException("getDatabases is not supported for S3
...@@ -282,14 +282,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -282,14 +282,14 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
System.out.println( System.out.println(
all + "=================================================================="); all + "==================================================================");
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return all; return all;
...@@ -297,16 +297,16 @@ public class ExcelDatasourceChannel implements DataSourceChannel { ...@@ -297,16 +297,16 @@ public class ExcelDatasourceChannel implements DataSourceChannel {
public ExcelClientService createS3Client(Map<String, String> requestParams) { public ExcelClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + ""; String endpoint = requestParams.get("fs.s3a.endpoint");
Integer port = Integer port =
Integer.valueOf( Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", "")); requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + ""; String provider = requestParams.get("fs.s3a.aws.credentials.provider");
String username = requestParams.get("access_key") + ""; String username = requestParams.get("access_key");
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key");
// String bucket = requestParams.get("bucket") + ""; String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new ExcelClientService(endpoint, provider, username, password, port); s3ClientService = new ExcelClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<artifactId>datasource-jdbc-Access</artifactId> <artifactId>datasource-jdbc-access</artifactId>
<properties> <properties>
<mysql-connector.version>8.0.28</mysql-connector.version> <mysql-connector.version>8.0.28</mysql-connector.version>
......
...@@ -16,13 +16,14 @@ public class S3ClientService { ...@@ -16,13 +16,14 @@ public class S3ClientService {
private MinioClient minioClient; private MinioClient minioClient;
public S3ClientService( public S3ClientService(
String endpoint, String provider, String username, String password, Integer port) String endpoint, String provider, String username, String password, Integer port,String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET=bucket;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -107,14 +107,14 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -107,14 +107,14 @@ public class S3DatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -148,14 +148,14 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -148,14 +148,14 @@ public class S3DatasourceChannel implements DataSourceChannel {
database); database);
} }
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
}); });
...@@ -189,14 +189,14 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -189,14 +189,14 @@ public class S3DatasourceChannel implements DataSourceChannel {
}); });
return db; return db;
} catch (ServerException } catch (ServerException
| ErrorResponseException | ErrorResponseException
| InsufficientDataException | InsufficientDataException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| XmlParserException | XmlParserException
| InternalException e) { | InternalException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
// throw new UnsupportedOperationException("getDatabases is not supported for S3 // throw new UnsupportedOperationException("getDatabases is not supported for S3
...@@ -293,14 +293,14 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -293,14 +293,14 @@ public class S3DatasourceChannel implements DataSourceChannel {
System.out.println( System.out.println(
all + "=================================================================="); all + "==================================================================");
} catch (ErrorResponseException } catch (ErrorResponseException
| InsufficientDataException | InsufficientDataException
| InternalException | InternalException
| InvalidKeyException | InvalidKeyException
| InvalidResponseException | InvalidResponseException
| IOException | IOException
| NoSuchAlgorithmException | NoSuchAlgorithmException
| ServerException | ServerException
| XmlParserException e) { | XmlParserException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
return all; return all;
...@@ -308,16 +308,16 @@ public class S3DatasourceChannel implements DataSourceChannel { ...@@ -308,16 +308,16 @@ public class S3DatasourceChannel implements DataSourceChannel {
public S3ClientService createS3Client(Map<String, String> requestParams) { public S3ClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + ""; String endpoint = requestParams.get("fs.s3a.endpoint");
Integer port = Integer port =
Integer.valueOf( Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", "")); requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + ""; String provider = requestParams.get("fs.s3a.aws.credentials.provider");
String username = requestParams.get("access_key") + ""; String username = requestParams.get("access_key");
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key");
// String bucket = requestParams.get("bucket") + ""; String bucket = requestParams.get("bucket");
try { try {
s3ClientService = new S3ClientService(endpoint, provider, username, password, port); s3ClientService = new S3ClientService(endpoint, provider, username, password, port, bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
...@@ -16,13 +16,14 @@ public class XMLClientService { ...@@ -16,13 +16,14 @@ public class XMLClientService {
private MinioClient minioClient; private MinioClient minioClient;
public XMLClientService( public XMLClientService(
String endpoint, String provider, String username, String password, Integer port) String endpoint, String provider, String username, String password, Integer port,String bucket)
throws MinioException { throws MinioException {
this.ENDPOINT = endpoint; this.ENDPOINT = endpoint;
this.PROVIDER = provider; this.PROVIDER = provider;
this.USERNAME = username; this.USERNAME = username;
this.PASSWORD = password; this.PASSWORD = password;
this.PORT = port; this.PORT = port;
this.BUCKET=bucket;
setMinioClient(endpoint, provider, username, password, port); setMinioClient(endpoint, provider, username, password, port);
} }
......
...@@ -297,16 +297,16 @@ public class XMLDatasourceChannel implements DataSourceChannel { ...@@ -297,16 +297,16 @@ public class XMLDatasourceChannel implements DataSourceChannel {
public XMLClientService createS3Client(Map<String, String> requestParams) { public XMLClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":"); int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + ""; String endpoint = requestParams.get("fs.s3a.endpoint") ;
Integer port = Integer port =
Integer.valueOf( Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", "")); requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + ""; String provider = requestParams.get("fs.s3a.aws.credentials.provider") ;
String username = requestParams.get("access_key") + ""; String username = requestParams.get("access_key") ;
String password = requestParams.get("secret_key") + ""; String password = requestParams.get("secret_key") ;
// String bucket = requestParams.get("bucket") + ""; String bucket = requestParams.get("bucket") ;
try { try {
s3ClientService = new XMLClientService(endpoint, provider, username, password, port); s3ClientService = new XMLClientService(endpoint, provider, username, password, port,bucket);
return s3ClientService; return s3ClientService;
} catch (Exception e) { } catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!"); throw new SeaTunnelException("创建Mqtt客户端错误!");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论