Vert.x Cassandra 客户端
Vert.x 客户端可以访问 Apache Cassandra 服务。
开始
要使用本模块,请在Maven的pom文件中添加如下 依赖 :
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-cassandra-client</artifactId>
<version>4.0.3</version>
</dependency>
或者,如果您使用gradle:
compile 'io.vertx:vertx-cassandra-client:4.0.3'
警告
|
Cassandra客户端与Vert.x Dropwizard Metrics库不兼容。 他们都在使用 Dropwizard Metrics库的不同主版本并且 Datastax Java driver 将不会升级到最新版本,因为它舍弃了对java7的兼容性。 下一个driver的主版本(4.x)将会使用一个较新的 Dropwizard Metrics 版本。 |
创建客户端
客户端选项
Cassandra是一个分布式的系统,它可以有很多节点。
要连接Cassandra,在创建 CassandraClientOptions
对象时, 您需要指定集群当中的一些结点的地址
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142);
CassandraClient client = CassandraClient.create(vertx, options);
默认情况下,Vert.x Cassandra客户端连接的是本地机器的 9042
端口,并且不绑定任何 keyspace。但是您可以用以下选项来同时设置 ContactPoint 和 keyspace,
也可以设置二者之一:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("localhost", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.create(vertx, options);
提示
|
CassandraClientOptions 提供了一个 com.datastax.driver.core.Cluster.Builder 对象用来达到调优的目的。
|
共享客户端
如果您部署了多个Verticle实例,或者有多个同时和同一数据库交互的Verticle,我们建议创建一个共享的客户端:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.address", 9142)
.addContactPoint("node2.address", 9142)
.addContactPoint("node3.address", 9142)
.setKeyspace("my_keyspace");
CassandraClient client = CassandraClient.createShared(vertx, "sharedClientName", options);
相同名称的共享客户端在底层使用的是同一个 com.datastax.driver.core.Session
。
客户端生命周期
客户端创建之后,直到执行第一个查询前,该客户端不会被连接。
提示
|
如果相同名称的客户端已经存在并已经执行了一次查询,那么新的共享客户端在被创建之后则可以连接。 |
在verticle内创建的客户端会在verticle被取消部署的时候停止。
换句话说,您不需要在vertical的 stop
方法中调用 close
。
在其他情况下,您必须手动关闭客户端。
注意
|
当一个共享客户端被关闭,如果存在同名客户端仍旧在运行,那么数据库的会话不会被关闭。 |
使用API
客户端API由 CassandraClient
提供。
查询
您有3种不同的方式来获取查询结果。
Streaming
当您需要以迭代的方式来处理查询结果(例如,您想处理结果集重的每一个元素),那么Streaming API是再合适不过了。 特别是处理大量数据记录时,这样是非常高效的。
为了给您一些使用这些API的灵感和思路,我们建议您参考如下示例:
cassandraClient.queryStream("SELECT my_string_col FROM my_keyspace.my_table where my_key = 'my_value'", queryStream -> {
if (queryStream.succeeded()) {
CassandraRowStream stream = queryStream.result();
// 当队列准备好接收buffer的时候恢复stream
response.drainHandler(v -> stream.resume());
stream.handler(row -> {
String value = row.getString("my_string_col");
response.write(value);
// 当buffer队列满时,暂停stream
if (response.writeQueueFull()) {
stream.pause();
}
});
// 在stream末尾结束请求。
stream.endHandler(end -> response.end());
} else {
queryStream.cause().printStackTrace();
// 如果无法执行该查询,则响应服务器内部错误。
response
.setStatusCode(500)
.end("Unable to execute the query");
}
});
在这个示例当中,我们执行查询,并通过HTTP来流式地处理查询结果。
获取 Bulk
这个API应该在您需要同时处理所有结果行的时候来使用。
cassandraClient.executeWithFullFetch("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", executeWithFullFetch -> {
if (executeWithFullFetch.succeeded()) {
List<Row> rows = executeWithFullFetch.result();
for (Row row : rows) {
// 在此处处理每一行
}
} else {
System.out.println("Unable to execute the query");
executeWithFullFetch.cause().printStackTrace();
}
});
小心
|
只能在内存足以容纳整个数据块时获取 bulk 。 |
Collector 查询
您可以结合查询API来使用java Collector:
cassandraClient.execute("SELECT * FROM users", listCollector, ar -> {
if (ar.succeeded()) {
// 获取collector创建的字符串。
String list = ar.result();
System.out.println("Got " + list);
} else {
System.out.println("Failure: " + ar.cause().getMessage());
}
});
低级别获取
相比于stream 和bulk fetch,这个API更加底层,并对负载提供了更强大控制。
cassandraClient.execute("SELECT * FROM my_keyspace.my_table where my_key = 'my_value'", execute -> {
if (execute.succeeded()) {
ResultSet resultSet = execute.result();
if (resultSet.remaining() != 0) {
Row row = resultSet.one();
System.out.println("One row successfully fetched");
} else if (!resultSet.hasMorePages()) {
System.out.println("No pages to fetch");
} else {
resultSet.fetchNextPage().onComplete(fetchMoreResults -> {
if (fetchMoreResults.succeeded()) {
int availableWithoutFetching = resultSet.remaining();
System.out.println("Now we have " + availableWithoutFetching + " rows fetched, but not consumed!");
} else {
System.out.println("Unable to fetch more results");
fetchMoreResults.cause().printStackTrace();
}
});
}
} else {
System.out.println("Unable to execute the query");
execute.cause().printStackTrace();
}
});
Prepared queries
为了安全和高效,对于将被多次使用的查询来讲,使用prepared statement是一个比较好的做法。
您可以预备一个查询:
cassandraClient.prepare("SELECT * FROM my_keyspace.my_table where my_key = ? ", preparedStatementResult -> {
if (preparedStatementResult.succeeded()) {
System.out.println("The query has successfully been prepared");
PreparedStatement preparedStatement = preparedStatementResult.result();
// 现在您可以用这个 PreparedStatement 来执行下一次查询。
} else {
System.out.println("Unable to prepare the query");
preparedStatementResult.cause().printStackTrace();
}
});
然后,在接下来所有的查询中使用 PreparedStatement
:
cassandraClient.execute(preparedStatement.bind("my_value"), done -> {
ResultSet results = done.result();
// 处理查询结果
});
// Bulk fetching API
cassandraClient.executeWithFullFetch(preparedStatement.bind("my_value"), done -> {
List<Row> results = done.result();
// 处理查询结果
});
// Streaming API
cassandraClient.queryStream(preparedStatement.bind("my_value"), done -> {
CassandraRowStream results = done.result();
// 处理查询结果
});
批处理
考虑到您可能一次执行多个查询,您可以用 BatchStatement
达到批处理效果:
BatchStatement batchStatement = BatchStatement.newInstance(BatchType.LOGGED)
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Pavel')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Thomas')"))
.add(SimpleStatement.newInstance("INSERT INTO NAMES (name) VALUES ('Julien')"));
cassandraClient.execute(batchStatement, result -> {
if (result.succeeded()) {
System.out.println("The given batch executed successfully");
} else {
System.out.println("Unable to execute the batch");
result.cause().printStackTrace();
}
});
RxJava 2 API
Cassandra 客户端为原来的API提供了一个Rx版本。
创建Rx版客户端
想要创建Rx版 Cassandra 客户端,您需要引入 CassandraClient
类。
然后用 create
方法获取一个实例:
CassandraClientOptions options = new CassandraClientOptions()
.addContactPoint("node1.corp.int", 7000)
.addContactPoint("node2.corp.int", 7000)
.addContactPoint("node3.corp.int", 7000);
CassandraClient cassandraClient = CassandraClient.createShared(vertx, options);
查询
在本节,我们会回顾之前的一些 Rx-API 用例。
Streaming
一个 CassandraRowStream
可以转换成 Flowable
,这样会方便您处理大容量数据集合:
cassandraClient.rxQueryStream("SELECT my_key FROM my_keyspace.my_table where my_key = my_value")
// 将stream转换成Flowable
.flatMapPublisher(CassandraRowStream::toFlowable)
.subscribe(row -> {
// 处理单行
}, t -> {
// 处理失败
}, () -> {
// stream 末尾
});