响应式(Reactive) PostgreSQL 客户端
响应式PostgreSQL客户端是一款以可扩展性和低开销为目标而专门为PostgreSQL数据库设计的 客户端。
客户端是响应式和非阻塞的,可以仅仅使用一条线程来处理大量的数据库连接。
-
事件驱动
-
轻量级
-
内置连接池
-
预查询缓存
-
基于PostgreSQL的`NOTIFY/LISTEN`机制实现的发布/订阅
-
批处理和游标
-
支持原生流式操作
-
命令管道(pipeline)
-
RxJava 1 and RxJava 2
-
使用直接内存存储对象,避免了不必要的复制
-
支持Java 8 Date and Time
-
SSL/TLS
-
Unix domain socket
-
支持HTTP/1.x, SOCKS4a 或 SOCKS5 代理
用法
在 dependencies 里添加如下依赖来引入响应式PostgreSQL客户端:
-
Maven(在您的
pom.xml
文件里):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-pg-client</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle(在您的
build.gradle
文件里):
dependencies {
compile 'io.vertx:vertx-pg-client:4.0.3'
}
开始使用
如下是一种最为简单的连接、查询、关闭连接的方式
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the client pool
PgPool client = PgPool.pool(connectOptions, poolOptions);
// A simple query
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// Now close the pool
client.close();
});
连接PostgreSQL
大多数时间,您将使用连接池连接到 PostgreSQL:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool client = PgPool.pool(connectOptions, poolOptions);
池化PostgreSQL客户端使用连接池去执行数据库操作, 所有操作都会遵循从池里拿到连接、执行、释放连接到池里这三个步骤。
您可以传入一个连接池到正在运行的Vert.x实例里:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool client = PgPool.pool(vertx, connectOptions, poolOptions);
如果不再需要连接池,您需要将其释放:
pool.close();
当您想要在同一条连接上执行多个操作时,您需要使用客户端
connection
。
您可以很方便地从连接池里拿到一条连接:
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool client = PgPool.pool(vertx, connectOptions, poolOptions);
// Get a connection from the pool
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// All operations execute on the same connection
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// Release the connection to the pool
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
为了连接可以重用,一旦当前连接上的操作已经完成,您需要关闭并释放连接到连接池里。
某些情况下您希望通过Unix domain socket类型的连接来提升性能,我们通过Vert.x本机传输支持了这种方式。
首先确保您已经在classpath下添加了 netty-transport-native
这个必须的依赖,同时开启了Unix domain socket功能(pg)选项。
PgConnectOptions connectOptions = new PgConnectOptions()
.setHost("/var/run/postgresql")
.setPort(5432)
.setDatabase("the-db");
// Pool options
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// Create the pooled client
PgPool client = PgPool.pool(connectOptions, poolOptions);
// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
PgPool client2 = PgPool.pool(vertx, connectOptions, poolOptions);
更多详情可以在这里找到 Vert.x 文档。
配置
有如下几种配置客户端的可选方案。
data object
通过指定 PgConnectOptions
数据对象是一种简单的客户端的配置方式。
PgConnectOptions connectOptions = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// Create the pool from the data object
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// Handling your connection
});
您也可以使用 setProperties`或 `addProperty`方法配置通用属性。注意 `setProperties
将覆盖默认的客户端属性。
例如,您可以通过添加 search_path
属性来配置一个默认的schema。
PgConnectOptions connectOptions = new PgConnectOptions();
// Set the default schema
Map<String, String> props = new HashMap<>();
props.put("search_path", "myschema");
connectOptions.setProperties(props);
关于可用属性的更多信息可以在这里找到 PostgreSQL Manuals。
连接uri
除了使用 PgConnectionOptions
对象,我们也提供了另一种基于URI的可选配置方案:
String connectionUri = "postgresql://dbuser:secretpassword@database.server.com:3211/mydb";
// Create the pool from the connection URI
PgPool pool = PgPool.pool(connectionUri);
// Create the connection from the connection URI
PgConnection.connect(vertx, connectionUri, res -> {
// Handling your connection
});
关于连接uri字符串格式的更多信息可以在这里找到 PostgreSQL 手册。
当前版本的客户端支持在连接uri里使用如下参数
-
host
-
hostaddr
-
port
-
user
-
password
-
dbname
-
sslmode
-
properties including(application_name, fallback_application_name, search_path)
注意
|
通过URI配置的属性将会覆盖默认的配置属性。 |
环境变量
您也可以使用环境变量来设置连接的属性值,以此来避免硬编码数据库连接信息。 您可以参考 官方文档来了解更多详情。 目前支持下列这些配置参数:
-
PGHOST
-
PGHOSTADDR
-
PGPORT
-
PGDATABASE
-
PGUSER
-
PGPASSWORD
-
PGSSLMODE
如果您没有在连接时指定连接对象或者URI字符串,此时将会使用环境变量。
$ PGUSER=user \
PGHOST=the-host \
PGPASSWORD=secret \
PGDATABASE=the-db \
PGPORT=5432 \
PGSSLMODE=DISABLE
PgPool pool = PgPool.pool();
// Create the connection from the environment variables
PgConnection.connect(vertx, res -> {
// Handling your connection
});
SASL SCRAM-SHA-256 鉴权机制。
为了使用 sasl SCRAM-SHA-256鉴权,需要在 dependencies 里添加如下依赖:
-
Maven(在您的
pom.xml
文件里):
<dependency>
<groupId>com.ongres.scram</groupId>
<artifactId>client</artifactId>
<version>2.1</version>
</dependency>
-
Gradle(在您的
build.gradle
文件里):
dependencies {
compile 'com.ongres.scram:client:2.1'
}
注意
|
SCRAM-SHA-256-PLUS(在Postgresql 11中加入)当前版本客户端暂不支持。 |
执行查询
当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。 下边是如何执行一个简单的查询的例子:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预查询
执行预查询也是一样的操作。
SQL字符通过位置引用实际的参数,并使用数据库的语法 `$1`, `$2`, etc…
client
.preparedQuery("SELECT * FROM users WHERE id=$1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet
实例
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者 UPDATE/INSERT 类型的查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row对象(Row
)可以让您通过索引位置获取相应的数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或者通过名称
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。
您也可以直接访问得到多种类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存过的预处理语句去执行一次性的预查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = $1")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以创建 PreparedStatement
并自主地管理它的生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = $1", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以在预查询中执行批处理操作
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// Execute the prepared batch
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// Process rows
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
您可以使用 'RETURNING' 从查询里拿到生成的key:
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
.execute(Tuple.of("white", "red", "blue"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
for (Row row : rows) {
System.out.println("generated key: " + row.getInteger("color_id"));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
带有 RETURNING
语句的批量查询创建了一个 RowSet
,
这个RowSet包含了该批量查询中的每一个元素。
client
.preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
.executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")),ar -> {
if (ar.succeeded()) {
for (RowSet<Row> rows = ar.result();rows.next() != null;rows = rows.next()) {
Integer colorId = rows.iterator().next().getInteger("color_id");
System.out.println("generated key: " + colorId);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
使用连接
获取一条连接
当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// Return the connection to the pool
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
也可以通过连接对象创建预查询:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE $1")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// All rows
});
简化的连接API
当您使用连接池时,您可以调用 withConnection
并以当前连接要执行的操作作为参数。
这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。
这种方式需要返回一个future对象来表示操作结果。
当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
连接中使用事务操作
您可以使用SQL语法 BEGIN
/COMMIT
/ROLLBACK
来执行事务操作,同时您必须使用
SqlConnection
并自己管理当前连接。
或者您也可以使用 SqlConnection
的事务API:
pool.getConnection()
// Transaction must use a connection
.onSuccess(conn -> {
// Begin the transaction
conn.begin()
.compose(tx -> conn
// Various statements
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// Commit the transaction
.compose(res3 -> tx.commit()))
// Return the connection to the pool
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block)
,事务已回滚和 completion
方法的返回值future返回了
TransactionRollbackException
异常时:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简化版事务API
当您使用连接池时,您可以调用 withTransaction
方法
并传递待执行的事务操作作为参数。
这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。
待执行操作需要返回一个future来表示可能产生的结果:
-
当future成功时,客户端提交该事务
-
当future失败时,客户端回滚该事务
事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// Map to a message result
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流式操作
默认情况下预查询操作会拉去所有的行记录,您可以使用
游标
来控制您想要读取的行数:
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Cursors require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Create a cursor
Cursor cursor = pq.cursor(Tuple.of("julien"));
// Read 50 rows
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// Check for more ?
if (cursor.hasMore()) {
// Repeat the process...
} else {
// No more rows - commit the transaction
tx.commit();
}
}
});
}
});
}
});
游标释放时需要同时执行关闭操作:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// Close the cursor
cursor.close();
}
});
stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。
connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
if (ar0.succeeded()) {
PreparedStatement pq = ar0.result();
// Streams require to run within a transaction
connection.begin(ar1 -> {
if (ar1.succeeded()) {
Transaction tx = ar1.result();
// Fetch 50 rows at a time
RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));
// Use the stream
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
tx.commit();
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
}
});
上边的stream会批量读取 50
行并同时将其转换为流,当这些行记录被传递给处理器时,
会以此类推地读取下一批的 50
行记录。
stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。
注意
|
PostreSQL会在事务结束后销毁游标,因而游标API应该在事务内使用,
否则您将可能收到 34000 PostgreSQL错误码。
|
跟踪查询
当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。
客户端会上报下列这些 client spans:
-
Query
操作名称 -
tags
-
db.user
:数据库用户名 -
db.instance
:数据库实例 -
db.statement
:SQL语句 -
db.type
:sql
默认的 tracing 策略时 PROPAGATE
,客户端
在一个活跃trace里只创建一个span。
您可以通过 setTracingPolicy
方法来调整tracing策略,
例如您可以设置为 ALWAYS
,
客户端将始终上报span:
options.setTracingPolicy(TracingPolicy.ALWAYS);
PostgreSQL 类型映射
当前版本客户端支持下列的PostgreSQL类型
-
BOOLEAN (
java.lang.Boolean
) -
INT2 (
java.lang.Short
) -
INT4 (
java.lang.Integer
) -
INT8 (
java.lang.Long
) -
FLOAT4 (
java.lang.Float
) -
FLOAT8 (
java.lang.Double
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
TEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
NAME (
java.lang.String
) -
SERIAL2 (
java.lang.Short
) -
SERIAL4 (
java.lang.Integer
) -
SERIAL8 (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
UUID (
java.util.UUID
) -
DATE (
java.time.LocalDate
) -
TIME (
java.time.LocalTime
) -
TIMETZ (
java.time.OffsetTime
) -
TIMESTAMP (
java.time.LocalDateTime
) -
TIMESTAMPTZ (
java.time.OffsetDateTime
) -
INTERVAL (
io.vertx.pgclient.data.Interval
) -
BYTEA (
io.vertx.core.buffer.Buffer
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
JSONB (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
POINT (
io.vertx.pgclient.data.Point
) -
LINE (
io.vertx.pgclient.data.Line
) -
LSEG (
io.vertx.pgclient.data.LineSegment
) -
BOX (
io.vertx.pgclient.data.Box
) -
PATH (
io.vertx.pgclient.data.Path
) -
POLYGON (
io.vertx.pgclient.data.Polygon
) -
CIRCLE (
io.vertx.pgclient.data.Circle
) -
TSVECTOR (
java.lang.String
) -
TSQUERY (
java.lang.String
)
Tuple(元组)在解码时使用上述类型映射关系存储解码出的值,并且在合法的条件下可以动态转换为实际类型的值:
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
Tuple(元组)编码时使用上述关系作为类型间的映射关系,除非类型为数字,在这种情况下将会使用 java.lang.Number
:
pool
.query("SELECT 1::BIGINT \"VAL\"")
.execute(ar -> {
RowSet<Row> rowSet = ar.result();
Row row = rowSet.iterator().next();
// Stored as java.lang.Long
Object value = row.getValue(0);
// Convert to java.lang.Integer
Integer intValue = row.getInteger(0);
});
上述类型的数组形式也是支持的。
JSON
PostgreSQL的 JSON
和 JSONB
用下列的java类型表示:
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
for representing the JSON null literal
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // Expect JSON object
//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3
数字类型(Numeric)
java的 Numeric
用来表示PostgreSQL的 NUMERIC
类型。
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// Handle NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
日期/时间类型(Date/Time)的最值
PostgreSQL定义了几个特殊的值用来表示这些最值。
相应类型的最大/最小值由这些常量 特殊值 表示。
-
OffsetDateTime.MAX
/OffsetDateTime.MIN` -
LocalDateTime.MAX
/LocalDateTime.MIN` -
LocalDate.MAX
/LocalDate.MIN`
client
.query("SELECT 'infinity'::DATE \"LocalDate\"")
.execute(ar -> {
if (ar.succeeded()) {
Row row = ar.result().iterator().next();
System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX));
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
自定义类型
发送和从Postgres接收的自定义类型都由字符串来表示。
您可以读取PostgreSQL并以字符串的形式得到自定义的类型值
client
.preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
.execute(Tuple.of(3), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以向PostgreSQL写入字符串
client
.preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
.execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
文本检索
文本检索使用的是java的 String
client
.preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Match : " + row.getBoolean(0));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
tsvector
和 tsquery
可以使用java的 String
类型来从数据库中获取
client
.preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
.execute(Tuple.of("fat cats ate fat rats", "fat & rat"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
枚举类型
PostgreSQL的 枚举类型 被映射为java的字符串。
client
.preparedQuery("INSERT INTO colors VALUES ($2)")
.execute(Tuple.of("red"), res -> {
// ...
});
使用Java枚举类型
您可以将Java的 枚举类型 映射为下列这些列类型:
-
Strings (VARCHAR, TEXT)
-
PosgreSQL enumerated types
-
Numbers (INT2, INT4, INT8)
client
.preparedQuery("INSERT INTO colors VALUES ($1)")
.execute(Tuple.of(Color.red))
.flatMap(res ->
client
.preparedQuery("SELECT color FROM colors")
.execute()
).onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
String and PostgreSQL enumerated types 对应Java枚举类的 name()
方法的返回值。
Numbers类型对应Java枚举类的 ordinal()
方法的返回值。
Collector式查询
您可以将Java collector与查询API结合使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// Run the query with the collector
client.query("SELECT * FROM users")
.collecting(collector)
.execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// Get the map created by the collector
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
collector式查询的结果集处理过程中不能再拿到 Row
的引用,因为
pg客户端在处理collector时,只会用一个row用于处理整个集合。
Java的 Collectors
类提供了很多很有趣的预定义的collector,比如您可以很容易
从row集合里得到一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// Get the string created by the collector
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
发布/订阅
PostgreSQL支持发布/订阅式的通信方式。
您可以设置一个 notificationHandler
用于
接收PostgreSQL的通知消息:
connection.notificationHandler(notification -> {
System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
});
connection
.query("LISTEN some-channel")
.execute(ar -> {
System.out.println("Subscribed to channel");
});
PgSubscriber (PgSubscriber
) 是一种用作
处理单条连接上的订阅的通道(channel)管理器:
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.connect(ar -> {
if (ar.succeeded()) {
// Or you can set the channel after connect
subscriber.channel("channel2").handler(payload -> {
System.out.println("Received " + payload);
});
}
});
channel(通道)方法的参数即通道名称(接收端)需要和PostgreSQL发送通知时的通道名称保持一致。
注意这里和SQL中的通道名称的形式不同,在 PgSubscriber
内部会把待提交的通道名称预处理为带引号的形式:
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
subscriber.connect(ar -> {
if (ar.succeeded()) {
// Complex channel name - name in PostgreSQL requires a quoted ID
subscriber.channel("Complex.Channel.Name").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
subscriber.actualConnection()
.query("NOTIFY \"Complex.Channel.Name\", 'msg'")
.execute(notified -> {
System.out.println("Notified \"Complex.Channel.Name\"");
});
});
// PostgreSQL simple ID's are forced lower-case
subscriber.channel("simple_channel").handler(payload -> {
System.out.println("Received " + payload);
});
subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
// The following simple channel identifier is forced to lower case
subscriber.actualConnection()
.query("NOTIFY Simple_CHANNEL, 'msg'")
.execute(notified -> {
System.out.println("Notified simple_channel");
});
});
// The following channel name is longer than the current
// (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
.handler(payload -> {
System.out.println("Received " + payload);
});
}
});
您可以自定义一个方法来实现重连,该方法的参数为 retries
(重试次数),
返回值为 amountOfTime
(重试间隔):
-
当
amountOfTime < 0
: 不重试,并关闭订阅 -
当
amountOfTime = 0
: 立即重试 -
当
amountOfTime > 0
: 在amountOfTime
毫秒之后发起重试
PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
);
// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy(retries -> {
if (retries < 10) {
return 100L;
} else {
return -1L;
}
});
默认的策略是不重连。
取消请求
PostgreSQL 支持取消正在运行的请求. 您可以使用 cancelRequest
正在运行的请求. 取消请求的操作在执行时将和PostgreSQL服务端新建一条连接、执行取消请求、关闭这条连接。
connection
.query("SELECT pg_sleep(20)")
.execute(ar -> {
if (ar.succeeded()) {
// imagine this is a long query and is still running
System.out.println("Query success");
} else {
// the server will abort the current query after cancelling request
System.out.println("Failed to query due to " + ar.cause().getMessage());
}
});
connection.cancelRequest(ar -> {
if (ar.succeeded()) {
System.out.println("Cancelling request has been sent");
} else {
System.out.println("Failed to send cancelling request");
}
});
取消请求可能不会起作用——如果请求到达时,服务端已经处理完了当前查询请求,此时取消操作不会起作用。反之,取消请求得以执行,目标命令执行提前终止并返回一条错误消息。
更多详细信息可以在这里找到 official documentation。
使用 SSL/TLS
为客户端连接添加SSL的操作,您可以参考Vert.x的
NetClient
的 PgConnectOptions
配置操作。
当前版本客户端支持全部的PostgreSql SSL模式配置,您可以通过 sslmode
配置它们。客户端默认不启用SSL模式。
ssl
参数仅作为一种设置 sslmode
的快捷方式。 setSsl(true)
等价于 setSslMode(VERIFY_CA)
,setSsl(false)
等价于 setSslMode(DISABLE)
。
PgConnectOptions options = new PgConnectOptions()
.setPort(5432)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
PgConnection.connect(vertx, options, res -> {
if (res.succeeded()) {
// Connected with SSL
} else {
System.out.println("Could not connect " + res.cause());
}
});
更多详细信息可以在这里找到 Vert.x documentation。
使用代理
您可以配置客户端使用HTTP/1.x 连接,SOCKS4a 或 SOCKS5 代理。
更多信息可以在这里找到 Vert.x documentation。
RxJava 2 API
Rx风格的API支持 RxJava 1 and RxJava 2,下边的例子使用的是 RxJava 2。
Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();
// Execute the query
single.subscribe(result -> {
System.out.println("Got " + result.size() + " rows ");
}, err -> {
System.out.println("Failure: " + err.getMessage());
});
连接
简化版的连接API可以让您很容易地使用connection对象,withConnection
方法会从连接池里获取到一条连接:
Maybe<RowSet<Row>> maybe = pool.rxWithConnection((Function<SqlConnection, Maybe<RowSet<Row>>>) conn ->
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.rxExecute()
.flatMap(result -> conn
.query("SELECT * FROM Users")
.rxExecute())
.toMaybe());
maybe.subscribe(rows -> {
// Success
}, err -> {
// Failed
});
事务
简化版的事务API可以让您很容易地编写异步事务处理流,withTransaction
方法会为您启动和提交事务:
Completable completable = pool.rxWithTransaction((Function<SqlConnection, Maybe<RowSet<Row>>>) conn ->
conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.rxExecute()
.flatMap(result -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.rxExecute())
.toMaybe())
.ignoreElement();
completable.subscribe(() -> {
// Transaction succeeded
}, err -> {
// Transaction failed
});
流式操作
RxJava 2 支持 Observable
和 Flowable
类型,这些类型可以从 PreparedQuery
产生的 RowStream
里获取到:
Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
.rxBegin()
.flatMapObservable(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapObservable(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toObservable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
observable.subscribe(row -> {
System.out.println("User: " + row.getString("last_name"));
}, err -> {
System.out.println("Error: " + err.getMessage());
}, () -> {
System.out.println("End of stream");
});
下边是使用 Flowable
的例子:
Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
.rxBegin()
.flatMapPublisher(tx ->
conn
.rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
.flatMapPublisher(preparedQuery -> {
// Fetch 50 rows at a time
RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
return stream.toFlowable();
})
.doAfterTerminate(tx::commit)));
// Then subscribe
flowable.subscribe(new Subscriber<Row>() {
private Subscription sub;
@Override
public void onSubscribe(Subscription subscription) {
sub = subscription;
subscription.request(1);
}
@Override
public void onNext(Row row) {
sub.request(1);
System.out.println("User: " + row.getString("last_name"));
}
@Override
public void onError(Throwable err) {
System.out.println("Error: " + err.getMessage());
}
@Override
public void onComplete() {
System.out.println("End of stream");
}
});