Reactive DB2 客户端

响应式的DB2客户端具有直接简单的API, 专注于可伸缩性和低开销。

该客户端是响应式的、非阻塞的,可以用单个线程处理多个数据库连接。

特性

  • 支持Linux、Unix,及Windows上的DB2

  • 对z/OS上的DB2提供有限支持

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预处理查询(Prepared query)缓存

  • 批处理及游标支持

  • 流式行处理

  • RxJava 1 与 RxJava 2 支持

  • 支持对象存储在直接内存,避免不必要的拷贝

  • 支持Java 8的日期和时间类型

  • 支持 SSL/TLS

  • 支持 HTTP/1.x 连接,SOCKS4a 及 SOCKS5 等代理

当前限制

  • 不支持存储过程

  • 不支持某些列类型(例如BLOB和CLOB)

使用方法

使用响应式 DB2 客户端,需要将以下依赖项添加到项目构建工具的 依赖 配置中:

  • Maven (在您的 pom.xml):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-db2-client</artifactId>
 <version>4.0.3</version>
</dependency>
  • Gradle (在您的 build.gradle 文件中):

dependencies {
 compile 'io.vertx:vertx-db2-client:4.0.3'
}

开始

以下是最简单的连接,查询和断开连接方法

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池选项
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建客户端池
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);

// 简单查询
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // 现在关闭客户端池
  client.close();
});

连接 DB2

大多数时候,您将使用连接池连接 DB2:

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建池化的客户端
DB2Pool client = DB2Pool.pool(connectOptions, poolOptions);

池化的客户端使用连接池,任何操作都将借用连接池中的连接来执行该操作, 并将连接释放回连接池中。

如果您使用 Vert.x 运行,您可以将 Vertx 实例传递给它:

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);
// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);

当您不再需要连接池时,您需要将其释放:

pool.close();

当您需要在同一连接上执行多个操作时,您需要使用 connection 客户端。

您可以轻松地从连接池中获取一个:

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池配置
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// 创建池化的客户端
DB2Pool client = DB2Pool.pool(vertx, connectOptions, poolOptions);

// 从连接池获取一个连接
client.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // 以下所有操作都在同一个连接上执行
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // 将连接释放回连接池
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

连接使用完后,您必须关闭它以释放到连接池中,以便可以重复使用。

配置

有几个选项供您配置客户端。

数据对象

配置客户端的简单方法就是指定 DB2ConnectOptions 数据对象。

DB2ConnectOptions connectOptions = new DB2ConnectOptions()
  .setPort(50000)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// 连接池配置
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// 从数据对象创建连接池
DB2Pool pool = DB2Pool.pool(vertx, connectOptions, poolOptions);

pool.getConnection(ar -> {
  // 使用连接进行处理
});

您也可以使用 setPropertiesaddProperty 方法配置通用配置项。但请注意调用 setProperties 方法会覆盖默认的客户端配置。

连接 URI

除了使用 DB2ConnectOptions 数据对象进行配置外,我们还为您提供了另外一种使用连接URI进行配置的方法:

String connectionUri = "db2://dbuser:secretpassword@database.server.com:50000/mydb";

// 从连接URI创建连接池
DB2Pool pool = DB2Pool.pool(connectionUri);

// 从连接URI创建连接
DB2Connection.connect(vertx, connectionUri, res -> {
  // 使用连接进行处理
});

连接字符串的URI格式为:

db2://<USERNAME>:<PASSWORD>@<HOSTNAME>:<PORT>/<DBNAME>

目前,客户端支持以下的连接 uri 参数关键字:

  • host

  • port

  • user

  • password

  • dbname

Note: 连接URI中配置的配置项会覆盖默认配置。

连接重试

您可以将客户端配置为在连接无法建立时重试。

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

执行查询

当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。 下边是如何执行一个简单的查询的例子:

client
  .query("SELECT * FROM users WHERE id='andy'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预查询

执行预查询也是一样的操作。

SQL字符通过位置引用实际的参数,并使用数据库的语法 `?` ​

client
  .preparedQuery("SELECT * FROM users WHERE id=$1")
  .execute(Tuple.of("andy"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet 实例

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

或者 UPDATE/INSERT 类型的查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
  .execute(Tuple.of("Andy", "Guibert"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row对象(Row)可以让您通过索引位置获取相应的数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或者通过名称

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。

您也可以直接访问得到多种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存过的预处理语句去执行一次性的预查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = ?")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您也可以创建 PreparedStatement 并自主地管理它的生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id= ?", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以在预查询中执行批处理操作

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julient Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));
batch.add(Tuple.of("andy", "Andy Guibert"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

通过将查询包装在 SELECT <COLUMNS> FROM FINAL TABLE ( <SQL> ),可以获取生成的键,例如:

client
  .preparedQuery("SELECT color_id FROM FINAL TABLE ( INSERT INTO color (color_name) VALUES (?), (?), (?) )")
  .execute(Tuple.of("white", "red", "blue"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Inserted " + rows.rowCount() + " new rows.");
    for (Row row : rows) {
      System.out.println("generated key: " + row.getInteger("color_id"));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

使用连接

获取一条连接

当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Andy", "Guibert")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

也可以通过连接对象创建预查询:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Andy"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化的连接API

当您使用连接池时,您可以调用 withConnection 并以当前连接要执行的操作作为参数。

这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。

这种方式需要返回一个future对象来表示操作结果。

当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES (?, ?)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Andy", "Guibert")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接中使用事务操作

您可以使用SQL语法 BEGIN/COMMIT/ROLLBACK 来执行事务操作,同时您必须使用 SqlConnection 并自己管理当前连接。

或者您也可以使用 SqlConnection 的事务API:

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
    // Begin the transaction
    conn.begin()
      .compose(tx -> conn
        // Various statements
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
        .execute()
        .compose(res2 -> conn
          .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
          .execute())
        // Commit the transaction
        .compose(res3 -> tx.commit()))
      // Return the connection to the pool
      .eventually(v -> conn.close())
      .onSuccess(v -> System.out.println("Transaction succeeded"))
      .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
  });

当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block) ,事务已回滚和 completion 方法的返回值future返回了 TransactionRollbackException 异常时:

tx.completion()
  .onFailure(err -> {
    System.out.println("Transaction failed => rolled back");
  });

简化版事务API

当您使用连接池时,您可以调用 withTransaction 方法 并传递待执行的事务操作作为参数。

这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。

待执行操作需要返回一个future来表示可能产生的结果:

  • 当future成功时,客户端提交该事务

  • 当future失败时,客户端回滚该事务

事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Andy','Guibert')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流式操作

默认情况下预查询操作会拉去所有的行记录,您可以使用 游标 来控制您想要读取的行数:

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Cursors require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));

        // Read 50 rows
        cursor.read(50, ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();

            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();
            }
          }
        });
      }
    });
  }
});

游标释放时需要同时执行关闭操作:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Streams require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));

        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        });
        stream.endHandler(v -> {
          tx.commit();
          System.out.println("End of stream");
        });
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));
        });
      }
    });
  }
});

上边的stream会批量读取 50 行并同时将其转换为流,当这些行记录被传递给处理器时, 会以此类推地读取下一批的 50 行记录。

stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。

追踪查询

当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。

客户端会上报下列这些 client spans:

  • Query 操作名称

  • tags

  • db.user :数据库用户名

  • db.instance :数据库实例

  • db.statement :SQL语句

  • db.typesql

默认的 tracing 策略时 PROPAGATE,客户端 在一个活跃trace里只创建一个span。

您可以通过 setTracingPolicy 方法来调整tracing策略, 例如您可以设置为 ALWAYS, 客户端将始终上报span:

options.setTracingPolicy(TracingPolicy.ALWAYS);

DB2 类型映射

当前客户端支持以下 DB2 类型

  • BOOLEAN (java.lang.Boolean) (只针对DB2 LUW)

  • SMALLINT (java.lang.Short)

  • INTEGER (java.lang.Integer)

  • BIGINT (java.lang.Long)

  • REAL (java.lang.Float)

  • DOUBLE (java.lang.Double)

  • DECIMAL (io.vertx.sqlclient.data.Numeric)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • ENUM (java.lang.String)

  • DATE (java.time.LocalDate)

  • TIME (java.time.LocalTime)

  • TIMESTAMP (java.time.LocalDateTime)

  • BINARY (byte[])

  • VARBINARY (byte[])

  • ROWID (io.vertx.db2client.impl.drda.DB2RowIdjava.sql.RowId) (只针对DB2 z/OS)

以下类型目前不支持:

  • XML

  • BLOB

  • CLOB

  • DBCLOB

  • GRAPHIC / VARGRAPHIC

有关进一步介绍DB2数据类型的文档,请参考以下资源:

元组解码在存储值时使用上述类型,并且在可能的情况下还对实际值进行即时转换:

pool
  .query("SELECT an_int_column FROM exampleTable")
  .execute(ar -> {
  RowSet<Row> rowSet = ar.result();
  Row row = rowSet.iterator().next();

  // INTEGER 类型字段读取出来是 java.lang.Integer
  Object value = row.getValue(0);

  // 转换为 java.lang.Long
  Long longValue = row.getLong(0);
});

使用 Java 枚举类型

您可以将Java的 枚举类型 映射为下面的列类型:

  • Strings (VARCHAR, TEXT)

  • Numbers (SMALLINT, INTEGER, BIGINT)

client.preparedQuery("SELECT day_name FROM FINAL TABLE ( INSERT INTO days (day_name) VALUES (?), (?), (?) )")
.execute(Tuple.of(Days.FRIDAY, Days.SATURDAY, Days.SUNDAY), ar -> {
 if (ar.succeeded()) {
  RowSet<Row> rows = ar.result();
  System.out.println("Inserted " + rows.rowCount() + " new rows");
  for (Row row : rows) {
	  System.out.println("Day: " + row.get(Days.class, "day_name"));
  }
 } else {
  System.out.println("Failure: " + ar.cause().getMessage());
 }
});
client.preparedQuery("SELECT day_num FROM FINAL TABLE ( INSERT INTO days (day_num) VALUES (?), (?), (?) )")
   .execute(Tuple.of(Days.FRIDAY.ordinal(), Days.SATURDAY.ordinal(), Days.SUNDAY.ordinal()), ar -> {
   	if (ar.succeeded()) {
   		RowSet<Row> rows = ar.result();
   		System.out.println("Inserted " + rows.rowCount() + " new rows");
   		for (Row row : rows) {
   			System.out.println("Day: " + row.get(Days.class, "day_num"));
   		}
   	} else {
   		System.out.println("Failure: " + ar.cause().getMessage());
   	}
   });

String类型使用Java枚举 name() 方法返回的名字进行匹配。

数值类型使用Java枚举 ordinal() 方法返回的序数进行匹配,row.get() 方法获取到的是整型值对应Java枚举序数的枚举值的 name() 值。

集合类查询

您可以将查询API与Java集合类结合使用:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// 运行查询使用集合类
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
  if (ar.succeeded()) {
    SqlResult<Map<Long, String>> result = ar.result();

    // 获取用集合类创建的map
    Map<Long, String> map = result.value();
    System.out.println("Got " + map);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

集合类处理不能保留 Row 的引用,因为只有一个 Row 对象用于处理整个集合。

Java Collectors 提供了许多有趣的预定义集合类,例如, 您可以直接用 Row 中的集合轻松拼接成一个字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// 运行查询使用集合类
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // 获取用集合类创建的String
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

使用 SSL/TLS

配置客户端使用SSL连接, 您可以像 Vert.x NetClient 一样配置 DB2ConnectOptions

DB2ConnectOptions options = new DB2ConnectOptions()
  .setPort(50001)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSsl(true)
  .setTrustStoreOptions(new JksOptions()
      .setPath("/path/to/keystore.p12")
      .setPassword("keystoreSecret"));

DB2Connection.connect(vertx, options, res -> {
  if (res.succeeded()) {
    // 使用SSL进行连接
  } else {
    System.out.println("Could not connect " + res.cause());
  }
});

更多详细信息,请参阅 Vert.x 文档.

使用代理

您可以配置客户端使用 HTTP/1.x 连接、SOCKS4a 或 SOCKS5 代理。

更多详细信息,请参阅 Vert.x 文档

Unresolved directive in index.adoc - include::override/rxjava2.adoc[]