响应式 MySQL 客户端
响应式 MySQL 客户端具有简单易懂的 API,专注于可扩展性和低开销。
特性
-
事件驱动
-
轻量级
-
内置连接池
-
预处理查询缓存
-
支持游标
-
流式行处理
-
RxJava 1 和 RxJava 2
-
支持对象存储在直接内存,避免不必要的拷贝
-
完整的数据类型支持
-
支持存储过程
-
支持 TLS/SSL
-
MySQL 实用程序命令支持
-
支持 MySQL 和 MariaDB
-
丰富的字符排序(collation)和字符集支持
-
Unix 域套接字
使用方法
使用响应式 MySQL 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:
-
Maven (在您的
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-mysql-client:4.0.3'
}
开始
以下是最简单的连接,查询和断开连接方法
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建客户端池
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// 一个简单的查询
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// 现在关闭客户端池
client.close();
});
连接到 MySQL
大多数时间,您将使用连接池连接到 MySQL:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
带连接池的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。
如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
当您不再需要连接池时,您需要释放它:
pool.close();
当您需要在同一连接上执行多个操作时,您需要使用
connection
客户端 。
您可以轻松地从连接池中获取一个:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
// 从连接池获得连接
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// 所有操作都在同一连接上执行
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// 释放连接池的连接
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
连接完成后,您必须关闭它以释放到连接池中,以便可以重复使用。
Unix 域套接字
有时为了简单,安全或性能原因,需要通过 Unix 域套接字 进行连接。
由于 JVM 不支持域套接字,因此首先必须向项目添加本地传输(native transport)扩展。
-
Maven (在您的
pom.xml
):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
注意
|
ARM64 的原生 epoll 支持也可以与分类器(classifier) linux-aarch64 一起添加。
|
注意
|
如果您的团队中有 Mac 用户,在 osx-x86_64 上添加 netty-transport-native-kqueue 分类器(classifier)。
|
然后通过 MySQLConnectOptions#setHost
设置域套接字的路径:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setHost("/var/run/mysqld/mysqld.sock")
.setDatabase("the-db");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// 使用vertx实例创建带连接池的客户端
// 确保vertx实例已启用native transports
// vertxOptions.setPreferNativeTransport(true);
MySQLPool client2 = MySQLPool.pool(vertx, connectOptions, poolOptions);
有关 native transport 的详细信息,请参阅 Vert.x 文档 。
配置
有几个选项供您配置客户端。
数据对象
配置客户端的简单方法就是指定 MySQLConnectOptions
数据对象。
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 从数据对象创建连接池
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// 处理您的连接
});
字符序(collations)和字符集(character sets)
响应式 MySQL 客户端支持配置字符序或字符集,并将它们映射到一个相关的 java.nio.charset.Charset
。
您可以为数据库连接指定字符集,例如
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 将连接的字符集设置为utf8而不是默认的字符集utf8mb4
connectOptions.setCharset("utf8");
响应式 MySQL 客户端的默认字符集是 utf8mb4
。字符串值,如密码和错误消息等,总是使用 UTF-8
字符集解码。
characterEncoding
选项用于设置字符串(例如查询字符串和参数值)使用的 Java 字符集,默认使用 UTF-8
字符集;如果设置为 null
,则客户端将使用 Java 的默认字符集。
您还可以为连接指定字符序,例如
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 将连接的字符序设置为 utf8_general_ci 来代替默认字符序 utf8mb4_general_ci
// 设置字符序将覆盖charset选项
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");
请注意,在数据对象上设置字符序将覆盖 charset 和 characterEncoding 选项。
您可以执行 SQL SHOW COLLATION;
或 SHOW CHARACTER SET;
获取服务器支持的字符序和字符集。
有关 MySQL 字符集和字符序的更多信息,请参阅 MySQL 参考手册。
连接属性
还可以使用 setProperties
或 addProperty
方法配置连接属性。注意 setProperties
将覆盖客户端的默认属性。
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 添加连接属性
connectOptions.addProperty("_java_version", "1.8.0_212");
// 覆盖属性
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);
有关连接属性的更多信息,请参阅 MySQL 参考手册。
配置 useAffectedRows
您可以 useAffectedRows
选项以决定是否在连接到服务器时设置标志 CLIENT_FOUND_ROWS
。如果指定了 CLIENT_FOUND_ROWS
标志,则受影响的行计数(返回的)是查找到的行数,而不是受影响的行数。
更多有关信息,请参阅 MySQL 参考手册
连接 URI
除了使用 MySQLConnectOptions
数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:
String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb";
// 从连接URI创建连接池
MySQLPool pool = MySQLPool.pool(connectionUri);
// 从连接URI创建连接
MySQLConnection.connect(vertx, connectionUri, res -> {
// 处理您的连接
});
有关连接字符串格式的有关更多信息,请参阅 MySQL 参考手册。
目前,客户端支持以下的连接 uri 参数关键字(不区分大小写):
-
host
-
port
-
user
-
password
-
schema
-
socket
-
useAffectedRows
运行查询
当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询。连接池将使用其中一个连接来运行查询并将结果返回给您。
这是运行简单查询的方法:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预处理查询
您可以对预处理查询执行相同的操作。
SQL字符串可以使用数据库语法 `?` 按位置引用参数
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询方法提供异步 RowSet
实例,它适用于 SELECT 查询。
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或 UPDATE/INSERT 查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row
使您可以按索引访问数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或按名字
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在这里不会使用任何魔术,并且无论您的SQL文本如何,列名都将用表中的名称标识。
您可以访问多样的类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存的预处理语句执行一次性的预处理查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您可以创建一个 PreparedStatement
并自己管理生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以执行预处理的批处理
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// 执行预处理的批处理
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// 处理行
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
MySQL LAST_INSERT_ID
往表中插入一条记录后,可以获得自增值。
client
.query("INSERT INTO test(val) VALUES ('v1')")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
System.out.println("Last inserted id is: " + lastInsertId);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
更多有关信息,请参阅 如何获取最近一条插入记录的唯一ID。
使用连接
获取连接
当需要执行顺序查询(无事务)时,可以创建一个新连接或从连接池中借用一个。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// 对行执行一些操作
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// 将连接返回到连接池中
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
可以创建预处理查询语句:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE ?")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// 所有的行
});
简单连接 API
当您创建了一个连接池, 您可以调用 withConnection
并传入一个使用连接进行处理的函数。
它从连接池中借用一个连接,并使用该连接调用函数。
该函数必须返回一个任意结果的 Future。
Future 完成后, 连接将归还至连接池,并提供全部的结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
带事务的连接
您可以使用SQL BEGIN
/COMMIT
/ROLLBACK
执行事务,如果您必须这么做,就必须使用 SqlConnection
自己管理事务。
或者您使用 SqlConnection
的事务API:
pool.getConnection()
// 事务必须使用一个连接
.onSuccess(conn -> {
// 开始事务
conn.begin()
.compose(tx -> conn
// 各种语句
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// 提交事务
.compose(res3 -> tx.commit()))
// 将连接返回到连接池
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务器报告当前事务失败时(例如,臭名昭著的 current transaction is aborted, commands ignored until end of transaction block),
事务被回滚,此时 completion
Future 会失败,
并返回 TransactionRollbackException
异常:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简单事务 API
当您创建了一个连接池, 您可以调用 withConnection
并传入一个使用连接进行处理的函数。
它从连接池中借用一个连接,开始事务,并且,在此事务范围内所有执行操作的客户端调用该函数。
该函数必须返回一个任意结果的Future。
-
当Future成功,客户端提交这个事务
-
当Future失败,客户端回滚这个事务
事务完成后, 连接将返回到连接池中,并提供全部的结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// 映射一个消息结果
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流
默认情况下,执行预处理查询将获取所有行,您可以使用 Cursor
控制想读取的行数:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// 创建游标
Cursor cursor = pq.cursor(Tuple.of(18));
// 读取50行
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// 检查更多 ?
if (cursor.hasMore()) {
// 重复这个过程...
} else {
// 没有更多行-关闭游标
cursor.close();
}
}
});
}
});
游标提前释放时应将其关闭:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// 关闭游标
cursor.close();
}
});
游标还可以使用流式API,这可以更加方便,尤其是在Rx化的版本中。
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// 一次获取50行
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// 使用流
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
当这些行已传递给处理程序时,该流将批量读取 50
行并将其流化。
然后读取新一批的 50
行数据,依此类推。
流可以恢复或暂停,已加载的行将保留在内存中,直到被送达,游标将停止迭代。
追踪查询
当Vert.x启用了跟踪时,SQL客户端可以跟踪查询执行。
客户端报告以下 client 跨度(spans):
-
Query
operation name -
tags
-
db.user
: the database username -
db.instance
: the database instance -
db.statement
: the SQL query -
db.type
: sql
默认的跟踪策略是 PROPAGATE
,客户端仅当涉及活动跟踪时才创建跨度(span)。
您可以使用 setTracingPolicy
更改客户策略,例如 您可以设置 ALWAYS
总是报告跨度(span):
options.setTracingPolicy(TracingPolicy.ALWAYS);
MySQL 类型映射
当前客户端支持以下 MySQL 类型
-
BOOL,BOOLEAN (
java.lang.Byte
) -
TINYINT (
java.lang.Byte
) -
TINYINT UNSIGNED(
java.lang.Short
) -
SMALLINT (
java.lang.Short
) -
SMALLINT UNSIGNED(
java.lang.Integer
) -
MEDIUMINT (
java.lang.Integer
) -
MEDIUMINT UNSIGNED(
java.lang.Integer
) -
INT,INTEGER (
java.lang.Integer
) -
INTEGER UNSIGNED(
java.lang.Long
) -
BIGINT (
java.lang.Long
) -
BIGINT UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
FLOAT (
java.lang.Float
) -
FLOAT UNSIGNED(
java.lang.Float
) -
DOUBLE (
java.lang.Double
) -
DOUBLE UNSIGNED(
java.lang.Double
) -
BIT (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
NUMERIC UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
DATE (
java.time.LocalDate
) -
DATETIME (
java.time.LocalDateTime
) -
TIME (
java.time.Duration
) -
TIMESTAMP (
java.time.LocalDateTime
) -
YEAR (
java.lang.Short
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
BINARY (
io.vertx.core.buffer.Buffer
) -
VARBINARY (
io.vertx.core.buffer.Buffer
) -
TINYBLOB (
io.vertx.core.buffer.Buffer
) -
TINYTEXT (
java.lang.String
) -
BLOB (
io.vertx.core.buffer.Buffer
) -
TEXT (
java.lang.String
) -
MEDIUMBLOB (
io.vertx.core.buffer.Buffer
) -
MEDIUMTEXT (
java.lang.String
) -
LONGBLOB (
io.vertx.core.buffer.Buffer
) -
LONGTEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
SET (
java.lang.String
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
GEOMETRY(
io.vertx.mysqlclient.data.spatial.*
)
元组解码在存储值时使用上述类型
请注意:在Java中,没有无符号数字值的具体表示形式,因此客户端会将无符号值转换为相关的Java类型。
隐式类型转换
当执行预处理语句时,响应式 MySQL 客户端支持隐式类型转换。
假设您的表中有一个 TIME
列,下面的两个示例都是有效的。
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of(LocalTime.of(19, 10, 25)), ar -> {
// 处理结果
});
// 这个也适用于隐式类型转换
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of("19:10:25"), ar -> {
// 处理结果
});
MySQL 数据类型编码是根据参数值推断的。下面是具体的类型映射:
参数值 | MySQL 类型编码 |
---|---|
null |
MYSQL_TYPE_NULL |
java.lang.Byte |
MYSQL_TYPE_TINY |
java.lang.Boolean |
MYSQL_TYPE_TINY |
java.lang.Short |
MYSQL_TYPE_SHORT |
java.lang.Integer |
MYSQL_TYPE_LONG |
java.lang.Long |
MYSQL_TYPE_LONGLONG |
java.lang.Double |
MYSQL_TYPE_DOUBLE |
java.lang.Float |
MYSQL_TYPE_FLOAT |
java.time.LocalDate |
MYSQL_TYPE_DATE |
java.time.Duration |
MYSQL_TYPE_TIME |
java.time.LocalTime |
MYSQL_TYPE_TIME |
io.vertx.core.buffer.Buffer |
MYSQL_TYPE_BLOB |
java.time.LocalDateTime |
MYSQL_TYPE_DATETIME |
io.vertx.mysqlclient.data.spatial.* |
MYSQL_TYPE_BLOB |
default |
MYSQL_TYPE_STRING |
处理布尔值
在 MySQL 中 BOOLEAN
和 BOOL
数据类型是 TINYINT(1)
的同义词。零值视为 false,非零值视为 true。
BOOLEAN
数据类型值以 java.lang.Byte
类型存储在 Row
或 Tuple
中,调用 Row#getValue
可以获取到 java.lang.Byte
类型的值,
也可以调用 Row#getBoolean
获取 java.lang.Boolean
类型的值。
client
.query("SELECT graduated FROM students WHERE id = 0")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rowSet = ar.result();
for (Row row : rowSet) {
int pos = row.getColumnIndex("graduated");
Byte value = row.get(Byte.class, pos);
Boolean graduated = row.getBoolean("graduated");
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
当您想要使用一个 BOOLEAN
参数值执行预处理语句时,只要简单地在参数列表中添加 java.lang.Boolean
值即可。
client
.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
.execute(Tuple.of(true), ar -> {
if (ar.succeeded()) {
System.out.println("Updated with the boolean value");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
处理 JSON
MySQL JSON
数据类型由以下Java类型表示:
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
表示 JSON null 字面量
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// 获取 json
Object value = tuple.getValue(0); // 期望得到 JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // 期望得到 JSON object
//
value = tuple.get(Integer.class, 2); // 期望得到 3
value = tuple.getInteger(2); // 期望得到 3
处理 BIT
BIT
是 java.lang.Long
类型的映射, 但是Java没有无符号数值的概念, 因此,如果您要插入或更新一条记录为 BIT(64)
的最大值 , 可以将参数设置为 -1L
。
处理 TIME
MySQL TIME
数据类型可用于表示一天中的时间或范围为 -838:59:59
到 838:59:59
的时间间隔。在响应式MySQL客户端, TIME
数据类型自然的被映射为 java.time.Duration
,您也可以调用 Row#getLocalTime
获取到 java.time.LocalTime
类型的值。
处理 NUMERIC
Numeric
Java类型用于表示MySQL的 NUMERIC
类型。
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// 处理 NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
处理 ENUM
MySQL支持ENUM数据类型,客户端将这些类型检索为String数据类型。
您可以像这样将Java枚举编码为String:
client
.preparedQuery("INSERT INTO colors VALUES (?)")
.execute(Tuple.of(Color.red), res -> {
// ...
});
您可以将 ENUM 类型的列读取为 Java 的枚举,如以下代码:
client
.preparedQuery("SELECT color FROM colors")
.execute()
.onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
处理 GEOMETRY
MYSQL 还支持 GEOMETRY
数据类型,下面是一些示例展示您可以使用 Well-Known Text(WKT)格式或 Well-Known Binary(WKB)格式处理几何数据,数据被解码为MySQL TEXT 或 BLOB 数据类型。有很多很棒的第三方库可以处理这种格式的数据。
您可以以 WKT 格式获取空间数据:
client
.query("SELECT ST_AsText(g) FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以WKT格式获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
String wktString = row.getString(0);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者,您可以以WKB格式获取空间数据:
client
.query("SELECT ST_AsBinary(g) FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以WKB格式获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
Buffer wkbValue = row.getBuffer(0);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
在响应式 MySQL 客户端中,我们还提供了一种处理几何数据类型的简单方法。
您可以将几何数据作为Vert.x数据对象检索:
client
.query("SELECT g FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以Vert.x数据对象获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
Point point = row.get(Point.class, 0);
System.out.println("Point x: " + point.getX());
System.out.println("Point y: " + point.getY());
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以将WKB描述用作预处理语句参数。
Point point = new Point(0, 1.5, 1.5);
// 以 WKB 描述发送
client
.preparedQuery("INSERT INTO geom VALUES (ST_GeomFromWKB(?))")
.execute(Tuple.of(point), ar -> {
if (ar.succeeded()) {
System.out.println("Success");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
集合类查询
您可以将查询API与Java集合类结合使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// 获取用集合类创建的map
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
集合类处理不能保留 Row
的引用,因为只有一个 Row 对象用于处理整个集合。
Java Collectors
提供了许多有趣的预定义集合类,例如,
您可以直接用 Row 中的集合轻松拼接成一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// 获取用集合类创建的String
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL 存储过程
您可以在查询中运行存储过程。结果将按照 MySQL 协议 从服务器获取,无需任何魔法。
client.query("CREATE PROCEDURE multi() BEGIN\n" +
" SELECT 1;\n" +
" SELECT 1;\n" +
" INSERT INTO ins VALUES (1);\n" +
" INSERT INTO ins VALUES (2);\n" +
"END;").execute(ar1 -> {
if (ar1.succeeded()) {
// 创建存储过程成功
client
.query("CALL multi();")
.execute(ar2 -> {
if (ar2.succeeded()) {
// 处理结果
RowSet<Row> result1 = ar2.result();
Row row1 = result1.iterator().next();
System.out.println("First result: " + row1.getInteger(0));
RowSet<Row> result2 = result1.next();
Row row2 = result2.iterator().next();
System.out.println("Second result: " + row2.getInteger(0));
RowSet<Row> result3 = result2.next();
System.out.println("Affected rows: " + result3.rowCount());
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar1.cause().getMessage());
}
});
Note: 目前尚不支持绑定OUT参数的预处理语句。
MySQL 导入本地文件
该客户端支持处理 LOCAL INFILE 请求, 如果要将数据从本地文件加载到服务器,则可以使用语句
LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;
。 更多有关信息,请参阅 MySQL 参考手册.
认证
默认身份验证插件
该客户端支持指定在连接开始时使用缺省的身份验证插件。 当前支持以下插件:
-
mysql_native_password
-
caching_sha2_password
-
mysql_clear_password
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // 设定默认身份验证插件
MySQL 8 中引入的新身份验证方法
MySQL 8.0 引入了一种名为 caching_sha2_password
的身份验证方法,它是默认的身份验证方法。
为了使用此新的身份验证方法连接到服务器,您需要使用安全连接(例如 启用 TLS/SSL)或使用 RSA 密钥对交换加密密码,以避免密码泄漏。RSA 密钥对在通信过程中会自动交换,但服务器 RSA 公钥可能会在这个过程中被黑客攻击,因为它通过不安全的连接传输。
因此,如果您使用不安全的连接,并且希望避免暴露服务器 RSA 公钥的风险,可以这样设置服务器 RSA 公钥:
MySQLConnectOptions options1 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyPath("tls/files/public_key.pem"); // 配置公钥路径
MySQLConnectOptions options2 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
"xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
"HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
"aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
"0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
"7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
"5QIDAQAB\n" +
"-----END PUBLIC KEY-----\n")); // 配置公钥缓冲
有关 caching_sha2_password
身份验证方法的更多信息,请参见 MySQL 参考手册.
使用 SSL/TLS
配置客户端使用SSL连接, 您可以像 Vert.x NetClient
一样配置
MySQLConnectOptions
。
响应式 MySQL 客户端 支持所有 SSL 模式 ,而且您能够配置 sslmode
. 客户端默认设置为 DISABLED
SSL 模式。
ssl
参数仅作为设置 sslmode
的快捷方式存在。setSsl(true)
等价于 setSslMode(VERIFY_CA)
, setSsl(false)
等价于 setSslMode(DISABLED)
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
MySQLConnection.connect(vertx, options, res -> {
if (res.succeeded()) {
// 用SSL连接
} else {
System.out.println("Could not connect " + res.cause());
}
});
更多详细信息,请参阅 Vert.x 文档.
MySQL 实用程序命令
有时您希望使用 MySQL 实用程序命令,我们为此提供支持。有关更多信息,请参阅 MySQL 实用程序命令.
COM_PING
您可以使用 COM_PING
命令检查服务器是否处于活动状态。如果服务器响应 PING,将通知处理程序,否则将永远不会调用处理程序。
connection.ping(ar -> {
System.out.println("The server has responded to the PING");
});
COM_RESET_CONNECTION
您可以使用 COM_RESET_CONNECTION
命令重置会话状态,这将重置连接状态,如:
- user variables
- temporary tables
- prepared statements
connection.resetConnection(ar -> {
if (ar.succeeded()) {
System.out.println("Connection has been reset now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_CHANGE_USER
您可以更改当前连接的用户,这将执行re-authentication并重置连接状态,如 COM_RESET_CONNECTION
。
MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
.setUser("newuser")
.setPassword("newpassword")
.setDatabase("newdatabase");
connection.changeUser(authenticationOptions, ar -> {
if (ar.succeeded()) {
System.out.println("User of current connection has been changed.");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_INIT_DB
您可以使用 COM_INIT_DB
命令更改连接的默认schema。
connection.specifySchema("newschema", ar -> {
if (ar.succeeded()) {
System.out.println("Default schema changed to newschema");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_STATISTICS
您可以使用 COM_STATISTICS
命令获取某些内部状态变量的可读字符串。
connection.getInternalStatistics(ar -> {
if (ar.succeeded()) {
System.out.println("Statistics: " + ar.result());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_DEBUG
您可以使用 COM_DEBUG
命令将调试信息转储到MySQL服务器的STDOUT。
connection.debug(ar -> {
if (ar.succeeded()) {
System.out.println("Debug info dumped to server's STDOUT");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_SET_OPTION
您可以使用 COM_SET_OPTION
命令设置当前连接的选项。目前只能设置 CLIENT_MULTI_STATEMENTS
。
例如,您可以使用此命令禁用 CLIENT_MULTI_STATEMENTS
。
connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
if (ar.succeeded()) {
System.out.println("CLIENT_MULTI_STATEMENTS is off now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL 和 MariaDB 版本支持情况
MySQL | MariaDB | ||
---|---|---|---|
版本 |
是否支持 |
版本 |
是否支持 |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
已知问题:
-
重置连接实用程序命令在 MySQL 5.5、5.6 和 MariaDB 10.1 中不可用
-
MariaDB 10.2 和 10.3 不支持更改用户实用程序命令
陷阱和最佳实践
以下是使用响应式 MySQL 客户端时避免常见陷阱的一些最佳实践。
预处理语句的计数限制
有时您可能会遇到臭名昭著的错误 Can’t create more than max_prepared_stmt_count statements (current value: 16382)
,这是因为服务器已达到预处理语句的总数限制。
您可以调整服务器系统变量 max_prepared_stmt_count
,但它具有上限值,因此您无法以这种方式摆脱错误。
缓解这种情况的最佳方法是启用预处理语句缓存,因此可以重用具有相同 SQL 字符串的预处理语句,并且客户端不必为每个请求创建全新的预处理语句。 执行语句后,预处理语句将自动关闭。 这样,虽然无法完全消除错误,但达到极限的机会可以大大减少。
还可以通过接口 SqlConnection#prepare
创建 PreparedStatement
对象来手动管理预处理语句的生命周期,以便可以选择何时释放语句句柄,甚至使用 SQL syntax prepared statement.