响应式的 Oracle 数据库客户端
响应式的 Oracle 数据库客户端是一个以可扩展性和低开销为目标而专门为 Oracle 数据库设计的 客户端。
特性
-
事件驱动
-
内置连接池
-
Java 8 Date 和 Time
暂不支持
-
RxJava API
-
预查询缓存
-
指针
-
行串流
-
存储过程
警告
|
该模块处于 tech preview 阶段 |
使用
在您构建描述文件的 dependencies 里添加如下依赖来引入响应式 Oracle 客户端:
-
Maven (在您的
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-oracle-client</artifactId>
<version>4.2.7</version>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-oracle-client:4.2.7'
}
开始使用
如下是一种最为简单的连接、查询、关闭连接的方式
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
OraclePool client = OraclePool.pool(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
连接 Oracle 数据库
大多数情况下,您将使用连接池连接到 Oracle 数据库:
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
OraclePool client = OraclePool.pool(connectOptions, poolOptions);
池化的 Oracle 数据库客户端使用连接池去执行数据库操作, 所有操作都遵循从连接池里拿到连接、执行、释放连接到池里这三个步骤。
您可以传入一个连接池到正在运行的 Vert.x 实例里:
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
OraclePool client = OraclePool.pool(vertx, connectOptions, poolOptions);
如果不再需要连接池,您需要将其释放:
pool.close();
当您想要在同一条连接上执行多个操作时,您需要使用一个客户端的
connection
.
您可以很方便地从连接池里获取到一条数据库连接:
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
OraclePool client = OraclePool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
为了连接可以重用,一旦当前连接上的操作已经完成,您需要将数据库连接关闭并释放到连接池里。
配置
您有如下几种配置客户端的可选方案。
Data Object
通过指定 OracleConnectOptions
数据对象是一种简单的客户端的配置方式。
OracleConnectOptions connectOptions = new OracleConnectOptions()
.setPort(1521)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
OraclePool pool = OraclePool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// Handling your connection
});
连接 URI
除了使用 OracleConnectOptions
数据对象来配置连接,我们也支持使用连接 URI :
String connectionUri = "oracle:thin:scott/tiger@myhost:1521:orcl";
// 数据库连接池设置
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 使用连接 URI 创建连接池
OraclePool pool = OraclePool.pool(connectionUri, poolOptions);
执行查询
当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。
下边是如何执行一个简单的查询的例子:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预查询
执行预查询也是一样的操作。
SQL字符通过位置引用实际的参数,并使用数据库的语法 `?`
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet
实例
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者 UPDATE/INSERT 类型的查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row对象(Row
)可以让您通过索引位置获取相应的数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或者通过名称
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。
您也可以直接访问得到多种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存过的预处理语句去执行一次性的预查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以创建 PreparedStatement
并自主地管理它的生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以在预查询中执行批处理操作
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
检索生成的键值
当执行 INSERT
查询时,您可以检索生成的键的值。
生成的键值会以 Row
类型实例的形式返回。
您可以通过调用 SqlResult.property(kind)
方法并设置参数为 OracleClient.GENERATED_KEYS
对的方式来获取该实例。
您可以通过列名检索键值:
String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";
// 通过名称获取列的数据
OraclePrepareOptions options = new OraclePrepareOptions()
.setAutoGeneratedKeysIndexes(new JsonArray().add("ID"));
client.preparedQuery(sql, options).execute(Tuple.of("john", 3), ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
Row generated = result.property(OracleClient.GENERATED_KEYS);
Long id = generated.getLong("ID");
}
});
或者您也可以通过列索引的方式来获取:
String sql = "INSERT INTO EntityWithIdentity (name, position) VALUES (?, ?)";
// 通过索引获取列的数据
OraclePrepareOptions options = new OraclePrepareOptions()
.setAutoGeneratedKeysIndexes(new JsonArray().add("1"));
client.preparedQuery(sql, options).execute(Tuple.of("john", 3), ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
Row generated = result.property(OracleClient.GENERATED_KEYS);
Long id = generated.getLong("ID");
}
});
使用连接
获取一条连接
当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
也可以通过连接对象创建预查询:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE ?")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化的连接API
当您使用连接池时,您可以调用 withConnection
并以当前连接要执行的操作作为参数。
这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。
这种方式需要返回一个future对象来表示操作结果。
当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接中使用事务操作
您可以使用SQL语法 BEGIN
/COMMIT
/ROLLBACK
来执行事务操作,同时您必须使用
SqlConnection
并自己管理当前连接。
或者您也可以使用 SqlConnection
的事务API:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block)
,事务已回滚和 completion
方法的返回值future返回了
TransactionRollbackException
异常时:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化版事务API
当您使用连接池时,您可以调用 withTransaction
方法
并传递待执行的事务操作作为参数。
这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。
待执行操作需要返回一个future来表示可能产生的结果:
-
当future成功时,客户端提交该事务
-
当future失败时,客户端回滚该事务
事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式操作
默认情况下预查询操作会拉去所有的行记录,您可以使用
游标
来控制您想要读取的行数:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of(18));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - close the cursor
cursor.close();
}
}
});
}
});
游标释放时需要同时执行关闭操作:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
上边的stream会批量读取 50
行并同时将其转换为流,当这些行记录被传递给处理器时,
会以此类推地读取下一批的 50
行记录。
stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。
支持的数据类型
目前,客户端支持以下Oracle数据类型:
-
CHAR/VARCHAR2(
java.lang.String
) -
NCHAR/NVARCHAR2(
java.lang.String
) -
NUMBER(
BigDecimal
) -
FLOAT(
java.lang.Double
) -
DATE(
java.time.LocalDate
) -
TIMESTAMP(
java.time.LocalDateTime
)
Tuple(元组)解码在存储值时使用上述类型。
跟踪查询
当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。
客户端会上报下列这些 client spans:
-
Query
操作名称 -
tags
-
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL语句 -
db.type
:sql
默认的 tracing 策略时 PROPAGATE
,客户端
在一个活跃trace里只创建一个span。
您可以通过 setTracingPolicy
方法来调整tracing策略,
例如您可以设置为 ALWAYS
,
客户端将始终上报span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
Collector 式查询
您可以将 Java 的 collector 与查询 API 结合使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
collector 式查询获取的结果集处理过程中不能再拿到 Row
的引用,因为
oracle 数据库客户端在处理 collector 时,只会用一个 row 用于处理整个集合。
Java 的 Collectors
类提供了很多有趣的预定义的 collector,比如您可以很容易
从 row 集合里得到一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});