Vert.x Cassandra 客户端

Vert.x 客户端可以访问 Apache Cassandra 服务。

开始

要使用本模块,请在Maven的pom文件中添加如下 依赖

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-cassandra-client</artifactId>
 <version>4.4.0</version>
</dependency>

或者,如果您使用gradle:

compile 'io.vertx:vertx-cassandra-client:4.4.0'

创建客户端

客户端选项

Cassandra是一个分布式的系统,它可以有很多节点。 要连接Cassandra,在创建 CassandraClientOptions 对象时, 您需要指定集群当中的一些结点的地址

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);

默认情况下,Vert.x Cassandra客户端连接的是本地机器的 9042 端口,并且不绑定任何 keyspace。但是您可以用以下选项来同时设置 ContactPoint 和 keyspace, 也可以设置二者之一:

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("localhost", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
提示
CassandraClientOptions 提供了一个 com.datastax.driver.core.Cluster.Builder 对象用来达到调优的目的。

共享客户端

如果您部署了多个Verticle实例,或者有多个同时和同一数据库交互的Verticle,我们建议创建一个共享的客户端:

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.address", 9142)
  .addContactPoint("node2.address", 9142)
  .addContactPoint("node3.address", 9142)
  .setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);

相同名称的共享客户端在底层使用的是同一个 com.datastax.driver.core.Session

客户端生命周期

客户端创建之后,直到执行第一个查询前,该客户端不会被连接。

提示
如果相同名称的客户端已经存在并已经执行了一次查询,那么新的共享客户端在被创建之后则可以连接。

在verticle内创建的客户端会在verticle被取消部署的时候停止。 换句话说,您不需要在vertical的 stop 方法中调用 close

在其他情况下,您必须手动关闭客户端。

注意
当一个共享客户端被关闭,如果存在同名客户端仍旧在运行,那么数据库的会话不会被关闭。

使用API

客户端API由 CassandraClient 提供。

查询

您有3种不同的方式来获取查询结果。

Streaming

当您需要以迭代的方式来处理查询结果(例如,您想处理结果集中的每一个元素),那么 Streaming API 是再合适不过了。 特别是处理大量数据记录时,这样是非常高效的。

为了给您一些使用这些 API 的灵感和思路,我们建议您参考如下示例:

cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'", queryStream -> {
  if (queryStream.succeeded()) {
    CassandraRowStream stream = queryStream.result();

    // 当队列准备好接收buffer的时候恢复stream
    response.drainHandler(v -> stream.resume());

    stream.handler(row -> {
      String value = row.getString("my_string_col");
      response.write(value);

      // 当buffer队列满时,暂停stream
      if (response.writeQueueFull()) {
        stream.pause();
      }
    });

    // 在stream末尾结束请求。
    stream.endHandler(end -> response.end());

  } else {
    queryStream.cause().printStackTrace();
    // 如果无法执行该查询,则响应服务器内部错误。
    response
      .setStatusCode(500)
      .end("Unable to execute the query");
  }
});

在这个示例当中,我们执行查询,并通过HTTP来流式地处理查询结果。

获取 Bulk

这个API应该在您需要同时处理所有结果行的时候来使用。

cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", executeWithFullFetch -> {
  if (executeWithFullFetch.succeeded()) {
    List<Row> rows = executeWithFullFetch.result();
    for (Row row : rows) {
      // 在此处处理每一行
    }
  } else {
    System.out.println("Unable to execute the query");
    executeWithFullFetch.cause().printStackTrace();
  }
});
小心
只能在内存足以容纳整个数据块时获取 bulk 。

Collector 查询

您可以结合查询API来使用java Collector:

cassandraClient.execute("SELECT * FROM users", listCollector, ar -> {
  if (ar.succeeded()) {
    // 获取collector创建的字符串。
    String list = ar.result();
    System.out.println("Got " + list);
  } else {
    System.out.println("Failure: " + ar.cause().getMessage());
  }
});

低级别获取

相比于 stream 和bulk fetch,这个 API 更加底层,并对负载提供了更强大控制。

cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", execute -> {
  if (execute.succeeded()) {
    ResultSet resultSet = execute.result();

    if (resultSet.remaining() != 0) {
      Row row = resultSet.one();
      System.out.println("One row successfully fetched");
    } else if (!resultSet.hasMorePages()) {
      System.out.println("No pages to fetch");
    } else {
      resultSet.fetchNextPage().onComplete(fetchMoreResults -> {
        if (fetchMoreResults.succeeded()) {
          int availableWithoutFetching = resultSet.remaining();
          System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
        } else {
          System.out.println("Unable to fetch more results");
          fetchMoreResults.cause().printStackTrace();
        }
      });
    }
  } else {
    System.out.println("Unable to execute the query");
    execute.cause().printStackTrace();
  }
});

Prepared queries

为了安全和高效,对于将被多次使用的查询来讲,使用prepared statement是一个比较好的做法。

您可以预备一个查询:

cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ", preparedStatementResult -> {
  if (preparedStatementResult.succeeded()) {
    System.out.println("The query has successfully been prepared");
    PreparedStatement preparedStatement = preparedStatementResult.result();
    // 现在您可以用这个 PreparedStatement 来执行下一次查询。
  } else {
    System.out.println("Unable to prepare the query");
    preparedStatementResult.cause().printStackTrace();
  }
});

然后,在接下来所有的查询中使用 PreparedStatement

cassandraClient.execute(preparedStatement.bind("my_value"), done -> {
  ResultSet results = done.result();
  // 处理查询结果
});

// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"), done -> {
  List<Row> results = done.result();
  // 处理查询结果
});

// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"), done -> {
  CassandraRowStream results = done.result();
  // 处理查询结果
});

批处理

考虑到您可能一次执行多个查询,您可以用 BatchStatement 达到批处理效果:

BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED)
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Pavel')"))
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Thomas')"))
  .add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Julien')"));

cassandraClient.execute(batchStatement, result -> {
  if (result.succeeded()) {
    System.out.println("The given batch executed successfully");
  } else {
    System.out.println("Unable to execute the batch");
    result.cause().printStackTrace();
  }
});

追踪查询

当 Vert.x 开启了追踪(tracing)时,Cassandra 客户端也可以追踪执行的查询。

Cassandra 客户端会上报以下的 客户端 跨度(span):

  • Query 操作名

  • 标签

    • peer.address: 连接 Cassandra 的驱动已知的节点列表,格式类似 [127_0_0_1:9042,localhost:9042,myhost_mydomain:9042]

    • span.kind: client

    • db.instance: keyspace 的名称

    • db.statement: CQL 的查询语句

    • db.type: cassandra

默认的追踪策略是 PROPAGATE ,客户端只会在参与活动跟踪时创建一个跨度。

您可以使用 setTracingPolicy 方法来修改客户端的追踪策略。 例如,您可以设置追踪策略为 ALWAYS ,这样 Cassandra 客户端就会一直上报跨度:

CassandraClientOptions options = new CassandraClientOptions()
  .setTracingPolicy(TracingPolicy.ALWAYS);

RxJava 3 API

Cassandra 客户端为原来的API提供了一个Rx版本。

创建Rx版客户端

想要创建Rx版 Cassandra 客户端,您需要引入 CassandraClient 类。 然后用 create 方法获取一个实例:

CassandraClientOptions options = new CassandraClientOptions()
  .addContactPoint("node1.corp.int", 7000)
  .addContactPoint("node2.corp.int", 7000)
  .addContactPoint("node3.corp.int", 7000);
CassandraClient cassandraClient = CassandraClient.createShared(vertx, options);

查询

在本节,我们会回顾之前的一些 Rx-API 用例。

Streaming

一个 CassandraRowStream 可以转换成 Flowable ,这样会方便您处理大容量数据集合:

cassandraClient.rxQueryStream("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
  // 将stream转换成Flowable
  .flatMapPublisher(CassandraRowStream::toFlowable)
  .subscribe(row -> {
    // 处理单行
  }, t -> {
    // 处理失败
  }, () -> {
    // stream 末尾
  });

获取 Bulk

当您的结果集数据量很少,您可以一次性获取所有结果;

cassandraClient.rxExecuteWithFullFetch("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
  .subscribe(rows -> {
    // 处理结果集
  }, throwable -> {
    // 处理失败
  });