响应式 MSSQL 客户端
Reactive MSSQL 客户端是微软 SQL Server 的客户端, 具有简单直接专注于可扩展性和低开销的 API。
The client is reactive and non-blocking, allowing to handle many database connections with a single thread.
特性
-
事件驱动
-
轻量级
-
内置连接池
-
支持内存直接映射到对象,避免了不必要的复制
-
Java 8 Date 和 Time 支持
-
RxJava API
-
SSL/TLS
-
游标
-
行流(Row streaming)
暂不支持
-
Prepared 查询缓存
用法
要使用响应式 MSSQL 客户端,请将以下依赖项添加到构建描述文件中的 dependencies 部分:
-
Maven (in your
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mssql-client</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle (in your
build.gradle
file):
dependencies {
compile 'io.vertx:vertx-mssql-client:4.4.0'
}
由此开始
这是最简单的连接、查询和断开连接方法:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池参数
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建客户端池
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);
// 一个简单的查询
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// 现在关闭池
client.close();
});
连接 SQL Server
大多数情况下,您将使用池连接到 MSSQL:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 池参数
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);
池化客户端使用连接池,任何操作都会从池中借用连接, 随后执行操作,并最终执行完之后将其释放到池中。
如果您使用 Vert.x 运行,您可以将它传递给您的 Vertx 实例:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);
当您不再需要它时,您需要释放池:
pool.close();
当您需要在同一个连接上执行多个操作时,需要使用客户端
连接
。
您可以轻松地从池中获取一个连接:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);
// 从池中获取连接
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// 所有操作都在同一个连接上执行
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// 释放连接并将其归还给池
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
完成连接后,您必须关闭它以将其释放到池中,以便可以重复使用。
配置
数据对象
一种配置客户端的简单方法是指定一个 MSSQLConnectOptions
数据对象。
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setPort(1433)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 池参数
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 根据数据对象创建池
MSSQLPool pool = MSSQLPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// 处理您的连接
});
连接 URI
一种替代用 MSSQLConnectOptions
数据对象配置客户端的方案,便是您可使用连接 URI:
String connectionUri = "sqlserver://dbuser:secretpassword@database.server.com:1433/mydb";
// 从连接 URI 创建池
MSSQLPool pool = MSSQLPool.pool(connectionUri);
// 从连接 URI 创建连接
MSSQLConnection.connect(vertx, connectionUri, res -> {
// 处理您的连接
});
连接 URI 格式由客户端以惯用方式定义:
sqlserver://[user[:[password]]@]host[:port][/database][?<key1>=<value1>[&<key2>=<value2>]]
当前,客户端在连接 uri 中支持以下参数关键字(key不区分大小写):
-
host
-
port
-
user
-
password
-
database
注意
|
通过 URI 配置的参数将会覆盖默认的配置参数。 |
执行查询
当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。
下边是如何执行一个简单的查询的例子:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预查询
执行预查询也是一样的操作。
SQL字符通过位置引用实际的参数,并使用数据库的语法 `@p1`, `@p2`, etc…
client
.preparedQuery("SELECT * FROM users WHERE id=@p1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet
实例
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者 UPDATE/INSERT 类型的查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (@p1, @p2)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row对象(Row
)可以让您通过索引位置获取相应的数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
小心
|
Column indexes start at 0, not at 1. |
Alternatively, data can be retrieved by name:
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。
您也可以直接访问得到多种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存过的预处理语句去执行一次性的预查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = @p1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以创建 PreparedStatement
并自主地管理它的生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = @p1", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以在预查询中执行批处理操作
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (@p1, @p2)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
使用自增列
您可以在插入新数据后使用 OUTPUT
子句来获取自增列的值
client
.preparedQuery("INSERT INTO movies (title) OUTPUT INSERTED.id VALUES (@p1)")
.execute(Tuple.of("The Man Who Knew Too Much"), res -> {
if (res.succeeded()) {
Row row = res.result().iterator().next();
System.out.println(row.getLong("id"));
}
});
使用连接
获取一条连接
当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
也可以通过连接对象创建预查询:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE @p1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化的连接API
当您使用连接池时,您可以调用 withConnection
并以当前连接要执行的操作作为参数。
这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。
这种方式需要返回一个future对象来表示操作结果。
当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接中使用事务操作
您可以使用SQL语法 BEGIN
/COMMIT
/ROLLBACK
来执行事务操作,同时您必须使用
SqlConnection
并自己管理当前连接。
或者您也可以使用 SqlConnection
的事务API:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block)
,事务已回滚和 completion
方法的返回值future返回了
TransactionRollbackException
异常时:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化版事务API
当您使用连接池时,您可以调用 withTransaction
方法
并传递待执行的事务操作作为参数。
这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。
待执行操作需要返回一个future来表示可能产生的结果:
-
当future成功时,客户端提交该事务
-
当future失败时,客户端回滚该事务
事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式操作
默认情况下,预查询操作会拉去所有的行记录,您可以使用
游标
来控制您想要读取的行数:
connection.prepare("SELECT * FROM users WHERE age > @p1", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
游标释放时需要同时执行关闭操作:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。
connection.prepare("SELECT * FROM users WHERE age > @p1", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
上边的stream会批量读取 50
行并同时将其转换为流,当这些行记录被传递给处理器时,
会以此类推地读取下一批的 50
行记录。
stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。
查询追踪
当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。
客户端会上报下列这些 client spans:
-
Query
操作名称 -
tags
-
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL语句 -
db.type
:sql
默认的 tracing 策略时 PROPAGATE
,客户端
在一个活跃trace里只创建一个span。
您可以通过 setTracingPolicy
方法来调整tracing策略,
例如您可以设置为 ALWAYS
,
客户端将始终上报span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
支持的数据类型
当前,客户端支持以下 SQL Server 类型:
-
TINYINT(
java.lang.Short
) -
SMALLINT(
java.lang.Short
) -
INT(
java.lang.Integer
) -
BIGINT(
java.lang.Long
) -
BIT(
java.lang.Boolean
) -
REAL(
java.lang.Float
) -
DOUBLE(
java.lang.Double
) -
NUMERIC/DECIMAL(
BigDecimal
) -
CHAR/VARCHAR(
java.lang.String
) -
NCHAR/NVARCHAR(
java.lang.String
) -
DATE(
java.time.LocalDate
) -
TIME(
java.time.LocalTime
) -
SMALLDATETIME(
java.time.LocalDateTime
) -
DATETIME(
java.time.LocalDateTime
) -
DATETIME2(
java.time.LocalDateTime
) -
DATETIMEOFFSET(
java.time.OffsetDateTime
) -
BINARY/VARBINARY(
io.vertx.core.buffer.Buffer
) -
MONEY (
BigDecimal
) -
SMALLMONEY (
BigDecimal
) -
GUID (
UUID
)
元组(Tuple)解码在存储值时使用上述类型。
使用 Java 的 枚举
类型
SQL Server 没有 ENUM
数据类型,但客户端可以将检索到的字符串/数字数据类型映射到枚举。
您可以像这样对 Java 枚举进行编码:
client
.preparedQuery("INSERT INTO colors VALUES (@p1)")
.execute(Tuple.of(Color.red), res -> {
// ...
});
您可以像这样解码 Java 枚举:
client
.preparedQuery("SELECT color FROM colors")
.execute()
.onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
处理空 NULL
如果您使用 addXXX
方法之一修改 Tuple
,则 null
值被透明处理。
客户端可以在执行准备好的查询时推断正确的 SQL 类型:
Tuple tuple = Tuple.tuple()
.addInteger(17)
.addString("The Man Who Knew Too Much")
.addString(null);
client
.preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
.execute(tuple, res -> {
// ...
});
否则,您应该使用 NullValue
常量和 NullValue.of
方法之一以显式声明类型:
Tuple tuple = Tuple.of(17, "The Man Who Knew Too Much", NullValue.String);
client
.preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
.execute(tuple, res -> {
// ...
});
收集器查询
您可以通过查询 API 使用 Java 收集器:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// 使用收集器运行查询
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// 获取收集器创建的映射(map)对象
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
收集器处理时不得保留对 Row
的引用,
因为有一行用于处理整个集合。
Java Collectors
提供了许多有趣的预定义收集器,例如您可以
create 直接从行集轻松创建字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// 使用收集器运行查询
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// 获取收集器创建的字符串
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
信息消息
SQL Server 可以将信息消息作为对查询响应的一部分发送给客户端。
默认情况下,它们以 WARN
级别写入日志。
您可以在连接上设置一个处理器将它们捕获并对做些有用的事。
connection.infoHandler(info -> {
System.out.println("Received info " + info.getSeverity() + "" + info.getMessage());
});
使用 SSL/TLS
加密等级协商
当一个数据库连接建立时,客户端和服务端需要协商加密等级。
协商的加密等级取决于客户端的 MSSQLConnectOptions
配置和服务端和配置:
-
不加密: 如果客户端的
ssl
配置设置为false
,并且服务端不支持加密 -
只加密登录报文: 如果客户端的
ssl
配置设置为false
, 并且服务端支持加密 -
加密整个连接通道: 如果客户端的
ssl
配置设置为true
, 并且服务端要求加密
注意
|
如果客户端的 |
配置
为了设置客户端的 ssl
项配置, 使用 setSsl
方法。
默认情况下, ssl
配置为 false
。
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions().setSsl(true);
当 ssl
配置为 false
时,客户端信任所有的服务端密钥。
否则,客户端会验证主机名。
如果客户端的 ssl
配置为 true
, 并且服务端使用了自签名的密钥, 可以禁用主机名验证:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setSsl(true)
.setTrustAll(true);
客户端也可以选择配置信任所有服务端的密钥,使用 TrustOptions
进行配置。
例如,如果服务端密钥放置在一个 PEM 格式的文件中时,可以使用 PemTrustOptions
进行如下配置:
MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
.setSsl(true)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/server-cert.pem"));
对于 Vert.x 中更高级的 SSL 支持,请参考 Vert.x Core 文档。
可共享的连接池
您可以在多个 Verticle 间或同一 Verticle 的多个实例间共享一个连接池。这样的连接池应该在 Verticle 外面创建, 否则这个连接池将在创建它的 Verticle 被取消部署时关闭
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() throws Exception {
// 使用连接池
}
}, new DeploymentOptions().setInstances(4));
您也可以用以下方式在每个 Verticle 中创建可共享的连接池:
vertx.deployVerticle(() -> new AbstractVerticle() {
MSSQLPool pool;
@Override
public void start() {
// 创建一个可共享的连接池
// 或获取已有的可共享连接池,并创建对原连接池的借用
// 当 verticle 被取消部署时,借用会被自动释放
pool = MSSQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool"));
}
}, new DeploymentOptions().setInstances(4));
第一次创建可共享的连接池时,会创建新连接池所需的资源。之后再调用该创建方法时,会复用之前的连接池,并创建 对原有连接池的借用。当所有的借用都被关闭时,该连接池的资源也会被释放。
默认情况下,客户端需要创建一个 TCP 连接时,会复用当前的 event-loop 。 这个可共享的 HTTP 客户端会 以一种安全的模式,在使用它的 verticle 中随机选中一个 verticle,并使用它的 event-loop。
您可以手动设置一个客户端可以使用的 event-loop 的数量
MSSQLPool pool = MSSQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool")
.setEventLoopSize(4));
高级连接池配置
数据库服务负载均衡
您可以使用包含多个数据库服务的列表来配置连接池而不是单个数据库服务。
MSSQLPool pool = MSSQLPool.pool(Arrays.asList(server1, server2, server3), options);
当一个连接创建时,连接池使用(round-robin)轮询调度算法做负载均衡以选择不同的数据库服务
注意
|
负载均衡是在创建连接时提供的,而不是在从连接池中获取连接时提供 |
连接初始化
您可以使用 connectHandler
方法在连接创建后和连接释放回连接池之前来与数据库连接交互
pool.connectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// 将连接释放回连接池,以被该应用程序复用
conn.close();
});
});
连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用