提交 cfdde250 authored 作者: 李纤's avatar 李纤

修改oracle

上级 8ac4fdfc
......@@ -29,6 +29,18 @@
</properties>
<dependencies>
<!-- Oracle驱动包、支持字符集的依赖包-->
<!-- Oracle 驱动包 -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>12.2.0.1</version>
</dependency>
<!-- Additional library required to support Internationalization -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
......@@ -50,13 +62,6 @@
<scope>provided</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>com.oracle.database.jdbc</groupId>
<artifactId>ojdbc8</artifactId>
<version>${oracle-jdbc.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
......@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
......@@ -84,11 +83,11 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
PreparedStatement statement =
connection.prepareStatement("SELECT table_name FROM user_tables;");
connection.prepareStatement("SELECT table_name FROM user_tables");
ResultSet re = statement.executeQuery()) {
// filter system databases
while (re.next()) {
String dbName = re.getString("database");
String dbName = re.getString(1);
if (StringUtils.isNotBlank(dbName)
&& !OracleDataSourceConfig.ORACLE_SYSTEM_DATABASES.contains(dbName)) {
dbNames.add(dbName);
......@@ -125,8 +124,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
"SELECT "
+ " t.TABLE_NAME,\n"
+ " t.COLUMN_NAME,\n"
+ " t.COMMENTS,\n"
+ " t1.DATA_TYPE,\n"
+ " t.COMMENTS REMARKS,\n"
+ " t1.DATA_TYPE TYPE_NAME,\n"
+ " t1.DATA_LENGTH VAR_LEN,\n"
+ " t1.DATA_PRECISION LEN,\n"
+ " DATA_SCALE ,\n"
......@@ -159,7 +158,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
tableField.setPrimaryKey(true);
}
if (typeList.contains(tableField.getType().toLowerCase())) {
if (resultSet.getString("TYPE_NAME") != null
&& typeList.contains(resultSet.getString("TYPE_NAME").toLowerCase())) {
tableField.setLen(resultSet.getString("VAR_LEN"));
} else {
tableField.setLen(resultSet.getString("LEN"));
......@@ -175,7 +175,11 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
tableField.setType(resultSet.getString("TYPE_NAME"));
tableField.setComment(resultSet.getString("REMARKS"));
Object nullable = resultSet.getObject("IS_NULLABLE");
tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString()));
if (Objects.nonNull(nullable)) {
tableField.setNullable(Boolean.TRUE.toString().equals(nullable.toString()));
} else {
tableField.setNullable(Boolean.TRUE);
}
tableFields.add(tableField);
}
......@@ -191,7 +195,13 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
return null;
HashMap<String, List<TableField>> fields = new HashMap<>();
for (String table : tables) {
List<TableField> tableFields =
getTableFields(pluginName, requestParams, database, table);
fields.put(table, tableFields);
}
return fields;
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
......@@ -212,12 +222,22 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(OracleOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(OracleOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
JdbcUtils.replaceDatabase(
requestParams.get(OracleOptionRule.URL.key()), databaseName);
String url = requestParams.get(OracleOptionRule.URL.key());
if (requestParams.containsKey(OracleOptionRule.USER.key())) {
String username = requestParams.get(OracleOptionRule.USER.key());
String password = requestParams.get(OracleOptionRule.PASSWORD.key());
String driver = requestParams.get(OracleOptionRule.DRIVER.key());
if (StringUtils.isEmpty(driver)) {
driver = "oracle.jdbc.driver.OracleDriver";
}
Class Clazz = Class.forName(driver);
Driver driverInstance = null;
try {
driverInstance = (Driver) Clazz.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(e);
}
DriverManager.registerDriver(driverInstance);
return DriverManager.getConnection(url, username, password);
}
return DriverManager.getConnection(url);
......@@ -232,16 +252,16 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options) {
List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
StringBuilder sql = new StringBuilder("sselect * from all_tables where 1=1 ");
StringBuilder sql = new StringBuilder("select * from all_tables where 1=1 ");
if (StringUtils.isNotBlank(database)) {
sql.append(" and OWNER=\"" + database + "\"");
sql.append(" and OWNER=\'" + database + "\'");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and table_name=\"" + tableName + "\"");
sql.append(" and table_name=\'" + tableName + "\'");
}
Statement statement = connection.createStatement();
......@@ -251,23 +271,23 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
while (resultSet.next()) {
Map tables = new HashMap();
// ResultSetMetaData metaData = resultSet.getMetaData();
// 获取表列数
int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) {
tables.put(metaData.getColumnName(i), resultSet.getObject(i));
}
// // ResultSetMetaData metaData = resultSet.getMetaData();
// // 获取表列数
// int columnCount = metaData.getColumnCount();
// for (int i = 1; i <= columnCount; i++) {
// tables.put(metaData.getColumnName(i), resultSet.getObject(i));
// }
String tableNameold = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableNameold)) {
tables.put("TABLE_NAME", tableNameold);
}
String tableComment = resultSet.getString("TABLE_COMMENT");
if (StringUtils.isNotBlank(tableComment)) {
tables.put("TABLE_COMMENT", tableComment);
}
// String tableComment = resultSet.getString("TABLE_COMMENT");
//
// if (StringUtils.isNotBlank(tableComment)) {
// tables.put("TABLE_COMMENT", tableComment);
// }
tableNames.add(tables);
}
return tableNames;
......@@ -286,7 +306,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
try (Connection connection = getConnection(requestParams, database)) {
String sql =
"sselect * from all_tables where table_name='"
"select * from all_tables where table_name='"
+ tableName
+ "' and OWNER ='"
+ database
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论