Reactive DB2 客户端
响应式的DB2客户端具有直接简单的API, 专注于可伸缩性和低开销。
该客户端是响应式的、非阻塞的,可以用单个线程处理多个数据库连接。
特性
-
支持Linux、Unix,及Windows上的DB2
-
对z/OS上的DB2提供有限支持
-
事件驱动
-
轻量级
-
内置连接池
-
预处理查询(Prepared query)缓存
-
批处理及游标支持
-
流式行处理
-
RxJava 1 与 RxJava 2 支持
-
支持对象存储在直接内存,避免不必要的拷贝
-
支持Java 8的日期和时间类型
-
支持 SSL/TLS
-
支持 HTTP/1.x 连接,SOCKS4a 及 SOCKS5 等代理
当前限制
-
不支持存储过程
-
不支持某些列类型(例如BLOB和CLOB)
使用方法
使用响应式 DB2 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:
-
Maven (在您的
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-db2-client</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-db2-client:4.0.3'
}
开始
以下是最简单的连接,查询和断开连接方法
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建客户端池
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);
// 简单查询
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// 现在关闭客户端池
client.close();
});
连接 DB2
大多数时候,您将使用连接池连接 DB2:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);
池化的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。
如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);
当您不再需要连接池时,您需要将其释放:
pool.close();
当您需要在同一连接上执行多个操作时,您需要使用
connection
客户端。
您可以轻松地从连接池中获取一个:
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);
// 从连接池获取一个连接
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// 以下所有操作都在同一个连接上执行
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// 将连接释放回连接池
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
连接使用完后,您必须关闭它以释放到连接池中,以便可以重复使用。
配置
有几个选项供您配置客户端。
数据对象
配置客户端的简单方法就是指定 DB2ConnectOptions
数据对象。
DB2ConnectOptions connectOptions = new DB2ConnectOptions()
.setPort(50000)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池配置
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 从数据对象创建连接池
DB2Pool pool = DB2Pool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// 使用连接进行处理
});
您也可以使用 setProperties
及 addProperty
方法配置通用配置项。但请注意调用 setProperties
方法会覆盖默认的客户端配置。
连接 URI
除了使用 DB2ConnectOptions
数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:
String connectionUri = "db2://dbuser:secretpassword@database.server.com:50000/mydb";
// 从连接URI创建连接池
DB2Pool pool = DB2Pool.pool(connectionUri);
// 从连接URI创建连接
DB2Connection.connect(vertx, connectionUri, res -> {
// 使用连接进行处理
});
连接字符串的URI格式为:
db2://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DBNAME>
目前,客户端支持以下的连接 uri 参数关键字:
-
host
-
port
-
user
-
password
-
dbname
Note: 连接URI中配置的配置项会覆盖默认配置。
执行查询
当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。 下边是如何执行一个简单的查询的例子:
client
.query("SELECT * FROM users WHERE id='andy'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预查询
执行预查询也是一样的操作。
SQL字符通过位置引用实际的参数,并使用数据库的语法 `?`
client
.preparedQuery("SELECT * FROM users WHERE id=$1")
.execute(Tuple.of("andy"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet
实例
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者 UPDATE/INSERT 类型的查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
.execute(Tuple.of("Andy", "Guibert"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row对象(Row
)可以让您通过索引位置获取相应的数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或者通过名称
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。
您也可以直接访问得到多种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存过的预处理语句去执行一次性的预查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以创建 PreparedStatement
并自主地管理它的生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id= ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以在预查询中执行批处理操作
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julient Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
batch.add(Tuple.of("andy", "Andy Guibert"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
通过将查询包装在 SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> )
,可以获取生成的键,例如:
client
.preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )")
.execute(Tuple.of("white", "red", "blue"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows.");
for (Row row : rows) {
System.out.println("generated key: " + row.getInteger("color_id"));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
使用连接
获取一条连接
当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Andy", "Guibert")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
也可以通过连接对象创建预查询:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Andy"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化的连接API
当您使用连接池时,您可以调用 withConnection
并以当前连接要执行的操作作为参数。
这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。
这种方式需要返回一个future对象来表示操作结果。
当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Andy", "Guibert")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接中使用事务操作
您可以使用SQL语法 BEGIN
/COMMIT
/ROLLBACK
来执行事务操作,同时您必须使用
SqlConnection
并自己管理当前连接。
或者您也可以使用 SqlConnection
的事务API:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block)
,事务已回滚和 completion
方法的返回值future返回了
TransactionRollbackException
异常时:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化版事务API
当您使用连接池时,您可以调用 withTransaction
方法
并传递待执行的事务操作作为参数。
这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。
待执行操作需要返回一个future来表示可能产生的结果:
-
当future成功时,客户端提交该事务
-
当future失败时,客户端回滚该事务
事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式操作
默认情况下预查询操作会拉去所有的行记录,您可以使用
游标
来控制您想要读取的行数:
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Cursors require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of("julien"));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - commit the transaction
tx.commit();
}
}
});
}
});
}
});
游标释放时需要同时执行关闭操作:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Streams require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
tx.commit();
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
}
});
上边的stream会批量读取 50
行并同时将其转换为流,当这些行记录被传递给处理器时,
会以此类推地读取下一批的 50
行记录。
stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。
追踪查询
当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。
客户端会上报下列这些 client spans:
-
Query
操作名称 -
tags
-
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL语句 -
db.type
:sql
默认的 tracing 策略时 PROPAGATE
,客户端
在一个活跃trace里只创建一个span。
您可以通过 setTracingPolicy
方法来调整tracing策略,
例如您可以设置为 ALWAYS
,
客户端将始终上报span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
DB2 类型映射
当前客户端支持以下 DB2 类型
-
BOOLEAN (
java.lang.Boolean
) (只针对DB2 LUW) -
SMALLINT (
java.lang.Short
) -
INTEGER (
java.lang.Integer
) -
BIGINT (
java.lang.Long
) -
REAL (
java.lang.Float
) -
DOUBLE (
java.lang.Double
) -
DECIMAL (
io.vertx.sqlclient.data.Numeric
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
ENUM (
java.lang.String
) -
DATE (
java.time.LocalDate
) -
TIME (
java.time.LocalTime
) -
TIMESTAMP (
java.time.LocalDateTime
) -
BINARY (
byte[]
) -
VARBINARY (
byte[]
) -
ROWID (
io.vertx.db2client.impl.drda.DB2RowId
或java.sql.RowId
) (只针对DB2 z/OS)
以下类型目前不支持:
-
XML
-
BLOB
-
CLOB
-
DBCLOB
-
GRAPHIC / VARGRAPHIC
有关进一步介绍DB2数据类型的文档,请参考以下资源:
元组解码在存储值时使用上述类型,并且在可能的情况下还对实际值进行即时转换:
pool
.query("SELECT an_int_column FROM exampleTable")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// INTEGER 类型字段读取出来是 java.lang.Integer
Object value = row.getValue(0);
// 转换为 java.lang.Long
Long longValue = row.getLong(0);
});
使用 Java 枚举类型
您可以将Java的 枚举类型 映射为下面的列类型:
-
Strings (VARCHAR, TEXT)
-
Numbers (SMALLINT, INTEGER, BIGINT)
client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows");
for (Row row : rows) {
System.out.println("Day: " + row.get(Days.class, "day_name"));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Inserted " + rows.rowCount() + " new rows");
for (Row row : rows) {
System.out.println("Day: " + row.get(Days.class, "day_num"));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
String类型使用Java枚举 name()
方法返回的名字进行匹配。
数值类型使用Java枚举 ordinal()
方法返回的序数进行匹配,row.get() 方法获取到的是整型值对应Java枚举序数的枚举值的 name()
值。
集合类查询
您可以将查询API与Java集合类结合使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// 运行查询使用集合类
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// 获取用集合类创建的map
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
集合类处理不能保留 Row
的引用,因为只有一个 Row 对象用于处理整个集合。
Java Collectors
提供了许多有趣的预定义集合类,例如,
您可以直接用 Row 中的集合轻松拼接成一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// 获取用集合类创建的String
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
使用 SSL/TLS
配置客户端使用SSL连接, 您可以像 Vert.x NetClient
一样配置
DB2ConnectOptions
。
DB2ConnectOptions options = new DB2ConnectOptions()
.setPort(50001)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSsl(true)
.setTrustStoreOptions(new JksOptions()
.setPath("/path/to/keystore.p12")
.setPassword("keystoreSecret"));
DB2Connection.connect(vertx, options, res -> {
if (res.succeeded()) {
// 使用SSL进行连接
} else {
System.out.println("Could not connect " + res.cause());
}
});
更多详细信息,请参阅 Vert.x 文档.
使用代理
您可以配置客户端使用 HTTP/1.x 连接、SOCKS4a 或 SOCKS5 代理。
更多详细信息,请参阅 Vert.x 文档。
Unresolved directive in index.adoc - include::override/rxjava2.adoc[]