响应式 MySQL 客户端
响应式 MySQL 客户端具有简单易懂的 API,专注于可扩展性和低开销。
特性
-
事件驱动
-
轻量级
-
内置连接池
-
预处理查询缓存
-
支持游标
-
流式行处理
-
RxJava API
-
支持内存直接映射到对象,避免了不必要的复制
-
完整的数据类型支持
-
支持存储过程
-
支持 TLS/SSL
-
MySQL 实用程序命令支持
-
支持 MySQL 和 MariaDB
-
丰富的字符排序(collation)和字符集支持
-
Unix 域套接字
使用方法
使用响应式 MySQL 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:
-
Maven (在您的
pom.xml
):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mysql-client</artifactId>
<version>4.2.7</version>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.vertx:vertx-mysql-client:4.2.7'
}
开始
以下是最简单的连接,查询和断开连接方法
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建客户端池
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// 一个简单的查询
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
// 现在关闭客户端池
client.close();
});
连接到 MySQL
大多数时间,您将使用连接池连接到 MySQL:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
带连接池的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。
如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
当您不再需要连接池时,您需要释放它:
pool.close();
当您需要在同一连接上执行多个操作时,您需要使用
connection
客户端 。
您可以轻松地从连接池中获取一个:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
// 从连接池获得连接
client.getConnection().compose(conn -> {
System.out.println("Got a connection from the pool");
// 所有操作都在同一连接上执行
return conn
.query("SELECT * FROM users WHERE id='julien'")
.execute()
.compose(res -> conn
.query("SELECT * FROM users WHERE id='emad'")
.execute())
.onComplete(ar -> {
// 释放连接池的连接
conn.close();
});
}).onComplete(ar -> {
if (ar.succeeded()) {
System.out.println("Done");
} else {
System.out.println("Something went wrong " + ar.cause().getMessage());
}
});
连接完成后,您必须关闭它以释放到连接池中,以便可以重复使用。
可共享的连接池
您可以在多个 Verticle 间或同一 Verticle 的多个实例间共享一个连接池。这样的连接池应该在 Verticle 外面创建, 否则这个连接池将在创建它的 Verticle 被取消部署时关闭
MySQLPool pool = MySQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
@Override
public void start() throws Exception {
// 使用连接池
}
}, new DeploymentOptions().setInstances(4));
您也可以用以下方式在每个 Verticle 中创建可共享的连接池:
vertx.deployVerticle(() -> new AbstractVerticle() {
MySQLPool pool;
@Override
public void start() {
// 创建一个可共享的连接池
// 或获取已有的可共享连接池,并创建对原连接池的借用
// 当 verticle 被取消部署时,借用会被自动释放
pool = MySQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool"));
}
}, new DeploymentOptions().setInstances(4));
第一次创建可共享的连接池时,会创建新连接池所需的资源。之后再调用该创建方法时,会复用之前的连接池,并创建 对原有连接池的借用。当所有的借用都被关闭时,该连接池的资源也会被释放。
默认情况下,客户端需要创建一个 TCP 连接时,会复用当前的 event-loop 。 这个可共享的 HTTP 客户端会 以一种安全的模式,在使用它的 verticle 中随机选中一个 verticle,并使用它的 event-loop。
您可以手动设置一个客户端可以使用的 event-loop 的数量
MySQLPool pool = MySQLPool.pool(database, new PoolOptions()
.setMaxSize(maxSize)
.setShared(true)
.setName("my-pool")
.setEventLoopSize(4));
Unix 域套接字
有时为了简单,安全或性能原因,需要通过 Unix 域套接字 进行连接。
由于 JVM 不支持域套接字,因此首先必须向项目添加本地传输(native transport)扩展。
-
Maven (在您的
pom.xml
):
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>${netty.version}</version>
<classifier>linux-x86_64</classifier>
</dependency>
-
Gradle (在您的
build.gradle
文件中):
dependencies {
compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
注意
|
ARM64 的原生 epoll 支持也可以与分类器(classifier) linux-aarch64 一起添加。
|
注意
|
如果您的团队中有 Mac 用户,在 osx-x86_64 上添加 netty-transport-native-kqueue 分类器(classifier)。
|
然后通过 MySQLConnectOptions#setHost
设置域套接字的路径:
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setHost("/var/run/mysqld/mysqld.sock")
.setDatabase("the-db");
// 连接池选项
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
// 使用vertx实例创建带连接池的客户端
// 确保vertx实例已启用native transports
// vertxOptions.setPreferNativeTransport(true);
MySQLPool client2 = MySQLPool.pool(vertx, connectOptions, poolOptions);
有关 native transport 的详细信息,请参阅 Vert.x 文档 。
配置
有几个选项供您配置客户端。
数据对象
配置客户端的简单方法就是指定 MySQLConnectOptions
数据对象。
MySQLConnectOptions connectOptions = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret");
// 连接池选项
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
// 从数据对象创建连接池
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
pool.getConnection(ar -> {
// 处理您的连接
});
字符序(collations)和字符集(character sets)
响应式 MySQL 客户端支持配置字符序或字符集,并将它们映射到一个相关的 java.nio.charset.Charset
。
您可以为数据库连接指定字符集,例如
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 将连接的字符集设置为utf8而不是默认的字符集utf8mb4
connectOptions.setCharset("utf8");
响应式 MySQL 客户端的默认字符集是 utf8mb4
。字符串值,如密码和错误消息等,总是使用 UTF-8
字符集解码。
characterEncoding
选项用于设置字符串(例如查询字符串和参数值)使用的 Java 字符集,默认使用 UTF-8
字符集;如果设置为 null
,则客户端将使用 Java 的默认字符集。
您还可以为连接指定字符序,例如
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 将连接的字符序设置为 utf8_general_ci 来代替默认字符序 utf8mb4_general_ci
// 设置字符序将覆盖charset选项
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");
请注意,在数据对象上设置字符序将覆盖 charset 和 characterEncoding 选项。
您可以执行 SQL SHOW COLLATION;
或 SHOW CHARACTER SET;
获取服务器支持的字符序和字符集。
有关 MySQL 字符集和字符序的更多信息,请参阅 MySQL 参考手册。
连接属性
还可以使用 setProperties
或 addProperty
方法配置连接属性。注意 setProperties
将覆盖客户端的默认属性。
MySQLConnectOptions connectOptions = new MySQLConnectOptions();
// 添加连接属性
connectOptions.addProperty("_java_version", "1.8.0_212");
// 覆盖属性
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);
有关连接属性的更多信息,请参阅 MySQL 参考手册。
配置 useAffectedRows
您可以 useAffectedRows
选项以决定是否在连接到服务器时设置标志 CLIENT_FOUND_ROWS
。如果指定了 CLIENT_FOUND_ROWS
标志,则受影响的行计数(返回的)是查找到的行数,而不是受影响的行数。
更多有关信息,请参阅 MySQL 参考手册
连接 URI
除了使用 MySQLConnectOptions
数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:
String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb";
// 从连接URI创建连接池
MySQLPool pool = MySQLPool.pool(connectionUri);
// 从连接URI创建连接
MySQLConnection.connect(vertx, connectionUri, res -> {
// 处理您的连接
});
有关连接字符串格式的有关更多信息,请参阅 MySQL 参考手册。
目前,客户端支持以下的连接 uri 参数关键字(不区分大小写):
-
host
-
port
-
user
-
password
-
schema
-
socket
-
useAffectedRows
运行查询
当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询。连接池将使用其中一个连接来运行查询并将结果返回给您。
这是运行简单查询的方法:
client
.query("SELECT * FROM users WHERE id='julien'")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> result = ar.result();
System.out.println("Got " + result.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
预处理查询
您可以对预处理查询执行相同的操作。
SQL字符串可以使用数据库语法 `?` 按位置引用参数
client
.preparedQuery("SELECT * FROM users WHERE id=?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
查询方法提供异步 RowSet
实例,它适用于 SELECT 查询。
client
.preparedQuery("SELECT first_name, last_name FROM users")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
for (Row row : rows) {
System.out.println("User " + row.getString(0) + " " + row.getString(1));
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或 UPDATE/INSERT 查询:
client
.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
.execute(Tuple.of("Julien", "Viet"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println(rows.rowCount());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
Row
使您可以按索引访问数据
System.out.println("User " + row.getString(0) + " " + row.getString(1));
或按名字
System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));
客户端在这里不会使用任何魔术,并且无论您的SQL文本如何,列名都将用表中的名称标识。
您可以访问多样的类型
String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");
您可以使用缓存的预处理语句执行一次性的预处理查询:
connectOptions.setCachePreparedStatements(true);
client
.preparedQuery("SELECT * FROM users WHERE id = ?")
.execute(Tuple.of("julien"), ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
System.out.println("Got " + rows.size() + " rows ");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您可以创建一个 PreparedStatement
并自己管理生命周期。
sqlConnection
.prepare("SELECT * FROM users WHERE id = ?", ar -> {
if (ar.succeeded()) {
PreparedStatement preparedStatement = ar.result();
preparedStatement.query()
.execute(Tuple.of("julien"), ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
System.out.println("Got " + rows.size() + " rows ");
preparedStatement.close();
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
批处理
您可以执行预处理的批处理
List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
// 执行预处理的批处理
client
.preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
.executeBatch(batch, res -> {
if (res.succeeded()) {
// 处理行
RowSet<Row> rows = res.result();
} else {
System.out.println("Batch failed " + res.cause());
}
});
MySQL LAST_INSERT_ID
往表中插入一条记录后,可以获得自增值。
client
.query("INSERT INTO test(val) VALUES ('v1')")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rows = ar.result();
long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
System.out.println("Last inserted id is: " + lastInsertId);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
更多有关信息,请参阅 如何获取最近一条插入记录的唯一ID。
使用连接
获取连接
当需要执行顺序查询(无事务)时,可以创建一个新连接或从连接池中借用一个。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。
pool
.getConnection()
.compose(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// 对行执行一些操作
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
// 将连接返回到连接池中
.eventually(v -> connection.close())
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
可以创建预处理查询语句:
connection
.prepare("SELECT * FROM users WHERE first_name LIKE ?")
.compose(pq ->
pq.query()
.execute(Tuple.of("Julien"))
.eventually(v -> pq.close())
).onSuccess(rows -> {
// 所有的行
});
简单连接 API
当您创建了一个连接池, 您可以调用 withConnection
并传入一个使用连接进行处理的函数。
它从连接池中借用一个连接,并使用该连接调用函数。
该函数必须返回一个任意结果的 Future。
Future 完成后, 连接将归还至连接池,并提供全部的结果。
pool.withConnection(connection ->
connection
.preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
.executeBatch(Arrays.asList(
Tuple.of("Julien", "Viet"),
Tuple.of("Emad", "Alblueshi")
))
.compose(res -> connection
// Do something with rows
.query("SELECT COUNT(*) FROM Users")
.execute()
.map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
System.out.println("Insert users, now the number of users is " + count);
});
使用事务
带事务的连接
您可以使用SQL BEGIN
/COMMIT
/ROLLBACK
执行事务,如果您必须这么做,就必须使用 SqlConnection
自己管理事务。
或者您使用 SqlConnection
的事务API:
pool.getConnection()
// 事务必须使用一个连接
.onSuccess(conn -> {
// 开始事务
conn.begin()
.compose(tx -> conn
// 各种语句
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.compose(res2 -> conn
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute())
// 提交事务
.compose(res3 -> tx.commit()))
// 将连接返回到连接池
.eventually(v -> conn.close())
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});
当数据库服务器报告当前事务失败时(例如,臭名昭著的 current transaction is aborted, commands ignored until end of transaction block),
事务被回滚,此时 completion
Future 会失败,
并返回 TransactionRollbackException
异常:
tx.completion()
.onFailure(err -> {
System.out.println("Transaction failed => rolled back");
});
简单事务 API
当您创建了一个连接池, 您可以调用 withConnection
并传入一个使用连接进行处理的函数。
它从连接池中借用一个连接,开始事务,并且,在此事务范围内所有执行操作的客户端调用该函数。
该函数必须返回一个任意结果的Future。
-
当Future成功,客户端提交这个事务
-
当Future失败,客户端回滚这个事务
事务完成后, 连接将返回到连接池中,并提供全部的结果。
pool.withTransaction(client -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
.execute()
.flatMap(res -> client
.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
.execute()
// 映射一个消息结果
.map("Users inserted")))
.onSuccess(v -> System.out.println("Transaction succeeded"))
.onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
游标和流
默认情况下,执行预处理查询将获取所有行,您可以使用 Cursor
控制想读取的行数:
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// 创建游标
Cursor cursor = pq.cursor(Tuple.of(18));
// 读取50行
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
RowSet<Row> rows = ar2.result();
// 检查更多 ?
if (cursor.hasMore()) {
// 重复这个过程...
} else {
// 没有更多行-关闭游标
cursor.close();
}
}
});
}
});
游标提前释放时应将其关闭:
cursor.read(50, ar2 -> {
if (ar2.succeeded()) {
// 关闭游标
cursor.close();
}
});
游标还可以使用流式API,这可以更加方便,尤其是在Rx化的版本中。
connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
if (ar1.succeeded()) {
PreparedStatement pq = ar1.result();
// 一次获取50行
RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
// 使用流
stream.exceptionHandler(err -> {
System.out.println("Error: " + err.getMessage());
});
stream.endHandler(v -> {
System.out.println("End of stream");
});
stream.handler(row -> {
System.out.println("User: " + row.getString("last_name"));
});
}
});
当这些行已传递给处理程序时,该流将批量读取 50
行并将其流化。
然后读取新一批的 50
行数据,依此类推。
流可以恢复或暂停,已加载的行将保留在内存中,直到被送达,游标将停止迭代。
追踪查询
当Vert.x启用了跟踪时,SQL客户端可以跟踪查询执行。
客户端报告以下 client 跨度(spans):
-
Query
operation name -
tags
-
db.user
: the database username -
db.instance
: the database instance -
db.statement
: the SQL query -
db.type
: sql
默认的跟踪策略是 PROPAGATE
,客户端仅当涉及活动跟踪时才创建跨度(span)。
您可以使用 setTracingPolicy
更改客户策略,例如 您可以设置 ALWAYS
总是报告跨度(span):
options.setTracingPolicy(TracingPolicy.ALWAYS);
MySQL 类型映射
当前客户端支持以下 MySQL 类型
-
BOOL,BOOLEAN (
java.lang.Byte
) -
TINYINT (
java.lang.Byte
) -
TINYINT UNSIGNED(
java.lang.Short
) -
SMALLINT (
java.lang.Short
) -
SMALLINT UNSIGNED(
java.lang.Integer
) -
MEDIUMINT (
java.lang.Integer
) -
MEDIUMINT UNSIGNED(
java.lang.Integer
) -
INT,INTEGER (
java.lang.Integer
) -
INTEGER UNSIGNED(
java.lang.Long
) -
BIGINT (
java.lang.Long
) -
BIGINT UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
FLOAT (
java.lang.Float
) -
FLOAT UNSIGNED(
java.lang.Float
) -
DOUBLE (
java.lang.Double
) -
DOUBLE UNSIGNED(
java.lang.Double
) -
BIT (
java.lang.Long
) -
NUMERIC (
io.vertx.sqlclient.data.Numeric
) -
NUMERIC UNSIGNED(
io.vertx.sqlclient.data.Numeric
) -
DATE (
java.time.LocalDate
) -
DATETIME (
java.time.LocalDateTime
) -
TIME (
java.time.Duration
) -
TIMESTAMP (
java.time.LocalDateTime
) -
YEAR (
java.lang.Short
) -
CHAR (
java.lang.String
) -
VARCHAR (
java.lang.String
) -
BINARY (
io.vertx.core.buffer.Buffer
) -
VARBINARY (
io.vertx.core.buffer.Buffer
) -
TINYBLOB (
io.vertx.core.buffer.Buffer
) -
TINYTEXT (
java.lang.String
) -
BLOB (
io.vertx.core.buffer.Buffer
) -
TEXT (
java.lang.String
) -
MEDIUMBLOB (
io.vertx.core.buffer.Buffer
) -
MEDIUMTEXT (
java.lang.String
) -
LONGBLOB (
io.vertx.core.buffer.Buffer
) -
LONGTEXT (
java.lang.String
) -
ENUM (
java.lang.String
) -
SET (
java.lang.String
) -
JSON (
io.vertx.core.json.JsonObject
,io.vertx.core.json.JsonArray
,Number
,Boolean
,String
,io.vertx.sqlclient.Tuple#JSON_NULL
) -
GEOMETRY(
io.vertx.mysqlclient.data.spatial.*
)
元组解码在存储值时使用上述类型
请注意:在Java中,没有无符号数字值的具体表示形式,因此客户端会将无符号值转换为相关的Java类型。
隐式类型转换
当执行预处理语句时,响应式 MySQL 客户端支持隐式类型转换。
假设您的表中有一个 TIME
列,下面的两个示例都是有效的。
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of(LocalTime.of(19, 10, 25)), ar -> {
// 处理结果
});
// 这个也适用于隐式类型转换
client
.preparedQuery("SELECT * FROM students WHERE updated_time = ?")
.execute(Tuple.of("19:10:25"), ar -> {
// 处理结果
});
MySQL 数据类型编码是根据参数值推断的。下面是具体的类型映射:
参数值 | MySQL 类型编码 |
---|---|
null |
MYSQL_TYPE_NULL |
java.lang.Byte |
MYSQL_TYPE_TINY |
java.lang.Boolean |
MYSQL_TYPE_TINY |
java.lang.Short |
MYSQL_TYPE_SHORT |
java.lang.Integer |
MYSQL_TYPE_LONG |
java.lang.Long |
MYSQL_TYPE_LONGLONG |
java.lang.Double |
MYSQL_TYPE_DOUBLE |
java.lang.Float |
MYSQL_TYPE_FLOAT |
java.time.LocalDate |
MYSQL_TYPE_DATE |
java.time.Duration |
MYSQL_TYPE_TIME |
java.time.LocalTime |
MYSQL_TYPE_TIME |
io.vertx.core.buffer.Buffer |
MYSQL_TYPE_BLOB |
java.time.LocalDateTime |
MYSQL_TYPE_DATETIME |
io.vertx.mysqlclient.data.spatial.* |
MYSQL_TYPE_BLOB |
default |
MYSQL_TYPE_STRING |
处理布尔值
在 MySQL 中 BOOLEAN
和 BOOL
数据类型是 TINYINT(1)
的同义词。零值视为 false,非零值视为 true。
BOOLEAN
数据类型值以 java.lang.Byte
类型存储在 Row
或 Tuple
中,调用 Row#getValue
可以获取到 java.lang.Byte
类型的值,
也可以调用 Row#getBoolean
获取 java.lang.Boolean
类型的值。
client
.query("SELECT graduated FROM students WHERE id = 0")
.execute(ar -> {
if (ar.succeeded()) {
RowSet<Row> rowSet = ar.result();
for (Row row : rowSet) {
int pos = row.getColumnIndex("graduated");
Byte value = row.get(Byte.class, pos);
Boolean graduated = row.getBoolean("graduated");
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
当您想要使用一个 BOOLEAN
参数值执行预处理语句时,只要简单地在参数列表中添加 java.lang.Boolean
值即可。
client
.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
.execute(Tuple.of(true), ar -> {
if (ar.succeeded()) {
System.out.println("Updated with the boolean value");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
处理 JSON
MySQL JSON
数据类型由以下Java类型表示:
-
String
-
Number
-
Boolean
-
io.vertx.core.json.JsonObject
-
io.vertx.core.json.JsonArray
-
io.vertx.sqlclient.Tuple#JSON_NULL
表示 JSON null 字面量
Tuple tuple = Tuple.of(
Tuple.JSON_NULL,
new JsonObject().put("foo", "bar"),
3);
// 获取 json
Object value = tuple.getValue(0); // 期望得到 JSON_NULL
//
value = tuple.get(JsonObject.class, 1); // 期望得到 JSON object
//
value = tuple.get(Integer.class, 2); // 期望得到 3
value = tuple.getInteger(2); // 期望得到 3
处理 BIT
BIT
是 java.lang.Long
类型的映射, 但是Java没有无符号数值的概念, 因此,如果您要插入或更新一条记录为 BIT(64)
的最大值 , 可以将参数设置为 -1L
。
处理 TIME
MySQL TIME
数据类型可用于表示一天中的时间或范围为 -838:59:59
到 838:59:59
的时间间隔。在响应式MySQL客户端, TIME
数据类型自然的被映射为 java.time.Duration
,您也可以调用 Row#getLocalTime
获取到 java.time.LocalTime
类型的值。
处理 NUMERIC
Numeric
Java类型用于表示MySQL的 NUMERIC
类型。
Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
// 处理 NaN
} else {
BigDecimal value = numeric.bigDecimalValue();
}
处理 ENUM
MySQL支持ENUM数据类型,客户端将这些类型检索为String数据类型。
您可以像这样将Java枚举编码为String:
client
.preparedQuery("INSERT INTO colors VALUES (?)")
.execute(Tuple.of(Color.red), res -> {
// ...
});
您可以将 ENUM 类型的列读取为 Java 的枚举,如以下代码:
client
.preparedQuery("SELECT color FROM colors")
.execute()
.onComplete(res -> {
if (res.succeeded()) {
RowSet<Row> rows = res.result();
for (Row row : rows) {
System.out.println(row.get(Color.class, "color"));
}
}
});
处理 GEOMETRY
MYSQL 还支持 GEOMETRY
数据类型,下面是一些示例展示您可以使用 Well-Known Text(WKT)格式或 Well-Known Binary(WKB)格式处理几何数据,数据被解码为MySQL TEXT 或 BLOB 数据类型。有很多很棒的第三方库可以处理这种格式的数据。
您可以以 WKT 格式获取空间数据:
client
.query("SELECT ST_AsText(g) FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以WKT格式获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
String wktString = row.getString(0);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
或者,您可以以WKB格式获取空间数据:
client
.query("SELECT ST_AsBinary(g) FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以WKB格式获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
Buffer wkbValue = row.getBuffer(0);
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
在响应式 MySQL 客户端中,我们还提供了一种处理几何数据类型的简单方法。
您可以将几何数据作为Vert.x数据对象检索:
client
.query("SELECT g FROM geom;")
.execute(ar -> {
if (ar.succeeded()) {
// 以Vert.x数据对象获取空间数据
RowSet<Row> result = ar.result();
for (Row row : result) {
Point point = row.get(Point.class, 0);
System.out.println("Point x: " + point.getX());
System.out.println("Point y: " + point.getY());
}
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
您也可以将WKB描述用作预处理语句参数。
Point point = new Point(0, 1.5, 1.5);
// 以 WKB 描述发送
client
.preparedQuery("INSERT INTO geom VALUES (ST_GeomFromWKB(?))")
.execute(Tuple.of(point), ar -> {
if (ar.succeeded()) {
System.out.println("Success");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
集合类查询
您可以将查询API与Java集合类结合使用:
Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
row -> row.getLong("id"),
row -> row.getString("last_name"));
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<Map<Long, String>> result = ar.result();
// 获取用集合类创建的map
Map<Long, String> map = result.value();
System.out.println("Got " + map);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
集合类处理不能保留 Row
的引用,因为只有一个 Row 对象用于处理整个集合。
Java Collectors
提供了许多有趣的预定义集合类,例如,
您可以直接用 Row 中的集合轻松拼接成一个字符串:
Collector<Row, ?, String> collector = Collectors.mapping(
row -> row.getString("last_name"),
Collectors.joining(",", "(", ")")
);
// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
if (ar.succeeded()) {
SqlResult<String> result = ar.result();
// 获取用集合类创建的String
String list = result.value();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL 存储过程
您可以在查询中运行存储过程。结果将按照 MySQL 协议 从服务器获取,无需任何魔法。
client.query("CREATE PROCEDURE multi() BEGIN\n" +
" SELECT 1;\n" +
" SELECT 1;\n" +
" INSERT INTO ins VALUES (1);\n" +
" INSERT INTO ins VALUES (2);\n" +
"END;").execute(ar1 -> {
if (ar1.succeeded()) {
// 创建存储过程成功
client
.query("CALL multi();")
.execute(ar2 -> {
if (ar2.succeeded()) {
// 处理结果
RowSet<Row> result1 = ar2.result();
Row row1 = result1.iterator().next();
System.out.println("First result: " + row1.getInteger(0));
RowSet<Row> result2 = result1.next();
Row row2 = result2.iterator().next();
System.out.println("Second result: " + row2.getInteger(0));
RowSet<Row> result3 = result2.next();
System.out.println("Affected rows: " + result3.rowCount());
} else {
System.out.println("Failure: " + ar2.cause().getMessage());
}
});
} else {
System.out.println("Failure: " + ar1.cause().getMessage());
}
});
Note: 目前尚不支持绑定OUT参数的预处理语句。
MySQL 导入本地文件
该客户端支持处理 LOCAL INFILE 请求, 如果要将数据从本地文件加载到服务器,则可以使用语句
LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;
。 更多有关信息,请参阅 MySQL 参考手册.
认证
默认身份验证插件
该客户端支持指定在连接开始时使用缺省的身份验证插件。 当前支持以下插件:
-
mysql_native_password
-
caching_sha2_password
-
mysql_clear_password
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // 设定默认身份验证插件
MySQL 8 中引入的新身份验证方法
MySQL 8.0 引入了一种名为 caching_sha2_password
的身份验证方法,它是默认的身份验证方法。
为了使用此新的身份验证方法连接到服务器,您需要使用安全连接(例如 启用 TLS/SSL)或使用 RSA 密钥对交换加密密码,以避免密码泄漏。RSA 密钥对在通信过程中会自动交换,但服务器 RSA 公钥可能会在这个过程中被黑客攻击,因为它通过不安全的连接传输。
因此,如果您使用不安全的连接,并且希望避免暴露服务器 RSA 公钥的风险,可以这样设置服务器 RSA 公钥:
MySQLConnectOptions options1 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyPath("tls/files/public_key.pem"); // 配置公钥路径
MySQLConnectOptions options2 = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
"xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
"HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
"aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
"0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
"7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
"5QIDAQAB\n" +
"-----END PUBLIC KEY-----\n")); // 配置公钥缓冲
有关 caching_sha2_password
身份验证方法的更多信息,请参见 MySQL 参考手册.
使用 SSL/TLS
配置客户端使用SSL连接, 您可以像 Vert.x NetClient
一样配置
MySQLConnectOptions
。
响应式 MySQL 客户端 支持所有 SSL 模式 ,而且您能够配置 sslmode
. 客户端默认设置为 DISABLED
SSL 模式。
ssl
参数仅作为设置 sslmode
的快捷方式存在。setSsl(true)
等价于 setSslMode(VERIFY_CA)
, setSsl(false)
等价于 setSslMode(DISABLED)
MySQLConnectOptions options = new MySQLConnectOptions()
.setPort(3306)
.setHost("the-host")
.setDatabase("the-db")
.setUser("user")
.setPassword("secret")
.setSslMode(SslMode.VERIFY_CA)
.setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));
MySQLConnection.connect(vertx, options, res -> {
if (res.succeeded()) {
// 用SSL连接
} else {
System.out.println("Could not connect " + res.cause());
}
});
更多详细信息,请参阅 Vert.x 文档.
MySQL 实用程序命令
有时您希望使用 MySQL 实用程序命令,我们为此提供支持。 有关更多信息,请参阅 MySQL 实用程序命令.
COM_PING
您可以使用 COM_PING
命令检查服务器是否处于活动状态。如果服务器响应 PING,将通知处理程序,否则将永远不会调用处理程序。
connection.ping(ar -> {
System.out.println("The server has responded to the PING");
});
COM_RESET_CONNECTION
您可以使用 COM_RESET_CONNECTION
命令重置会话状态,这将重置连接状态,如:
- user variables
- temporary tables
- prepared statements
connection.resetConnection(ar -> {
if (ar.succeeded()) {
System.out.println("Connection has been reset now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_CHANGE_USER
您可以更改当前连接的用户,这将执行re-authentication并重置连接状态,如 COM_RESET_CONNECTION
。
MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
.setUser("newuser")
.setPassword("newpassword")
.setDatabase("newdatabase");
connection.changeUser(authenticationOptions, ar -> {
if (ar.succeeded()) {
System.out.println("User of current connection has been changed.");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_INIT_DB
您可以使用 COM_INIT_DB
命令更改连接的默认schema。
connection.specifySchema("newschema", ar -> {
if (ar.succeeded()) {
System.out.println("Default schema changed to newschema");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_STATISTICS
您可以使用 COM_STATISTICS
命令获取某些内部状态变量的可读字符串。
connection.getInternalStatistics(ar -> {
if (ar.succeeded()) {
System.out.println("Statistics: " + ar.result());
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_DEBUG
您可以使用 COM_DEBUG
命令将调试信息转储到MySQL服务器的STDOUT。
connection.debug(ar -> {
if (ar.succeeded()) {
System.out.println("Debug info dumped to server's STDOUT");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
COM_SET_OPTION
您可以使用 COM_SET_OPTION
命令设置当前连接的选项。目前只能设置 CLIENT_MULTI_STATEMENTS
。
例如,您可以使用此命令禁用 CLIENT_MULTI_STATEMENTS
。
connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
if (ar.succeeded()) {
System.out.println("CLIENT_MULTI_STATEMENTS is off now");
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
MySQL 和 MariaDB 版本支持情况
MySQL | MariaDB | ||
---|---|---|---|
版本 |
是否支持 |
版本 |
是否支持 |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
|
✔ |
已知问题:
-
重置连接实用程序命令在 MySQL 5.5、5.6 和 MariaDB 10.1 中不可用
-
MariaDB 10.2 和 10.3 不支持更改用户实用程序命令
陷阱和最佳实践
以下是使用响应式 MySQL 客户端时避免常见陷阱的一些最佳实践。
预处理语句的计数限制
有时您可能会遇到臭名昭著的错误 Can’t create more than max_prepared_stmt_count statements (current value: 16382)
,这是因为服务器已达到预处理语句的总数限制。
您可以调整服务器系统变量 max_prepared_stmt_count
,但它具有上限值,因此您无法以这种方式摆脱错误。
缓解这种情况的最佳方法是启用预处理语句缓存,因此可以重用具有相同 SQL 字符串的预处理语句,并且客户端不必为每个请求创建全新的预处理语句。 执行语句后,预处理语句将自动关闭。 这样,虽然无法完全消除错误,但达到极限的机会可以大大减少。
还可以通过接口 SqlConnection#prepare
创建 PreparedStatement
对象来手动管理预处理语句的生命周期,以便可以选择何时释放语句句柄,甚至使用 SQL syntax prepared statement.
高级连接池配置
数据库服务负载均衡
您可以使用包含多个数据库服务的列表来配置连接池而不是单个数据库服务。
MySQLPool pool = MySQLPool.pool(Arrays.asList(server1, server2, server3), options);
当一个连接创建时,连接池使用(round-robin)轮询调度算法做负载均衡以选择不同的数据库服务
注意
|
负载均衡是在创建连接时提供的,而不是在从连接池中获取连接时提供 |
连接初始化
您可以使用 connectHandler
方法在连接创建后和连接释放回连接池之前来与数据库连接交互
pool.connectHandler(conn -> {
conn.query(sql).execute().onSuccess(res -> {
// 将连接释放回连接池,以被该应用程序复用
conn.close();
});
});
连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用