响应式 MySQL 客户端

响应式 MySQL 客户端具有简单易懂的 API,专注于可扩展性和低开销。

特性

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预处理查询缓存

  • 支持游标

  • 流式行处理

  • RxJava API

  • 支持内存直接映射到对象,避免了不必要的复制

  • 完整的数据类型支持

  • 支持存储过程

  • 支持 TLS/SSL

  • 查询管线/流水线

  • MySQL 实用程序命令支持

  • 支持 MySQL 和 MariaDB

  • 丰富的字符排序(collation)和字符集支持

  • Unix 域套接字

使用方法

使用响应式 MySQL 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:

  • Maven (在您的 pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-mysql-client</artifactId>
 <version>4.3.8</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

dependencies {
 compile 'io.vertx:vertx-mysql-client:4.3.8'
}

开始

以下是最简单的连接,查询和断开连接方法

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建客户端池
SqlClient client = MySQLPool.client(connectOptions, poolOptions);

// 一个简单的查询
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // 现在关闭客户端池
  client.close();
});

连接到 MySQL

大多数时间,您将使用连接池连接到 MySQL:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

带连接池的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。

如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

当您不再需要连接池时,您需要释放它:

pool.close();

当您需要在同一连接上执行多个操作时,您需要使用 connection 客户端 。

您可以轻松地从连接池中获取一个:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建带连接池的客户端
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

// 从连接池获得连接
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // 所有操作都在同一连接上执行
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // 释放连接池的连接
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

连接完成后,您必须关闭它以释放到连接池中,以便可以重复使用。

命令管线/流水线

在某些场景中,将命令管线化(Command Pipelining)可以提高数据库访问性能。

您可将客户端配置为使用管线

MySQLPool pool = MySQLPool.pool(vertx, connectOptions.setPipeliningLimit(16), poolOptions);

默认管线上限为 1 也就是禁用管线。

小心

当您使用了不支持管线化的代理时,请不要开启管线化。 否则, 代理可能会突然关闭客户端连接。

连接池以及池化的客户端

MySQLPool 允许您创建连接池或池化的客户端

connectOptions.setPipeliningLimit(64);
SqlClient client = MySQLPool.client(vertx, connectOptions, poolOptions);

// 管线/流水线化
Future<RowSet<Row>> res1 = client.query(sql).execute();

// 连接池
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

// 非管线化
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • 连接池操作并非管线/流水线化操作(pipelined),只有从连接池中获取的连接是流水线操作

  • 池化的客户端操作是管线/流水线操作,您无法从池化的客户端获取连接

可共享的连接池

您可以在多个 Verticle 间或同一 Verticle 的多个实例间共享一个连接池。这样的连接池应该在 Verticle 外面创建, 否则这个连接池将在创建它的 Verticle 被取消部署时关闭

MySQLPool pool = MySQLPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    // 使用连接池
  }
}, new DeploymentOptions().setInstances(4));

您也可以用以下方式在每个 Verticle 中创建可共享的连接池:

vertx.deployVerticle(() -> new AbstractVerticle() {
  MySQLPool pool;
  @Override
  public void start() {
    // 创建一个可共享的连接池
    // 或获取已有的可共享连接池,并创建对原连接池的借用
    // 当 verticle 被取消部署时,借用会被自动释放
    pool = MySQLPool.pool(database, new PoolOptions()
      .setMaxSize(maxSize)
      .setShared(true)
      .setName("my-pool"));
  }
}, new DeploymentOptions().setInstances(4));

第一次创建可共享的连接池时,会创建新连接池所需的资源。之后再调用该创建方法时,会复用之前的连接池,并创建 对原有连接池的借用。当所有的借用都被关闭时,该连接池的资源也会被释放。

默认情况下,客户端需要创建一个 TCP 连接时,会复用当前的 event-loop 。 这个可共享的 HTTP 客户端会 以一种安全的模式,在使用它的 verticle 中随机选中一个 verticle,并使用它的 event-loop。

您可以手动设置一个客户端可以使用的 event-loop 的数量

MySQLPool pool = MySQLPool.pool(database, new PoolOptions()
  .setMaxSize(maxSize)
  .setShared(true)
  .setName("my-pool")
  .setEventLoopSize(4));

Unix 域套接字

有时为了简单,安全或性能原因,需要通过 Unix 域套接字 进行连接。

由于 JVM 不支持域套接字,因此首先必须向项目添加本地传输(native transport)扩展。

  • Maven (在您的 pom.xml):

<dependency>
 <groupId>io.netty</groupId>
 <artifactId>netty-transport-native-epoll</artifactId>
 <version>${netty.version}</version>
 <classifier>linux-x86_64</classifier>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

dependencies {
 compile 'io.netty:netty-transport-native-epoll:${netty.version}:linux-x86_64'
}
注意
ARM64 的原生 epoll 支持也可以与分类器(classifier) linux-aarch64 一起添加。
注意
如果您的团队中有 Mac 用户,在 osx-x86_64 上添加 netty-transport-native-kqueue 分类器(classifier)。

然后通过 MySQLConnectOptions#setHost 设置域套接字的路径:

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setHost("/var/run/mysqld/mysqld.sock")
  .setDatabase("the-db");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建带连接池的客户端
MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

// 使用vertx实例创建带连接池的客户端
// 确保vertx实例已启用native transports
// vertxOptions.setPreferNativeTransport(true);
MySQLPool client2 = MySQLPool.pool(vertx, connectOptions, poolOptions);

有关 native transport 的详细信息,请参阅 Vert.x 文档

配置

有几个选项供您配置客户端。

数据对象

配置客户端的简单方法就是指定 MySQLConnectOptions 数据对象。

MySQLConnectOptions connectOptions = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// 从数据对象创建连接池
MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);

pool.getConnection(ar -> {
  // 处理您的连接
});

字符序(collations)和字符集(character sets)

响应式 MySQL 客户端支持配置字符序或字符集,并将它们映射到一个相关的 java.nio.charset.Charset 。 您可以为数据库连接指定字符集,例如

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// 将连接的字符集设置为utf8而不是默认的字符集utf8mb4
connectOptions.setCharset("utf8");

响应式 MySQL 客户端的默认字符集是 utf8mb4 。字符串值,如密码和错误消息等,总是使用 UTF-8 字符集解码。

characterEncoding 选项用于设置字符串(例如查询字符串和参数值)使用的 Java 字符集,默认使用 UTF-8 字符集;如果设置为 null ,则客户端将使用 Java 的默认字符集。

您还可以为连接指定字符序,例如

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// 将连接的字符序设置为 utf8_general_ci 来代替默认字符序 utf8mb4_general_ci
// 设置字符序将覆盖charset选项
connectOptions.setCharset("gbk");
connectOptions.setCollation("utf8_general_ci");

请注意,在数据对象上设置字符序将覆盖 charsetcharacterEncoding 选项。

您可以执行 SQL SHOW COLLATION;SHOW CHARACTER SET; 获取服务器支持的字符序和字符集。

有关 MySQL 字符集和字符序的更多信息,请参阅 MySQL 参考手册

连接属性

还可以使用 setPropertiesaddProperty 方法配置连接属性。注意 setProperties 将覆盖客户端的默认属性。

MySQLConnectOptions connectOptions = new MySQLConnectOptions();

// 添加连接属性
connectOptions.addProperty("_java_version", "1.8.0_212");

// 覆盖属性
Map<String, String> attributes = new HashMap<>();
attributes.put("_client_name", "myapp");
attributes.put("_client_version", "1.0.0");
connectOptions.setProperties(attributes);

有关连接属性的更多信息,请参阅 MySQL 参考手册

配置 useAffectedRows

您可以 useAffectedRows 选项以决定是否在连接到服务器时设置标志 CLIENT_FOUND_ROWS。如果指定了 CLIENT_FOUND_ROWS 标志,则受影响的行计数(返回的)是查找到的行数,而不是受影响的行数。

更多有关信息,请参阅 MySQL 参考手册

连接 URI

除了使用 MySQLConnectOptions 数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:

String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3306/mydb";

// 从连接URI创建连接池
MySQLPool pool = MySQLPool.pool(connectionUri);

// 从连接URI创建连接
MySQLConnection.connect(vertx, connectionUri, res -> {
  // 处理您的连接
});

有关连接字符串格式的有关更多信息,请参阅 MySQL 参考手册

目前,客户端支持以下的连接 uri 参数关键字(不区分大小写):

  • host

  • port

  • user

  • password

  • schema

  • socket

  • useAffectedRows

注意
通过 URI 配置的参数将会覆盖默认的配置参数。

连接重试

您可以将客户端配置为在连接无法建立时重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

运行查询

当您不需要事务或运行单个查询时,您可以直接在连接池上运行查询。连接池将使用其中一个连接来运行查询并将结果返回给您。

这是运行简单查询的方法:

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预处理查询

您可以对预处理查询执行相同的操作。

SQL字符串可以使用数据库语法 `?` 按位置引用参数

client
  .preparedQuery("SELECT * FROM users WHERE id=?")
  .execute(Tuple.of("julien"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询方法提供异步 RowSet 实例,它适用于 SELECT 查询。

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

UPDATE/INSERT 查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)")
  .execute(Tuple.of("Julien", "Viet"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row 使您可以按索引访问数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或按名字

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在这里不会使用任何魔术,并且无论您的SQL文本如何,列名都将用表中的名称标识。

您可以访问多样的类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存的预处理语句执行一次性的预处理查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您可以创建一个 PreparedStatement 并自己管理生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = ?", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以执行预处理的批处理

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// 执行预处理的批处理
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES (?, ?)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {

    // 处理行
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

MySQL LAST_INSERT_ID

往表中插入一条记录后,可以获得自增值。

client
  .query("INSERT INTO test(val) VALUES ('v1')")
  .execute(ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      long lastInsertId = rows.property(MySQLClient.LAST_INSERTED_ID);
      System.out.println("Last inserted id is: " + lastInsertId);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

更多有关信息,请参阅 如何获取最近一条插入记录的唯一ID

使用连接

获取连接

当需要执行顺序查询(无事务)时,可以创建一个新连接或从连接池中借用一个。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // 对行执行一些操作
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // 将连接返回到连接池中
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

可以创建预处理查询语句:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE ?")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // 所有的行
});

简单连接 API

当您创建了一个连接池, 您可以调用 withConnection 并传入一个使用连接进行处理的函数。

它从连接池中借用一个连接,并使用该连接调用函数。

该函数必须返回一个任意结果的 Future。

Future 完成后, 连接将归还至连接池,并提供全部的结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

带事务的连接

您可以使用SQL BEGIN/COMMIT/ROLLBACK 执行事务,如果您必须这么做,就必须使用 SqlConnection 自己管理事务。

或者您使用 SqlConnection 的事务API:

pool.getConnection()
  // 事务必须使用一个连接
  .onSuccess(conn -> {
    // 开始事务
    conn.begin()
      .compose(tx -> conn
        // 各种语句
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
          .execute())
        // 提交事务
        .compose(res3 -> tx.commit()))
      // 将连接返回到连接池
      .eventually(v -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

当数据库服务器报告当前事务失败时(例如,臭名昭著的 current transaction is aborted, commands ignored until end of transaction block), 事务被回滚,此时 completion Future 会失败, 并返回 TransactionRollbackException 异常:

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

简单事务 API

当您创建了一个连接池, 您可以调用 withConnection 并传入一个使用连接进行处理的函数。

它从连接池中借用一个连接,开始事务,并且,在此事务范围内所有执行操作的客户端调用该函数。

该函数必须返回一个任意结果的Future。

  • 当Future成功,客户端提交这个事务

  • 当Future失败,客户端回滚这个事务

事务完成后, 连接将返回到连接池中,并提供全部的结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // 映射一个消息结果
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流

小心

当您在写数据时,如果您是通过 ProxySQL 连接的数据库,则无法使用以下功能。 如果尝试使用,将看到以下错误消息:

RECEIVED AN UNKNOWN COMMAND: 28 -- PLEASE REPORT A BUG

这是因为代理不处理从游标中获取行所需的 COM_STMT_FETCH 命令类型。

默认情况下,执行预处理查询将获取所有行,您可以使用 Cursor 控制想读取的行数:

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // 创建游标
    Cursor cursor = pq.cursor(Tuple.of(18));

    // 读取50行
    cursor.read(50, ar2 -> {
      if (ar2.succeeded()) {
        RowSet<Row> rows = ar2.result();

        // 检查更多 ?
        if (cursor.hasMore()) {
          // 重复这个过程...
        } else {
          // 没有更多行-关闭游标
          cursor.close();
        }
      }
    });
  }
});

游标提前释放时应将其关闭:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // 关闭游标
    cursor.close();
  }
});

游标还可以使用流式API,这可以更加方便,尤其是在Rx化的版本中。

connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
  if (ar1.succeeded()) {
    PreparedStatement pq = ar1.result();

    // 一次获取50行
    RowStream<Row> stream = pq.createStream(50, Tuple.of(18));

    // 使用流
    stream.exceptionHandler(err -> {
      System.out.println("Error: " + err.getMessage());
    });
    stream.endHandler(v -> {
      System.out.println("End of stream");
    });
    stream.handler(row -> {
      System.out.println("User: " + row.getString("last_name"));
    });
  }
});

当这些行已传递给处理程序时,该流将批量读取 50 行并将其流化。 然后读取新一批的 50 行数据,依此类推。

流可以恢复或暂停,已加载的行将保留在内存中,直到被送达,游标将停止迭代。

追踪查询

当Vert.x启用了跟踪时,SQL客户端可以跟踪查询执行。

客户端报告以下 client 跨度(spans):

  • Query operation name

  • tags

  • db.user: the database username

  • db.instance: the database instance

  • db.statement: the SQL query

  • db.type: sql

默认的跟踪策略是 PROPAGATE ,客户端仅当涉及活动跟踪时才创建跨度(span)。

您可以使用 setTracingPolicy 更改客户策略,例如 您可以设置 ALWAYS 总是报告跨度(span):

options.setTracingPolicy(TracingPolicy.ALWAYS);

MySQL 类型映射

当前客户端支持以下 MySQL 类型

  • BOOL,BOOLEAN (java.lang.Byte)

  • TINYINT (java.lang.Byte)

  • TINYINT UNSIGNED(java.lang.Short)

  • SMALLINT (java.lang.Short)

  • SMALLINT UNSIGNED(java.lang.Integer)

  • MEDIUMINT (java.lang.Integer)

  • MEDIUMINT UNSIGNED(java.lang.Integer)

  • INT,INTEGER (java.lang.Integer)

  • INTEGER UNSIGNED(java.lang.Long)

  • BIGINT (java.lang.Long)

  • BIGINT UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • FLOAT (java.lang.Float)

  • FLOAT UNSIGNED(java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DOUBLE UNSIGNED(java.lang.Double)

  • BIT (java.lang.Long)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • NUMERIC UNSIGNED(io.vertx.sqlclient.data.Numeric)

  • DATE (java.time.LocalDate)

  • DATETIME (java.time.LocalDateTime)

  • TIME (java.time.Duration)

  • TIMESTAMP (java.time.LocalDateTime)

  • YEAR (java.lang.Short)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • BINARY (io.vertx.core.buffer.Buffer)

  • VARBINARY (io.vertx.core.buffer.Buffer)

  • TINYBLOB (io.vertx.core.buffer.Buffer)

  • TINYTEXT (java.lang.String)

  • BLOB (io.vertx.core.buffer.Buffer)

  • TEXT (java.lang.String)

  • MEDIUMBLOB (io.vertx.core.buffer.Buffer)

  • MEDIUMTEXT (java.lang.String)

  • LONGBLOB (io.vertx.core.buffer.Buffer)

  • LONGTEXT (java.lang.String)

  • ENUM (java.lang.String)

  • SET (java.lang.String)

  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • GEOMETRY(io.vertx.mysqlclient.data.spatial.*)

元组解码在存储值时使用上述类型

请注意:在Java中,没有无符号数字值的具体表示形式,因此客户端会将无符号值转换为相关的Java类型。

隐式类型转换

当执行预处理语句时,响应式 MySQL 客户端支持隐式类型转换。 假设您的表中有一个 TIME 列,下面的两个示例都是有效的。

client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of(LocalTime.of(19, 10, 25)), ar -> {
  // 处理结果
});
// 这个也适用于隐式类型转换
client
  .preparedQuery("SELECT * FROM students WHERE updated_time = ?")
  .execute(Tuple.of("19:10:25"), ar -> {
  // 处理结果
});

MySQL 数据类型编码是根据参数值推断的。下面是具体的类型映射:

参数值 MySQL 类型编码

null

MYSQL_TYPE_NULL

java.lang.Byte

MYSQL_TYPE_TINY

java.lang.Boolean

MYSQL_TYPE_TINY

java.lang.Short

MYSQL_TYPE_SHORT

java.lang.Integer

MYSQL_TYPE_LONG

java.lang.Long

MYSQL_TYPE_LONGLONG

java.lang.Double

MYSQL_TYPE_DOUBLE

java.lang.Float

MYSQL_TYPE_FLOAT

java.time.LocalDate

MYSQL_TYPE_DATE

java.time.Duration

MYSQL_TYPE_TIME

java.time.LocalTime

MYSQL_TYPE_TIME

io.vertx.core.buffer.Buffer

MYSQL_TYPE_BLOB

java.time.LocalDateTime

MYSQL_TYPE_DATETIME

io.vertx.mysqlclient.data.spatial.*

MYSQL_TYPE_BLOB

default

MYSQL_TYPE_STRING

处理布尔值

在 MySQL 中 BOOLEANBOOL 数据类型是 TINYINT(1) 的同义词。零值视为 false,非零值视为 true。 BOOLEAN 数据类型值以 java.lang.Byte 类型存储在 RowTuple 中,调用 Row#getValue 可以获取到 java.lang.Byte 类型的值, 也可以调用 Row#getBoolean 获取 java.lang.Boolean 类型的值。

client
  .query("SELECT graduated FROM students WHERE id = 0")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rowSet = ar.result();
    for (Row row : rowSet) {
      int pos = row.getColumnIndex("graduated");
      Byte value = row.get(Byte.class, pos);
      Boolean graduated = row.getBoolean("graduated");
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

当您想要使用一个 BOOLEAN 参数值执行预处理语句时,只要简单地在参数列表中添加 java.lang.Boolean 值即可。

client
  .preparedQuery("UPDATE students SET graduated = ? WHERE id = 0")
  .execute(Tuple.of(true), ar -> {
  if (ar.succeeded()) {
    System.out.println("Updated with the boolean value");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

处理 JSON

MySQL JSON 数据类型由以下Java类型表示:

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL 表示 JSON null 字面量

Tuple tuple = Tuple.of(
  Tuple.JSON_NULL,
  new JsonObject().put("foo", "bar"),
  3);

// 获取 json
Object value = tuple.getValue(0); // 期望得到 JSON_NULL

//
value = tuple.get(JsonObject.class, 1); // 期望得到 JSON object

//
value = tuple.get(Integer.class, 2); // 期望得到 3
value = tuple.getInteger(2); // 期望得到 3

处理 BIT

BITjava.lang.Long 类型的映射, 但是Java没有无符号数值的概念, 因此,如果您要插入或更新一条记录为 BIT(64) 的最大值 , 可以将参数设置为 -1L

处理 TIME

MySQL TIME 数据类型可用于表示一天中的时间或范围为 -838:59:59838:59:59 的时间间隔。在响应式MySQL客户端, TIME 数据类型自然的被映射为 java.time.Duration,您也可以调用 Row#getLocalTime 获取到 java.time.LocalTime 类型的值。

处理 NUMERIC

Numeric Java类型用于表示MySQL的 NUMERIC 类型。

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // 处理 NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

处理 ENUM

MySQL支持ENUM数据类型,客户端将这些类型检索为String数据类型。

您可以像这样将Java枚举编码为String:

client
  .preparedQuery("INSERT INTO colors VALUES (?)")
  .execute(Tuple.of(Color.red),  res -> {
    // ...
  });

您可以将 ENUM 类型的列读取为 Java 的枚举,如以下代码:

client
  .preparedQuery("SELECT color FROM colors")
  .execute()
  .onComplete(res -> {
  if (res.succeeded()) {
    RowSet<Row> rows = res.result();
    for (Row row : rows) {
      System.out.println(row.get(Color.class, "color"));
    }
  }
});

处理 GEOMETRY

MYSQL 还支持 GEOMETRY 数据类型,下面是一些示例展示您可以使用 Well-Known Text(WKT)格式或 Well-Known Binary(WKB)格式处理几何数据,数据被解码为MySQL TEXT 或 BLOB 数据类型。有很多很棒的第三方库可以处理这种格式的数据。

您可以以 WKT 格式获取空间数据:

client
  .query("SELECT ST_AsText(g) FROM geom;")
  .execute(ar -> {
  if (ar.succeeded()) {
    // 以WKT格式获取空间数据
    RowSet<Row> result = ar.result();
    for (Row row : result) {
      String wktString = row.getString(0);
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

或者,您可以以WKB格式获取空间数据:

client
  .query("SELECT ST_AsBinary(g) FROM geom;")
  .execute(ar -> {
  if (ar.succeeded()) {
    // 以WKB格式获取空间数据
    RowSet<Row> result = ar.result();
    for (Row row : result) {
      Buffer wkbValue = row.getBuffer(0);
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

在响应式 MySQL 客户端中,我们还提供了一种处理几何数据类型的简单方法。

您可以将几何数据作为Vert.x数据对象检索:

client
  .query("SELECT g FROM geom;")
  .execute(ar -> {
  if (ar.succeeded()) {
    // 以Vert.x数据对象获取空间数据
    RowSet<Row> result = ar.result();
    for (Row row : result) {
      Point point = row.get(Point.class, 0);
      System.out.println("Point x: " + point.getX());
      System.out.println("Point y: " + point.getY());
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

您也可以将WKB描述用作预处理语句参数。

Point point = new Point(0, 1.5, 1.5);
// 以 WKB 描述发送
client
  .preparedQuery("INSERT INTO geom VALUES (ST_GeomFromWKB(?))")
  .execute(Tuple.of(point), ar -> {
  if (ar.succeeded()) {
    System.out.println("Success");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

集合类查询

您可以将查询API与Java集合类结合使用:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // 获取用集合类创建的map
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

集合类处理不能保留 Row 的引用,因为只有一个 Row 对象用于处理整个集合。

Java Collectors 提供了许多有趣的预定义集合类,例如, 您可以直接用 Row 中的集合轻松拼接成一个字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // 获取用集合类创建的String
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

MySQL 存储过程

您可以在查询中运行存储过程。结果将按照 MySQL 协议 从服务器获取,无需任何魔法。

client.query("CREATE PROCEDURE multi() BEGIN\n" +
  "  SELECT 1;\n" +
  "  SELECT 1;\n" +
  "  INSERT INTO ins VALUES (1);\n" +
  "  INSERT INTO ins VALUES (2);\n" +
  "END;").execute(ar1 -> {
  if (ar1.succeeded()) {
    // 创建存储过程成功
    client
      .query("CALL multi();")
      .execute(ar2 -> {
      if (ar2.succeeded()) {
        // 处理结果
        RowSet<Row> result1 = ar2.result();
        Row row1 = result1.iterator().next();
        System.out.println("First result: " + row1.getInteger(0));

        RowSet<Row> result2 = result1.next();
        Row row2 = result2.iterator().next();
        System.out.println("Second result: " + row2.getInteger(0));

        RowSet<Row> result3 = result2.next();
        System.out.println("Affected rows: " + result3.rowCount());
      } else {
        System.out.println("Failure: " + ar2.cause().getMessage());
      }
    });
  } else {
    System.out.println("Failure: " + ar1.cause().getMessage());
  }
});

Note: 目前尚不支持绑定OUT参数的预处理语句。

MySQL 导入本地文件

该客户端支持处理 LOCAL INFILE 请求, 如果要将数据从本地文件加载到服务器,则可以使用语句 LOAD DATA LOCAL INFILE '<filename>' INTO TABLE <table>;。 更多有关信息,请参阅 MySQL 参考手册.

认证

默认身份验证插件

该客户端支持指定在连接开始时使用缺省的身份验证插件。 当前支持以下插件:

  • mysql_native_password

  • caching_sha2_password

  • mysql_clear_password

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setAuthenticationPlugin(MySQLAuthenticationPlugin.MYSQL_NATIVE_PASSWORD); // 设定默认身份验证插件

MySQL 8 中引入的新身份验证方法

MySQL 8.0 引入了一种名为 caching_sha2_password 的身份验证方法,它是默认的身份验证方法。 为了使用此新的身份验证方法连接到服务器,您需要使用安全连接(例如 启用 TLS/SSL)或使用 RSA 密钥对交换加密密码,以避免密码泄漏。RSA 密钥对在通信过程中会自动交换,但服务器 RSA 公钥可能会在这个过程中被黑客攻击,因为它通过不安全的连接传输。 因此,如果您使用不安全的连接,并且希望避免暴露服务器 RSA 公钥的风险,可以这样设置服务器 RSA 公钥:

MySQLConnectOptions options1 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyPath("tls/files/public_key.pem"); // 配置公钥路径

MySQLConnectOptions options2 = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setServerRsaPublicKeyValue(Buffer.buffer("-----BEGIN PUBLIC KEY-----\n" +
    "MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA3yvG5s0qrV7jxVlp0sMj\n" +
    "xP0a6BuLKCMjb0o88hDsJ3xz7PpHNKazuEAfPxiRFVAV3edqfSiXoQw+lJf4haEG\n" +
    "HQe12Nfhs+UhcAeTKXRlZP/JNmI+BGoBduQ1rCId9bKYbXn4pvyS/a1ft7SwFkhx\n" +
    "aogCur7iIB0WUWvwkQ0fEj/Mlhw93lLVyx7hcGFq4FOAKFYr3A0xrHP1IdgnD8QZ\n" +
    "0fUbgGLWWLOossKrbUP5HWko1ghLPIbfmU6o890oj1ZWQewj1Rs9Er92/UDj/JXx\n" +
    "7ha1P+ZOgPBlV037KDQMS6cUh9vTablEHsMLhDZanymXzzjBkL+wH/b9cdL16LkQ\n" +
    "5QIDAQAB\n" +
    "-----END PUBLIC KEY-----\n")); // 配置公钥缓冲

有关 caching_sha2_password 身份验证方法的更多信息,请参见 MySQL 参考手册.

使用 SSL/TLS

配置客户端使用SSL连接, 您可以像 Vert.x NetClient 一样配置 MySQLConnectOptions 。 响应式 MySQL 客户端 支持所有 SSL 模式 ,而且您能够配置 sslmode. 客户端默认设置为 DISABLED SSL 模式。 ssl 参数仅作为设置 sslmode 的快捷方式存在。setSsl(true) 等价于 setSslMode(VERIFY_CA)setSsl(false) 等价于 setSslMode(DISABLED)

MySQLConnectOptions options = new MySQLConnectOptions()
  .setPort(3306)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));

MySQLConnection.connect(vertx, options, res -> {
  if (res.succeeded()) {
    // 用SSL连接
  } else {
    System.out.println("Could not connect " + res.cause());
  }
});

更多详细信息,请参阅 Vert.x 文档.

MySQL 实用程序命令

有时您希望使用 MySQL 实用程序命令,我们为此提供支持。 有关更多信息,请参阅 MySQL 实用程序命令.

COM_PING

您可以使用 COM_PING 命令检查服务器是否处于活动状态。如果服务器响应 PING,将通知处理程序,否则将永远不会调用处理程序。

connection.ping(ar -> {
  System.out.println("The server has responded to the PING");
});

COM_RESET_CONNECTION

您可以使用 COM_RESET_CONNECTION 命令重置会话状态,这将重置连接状态,如: - user variables - temporary tables - prepared statements

connection.resetConnection(ar -> {
  if (ar.succeeded()) {
    System.out.println("Connection has been reset now");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_CHANGE_USER

您可以更改当前连接的用户,这将执行re-authentication并重置连接状态,如 COM_RESET_CONNECTION

MySQLAuthOptions authenticationOptions = new MySQLAuthOptions()
  .setUser("newuser")
  .setPassword("newpassword")
  .setDatabase("newdatabase");
connection.changeUser(authenticationOptions, ar -> {
  if (ar.succeeded()) {
    System.out.println("User of current connection has been changed.");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_INIT_DB

您可以使用 COM_INIT_DB 命令更改连接的默认schema。

connection.specifySchema("newschema", ar -> {
  if (ar.succeeded()) {
    System.out.println("Default schema changed to newschema");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_STATISTICS

您可以使用 COM_STATISTICS 命令获取某些内部状态变量的可读字符串。

connection.getInternalStatistics(ar -> {
  if (ar.succeeded()) {
    System.out.println("Statistics: " + ar.result());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_DEBUG

您可以使用 COM_DEBUG 命令将调试信息转储到MySQL服务器的STDOUT。

connection.debug(ar -> {
  if (ar.succeeded()) {
    System.out.println("Debug info dumped to server's STDOUT");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

COM_SET_OPTION

您可以使用 COM_SET_OPTION 命令设置当前连接的选项。目前只能设置 CLIENT_MULTI_STATEMENTS

例如,您可以使用此命令禁用 CLIENT_MULTI_STATEMENTS

connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
  if (ar.succeeded()) {
    System.out.println("CLIENT_MULTI_STATEMENTS is off now");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

MySQL 和 MariaDB 版本支持情况

MySQL MariaDB

版本

是否支持

版本

是否支持

5.5

10.1

5.6

10.2

5.7

10.3

8.0

10.4

已知问题:

  • 重置连接实用程序命令在 MySQL 5.5、5.6 和 MariaDB 10.1 中不可用

  • MariaDB 10.2 和 10.3 不支持更改用户实用程序命令

陷阱和最佳实践

以下是使用响应式 MySQL 客户端时避免常见陷阱的一些最佳实践。

预处理语句的计数限制

有时您可能会遇到臭名昭著的错误 Can’t create more than max_prepared_stmt_count statements (current value: 16382) ,这是因为服务器已达到预处理语句的总数限制。

您可以调整服务器系统变量 max_prepared_stmt_count ,但它具有上限值,因此您无法以这种方式摆脱错误。

缓解这种情况的最佳方法是启用预处理语句缓存,因此可以重用具有相同 SQL 字符串的预处理语句,并且客户端不必为每个请求创建全新的预处理语句。 执行语句后,预处理语句将自动关闭。 这样,虽然无法完全消除错误,但达到极限的机会可以大大减少。

还可以通过接口 SqlConnection#prepare 创建 PreparedStatement 对象来手动管理预处理语句的生命周期,以便可以选择何时释放语句句柄,甚至使用 SQL syntax prepared statement.

揭开批量预处理神秘面纱

当您要将数据批量插入到数据库中时,您可以使用 PreparedQuery#executeBatch ,它提供了一个简单的 API 来处理此问题。 请记住,MySQL 原生不支持批处理协议,因此 API 只是语法糖,一个接一个执行预处理语句,这意味着比执行一个预处理语句插入多条带有值列表的记录,需要更多的网络往返耗时。

棘手的日期和时间数据类型

处理 MYSQL 日期和时间数据类型(尤其是时区)是棘手的,因此响应式 MySQL 客户端不会为这些值进行魔法转换。

  • MySQL DATETIME 数据类型不包含时区信息,因此无论当前会话中的时区是什么,您获取的内容都与您设置的内容相同

  • MySQL TIMESTAMP 数据类型包含时区信息,因此当您设置或获取值时,服务器总是通过当前会话中设置的时区来转换该值。

高级连接池配置

数据库服务负载均衡

您可以使用包含多个数据库服务的列表来配置连接池而不是单个数据库服务。

MySQLPool pool = MySQLPool.pool(Arrays.asList(server1, server2, server3), options);

当一个连接创建时,连接池使用(round-robin)轮询调度算法做负载均衡以选择不同的数据库服务

注意
负载均衡是在创建连接时提供的,而不是在从连接池中获取连接时提供

连接初始化

您可以使用 connectHandler 方法在连接创建后和连接释放回连接池之前来与数据库连接交互

pool.connectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    //  将连接释放回连接池,以被该应用程序复用
    conn.close();
  });
});

连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用