响应式(Reactive) PostgreSQL 客户端

响应式 PostgreSQL 客户端是一款以可扩展性和低开销为目标而专门为 PostgreSQL 数据库设计的 客户端。

客户端是响应式和非阻塞的,可以仅仅使用一条线程来处理大量的数据库连接。

  • 事件驱动

  • 轻量级

  • 内置连接池

  • 预查询缓存

  • 基于 PostgreSQL 的 NOTIFY/LISTEN 机制实现的发布/订阅

  • 批处理和游标

  • 支持原生流式操作

  • 命令管道(pipeline)

  • RxJava API

  • 支持内存直接映射到对象,避免了不必要的复制

  • 支持 Java 8 Date and Time

  • SSL/TLS

  • Unix domain socket

  • 支持 HTTP/1.x, SOCKS4a 或 SOCKS5 代理

用法

dependencies 里添加如下依赖来引入响应式PostgreSQL客户端:

  • Maven(在您的 pom.xml 文件里):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-pg-client</artifactId>
 <version>4.2.7</version>
</dependency>
  • Gradle(在您的 build.gradle 文件里):

dependencies {
 compile 'io.vertx:vertx-pg-client:4.2.7'
}

开始使用

如下是一种最为简单的连接、查询、关闭连接的方式

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the client pool
SqlClient client = PgPool.client(connectOptions, poolOptions);

// A simple query
client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }

  // Now close the pool
  client.close();
});

连接PostgreSQL

大多数时间,您将使用连接池连接到 PostgreSQL:

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
SqlClient client = PgPool.client(connectOptions, poolOptions);

池化PostgreSQL客户端使用连接池去执行数据库操作, 所有操作都会遵循从池里拿到连接、执行、释放连接到池里这三个步骤。

您可以传入一个连接池到正在运行的Vert.x实例里:

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
SqlClient client = PgPool.client(vertx, connectOptions, poolOptions);

如果不再需要客户端,您需要将其释放:

client.close();

当您想要在同一条连接上执行多个操作时,您需要从连接池中获取 connection 连接。

您可以很方便地从连接池里拿到一条连接:

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);

// Get a connection from the pool
pool.getConnection().compose(conn -> {
  System.out.println("Got a connection from the pool");

  // All operations execute on the same connection
  return conn
    .query("SELECT * FROM users WHERE id='julien'")
    .execute()
    .compose(res -> conn
      .query("SELECT * FROM users WHERE id='emad'")
      .execute())
    .onComplete(ar -> {
      // Release the connection to the pool
      conn.close();
    });
}).onComplete(ar -> {
  if (ar.succeeded()) {

    System.out.println("Done");
  } else {
    System.out.println("Something went wrong " + ar.cause().getMessage());
  }
});

为了连接可以重用,一旦当前连接上的操作已经完成,您需要关闭并释放连接到连接池里。

连接池与池化的客户端

PgPool 允许您创建连接池或池化客户端

SqlClient pooledClient = PgPool.client(vertx, connectOptions, poolOptions);

// 流水线操作(Pipelined)
Future<RowSet<Row>> res1 = pooledClient.query(sql).execute();

// 连接池
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);

// 不是流水线操作
Future<RowSet<Row>> res2 = pool.query(sql).execute();
  • 连接池操作并非流水线操作(pipelined),只有连接客户端是流水线操作

  • 池化的客户端操作是流水线操作

可共享的连接池

您可以在多个 Verticle 间或同一 Verticle 的多个实例间共享一个连接池。这样的连接池应该在 Verticle 外面创建, 否则这个连接池将在创建它的 Verticle 被取消部署时关闭

PgPool pool = PgPool.pool(database, new PoolOptions().setMaxSize(maxSize));
vertx.deployVerticle(() -> new AbstractVerticle() {
  @Override
  public void start() throws Exception {
    // 使用连接池
  }
}, new DeploymentOptions().setInstances(4));

您也可以用以下方式在每个 Verticle 中创建可共享的连接池:

vertx.deployVerticle(() -> new AbstractVerticle() {
  PgPool pool;
  @Override
  public void start() {
    // 创建一个可共享的连接池
    // 或获取已有的可共享连接池,并创建对原连接池的借用
    // 当 verticle 被取消部署时,借用会被自动释放
    pool = PgPool.pool(database, new PoolOptions()
      .setMaxSize(maxSize)
      .setShared(true)
      .setName("my-pool"));
  }
}, new DeploymentOptions().setInstances(4));

第一次创建可共享的连接池时,会创建新连接池所需的资源。之后再调用该创建方法时,会复用之前的连接池,并创建 对原有连接池的借用。当所有的借用都被关闭时,该连接池的资源也会被释放。

默认情况下,客户端需要创建一个 TCP 连接时,会复用当前的 event-loop 。 这个可共享的 HTTP 客户端会 以一种安全的模式,在使用它的 verticle 中随机选中一个 verticle,并使用它的 event-loop。

您可以手动设置一个客户端可以使用的 event-loop 的数量

PgPool pool = PgPool.pool(database, new PoolOptions()
  .setMaxSize(maxSize)
  .setShared(true)
  .setName("my-pool")
  .setEventLoopSize(4));

Unix domain sockets

某些情况下您希望通过Unix domain socket类型的连接来提升性能,我们通过Vert.x本机传输支持了这种方式。

首先确保您已经在classpath下添加了 netty-transport-native 这个必须的依赖,同时开启了Unix domain socket功能(pg)选项。

PgConnectOptions connectOptions = new PgConnectOptions()
  .setHost("/var/run/postgresql")
  .setPort(5432)
  .setDatabase("the-db");

// Pool options
PoolOptions poolOptions = new PoolOptions()
  .setMaxSize(5);

// Create the pooled client
PgPool client = PgPool.pool(connectOptions, poolOptions);

// Create the pooled client with a vertx instance
// Make sure the vertx instance has enabled native transports
PgPool client2 = PgPool.pool(vertx, connectOptions, poolOptions);

更多详情可以在这里找到 Vert.x 文档

重连

您可以配置客户端在建立连接失败的时候的重试策略

options
  .setReconnectAttempts(2)
  .setReconnectInterval(1000);

配置

有如下几种配置客户端的可选方案。

data object

通过指定 PgConnectOptions 数据对象是一种简单的客户端的配置方式。

PgConnectOptions connectOptions = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret");

// Pool Options
PoolOptions poolOptions = new PoolOptions().setMaxSize(5);

// Create the pool from the data object
PgPool pool = PgPool.pool(vertx, connectOptions, poolOptions);

pool.getConnection(ar -> {
  // Handling your connection
});

您也可以使用 setPropertiesaddProperty 方法配置通用属性。注意 setProperties 将覆盖默认的客户端属性。

例如,您可以通过添加 search_path 属性来配置一个默认的 schema。

PgConnectOptions connectOptions = new PgConnectOptions();

// Set the default schema
Map<String, String> props = new HashMap<>();
props.put("search_path", "myschema");
connectOptions.setProperties(props);

关于可用属性的更多信息可以在这里找到 PostgreSQL Manuals

连接uri

除了使用 PgConnectionOptions 对象,我们也提供了另一种基于URI的可选配置方案:

String connectionUri = "postgresql://dbuser:secretpassword@database.server.com:3211/mydb";

// Create the pool from the connection URI
PgPool pool = PgPool.pool(connectionUri);

// Create the connection from the connection URI
PgConnection.connect(vertx, connectionUri, res -> {
  // Handling your connection
});

关于连接uri字符串格式的更多信息可以在这里找到 PostgreSQL 手册

当前版本的客户端支持在连接uri里使用如下参数

  • host

  • hostaddr

  • port

  • user

  • password

  • dbname

  • sslmode

  • properties including(application_name, fallback_application_name, search_path)

注意
通过URI配置的属性将会覆盖默认的配置属性。

环境变量

您也可以使用环境变量来设置连接的属性值,以此来避免硬编码数据库连接信息。 您可以参考 官方文档来了解更多详情。 目前支持下列这些配置参数:

  • PGHOST

  • PGHOSTADDR

  • PGPORT

  • PGDATABASE

  • PGUSER

  • PGPASSWORD

  • PGSSLMODE

如果您没有在连接时指定连接对象或者URI字符串,此时将会使用环境变量。

$ PGUSER=user \
 PGHOST=the-host \
 PGPASSWORD=secret \
 PGDATABASE=the-db \
 PGPORT=5432 \
 PGSSLMODE=DISABLE
PgPool pool = PgPool.pool();

// Create the connection from the environment variables
PgConnection.connect(vertx, res -> {
  // Handling your connection
});

SASL SCRAM-SHA-256 鉴权机制。

为了使用 sasl SCRAM-SHA-256鉴权,需要在 dependencies 里添加如下依赖:

  • Maven(在您的 pom.xml 文件里):

<dependency>
 <groupId>com.ongres.scram</groupId>
 <artifactId>client</artifactId>
 <version>2.1</version>
</dependency>
  • Gradle(在您的 build.gradle 文件里):

dependencies {
 compile 'com.ongres.scram:client:2.1'
}
注意
SCRAM-SHA-256-PLUS(在Postgresql 11中加入)当前版本客户端暂不支持。

执行查询

当您不需要事务或者只是执行一个单次查询操作,您可以直接在连接池里执行查询; 连接池会使用某一条连接执行并给您返回结果。 下边是如何执行一个简单的查询的例子:

client
  .query("SELECT * FROM users WHERE id='julien'")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> result = ar.result();
    System.out.println("Got " + result.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

预查询

执行预查询也是一样的操作。

SQL字符通过位置引用实际的参数,并使用数据库的语法 `$1`, `$2`, etc…​

client
  .preparedQuery("SELECT * FROM users WHERE id=$1")
  .execute(Tuple.of("julien"), ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println("Got " + rows.size() + " rows ");
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

查询相关的方法为 SELECT 类型的操作提供了异步的 RowSet 实例

client
  .preparedQuery("SELECT first_name, last_name FROM users")
  .execute(ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("User " + row.getString(0) + " " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

或者 UPDATE/INSERT 类型的查询:

client
  .preparedQuery("INSERT INTO users (first_name, last_name) VALUES ($1, $2)")
  .execute(Tuple.of("Julien", "Viet"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

Row对象(Row)可以让您通过索引位置获取相应的数据

System.out.println("User " + row.getString(0) + " " + row.getString(1));

或者通过名称

System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

客户端在此处没有做特殊处理,无论您的SQL文本时什么,列名都将使用数据库表中的名称标识。

您也可以直接访问得到多种类型

String firstName = row.getString("first_name");
Boolean male = row.getBoolean("male");
Integer age = row.getInteger("age");

您可以使用缓存过的预处理语句去执行一次性的预查询:

connectOptions.setCachePreparedStatements(true);
client
  .preparedQuery("SELECT * FROM users WHERE id = $1")
  .execute(Tuple.of("julien"), ar -> {
    if (ar.succeeded()) {
      RowSet<Row> rows = ar.result();
      System.out.println("Got " + rows.size() + " rows ");
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

您也可以创建 PreparedStatement 并自主地管理它的生命周期。

sqlConnection
  .prepare("SELECT * FROM users WHERE id = $1", ar -> {
    if (ar.succeeded()) {
      PreparedStatement preparedStatement = ar.result();
      preparedStatement.query()
        .execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();
            System.out.println("Got " + rows.size() + " rows ");
            preparedStatement.close();
          } else {
            System.out.println("Failure: " + ar2.cause().getMessage());
          }
        });
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

批处理

您可以在预查询中执行批处理操作

List<Tuple> batch = new ArrayList<>();
batch.add(Tuple.of("julien", "Julien Viet"));
batch.add(Tuple.of("emad", "Emad Alblueshi"));

// Execute the prepared batch
client
  .preparedQuery("INSERT INTO USERS (id, name) VALUES ($1, $2)")
  .executeBatch(batch, res -> {
  if (res.succeeded()) {

    // Process rows
    RowSet<Row> rows = res.result();
  } else {
    System.out.println("Batch failed " + res.cause());
  }
});

Returning 子句

您可以使用 'RETURNING' 从查询里拿到生成的key:

client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1), ($2), ($3) RETURNING color_id")
  .execute(Tuple.of("white", "red", "blue"))
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("generated key: " + row.getInteger("color_id"));
    }
});

只要 SQL 语句中存在 RETURNING 子句,就可以生效:

client
  .query("DELETE FROM color RETURNING color_name")
  .execute()
  .onSuccess(rows -> {
    for (Row row : rows) {
      System.out.println("deleted color: " + row.getString("color_name"));
    }
  });

带有 RETURNING 语句的批量查询创建了一个 RowSet , 这个RowSet包含了该批量查询中的每一个元素。

client
  .preparedQuery("INSERT INTO color (color_name) VALUES ($1) RETURNING color_id")
  .executeBatch(Arrays.asList(Tuple.of("white"), Tuple.of("red"), Tuple.of("blue")))
  .onSuccess(res -> {
    for (RowSet<Row> rows = res;rows.next() != null;rows = rows.next()) {
      Integer colorId = rows.iterator().next().getInteger("color_id");
      System.out.println("generated key: " + colorId);
    }
  });

使用连接

获取一条连接

当您要执行查询(无事务)操作时,您可以创建一条或者从连接池里拿到一条连接。 请注意在从拿到连接到将连接释放回连接池这之间的连接状态,服务端可能由于某些原因比如空闲时间超时,而关闭这条连接。

pool
  .getConnection()
  .compose(connection ->
    connection
      .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
      .executeBatch(Arrays.asList(
        Tuple.of("Julien", "Viet"),
        Tuple.of("Emad", "Alblueshi")
      ))
      .compose(res -> connection
        // Do something with rows
        .query("SELECT COUNT(*) FROM Users")
        .execute()
        .map(rows -> rows.iterator().next().getInteger(0)))
      // Return the connection to the pool
      .eventually(v -> connection.close())
  ).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

也可以通过连接对象创建预查询:

connection
  .prepare("SELECT * FROM users WHERE first_name LIKE $1")
  .compose(pq ->
    pq.query()
      .execute(Tuple.of("Julien"))
      .eventually(v -> pq.close())
  ).onSuccess(rows -> {
  // All rows
});

简化的连接API

当您使用连接池时,您可以调用 withConnection 并以当前连接要执行的操作作为参数。

这样会从连接池里拿到一条连接,并使用当前连接执行目标操作。

这种方式需要返回一个future对象来表示操作结果。

当这个future操作完成后,当前连接会被释放会连接池同时您也可能拿到最终的执行结果。

pool.withConnection(connection ->
  connection
    .preparedQuery("INSERT INTO Users (first_name,last_name) VALUES ($1, $2)")
    .executeBatch(Arrays.asList(
      Tuple.of("Julien", "Viet"),
      Tuple.of("Emad", "Alblueshi")
    ))
    .compose(res -> connection
      // Do something with rows
      .query("SELECT COUNT(*) FROM Users")
      .execute()
      .map(rows -> rows.iterator().next().getInteger(0)))
).onSuccess(count -> {
  System.out.println("Insert users, now the number of users is " + count);
});

使用事务

连接中使用事务操作

您可以使用SQL语法 BEGIN/COMMIT/ROLLBACK 来执行事务操作,同时您必须使用 SqlConnection 并自己管理当前连接。

或者您也可以使用 SqlConnection 的事务API:

pool.getConnection()
  // Transaction must use a connection
  .onSuccess(conn -> {
  // Begin the transaction
  conn.begin()
    .compose(tx -> conn
      // Various statements
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
      .execute()
      .compose(res2 -> conn
        .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
        .execute())
      // Commit the transaction
      .compose(res3 -> tx.commit()))
    // Return the connection to the pool
    .eventually(v -> conn.close())
    .onSuccess(v -> System.out.println("Transaction succeeded"))
    .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));
});

当数据库服务端返回当前事务已失败(比如常见的 current transaction is aborted, commands ignored until end of transaction block) ,事务已回滚和 completion 方法的返回值future返回了 TransactionRollbackException 异常时:

tx.completion()
  .onFailure(err -> {
  System.out.println("Transaction failed => rolled back");
});

简化版事务API

当您使用连接池时,您可以调用 withTransaction 方法 并传递待执行的事务操作作为参数。

这将会从连接池里拿到一条连接,开启事务并调用待执行操作,配合客户端一起执行该事务范围内 的所有操作。

待执行操作需要返回一个future来表示可能产生的结果:

  • 当future成功时,客户端提交该事务

  • 当future失败时,客户端回滚该事务

事务操作完成后,连接会被释放回连接池,并且可以获取到最终的操作结果。

pool.withTransaction(client -> client
  .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
  .execute()
  .flatMap(res -> client
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
    .execute()
    // Map to a message result
    .map("Users inserted")))
  .onSuccess(v -> System.out.println("Transaction succeeded"))
  .onFailure(err -> System.out.println("Transaction failed: " + err.getMessage()));

游标和流式操作

默认情况下预查询操作会拉去所有的行记录,您可以使用 游标 来控制您想要读取的行数:

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Cursors require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of("julien"));

        // Read 50 rows
        cursor.read(50, ar2 -> {
          if (ar2.succeeded()) {
            RowSet<Row> rows = ar2.result();

            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - commit the transaction
              tx.commit();
            }
          }
        });
      }
    });
  }
});

游标释放时需要同时执行关闭操作:

cursor.read(50, ar2 -> {
  if (ar2.succeeded()) {
    // Close the cursor
    cursor.close();
  }
});

stream API也可以用于游标,尤其是在Rx版的客户端,可能更为方便。

connection.prepare("SELECT * FROM users WHERE first_name LIKE $1", ar0 -> {
  if (ar0.succeeded()) {
    PreparedStatement pq = ar0.result();

    // Streams require to run within a transaction
    connection.begin(ar1 -> {
      if (ar1.succeeded()) {
        Transaction tx = ar1.result();

        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of("julien"));

        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        });
        stream.endHandler(v -> {
          // Close the stream to release the resources in the database
          stream.close(closed -> {
            tx.commit(committed -> {
              System.out.println("End of stream");
            });
          });
        });
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));
        });
      }
    });
  }
});

上边的stream会批量读取 50 行并同时将其转换为流,当这些行记录被传递给处理器时, 会以此类推地读取下一批的 50 行记录。

stream支持重启或暂停,已经加载到的行记录将会被保留在内存里直到被传递给处理器,此时 游标也将终止遍历。

注意
PostreSQL会在事务结束后销毁游标,因而游标API应该在事务内使用, 否则您将可能收到 34000 PostgreSQL错误码。

跟踪查询

当Vert.x启用tracing功能时,SQL客户端可以跟踪查询的执行情况。

客户端会上报下列这些 client spans:

  • Query 操作名称

  • tags

  • db.user :数据库用户名

  • db.instance :数据库实例

  • db.statement :SQL语句

  • db.typesql

默认的 tracing 策略时 PROPAGATE,客户端 在一个活跃trace里只创建一个span。

您可以通过 setTracingPolicy 方法来调整tracing策略, 例如您可以设置为 ALWAYS, 客户端将始终上报span:

options.setTracingPolicy(TracingPolicy.ALWAYS);

PostgreSQL 类型映射

当前版本客户端支持下列的PostgreSQL类型

  • BOOLEAN (java.lang.Boolean)

  • INT2 (java.lang.Short)

  • INT4 (java.lang.Integer)

  • INT8 (java.lang.Long)

  • FLOAT4 (java.lang.Float)

  • FLOAT8 (java.lang.Double)

  • CHAR (java.lang.String)

  • VARCHAR (java.lang.String)

  • TEXT (java.lang.String)

  • ENUM (java.lang.String)

  • NAME (java.lang.String)

  • SERIAL2 (java.lang.Short)

  • SERIAL4 (java.lang.Integer)

  • SERIAL8 (java.lang.Long)

  • NUMERIC (io.vertx.sqlclient.data.Numeric)

  • UUID (java.util.UUID)

  • DATE (java.time.LocalDate)

  • TIME (java.time.LocalTime)

  • TIMETZ (java.time.OffsetTime)

  • TIMESTAMP (java.time.LocalDateTime)

  • TIMESTAMPTZ (java.time.OffsetDateTime)

  • INTERVAL (io.vertx.pgclient.data.Interval)

  • BYTEA (io.vertx.core.buffer.Buffer)

  • JSON (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • JSONB (io.vertx.core.json.JsonObject, io.vertx.core.json.JsonArray, Number, Boolean, String, io.vertx.sqlclient.Tuple#JSON_NULL)

  • POINT (io.vertx.pgclient.data.Point)

  • LINE (io.vertx.pgclient.data.Line)

  • LSEG (io.vertx.pgclient.data.LineSegment)

  • BOX (io.vertx.pgclient.data.Box)

  • PATH (io.vertx.pgclient.data.Path)

  • POLYGON (io.vertx.pgclient.data.Polygon)

  • CIRCLE (io.vertx.pgclient.data.Circle)

  • TSVECTOR (java.lang.String)

  • TSQUERY (java.lang.String)

  • INET (io.vertx.pgclient.data.Inet)

  • MONEY (io.vertx.pgclient.data.Money)

Tuple(元组)在解码时使用上述类型映射关系存储解码出的值,并且在合法的条件下可以动态转换为实际类型的值:

pool
  .query("SELECT 1::BIGINT \"VAL\"")
  .execute(ar -> {
  RowSet<Row> rowSet = ar.result();
  Row row = rowSet.iterator().next();

  // Stored as java.lang.Long
  Object value = row.getValue(0);

  // Convert to java.lang.Integer
  Integer intValue = row.getInteger(0);
});

Tuple(元组)编码时使用上述关系作为类型间的映射关系,除非类型为数字,在这种情况下将会使用 java.lang.Number

pool
  .query("SELECT 1::BIGINT \"VAL\"")
  .execute(ar -> {
  RowSet<Row> rowSet = ar.result();
  Row row = rowSet.iterator().next();

  // Stored as java.lang.Long
  Object value = row.getValue(0);

  // Convert to java.lang.Integer
  Integer intValue = row.getInteger(0);
});

上述类型的数组形式也是支持的。

JSON

PostgreSQL的 JSONJSONB 用下列的java类型表示:

  • String

  • Number

  • Boolean

  • io.vertx.core.json.JsonObject

  • io.vertx.core.json.JsonArray

  • io.vertx.sqlclient.Tuple#JSON_NULL for representing the JSON null literal

Tuple tuple = Tuple.of(
  Tuple.JSON_NULL,
  new JsonObject().put("foo", "bar"),
  3);

// Retrieving json
Object value = tuple.getValue(0); // Expect JSON_NULL

//
value = tuple.get(JsonObject.class, 1); // Expect JSON object

//
value = tuple.get(Integer.class, 2); // Expect 3
value = tuple.getInteger(2); // Expect 3

数字类型(Numeric)

java的 Numeric 用来表示PostgreSQL的 NUMERIC 类型。

Numeric numeric = row.get(Numeric.class, 0);
if (numeric.isNaN()) {
  // Handle NaN
} else {
  BigDecimal value = numeric.bigDecimalValue();
}

数组

数组可以用在 TupleRow

Tuple tuple = Tuple.of(new String[]{ "a", "tuple", "with", "arrays" });

// Add a string array to the tuple
tuple.addArrayOfString(new String[]{"another", "array"});

// Get the first array of string
String[] array = tuple.getArrayOfStrings(0);

日期/时间类型(Date/Time)的最值

PostgreSQL定义了几个特殊的值用来表示这些最值。

相应类型的最大/最小值由这些常量 特殊值 表示。

  • OffsetDateTime.MAX/OffsetDateTime.MIN

  • LocalDateTime.MAX/LocalDateTime.MIN

  • LocalDate.MAX/LocalDate.MIN

client
  .query("SELECT 'infinity'::DATE \"LocalDate\"")
  .execute(ar -> {
    if (ar.succeeded()) {
      Row row = ar.result().iterator().next();
      System.out.println(row.getLocalDate("LocalDate").equals(LocalDate.MAX));
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

自定义类型

发送和从Postgres接收的自定义类型都由字符串来表示。

您可以读取PostgreSQL并以字符串的形式得到自定义的类型值

client
  .preparedQuery("SELECT address, (address).city FROM address_book WHERE id=$1")
  .execute(Tuple.of(3),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Full Address " + row.getString(0) + ", City " + row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

您也可以向PostgreSQL写入字符串

client
  .preparedQuery("INSERT INTO address_book (id, address) VALUES ($1, $2)")
  .execute(Tuple.of(3, "('Anytown', 'Second Ave', false)"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    System.out.println(rows.rowCount());
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

文本检索使用的是java的 String

client
  .preparedQuery("SELECT to_tsvector( $1 ) @@ to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Match : " + row.getBoolean(0));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

tsvectortsquery 可以使用java的 String 类型来从数据库中获取

client
  .preparedQuery("SELECT to_tsvector( $1 ), to_tsquery( $2 )")
  .execute(Tuple.of("fat cats ate fat rats", "fat & rat"),  ar -> {
  if (ar.succeeded()) {
    RowSet<Row> rows = ar.result();
    for (Row row : rows) {
      System.out.println("Vector : " + row.getString(0) + ", query : "+row.getString(1));
    }
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

枚举类型

PostgreSQL的 枚举类型 被映射为java的字符串。

client
  .preparedQuery("INSERT INTO colors VALUES ($2)")
  .execute(Tuple.of("red"),  res -> {
    // ...
  });

使用Java枚举类型

您可以将Java的 枚举类型 映射为下列这些列类型:

  • Strings (VARCHAR, TEXT)

  • PosgreSQL enumerated types

  • Numbers (INT2, INT4, INT8)

client
  .preparedQuery("INSERT INTO colors VALUES ($1)")
  .execute(Tuple.of(Color.red))
  .flatMap(res ->
    client
      .preparedQuery("SELECT color FROM colors")
      .execute()
  ).onComplete(res -> {
    if (res.succeeded()) {
      RowSet<Row> rows = res.result();
      for (Row row : rows) {
        System.out.println(row.get(Color.class, "color"));
      }
    }
});

String and PostgreSQL enumerated types 对应Java枚举类的 name() 方法的返回值。

Numbers类型对应Java枚举类的 ordinal() 方法的返回值。

Collector式查询

您可以将Java collector与查询API结合使用:

Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
  row -> row.getLong("id"),
  row -> row.getString("last_name"));

// Run the query with the collector
client.query("SELECT * FROM users")
  .collecting(collector)
  .execute(ar -> {
  if (ar.succeeded()) {
    SqlResult<Map<Long, String>> result = ar.result();

    // Get the map created by the collector
    Map<Long, String> map = result.value();
    System.out.println("Got " + map);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

collector 式查询的结果集处理过程中不能再拿到 Row 的引用,因为 pg 客户端在处理 collector 时,只会用一个 row 处理整个集合。

Java的 Collectors 类提供了很多很有趣的预定义的 collector,比如您可以很容易 从 row 集合里得到一个字符串:

Collector<Row, ?, String> collector = Collectors.mapping(
  row -> row.getString("last_name"),
  Collectors.joining(",", "(", ")")
);

// Run the query with the collector
client.query("SELECT * FROM users").collecting(collector).execute(ar -> {
    if (ar.succeeded()) {
      SqlResult<String> result = ar.result();

      // Get the string created by the collector
      String list = result.value();
      System.out.println("Got " + list);
    } else {
      System.out.println("Failure: " + ar.cause().getMessage());
    }
  });

发布/订阅

PostgreSQL支持发布/订阅式的通信方式。

您可以设置一个 notificationHandler 用于 接收PostgreSQL的通知消息:

connection.notificationHandler(notification -> {
  System.out.println("Received " + notification.getPayload() + " on channel " + notification.getChannel());
});

connection
  .query("LISTEN some-channel")
  .execute(ar -> {
  System.out.println("Subscribed to channel");
});

PgSubscriber (PgSubscriber) 是一种用作 处理单条连接上的订阅的通道(channel)管理器:

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

// You can set the channel before connect
subscriber.channel("channel1").handler(payload -> {
  System.out.println("Received " + payload);
});

subscriber.connect(ar -> {
  if (ar.succeeded()) {

    // Or you can set the channel after connect
    subscriber.channel("channel2").handler(payload -> {
      System.out.println("Received " + payload);
    });
  }
});

channel(通道)方法的参数即通道名称(接收端)需要和PostgreSQL发送通知时的通道名称保持一致。 注意这里和SQL中的通道名称的形式不同,在 PgSubscriber 内部会把待提交的通道名称预处理为带引号的形式:

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

subscriber.connect(ar -> {
    if (ar.succeeded()) {
      // Complex channel name - name in PostgreSQL requires a quoted ID
      subscriber.channel("Complex.Channel.Name").handler(payload -> {
        System.out.println("Received " + payload);
      });
      subscriber.channel("Complex.Channel.Name").subscribeHandler(subscribed -> {
        subscriber.actualConnection()
          .query("NOTIFY \"Complex.Channel.Name\", 'msg'")
          .execute(notified -> {
            System.out.println("Notified \"Complex.Channel.Name\"");
          });
      });

      // PostgreSQL simple ID's are forced lower-case
      subscriber.channel("simple_channel").handler(payload -> {
          System.out.println("Received " + payload);
      });
      subscriber.channel("simple_channel").subscribeHandler(subscribed -> {
        // The following simple channel identifier is forced to lower case
        subscriber.actualConnection()
          .query("NOTIFY Simple_CHANNEL, 'msg'")
          .execute(notified -> {
            System.out.println("Notified simple_channel");
          });
      });

      // The following channel name is longer than the current
      // (NAMEDATALEN = 64) - 1 == 63 character limit and will be truncated
      subscriber.channel("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaabbbbb")
        .handler(payload -> {
        System.out.println("Received " + payload);
      });
    }
  });

您可以自定义一个方法来实现重连,该方法的参数为 retries (重试次数), 返回值为 amountOfTime(重试间隔):

  • amountOfTime < 0 : 不重试,并关闭订阅

  • amountOfTime = 0 : 立即重试

  • amountOfTime > 0 : 在 amountOfTime 毫秒之后发起重试

PgSubscriber subscriber = PgSubscriber.subscriber(vertx, new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
);

// Reconnect at most 10 times after 100 ms each
subscriber.reconnectPolicy(retries -> {
  if (retries < 10) {
    return 100L;
  } else {
    return -1L;
  }
});

默认的策略是不重连。

取消请求

PostgreSQL 支持取消正在运行的请求. 您可以使用 cancelRequest 正在运行的请求. 取消请求的操作在执行时将和PostgreSQL服务端新建一条连接、执行取消请求、关闭这条连接。

connection
  .query("SELECT pg_sleep(20)")
  .execute(ar -> {
  if (ar.succeeded()) {
    // imagine this is a long query and is still running
    System.out.println("Query success");
  } else {
    // the server will abort the current query after cancelling request
    System.out.println("Failed to query due to " + ar.cause().getMessage());
  }
});
connection.cancelRequest(ar -> {
  if (ar.succeeded()) {
    System.out.println("Cancelling request has been sent");
  } else {
    System.out.println("Failed to send cancelling request");
  }
});

取消请求可能不会起作用——如果请求到达时,服务端已经处理完了当前查询请求,此时取消操作不会起作用。反之,取消请求得以执行,目标命令执行提前终止并返回一条错误消息。

更多详细信息可以在这里找到 official documentation

使用 SSL/TLS

为客户端连接添加SSL的操作,您可以参考Vert.x的 NetClientPgConnectOptions 配置操作。 当前版本客户端支持全部的PostgreSql SSL模式配置,您可以通过 sslmode 配置它们。客户端默认不启用SSL模式。 ssl 参数仅作为一种设置 sslmode 的快捷方式。 setSsl(true) 等价于 setSslMode(VERIFY_CA)setSsl(false) 等价于 setSslMode(DISABLE)

PgConnectOptions options = new PgConnectOptions()
  .setPort(5432)
  .setHost("the-host")
  .setDatabase("the-db")
  .setUser("user")
  .setPassword("secret")
  .setSslMode(SslMode.VERIFY_CA)
  .setPemTrustOptions(new PemTrustOptions().addCertPath("/path/to/cert.pem"));

PgConnection.connect(vertx, options, res -> {
  if (res.succeeded()) {
    // Connected with SSL
  } else {
    System.out.println("Could not connect " + res.cause());
  }
});

更多详细信息可以在这里找到 Vert.x documentation

使用代理

您可以配置客户端使用HTTP/1.x 连接,SOCKS4a 或 SOCKS5 代理。

更多信息可以在这里找到 Vert.x documentation

高级连接池配置

数据库服务负载均衡

您可以使用包含多个数据库服务的列表来配置连接池而不是单个数据库服务。

PgPool pool = PgPool.pool(Arrays.asList(server1, server2, server3), options);

当一个连接创建时,连接池使用(round-robin)轮询调度算法做负载均衡以选择不同的数据库服务

注意
负载均衡是在创建连接时提供的,而不是在从连接池中获取连接时提供

连接初始化

您可以使用 connectHandler 方法在连接创建后和连接释放回连接池之前来与数据库连接交互

pool.connectHandler(conn -> {
  conn.query(sql).execute().onSuccess(res -> {
    //  将连接释放回连接池,以被该应用程序复用
    conn.close();
  });
});

连接完成后,您应该释放该连接以通知连接池该数据库连接可以被使用

RxJava 3 API

客户端提供了原生 API 的响应式版本 以下示例使用RxJava3

简单的查询示例

Single<RowSet<Row>> single = pool.query("SELECT * FROM users WHERE id='julien'").rxExecute();

// Execute the query
single.subscribe(result -> {
  System.out.println("Got " + result.size() + " rows ");
}, err -> {
  System.out.println("Failure: " + err.getMessage());
});

连接

简化版的连接API可以让您很容易地使用connection对象,withConnection 方法会从连接池里获取到一条连接:

Maybe<RowSet<Row>> maybe = pool.withConnection(conn ->
  conn
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("SELECT * FROM Users")
      .rxExecute())
    .toMaybe());

maybe.subscribe(rows -> {
  // Success
}, err -> {
  // Failed
});

事务

简化版的事务API可以让您很容易地编写异步事务处理流,withTransaction 方法会为您启动和提交事务:

Completable completable = pool.withTransaction(conn ->
  conn
    .query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')")
    .rxExecute()
    .flatMap(result -> conn
      .query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')")
      .rxExecute())
    .toMaybe())
  .ignoreElement();

completable.subscribe(() -> {
  // Transaction succeeded
}, err -> {
  // Transaction failed
});

流式操作

RxJava 支持 ObservableFlowable 类型,这些类型可以从 PreparedQuery 产生的 RowStream 里获取到:

Observable<Row> observable = pool.rxGetConnection().flatMapObservable(conn -> conn
  .rxBegin()
  .flatMapObservable(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapObservable(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toObservable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
observable.subscribe(row -> {
  System.out.println("User: " + row.getString("last_name"));
}, err -> {
  System.out.println("Error: " + err.getMessage());
}, () -> {
  System.out.println("End of stream");
});

下边是使用 Flowable 的例子:

Flowable<Row> flowable = pool.rxGetConnection().flatMapPublisher(conn -> conn
  .rxBegin()
  .flatMapPublisher(tx ->
    conn
      .rxPrepare("SELECT * FROM users WHERE first_name LIKE $1")
      .flatMapPublisher(preparedQuery -> {
        // Fetch 50 rows at a time
        RowStream<Row> stream = preparedQuery.createStream(50, Tuple.of("julien"));
        return stream.toFlowable();
      })
      .doAfterTerminate(tx::commit)));

// Then subscribe
flowable.subscribe(new Subscriber<Row>() {

  private Subscription sub;

  @Override
  public void onSubscribe(Subscription subscription) {
    sub = subscription;
    subscription.request(1);
  }

  @Override
  public void onNext(Row row) {
    sub.request(1);
    System.out.println("User: " + row.getString("last_name"));
  }

  @Override
  public void onError(Throwable err) {
    System.out.println("Error: " + err.getMessage());
  }

  @Override
  public void onComplete() {
    System.out.println("End of stream");
  }
});