Vert.x JDBC 客户端
Verx.x JDBC 客户端允许您使用 Vert.x 应用程序中的异步 API 与任何符合 JDBC 协议的数据库进行交互。
JDBC 客户端 API 接口描述请参考 JDBCClient
。
如需使用 Vert.x JDBC Client,请先将以下依赖添加到您的构建描述中的 依赖 部分 :
-
Maven(在您的
pom.xml
文件中添加):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle(在您的
build.gradle
文件中添加):
compile 'io.vertx:vertx-jdbc-client:4.4.0'
使用 Sql 客户端 API
SQL 客户端是使用 SQL 数据库通信的 vert.x 响应式 API, 已经实现了 MySQL、PostgreSQL、MSSQL 和 IBM DB2 等多个流行数据的异步驱动程序。 但是,由于还有许多其他数据库没有异步驱动程序, 因此 JDBC 客户端实现了相同的 API,直接使用 JDBC 驱动,以实现数据库的异步支持。
创建一个数据库连接池
一切始于连接池,创建一个连接池是非常简单的。最简单的例子如下:
final JsonObject config = new JsonObject()
.put("jdbcUrl", "jdbc:h2:~/test")
.put("datasourceName", "pool-name")
.put("username", "sa")
.put("password", "")
.put("max_pool_size", 16);
JDBCPool pool = JDBCPool.pool(vertx, config);
在这种情况下,我们将重用 JDBC client 的 API 来创建池。 该配置为自由格式,用户需要查找对应JDBC连接池所需的属性。
对于类型安全的替代方案,存在第二种工厂方法。 第二种工厂方法可确保配置正确(因为其属性和类型已由编译器验证), 但当前仅适用于 Agroal 连接池。
JDBCPool pool = JDBCPool.pool(
vertx,
// configure the connection
new JDBCConnectOptions()
// H2 connection string
.setJdbcUrl("jdbc:h2:~/test")
// username
.setUser("sa")
// password
.setPassword(""),
// configure the pool
new PoolOptions()
.setMaxSize(16)
.setName("pool-name")
);
使用连接池
有了连接池后,就可以开始使用数据库了,连接池以两种模式运行:
-
托管连接模式
-
手动连接方式
在托管连接模式下工作时, 作为用户,您无需关心获取连接和将其返回到池中。 可以直接对该连接池运行查询,并且该连接池可确保在查询终止后获取并返回连接。
pool
.query("SELECT * FROM user")
.execute()
.onFailure(e -> {
// handle the failure
})
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.getString("FIRST_NAME"));
}
});
这也适用于准备好的sql语句:
pool
.preparedQuery("SELECT * FROM user WHERE emp_id > ?")
// the emp id to look up
.execute(Tuple.of(1000))
.onFailure(e -> {
// handle the failure
})
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.getString("FIRST_NAME"));
}
});
使用此模式非常方便,因为它使您可以专注于业务逻辑,而不是连接管理。 有时可能需要保留顺序和因果关系。 在这种情况下,我们需要在手动连接模式下执行查询:
pool
.getConnection()
.onFailure(e -> {
// failed to get a connection
})
.onSuccess(conn -> {
conn
.query("SELECT * FROM user")
.execute()
.onFailure(e -> {
// handle the failure
// very important! don't forget to return the connection
conn.close();
})
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.getString("FIRST_NAME"));
}
// very important! don't forget to return the connection
conn.close();
});
});
当然,预准备的sql语句也适用于此模式:
pool
.getConnection()
.onFailure(e -> {
// failed to get a connection
})
.onSuccess(conn -> {
conn
.preparedQuery("SELECT * FROM user WHERE emp_id > ?")
// the emp_id to look up
.execute(Tuple.of(1000))
.onFailure(e -> {
// handle the failure
// very important! don't forget to return the connection
conn.close();
})
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.getString("FIRST_NAME"));
}
// very important! don't forget to return the connection
conn.close();
});
});
检索生成的主键
自动生成主键是 JDBC 驱动都有的一项特性。 连接池允许您使用一个特殊的属性:
JDBCPool.GENERATED_KEYS
来检索生成的主键。例如:
String sql = "INSERT INTO insert_table (FNAME, LNAME) VALUES (?, ?)";
pool
.preparedQuery(sql)
.execute(Tuple.of("Paulo", "Lopes"))
.onSuccess(rows -> {
// the generated keys are returned as an extra row
Row lastInsertId = rows.property(JDBCPool.GENERATED_KEYS);
// just refer to the position as usual:
long newId = lastInsertId.getLong(0);
});
使用存储过程和函数
之前使用过 JDBC 的人知道,为了使用调用函数或存储过程, 用户必须
使用 CallableStatement
接口。 对于大部分数据库引擎来说,这种抽象是合二为一的,因为像
PostgreSQL
和 MySQL
这种流行的数据库并没有任何特殊的命令以区分普通的 SQL 语句和
可调用的函数与存储过程。
现有的 SQL 客户端 api 被设计成跟网络协议类似的样子,而不是适配 JDBC 规范,因此您找不到任何特殊的方法以处理可调用语句。选择这种设计 给 JDBC SQL 客户端引入了一些复杂度,因为我们需要适配常规的 SQL 调用以遵守 JDBC 规范的要求 并与客户端兼容。
简单 IN 参数映射
映射简单的 IN
参数是很简单的。考虑下面的存储过程:
create procedure new_customer(firstname varchar(50), lastname varchar(50))
modifies sql data
insert into customers values (default, firstname, lastname, current_timestamp)
为了使用 JDBC 客户端调用该存储过程,您需要编写以下代码:
String sql = "{call new_customer(?, ?)}";
pool
.preparedQuery(sql)
// by default the "IN" argument types are extracted from the
// type of the data in the tuple, as well as the order
.execute(Tuple.of("Paulo", "Lopes"))
.onFailure(e -> {
// handle the failure
})
.onSuccess(rows -> {
// ... success handler
});
复杂的 IN / OUT 参数映射
之前的例子展示了如何使用简单的查询。然而,使用简单的查询是有一些限制条件的。客户端会假定
所有的参数都是 IN
类型,并且参数类型也是相应传入参数的 Java 类型的对应类型,但这并不一定准确,
例如,当您传入了一个 null
参数时。
在这种情况下,JDBC 客户端可以使用一个叫做 SqlOutParam
的帮助类,它可以帮您显式地
指明参数的类型。不仅是参数是 IN
还是 OUT
,还是参数的类型。
考虑以下的存储过程:
create procedure customer_lastname(IN firstname varchar(50), OUT lastname varchar(50))
modifies sql data
select lastname into lastname from customers where firstname = firstname
这个存储过程会返回名字是某个值的所有客户的姓氏。所以我们既需要
映射 IN
参数也需要映射 OUT
参数。
String sql = "{call customer_lastname(?, ?)}";
pool
.preparedQuery(sql)
// by default the "IN" argument types are extracted from the
// type of the data in the tuple, as well as the order
//
// Note that we now also declare the output parameter and it's
// type. The type can be a "String", "int" or "JDBCType" constant
.execute(Tuple.of("John", SqlOutParam.OUT(JDBCType.VARCHAR)))
.onFailure(e -> {
// handle the failure
})
.onSuccess(rows -> {
// we can verify if there was a output received from the callable statement
if (rows.property(JDBCPool.OUTPUT)) {
// and then iterate the results
for (Row row : rows) {
System.out.println(row.getString(0));
}
}
});
有时,您需要将同一个变量映射为 IN
和 OUT
参数。同样地,您也可以使用
SqlOutParam
帮助类进行处理。
SqlOutParam param;
// map IN as "int" as well as "OUT" as VARCHAR
param = SqlOutParam.INOUT(123456, JDBCType.VARCHAR);
// or
param = SqlOutParam.INOUT(123456, "VARCHAR");
// and then just add to the tuple as usual:
Tuple.of(param);
旧版JDBC客户端API
现在已弃用Vert.x 3中创建的JDBC客户端API,而应使用新的SQL客户端API。
在Vert.x 4的生命周期内仍将支持该功能, 以允许将应用程序迁移到新的SQL Client API。
建立连接
创建客户端后,您可以通过 getConnection
以获取一个连接。
当池中有连接处于准备(ready)状态时,将在处理器(handler)中返回。
client.getConnection(res -> {
if (res.succeeded()) {
SQLConnection connection = res.result();
connection.query("SELECT * FROM some_table", res2 -> {
if (res2.succeeded()) {
ResultSet rs = res2.result();
// Do something with results
}
});
} else {
// Failed to get connection - deal with it
}
});
该连接是 SQLConnection
的一个实例,
它是一个通用接口,不仅由Vert.x JDBC客户端使用。
您可以在 常见的sql接口 文档中学习如何使用它。
简单的SQL操作
有时,您将需要运行单个SQL操作, 例如:单行查询或对一组行的更新,这些操作不需要成为事务的一部分,也不必依赖于上一个或下一个操作。
对于这些情况,客户端提供了无模板的API SQLOperations
,
该接口将为您执行以下步骤:
-
从连接池获取连接
-
执行您的动作
-
关闭并将连接返回到连接池
从 USERS
表中加载用户的示例如下:
client.query("SELECT * FROM USERS", ar -> {
if (ar.succeeded()) {
if (ar.succeeded()) {
ResultSet result = ar.result();
} else {
// Failed!
}
// NOTE that you don't need to worry about
// the connection management (e.g.: close)
}
});
您可以通过一个简单的 "shot" 方法调用来执行以下操作:
有关这些API的更多详细信息,请参阅 SQLOperations
接口。
创建一个客户端
有几种创建客户端的方法。让我们把它们都过一遍。
使用默认的共享数据源
在大多数情况下,您需要在不同的客户端实例之间共享数据源。
例如,您通过部署多个 Verticle 实例来扩展应用程序,并且希望每个 Verticle 实例共享相同的数据源,这样就不会有多个连接池。
您可以按照以下步骤进行操作:
SQLClient client = JDBCClient.createShared(vertx, config);
第一次调用 JDBCClient.createShared
将实际创建数据源,并使用指定的配置。
随后的调用将返回使用相同数据源的新客户端实例,因此将不会使用该配置。
指定数据源名称
您可以创建一个指定数据源名称的客户端,如下所示:
SQLClient client = JDBCClient.createShared(vertx, config, "MyDataSource");
如果使用相同的Vert.x实例并指定相同的数据源名称创建了不同的客户端, 则它们将共享相同的数据源。
第一次调用 JDBCClient.createShared
将实际创建数据源,并使用指定的配置。
后续调用将返回使用相同数据源的新客户端实例,因此将不使用该配置。
如果您希望不同的客户端组具有不同的数据源(例如,它们与不同的数据库进行交互),请使用这种创建方式。
使用非共享数据源创建客户端
在大多数情况下,您将需要在不同的客户端实例之间共享数据源。 但是,您可能想要创建一个不与任何其他客户端共享其数据源的客户端实例。
在这种情况下,
您可以使用 JDBCClient.create
.
SQLClient client = JDBCClient.create(vertx, config);
这等效于 JDBCClient.createShared
每次使用唯一的数据源名称进行调用。
关闭客户端
长时间保持客户端(例如,在您的verticle的生命周期内)是可以的, 但是一旦完成,就应该关闭它。
共享同一数据源的客户端将计算引用次数(reference counted)。 一旦最后一个引用的相同数据源被关闭,该数据源将被关闭。
SQL连接
与数据库的连接用表示 SQLConnection
。
自动提交
获取连接时,自动提交设置为true。 这意味着您执行的每个操作将有效地在其自己的事务中执行。
如果您希望在单个事务中执行多个操作,
则应使用 setAutoCommit
将自动提交设置为false
操作完成后,将调用处理程序:
connection.setAutoCommit(false, res -> {
if (res.succeeded()) {
// OK!
} else {
// Failed!
}
});
执行查询
执行查询使用 query
查询字符串是传递给实际数据库的原始SQL。
当执行查询时,
结果通过 ResultSet
返回。
connection.query("SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE", res -> {
if (res.succeeded()) {
// Get the result set
ResultSet resultSet = res.result();
} else {
// Failed!
}
});
ResultSet
表示查询的结果。
列名称列表可通过 getColumnNames
获取,
而实际结果通过 getResults
获取。
结果是 JsonArray
类型,每个元素对应一行查询结果
List<String> columnNames = resultSet.getColumnNames();
List<JsonArray> results = resultSet.getResults();
for (JsonArray row : results) {
String id = row.getString(0);
String fName = row.getString(1);
String lName = row.getString(2);
int shoeSize = row.getInteger(3);
}
您还可以使用 getRows
- 作为Json对象实例的列表来检索行
- 这可以使您使用的API稍微简单一些,但是请注意,SQL结果可以包含重复的列名 - 如果是这种情况
则应改用 getResults
使用Json对象作为返回类型,进行迭代的示例:
List<JsonObject> rows = resultSet.getRows();
for (JsonObject row : rows) {
String id = row.getString("ID");
String fName = row.getString("FNAME");
String lName = row.getString("LNAME");
int shoeSize = row.getInteger("SHOE_SIZE");
}
预备语句查询
要执行准备好的语句查询,
可以使用 queryWithParams
。
这将执行查询,其中包括参数占位符和一个 JsonArray
或参数值。
String query = "SELECT ID, FNAME, LNAME, SHOE_SIZE from PEOPLE WHERE LNAME=? AND SHOE_SIZE > ?";
JsonArray params = new JsonArray().add("Fox").add(9);
connection.queryWithParams(query, params, res -> {
if (res.succeeded()) {
// Get the result set
ResultSet resultSet = res.result();
} else {
// Failed!
}
});
执行INSERT,UPDATE或DELETE
要执行更新数据库的操作,请使用 update
.
更新字符串是原始SQL,无需更改实际数据库即可通过。
更新执行完成后,
通过回调返回 UpdateResult
的结果
更新内容的行数通过 getUpdated
获取,
如果更新生成了键,则可以使用 getKeys
获取
connection.update("INSERT INTO PEOPLE VALUES (null, 'john', 'smith', 9)", res -> {
if (res.succeeded()) {
UpdateResult result = res.result();
System.out.println("Updated no. of rows: " + result.getUpdated());
System.out.println("Generated keys: " + result.getKeys());
} else {
// Failed!
}
});
准备的语句更新
要执行准备好的语句更新,
可以使用 updateWithParams
。
这种方式的更新语句包含参数占位符, 和JsonArray参数列表或参数值进行更新
String update = "UPDATE PEOPLE SET SHOE_SIZE = 10 WHERE LNAME=?";
JsonArray params = new JsonArray().add("Fox");
connection.updateWithParams(update, params, res -> {
if (res.succeeded()) {
UpdateResult updateResult = res.result();
System.out.println("No. of rows updated: " + updateResult.getUpdated());
} else {
// Failed!
}
});
可调用语句
要执行可调用语句(SQL函数或SQL过程),可以使用
callWithParams
。
这将使用标准JDBC格式 {call func_proc_name()}
接受可调用语句,
还可以选择包含参数占位符,例如:{call func_proc_name(?,?)}
,
JsonArray
包含参数值,
最后是 JsonArray
包含输出类型,例如: [null,'VARCHAR']
。
请注意,输出类型的索引与params数组一样重要。 如果返回值是第二个参数,则输出数组必须包含一个空值作为第一个元素。
一个SQL函数使用return关键字返回一些输出,在这种情况下,可以这样调用它:
String func = "{ call one_hour_ago() }";
connection.call(func, res -> {
if (res.succeeded()) {
ResultSet result = res.result();
} else {
// Failed!
}
});
在使用Procedures时,仍然通过其参数从过程中返回值,如果不返回任何内容, 用法如下:
String func = "{ call new_customer(?, ?) }";
connection.callWithParams(func, new JsonArray().add("John").add("Doe"), null, res -> {
if (res.succeeded()) {
// Success!
} else {
// Failed!
}
});
但是,您还可以返回如下值:
String func = "{ call customer_lastname(?, ?) }";
connection.callWithParams(func, new JsonArray().add("John"), new JsonArray().addNull().add("VARCHAR"), res -> {
if (res.succeeded()) {
ResultSet result = res.result();
} else {
// Failed!
}
});
请注意,参数的索引与 ?
的索引匹配,
并且输出参数期望是描述您要接收的类型的字符串。
为避免歧义,实现应遵循以下规则:
-
当
IN
数组中的占位符为NOT NULL
时,它将被采用 -
当
IN
值为NULL
时,对OUT
进行检查 当OUT
值不为空时,它将被注册为输出参数 当OUT
也为空时,预期IN
值为NULL
值。
已注册的 OUT
参数将以数组形式出现在 output
属性下的结果集中。
批量操作
SQL通用接口还定义了如何执行批处理操作。批处理操作分为3种类型:
-
批量执行sql语句
batch
-
批量执行预处理sql语句
batchWithParams
-
批量执行可调用语句
batchCallableWithParams
批量执行sql语句示例如下:
List<String> batch = new ArrayList<>();
batch.add("INSERT INTO emp (NAME) VALUES ('JOE')");
batch.add("INSERT INTO emp (NAME) VALUES ('JANE')");
connection.batch(batch, res -> {
if (res.succeeded()) {
List<Integer> result = res.result();
} else {
// Failed!
}
});
执行一个预处理sql语句,并且使用一个参数列表,示例如下
List<JsonArray> batch = new ArrayList<>();
batch.add(new JsonArray().add("joe"));
batch.add(new JsonArray().add("jane"));
connection.batchWithParams("INSERT INTO emp (name) VALUES (?)", batch, res -> {
if (res.succeeded()) {
List<Integer> result = res.result();
} else {
// Failed!
}
});
执行其他操作
要执行任何其他数据库操作,例如 CREATE TABLE
您可以使用
execute
。
sql语句将会直接在数据库中执行。操作完成后调用处理程序
String sql = "CREATE TABLE PEOPLE (ID int generated by default as identity (start with 1 increment by 1) not null," +
"FNAME varchar(255), LNAME varchar(255), SHOE_SIZE int);";
connection.execute(sql, execute -> {
if (execute.succeeded()) {
System.out.println("Table created !");
} else {
// Failed!
}
});
多个ResultSet响应
在某些情况下,您的查询可能返回多个结果集,
在这种情况下,为了保持兼容性,当返回的结果集对象转换为纯json时,使用当前结果集的 next
链接到下一个结果集。
可以像这样简单地遍历所有结果集:
while (rs != null) {
// do something with the result set...
// next step
rs = rs.getNext();
}
流式返回
在处理大量数据返回时,建议不要使用刚刚描述的API,而是使用流式返回。 因为这样可以避免将整个响应内容填充到内存和JSON中,而是逐行处理数据,例如:
connection.queryStream("SELECT * FROM large_table", stream -> {
if (stream.succeeded()) {
stream.result().handler(row -> {
// do something with the row...
});
}
});
您可以完全控制流何时暂停,恢复和结束。 对于查询返回多个结果集的情况,您应该使用结果集结束事件来获取下一个结果(如果有)。 如果有更多数据,则流处理程序将接收新数据,否则将调用结束处理程序。
connection.queryStream("SELECT * FROM large_table; SELECT * FROM other_table", stream -> {
if (stream.succeeded()) {
SQLRowStream sqlRowStream = stream.result();
sqlRowStream
.resultSetClosedHandler(v -> {
// will ask to restart the stream with the new result set if any
sqlRowStream.moreResults();
})
.handler(row -> {
// do something with the row...
})
.endHandler(v -> {
// no more data available...
});
}
});
使用事务
要使用事务,请先使用 setAutoCommit
将自动提交设置为false。
提交/回滚完成后,将调用处理程序,并且下一个事务将自动启动。
connection.commit(res -> {
if (res.succeeded()) {
// Committed OK!
} else {
// Failed!
}
});
配置项
在创建或部署配置时,会将配置传递给客户端。
以下配置属性通常适用:
provider_class
-
实际用于管理数据库连接的类的类名。 默认情况下是,
io.vertx.ext.jdbc.spi.impl.C3P0DataSourceProvider
但是如果您要使用其他提供程序, 则可以覆盖此属性并提供您的实现。 row_stream_fetch_size
-
SQLRowStream
内部缓存的大小, 曾经用来提高性能。默认情况下等于128 datasourceName
-
数据源的名称。数据库连接池的度量工具会使用该名称上报。默认情况下会使用随机 生成的 UUID
假设正在使用C3P0实现(默认设置),则将应用以下额外的配置属性:
url
-
数据库的 JDBC 连接 URL
driver_class
-
JDBC 驱动程序的类
user
-
数据库的用户名
password
-
数据库的密码
max_pool_size
-
池的最大连接数 , 默认为
15
initial_pool_size
-
用于初始化池的连接数 , 默认为
3
min_pool_size
-
连接池的最小连接数
max_statements
-
PreparedStatement
缓存的数量,默认值为0
。 max_statements_per_connection
-
每个链接
PreparedStatement
缓存的数量 , 默认值为0
。 max_idle_time
-
空闲连接将关闭之前经过的秒数 ,默认值为
0
(永不过期)。
其他连接池提供程序是:
-
Hikari
-
Agroal
与 C3P0 类似,它们可以通过在 JSON 配置对象上传递配置值来进行配置。
对于特殊情况,您不想将应用程序部署为 fat jar ,而是使用 vert.x 发行版运行,
如果您没有写权限将 JDBC 驱动程序添加到 vert.x lib 目录,
并使用 -cp
命令行标志传递它,则建议使用 Agroal。
如果要配置任何其他C3P0属性,可以将文件 c3p0.properties
添加到类路径。
以下是配置服务的示例:
JsonObject config = new JsonObject()
.put("url", "jdbc:hsqldb:mem:test?shutdown=true")
.put("driver_class", "org.hsqldb.jdbcDriver")
.put("max_pool_size", 30);
SQLClient client = JDBCClient.createShared(vertx, config);
Hikari使用一组不同的属性:
-
jdbcUrl
JDBC的URL -
driverClassName
JDBC驱动的类名 -
maximumPoolSize
连接池的大小 -
username
数据库用户名(password
数据库密码)
有关更多详细信息, 请参阅 Hikari documentation 。 另请参阅 Agroal documentation 以配置 Agroal。
JDBC驱动程序
如果使用默认值 DataSourceProvider
(依赖于c3p0),则需要在类路径中复制 JDBC 驱动程序类。
如果您的应用程序打包为 fat jar ,请确保嵌入 jdbc 驱动程序。如果您的应用程序是通过 vertx
命令行启动的,请将JDBC驱动程序复制到 ${VERTX_HOME}/lib
。
使用不同的连接池时,行为可能会有所不同。
数据类型
由于 Vert.x 支持许多不同的 SQL 数据库,因此我们依赖 JDBC 4.2 specification 来把 Java 数据类型
最好地转化为 SQL 类型
,反之亦然
如果特定 SQL 数据库中能接受的数据类型存在限制,那么您可以通过重写您自己的实现类来覆盖默认的标准:
-
encoder
JDBCEncoder
用于将Java 输入数据类型
转化为SQL 数据类型
。默认的实现类是io.vertx.ext.jdbc.spi.impl.JDBCEncoderImpl
-
decoder
JDBCDecoder
用于将SQL 结果类型
转化为Java 数据类型
。默认的实现类是io.vertx.ext.jdbc.spi.impl.JDBCDecoderImpl
您可以在以下的 SPI 文件中包含您重写的实现类
META-INF/services/io.vertx.ext.jdbc.spi.JDBCEncoder
和 META-INF/services/io.vertx.ext.jdbc.spi.JDBCDecoder
将在运行时被加载,或者您也可以直接通过配置以下的两个属性来设置:
-
encoderCls
-
decoderCls
JsonObject options = new JsonObject().put("url", "your_jdbc_url")
.put("user", "your_database_user")
.put("password", "your_database_password")
.put("encoderCls", encoderClass.getName())
.put("decoderCls", decoderClass.getName());
return JDBCClient.createShared(vertx, options);
JsonObject extraOptions = new JsonObject()
.put("encoderCls", encoderClass.getName())
.put("decoderCls", decoderClass.getName());
JDBCConnectOptions options = new JDBCConnectOptions().setJdbcUrl("your_jdbc_url")
.setUser("your_database_user")
.setPassword("your_database_password");
PoolOptions poolOptions = new PoolOptions().setMaxSize(1);
DataSourceProvider provider = new AgroalCPDataSourceProvider(options, poolOptions).init(extraOptions);
return JDBCPool.pool(vertx, provider);