响应式 MSSQL 客户端

Reactive MSSQL 客户端是微软 SQL Server 的客户端, 具有简单直接专注于可扩展性和低开销的 API。

特性

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 支持内存直接映射到对象,避免了不必要的复制

  • Java 8 Date 和 Time 支持

  • RxJava API

  • SSL/TLS

暂不支持

  • Prepared 查询缓存

  • 游标

  • 行流(Row streaming)

  • 不支持一些 数据类型

用法

要使用反应式 MSSQL 客户端,请将以下依赖项添加到构建描述文件中的 dependencies 部分:

  • Maven (in your pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-mssql-client</artifactId>
 <version>4.1.8</version>
</dependency>
  • Gradle (in your build.gradle file):

dependencies {
 compile 'io.vertx:vertx-mssql-client:4.1.8'
}

由此开始

这是最简单的连接、查询和断开连接方法:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setPort(1433)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建客户端池
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);

// 一个简单的查询
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // 现在关闭池
  client.close();
});

连接 SQL Server

大多数情况下,您将使用池连接到 MSSQL:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setPort(1433)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 池参数
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(connectOptions, poolOptions);

池化客户端使用连接池,任何操作都会从池中借用连接, 随后执行操作,并最终执行完之后将其释放到池中。

如果您使用 Vert.x 运行,您可以将它传递给您的 Vertx 实例:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setPort(1433)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);

当您不再需要它时,您需要释放池:

pool.close();

当您需要在同一个连接上执行多个操作时,需要使用客户端 连接

您可以轻松地从池中获取一个:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setPort(1433)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建池化客户端
MSSQLPool client = MSSQLPool.pool(vertx, connectOptions, poolOptions);

// 从池中获取连接
client.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // 所有操作都在同一个连接上执行
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // 释放连接并将其归还给池
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

完成连接后,您必须关闭它以将其释放到池中,以便可以重复使用。

配置

数据对象

一种配置客户端的简单方法是指定一个 MSSQLConnectOptions 数据对象。

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setPort(1433)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 池参数
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// 根据数据对象创建池
MSSQLPool pool = MSSQLPool.pool(vertx, connectOptions, poolOptions);

pool.getConnection(ar -> {
  // 处理您的连接
});

连接 URI

一种替代用 MSSQLConnectOptions 数据对象配置客户端的方案,便是您可使用连接 URI:

String connectionUri = "sqlserver://dbuser:secretpassword@database.server.com:3211/mydb";

// 从连接 URI 创建池
MSSQLPool pool = MSSQLPool.pool(connectionUri);

// 从连接 URI 创建连接
MSSQLConnection.connect(vertx, connectionUri, res -> {
  // 处理您的连接
});

连接 URI 格式由客户端以惯用方式定义: sqlserver://[user[:[password]]@]host[:port][/database][?attribute1=value1&attribute2=value2…​]

当前,客户端在连接 uri 中支持以下参数关键字(key不区分大小写):

  • host

  • port

  • user

  • password

  • database

连接重试

您可以配置客户端连接失败后重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

执行查询

当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。

下边是如何执行一个简单的查询的例子:

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预查询

执行预查询也是一样的操作。

SQL字符通过位置引用实际的参数,并使用数据库的语法 `@p1`, `@p2`, etc…​

client
  .preparedQuery("SELECT * FROM users WHERE id=@p1")
  .execute(Tuple.of("julien"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet 实例

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

或者 UPDATE/INSERT 类型的查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES (@p1, @p2)")
  .execute(Tuple.of("Julien", "Viet"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row对象(Row)可以让您通过索引位置获取相应的数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或者通过名称

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。

您也可以直接访问得到多种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存过的预处理语句去执行一次性的预查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = @p1")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您也可以创建 PreparedStatement 并自主地管理它的生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = @p1", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以在预查询中执行批处理操作

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES (@p1, @p2)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

使用自增列

您可以在插入新数据后使用 OUTPUT 子句来获取自增列的值

client
  .preparedQuery("INSERT INTO movies (title) OUTPUT INSERTED.id VALUES (@p1)")
  .execute(Tuple.of("The Man Who Knew Too Much"), res -> {
    if (res.succeeded()) {
      Row row = res.result().iterator().next();
      System.out.println(row.getLong("id"));
    }
  });

使用连接

获取一条连接

当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

也可以通过连接对象创建预查询:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE @p1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化的连接API

当您使用连接池时,您可以调用 withConnection 并以当前连接要执行的操作作为参数。

这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。

这种方式需要返回一个future对象来表示操作结果。

当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (@p1, @p2)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接中使用事务操作

您可以使用SQL语法 BEGIN/COMMIT/ROLLBACK 来执行事务操作,同时您必须使用 SqlConnection 并自己管理当前连接。

或者您也可以使用 SqlConnection 的事务API:

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
    // Begin the transaction
    conn.begin()
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(v -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block) ,事务已回滚和 completion 方法的返回值future返回了 TransactionRollbackException 异常时:

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

简化版事务API

当您使用连接池时,您可以调用 withTransaction 方法 并传递待执行的事务操作作为参数。

这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。

待执行操作需要返回一个future来表示可能产生的结果:

  • 当future成功时,客户端提交该事务

  • 当future失败时,客户端回滚该事务

事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

查询追踪

当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。

客户端会上报下列这些 client spans:

  • Query 操作名称

  • tags

  • db.user :数据库用户名

  • db.instance :数据库实例

  • db.statement :SQL语句

  • db.typesql

默认的 tracing 策略时 PROPAGATE,客户端 在一个活跃trace里只创建一个span。

您可以通过 setTracingPolicy 方法来调整tracing策略, 例如您可以设置为 ALWAYS, 客户端将始终上报span:

options.setTracingPolicy(TracingPolicy.ALWAYS);

支持的数据类型

当前,客户端支持以下 SQL Server 类型:

  • TINYINT(java.lang.Short)

  • SMALLINT(java.lang.Short)

  • INT(java.lang.Integer)

  • BIGINT(java.lang.Long)

  • BIT(java.lang.Boolean)

  • REAL(java.lang.Float)

  • DOUBLE(java.lang.Double)

  • NUMERIC/DECIMAL(BigDecimal)

  • CHAR/VARCHAR(java.lang.String)

  • NCHAR/NVARCHAR(java.lang.String)

  • DATE(java.time.LocalDate)

  • TIME(java.time.LocalTime)

  • DATETIME2(java.time.LocalDateTime)

  • DATETIMEOFFSET(java.time.OffsetDateTime)

  • BINARY/VARBINARY(io.vertx.core.buffer.Buffer)

元组(Tuple)解码在存储值时使用上述类型。

使用 Java 的 枚举 类型

SQL Server 没有 ENUM 数据类型,但客户端可以将检索到的字符串/数字数据类型映射到枚举。

您可以像这样对 Java 枚举进行编码:

client
  .preparedQuery("INSERT INTO colors VALUES (@p1)")
  .execute(Tuple.of(Color.red),  res -> {
    // ...
  });

您可以像这样解码 Java 枚举:

client
  .preparedQuery("SELECT color FROM colors")
  .execute()
  .onComplete(res -> {
    if (res.succeeded()) {
      RowSet<Row> rows = res.result();
      for (Row row : rows) {
        System.out.println(row.get(Color.class, "color"));
      }
    }
  });

处理空 NULL

如果您使用 addXXX 方法之一修改 Tuple,则 null 值被透明处理。

客户端可以在执行准备好的查询时推断正确的 SQL 类型:

Tuple tuple = Tuple.tuple()
  .addInteger(17)
  .addString("The Man Who Knew Too Much")
  .addString(null);
client
  .preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
  .execute(tuple, res -> {
    // ...
  });

否则,您应该使用 NullValue 常量和 NullValue.of 方法之一以显式声明类型:

Tuple tuple = Tuple.of(17, "The Man Who Knew Too Much", NullValue.String);
client
  .preparedQuery("INSERT INTO movies (id, title, plot) VALUES (@p1, @p2, @p3)")
  .execute(tuple, res -> {
    // ...
  });

收集器查询

您可以通过查询 API 使用 Java 收集器:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// 使用收集器运行查询
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<Map<Long, String>> result = ar.result();

      // 获取收集器创建的映射(map)对象
      Map<Long, String> map = result.value();
      System.out.println("Got " + map);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

收集器处理时不得保留对 Row 的引用, 因为有一行用于处理整个集合。

Java Collectors 提供了许多有趣的预定义收集器,例如您可以 create 直接从行集轻松创建字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// 使用收集器运行查询
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // 获取收集器创建的字符串
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

使用 SSL/TLS

加密等级协商

当一个数据库连接建立时,客户端和服务端需要协商加密等级。

协商的加密等级取决于客户端的 MSSQLConnectOptions 配置和服务端和配置:

  • 不加密: 如果客户端的 ssl 配置设置为 false ,并且服务端不支持加密

  • 只加密登录报文: 如果客户端的 ssl 配置设置为 false , 并且服务端支持加密

  • 加密整个连接通道: 如果客户端的 ssl 配置设置为 true , 并且服务端要求加密

注意

如果客户端的 ssl 配置设置为 true ,并且服务端不支持加密时,加密等级协商会失败。 在这种情况下,客户端会终止连接。

配置

为了设置客户端的 ssl 项配置, 使用 setSsl 方法。 默认情况下, ssl 配置为 false

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions().setSsl(true);

ssl 配置为 false 时,客户端信任所有的服务端密钥。 否则,客户端会验证主机名。

如果客户端的 ssl 配置为 true, 并且服务端使用了自签名的密钥, 可以禁用主机名验证:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setSsl(true)
  .setTrustAll(true);

客户端也可以选择配置信任所有服务端的密钥,使用 TrustOptions 进行配置。 例如,如果服务端密钥放置在一个 PEM 格式的文件中时,可以使用 PemTrustOptions 进行如下配置:

MSSQLConnectOptions connectOptions = new MSSQLConnectOptions()
  .setSsl(true)
  .setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/server-cert.pem"));

对于 Vert.x 中更高级的 SSL 支持,请参考 Vert.x Core 文档

高级连接池配置

数据库服务负载均衡

您可以使用包含多个数据库服务的列表来配置连接池而不是单个数据库服务。

MSSQLPool pool = MSSQLPool.pool(Arrays.asList(server1, server2, server3), options);

当一个连接创建时,连接池使用(round-robin)轮询调度算法做负载均衡以选择不同的数据库服务

注意
负载均衡是在创建连接时提供的,而不是在从连接池中获取连接时提供

连接初始化

您可以使用 connectHandler 方法在连接创建后和连接释放回连接池之前来与数据库连接交互

pool.connectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    //  将连接释放回连接池,以被该应用程序复用
    conn.close();
  });
});

连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用