Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
d087e089
提交
d087e089
authored
4月 14, 2024
作者:
宋勇
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
修改access 链接用户名称密码
上级
ee1e5f57
隐藏空白字符变更
内嵌
并排
正在显示
3 个修改的文件
包含
107 行增加
和
56 行删除
+107
-56
pom.xml
...atunnel-datasource-plugins/datasource-jdbc-access/pom.xml
+6
-0
AccessJdbcDataSourceChannel.java
...ource/plugin/access/jdbc/AccessJdbcDataSourceChannel.java
+71
-56
FilieInfo.java
...he/seatunnel/datasource/plugin/access/jdbc/FilieInfo.java
+30
-0
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-access/pom.xml
浏览文件 @
d087e089
...
...
@@ -67,6 +67,12 @@
<artifactId>
httpclient
</artifactId>
<version>
4.5.14
</version>
</dependency>
<dependency>
<groupId>
cn.hutool
</groupId>
<artifactId>
hutool-all
</artifactId>
<version>
5.8.21
</version>
</dependency>
</dependencies>
</project>
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-access/src/main/java/org/apache/seatunnel/datasource/plugin/access/jdbc/AccessJdbcDataSourceChannel.java
浏览文件 @
d087e089
...
...
@@ -33,7 +33,9 @@ import java.io.InputStream;
import
java.net.HttpURLConnection
;
import
java.net.URL
;
import
java.sql.*
;
import
java.time.LocalDateTime
;
import
java.util.*
;
import
java.util.Date
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
...
...
@@ -43,7 +45,13 @@ import static com.google.common.base.Preconditions.checkNotNull;
public
class
AccessJdbcDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
private
ConcurrentHashMap
<
String
,
Connection
>
concurrentHashMap
=
new
ConcurrentHashMap
<>(
1
);
private
LocalDateTime
fileDate
;
public
LocalDateTime
getFileId
(){
return
fileDate
;
}
private
ConcurrentHashMap
<
String
,
FilieInfo
>
concurrentHashMap
=
new
ConcurrentHashMap
<>(
1
);
public
static
class
Holder
{
private
static
final
AccessJdbcDataSourceChannel
INSTANCE
=
...
...
@@ -253,69 +261,78 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isBlank
(
substring
))
{
throw
new
RuntimeException
(
"没找到文件名称!"
);
}
Connection
conn
=
null
;
String
database
=
""
;
Connection
connection
=
concurrentHashMap
.
get
(
url
);
if
(
connection
!=
null
)
{
return
connection
;
}
String
[]
split
=
substring
.
split
(
"\\."
);
String
prefix
=
split
[
0
];
String
suffix
=
""
;
if
(
split
.
length
>
1
)
{
suffix
=
"."
+
split
[
1
];
}
FilieInfo
filieInfo
=
concurrentHashMap
.
get
(
url
);
if
(
filieInfo
==
null
||
filieInfo
.
getDateTime
().
plusMinutes
(
1
).
isBefore
(
LocalDateTime
.
now
()))
{
// 定义 SQL 语句执行对象
Statement
state
=
null
;
// 定义结果集对象
ResultSet
rs
=
null
;
Connection
conn
=
null
;
String
[]
split
=
substring
.
split
(
"\\."
);
String
prefix
=
split
[
0
];
String
suffix
=
""
;
if
(
split
.
length
>
1
)
{
suffix
=
"."
+
split
[
1
];
}
File
path
=
new
File
(
"/tmp"
);
if
(!
path
.
exists
())
{
path
.
mkdir
();
}
// 定义 SQL 语句执行对象
Statement
state
=
null
;
// 定义结果集对象
ResultSet
rs
=
null
;
FileOutputStream
outputStream
=
null
;
InputStream
in
=
null
;
HttpURLConnection
con
=
null
;
File
file
=
null
;
try
{
file
=
File
.
createTempFile
(
prefix
,
suffix
,
path
);
outputStream
=
new
FileOutputStream
(
file
);
URL
urldownl
=
new
URL
(
url
);
con
=
(
HttpURLConnection
)
urldownl
.
openConnection
();
con
.
setRequestMethod
(
"GET"
);
in
=
con
.
getInputStream
();
byte
[]
buffer
=
new
byte
[
1024
];
int
bytesRead
;
while
((
bytesRead
=
in
.
read
(
buffer
))
!=
-
1
)
{
outputStream
.
write
(
buffer
,
0
,
bytesRead
);
File
path
=
new
File
(
"/tmp"
);
if
(!
path
.
exists
())
{
path
.
mkdir
();
}
outputStream
.
flush
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"读取url 文件失败!"
+
e
)
;
throw
new
RuntimeException
(
"读取url 文件失败!"
)
;
}
finally
{
FileOutputStream
outputStream
=
null
;
InputStream
in
=
null
;
HttpURLConnection
con
=
null
;
File
file
=
null
;
try
{
if
(
in
!=
null
)
{
in
.
close
();
file
=
File
.
createTempFile
(
prefix
,
suffix
,
path
);
outputStream
=
new
FileOutputStream
(
file
);
URL
urldownl
=
new
URL
(
url
);
con
=
(
HttpURLConnection
)
urldownl
.
openConnection
();
con
.
setRequestMethod
(
"GET"
);
in
=
con
.
getInputStream
();
byte
[]
buffer
=
new
byte
[
1024
];
int
bytesRead
;
while
((
bytesRead
=
in
.
read
(
buffer
))
!=
-
1
)
{
outputStream
.
write
(
buffer
,
0
,
bytesRead
);
}
if
(
con
!=
null
)
{
con
.
disconnect
();
}
if
(
outputStream
!=
null
)
{
outputStream
.
close
();
outputStream
.
flush
();
}
catch
(
Exception
e
)
{
System
.
out
.
println
(
"读取url 文件失败!"
+
e
);
throw
new
RuntimeException
(
"读取url 文件失败!"
);
}
finally
{
try
{
if
(
in
!=
null
)
{
in
.
close
();
}
if
(
con
!=
null
)
{
con
.
disconnect
();
}
if
(
outputStream
!=
null
)
{
outputStream
.
close
();
}
}
catch
(
Exception
ee
)
{
System
.
out
.
println
(
"关闭流错误!"
+
ee
);
}
}
catch
(
Exception
ee
)
{
System
.
out
.
println
(
"关闭流错误!"
+
ee
);
}
}
String
database
=
path
+
"/"
+
file
.
getName
();
database
=
database
.
replace
(
"\\"
,
"/"
);
database
=
path
+
"/"
+
file
.
getName
();
database
=
database
.
replace
(
"\\"
,
"/"
);
FilieInfo
newFile
=
new
FilieInfo
();
newFile
.
setDatabase
(
database
);
newFile
.
setDateTime
(
LocalDateTime
.
now
());
concurrentHashMap
.
put
(
url
,
newFile
);
}
else
{
FilieInfo
filieInfo1
=
concurrentHashMap
.
get
(
url
);
database
=
filieInfo1
.
getDatabase
();
}
// JDBC连接字符串
// 在application.properties或application.yml文件中配置数据库连接信息。你需要指定驱动类名为net.ucanaccess.jdbc.UcanaccessDriver,并提供Access数据库的URL。例如,URL可以是jdbc:ucanaccess://D:/Access2003/database/db_test.mdb;openExclusive=false;ignoreCase=true。[1]
...
...
@@ -328,8 +345,6 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
conn
=
DriverManager
.
getConnection
(
connectionString
,
user
,
pwd
);
System
.
out
.
println
(
"Connected to the database successfully"
);
concurrentHashMap
.
put
(
url
,
conn
);
return
conn
;
}
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-access/src/main/java/org/apache/seatunnel/datasource/plugin/access/jdbc/FilieInfo.java
0 → 100644
浏览文件 @
d087e089
package
org
.
apache
.
seatunnel
.
datasource
.
plugin
.
access
.
jdbc
;
import
java.time.LocalDateTime
;
/**
* @description:TODO
* @author: sy
* @create: 2024/4/14 22:53
*/
public
class
FilieInfo
{
private
String
database
;
private
LocalDateTime
dateTime
;
public
String
getDatabase
()
{
return
database
;
}
public
void
setDatabase
(
String
database
)
{
this
.
database
=
database
;
}
public
LocalDateTime
getDateTime
()
{
return
dateTime
;
}
public
void
setDateTime
(
LocalDateTime
dateTime
)
{
this
.
dateTime
=
dateTime
;
}
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论