Vert.x MongoDB 客户端
您的 Vert.x 应用可使用 Vert.x MongoDB Client(以下简称客户端)与 MongoDB 互动, 包括保存,获取,搜索和删除文档。MongoDB 是在 Vert.x 应用进行数据持久化时的最佳选择, 因为 MongoDB 天生就是处理 JSON(BSON)格式的文档数据库。
特点
-
完全非阻塞
-
支持自定义编解码器,从而实现 Vert.x JSON 快速序列化和反序列化
-
支持 MongoDB Java 驱动大部分配置项
本客户端基于 MongoDB ReactiveStreams Driver 。
使用 Vert.x MongoDB Client
使用此客户端,需要添加下列依赖:
-
Maven(在
pom.xml
文件中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mongo-client</artifactId>
<version>4.4.0</version>
</dependency>
-
Gradle(在
build.gradle
文件中):
compile 'io.vertx:vertx-mongo-client:4.4.0'
创建客户端
您可用以下几种方式创建客户端:
使用默认的共享连接池
大部分情况下,您希望不同的客户端之间共享一个连接池。
例如:我们通过部署多个verticle实例来扩展你的程序的时候,我们会希望每个verticle实例都共用同一个连接池, 而不是多个verticle实例使用多个连接池。
要想用最简单的方法去使用共享连接池,我们可以这么做:
MongoClient client = MongoClient.createShared(vertx, config);
只有第一次调用 MongoClient.createShared
才会真正的根据您指定的 config 配置创建连接池。
之后再调用此方法,只会返回一个新的客户端对象,但使用的是相同的数据源。因此这时 config 参数不再有作用。
指定 连接池方式数据源的名称
您还可以像下面这样,在创建一个客户端的时候指定连接池方式数据源的名称:
MongoClient client = MongoClient.createShared(vertx, config, "MyPoolName");
如果您使用相同Vert.x实例创建了不同客户端,同时指定了相同的连接池名称, 那么这些客户端将共享这个相同的连接池数据源。
同样的(与使用默认的共享数据源相同),只有第一次调用 MongoClient.createShared
才会真正的根据 您指定的config创建一个连接池。
之后再调用此方法,只会返回一个新的客户端对象,但使用的是相同的数据源。因此这时config参数不再有作用。
当您希望不同groups的客户端,使用不同的连接池时,可以使用这种方式。 举个使用场景的例子,比如这些客户端需要与不同数据源进行交互的时候,可以使用这种方式。
创建非共享数据源的客户端对象
在大部分情况下,您希望在不同客户端实例间共享一个连接池。 但是,在有些情况下,您可能想要使用一个不与其它客户端共享连接的连接池,以创建一个客户端实例。
此时您可使用 MongoClient.create
。
MongoClient client = MongoClient.create(vertx, config);
每次调用此方法,就相当于在调用 MongoClient.createShared
方法时,加上了具有唯一名称的数据源参数。
使用客户端 API
MongoClient
接口定义了操作客户端的API 方法。您可以使用 MongoClient
来使用调用 API 方法。
保存文档
您可以使用 save
方法以保存文档。
如果文档中没有 \_id
字段,文档会被保存。若有,将执行 upserted。
Upserted 意思是,如果此文档不存在,就保存此文档,此文档存在就更新。
如果被保存的文档没有 \_id
字段,回调方法中可以获得保存后生成的 id。
例如:
JsonObject document = new JsonObject()
.put("title", "The Hobbit");
mongoClient.save("books", document, res -> {
if (res.succeeded()) {
String id = res.result();
System.out.println("Saved book with id " + id);
} else {
res.cause().printStackTrace();
}
});
下面的例子,文档已有 \_id
:
JsonObject document = new JsonObject()
.put("title", "The Hobbit")
.put("_id", "123244");
mongoClient.save("books", document, res -> {
if (res.succeeded()) {
// ...
} else {
res.cause().printStackTrace();
}
});
插入文档
您可以使用 insert
方法来插入文档。
如果被插入的文档没有包含 id,回调方法中可以获得保存后生成的 id。
JsonObject document = new JsonObject()
.put("title", "The Hobbit");
mongoClient.insert("books", document, res -> {
if (res.succeeded()) {
String id = res.result();
System.out.println("Inserted book with id " + id);
} else {
res.cause().printStackTrace();
}
});
如果被插入的文档包含 id,但是此 id 代表的文档已经存在,插入就会失败:
JsonObject document = new JsonObject()
.put("title", "The Hobbit")
.put("_id", "123244");
mongoClient.insert("books", document, res -> {
if (res.succeeded()) {
//...
} else {
// Will fail if the book with that id already exists.
}
});
更新文档
您可以使用 updateCollection
方法来更新文档。
此方法可以更新集合(译者注:MongoDB 中的集合概念对应 SQL 中的数据库表)中的一个或多个文档。
在 updateCollection
方法中被当成参数传递的JSON对象,
必须包含 Update Operators,
因为由它决定更新的方式。
其中作为 query
参数的 json对象决定更新集合中的哪个文档。
例如更新 books 集合中的一个文档:
JsonObject query = new JsonObject()
.put("title", "The Hobbit");
// Set the author field
JsonObject update = new JsonObject().put("$set", new JsonObject()
.put("author", "J. R. R. Tolkien"));
mongoClient.updateCollection("books", query, update, res -> {
if (res.succeeded()) {
System.out.println("Book updated !");
} else {
res.cause().printStackTrace();
}
});
如果您希望更新操作是 upsert(upsert 意思是,如果此文档不存在,就保存此文档;此文档存在就更新)或者 是更新多个文档,
那么就使用 updateCollectionWithOptions
方法,传递一个 UpdateOptions
的实例,去定制化您的更新操作。
参数 UpdateOptions
有以下选项:
multi
-
若设置为 true,则可以更新多个文档
upsert
-
若设置为 true,则可以在没有查询到要更新的文档时,新增该文档
writeConcern
-
写操作的可靠性(译者注:源码中是用 writeOption 枚举类型来代表的)
//译者注:MongoDB 默认写操作级别是 WriteOption.ACKNOWLEDGED
JsonObject query = new JsonObject().put("title", "The Hobbit");
// Set the author field
JsonObject update = new JsonObject().put("$set", new JsonObject()
.put("author", "J. R. R. Tolkien"));
UpdateOptions options = new UpdateOptions().setMulti(true);
mongoClient.updateCollectionWithOptions("books", query, update, options, res -> {
if (res.succeeded()) {
System.out.println("Book updated !");
} else {
res.cause().printStackTrace();
}
});
替换文档
您可以使用 replaceDocuments
方法来替换文档。
替换操作和更新操作相似,但替换不需要任何操作符。 因为它是用您提供的文档去替换整个文档。
例如替换 books 集合中的一个文档:
JsonObject query = new JsonObject()
.put("title", "The Hobbit");
JsonObject replace = new JsonObject()
.put("title", "The Lord of the Rings")
.put("author", "J. R. R. Tolkien");
mongoClient.replaceDocuments("books", query, replace, res -> {
if (res.succeeded()) {
System.out.println("Book replaced !");
} else {
res.cause().printStackTrace();
}
});
批量操作
您可以使用 bulkWrite
来一次执行多个新增、更新、替换或者删除的操作。
在 bulkWrite
方法中,您可以传递一系列 BulkOperations
,而且每个 BulkOperations
运作方式和单个操作类似。
您可以根据需要传递多个操作,即使这些操作都是同一类型的。
如果您希望 批量操作可以按照顺序执行,那么可以使用 bulkWriteWithOptions
将自定义的配置写入其中,然后传递一个 BulkWriteOptions
的实例。
更多关于有序批量写操作的描述,见
Execution of Operations
查找文档
您可以使用 find
方法查找文档。
其中 query
参数用来匹配集合中的文档。
例如匹配所有文档:
JsonObject query = new JsonObject();
mongoClient.find("books", query, res -> {
if (res.succeeded()) {
for (JsonObject json : res.result()) {
System.out.println(json.encodePrettily());
}
} else {
res.cause().printStackTrace();
}
});
又例如匹配 books 集合中某一个作者的所有文档:
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
mongoClient.find("books", query, res -> {
if (res.succeeded()) {
for (JsonObject json : res.result()) {
System.out.println(json.encodePrettily());
}
} else {
res.cause().printStackTrace();
}
});
查询的结果包装成了 JSON 对象的 List 集合。
如果您希望在查询中指定一些内容,比如指定返回的字段,或者指定返回多少条数据,可以使用 findWithOptions
方法,
在参数 FindOptions
中指定这些查询要求。
FindOptions
中可以设置以下参数:
fields
-
返回的字段。默认为
null
,意味着查询结果会返回所有字段。 sort
-
指定排序字段。默认为 null。
limit
-
指定返回的数据条数。默认值为 -1,意味着查询结果会返回所有数据。
skip
-
在返回查询结果之前,指定跳过的数据数量。默认值为 0 。
hint
-
要使用的索引。默认为空字符串。
批量查询文档
当我们在处理大量的数据集合的时候,不建议使用
find
方法或者
findWithOptions
方法。
为了避免响应结果数据量太大导致内存溢出,建议使用 findBatch
:
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
mongoClient.findBatch("book", query)
.exceptionHandler(throwable -> throwable.printStackTrace())
.endHandler(v -> System.out.println("End of research"))
.handler(doc -> System.out.println("Found doc: " + doc.encodePrettily()));
被匹配到的文档会被 ReadStream
处理器挨个返回。
在 FindOptions
中有一个额外的参数 batchSize
,您可以通过设置这个参数,来设置 ReadStream
处理器一次加载的文档的数量。
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
FindOptions options = new FindOptions().setBatchSize(100);
mongoClient.findBatchWithOptions("book", query, options)
.exceptionHandler(throwable -> throwable.printStackTrace())
.endHandler(v -> System.out.println("End of research"))
.handler(doc -> System.out.println("Found doc: " + doc.encodePrettily()));
默认情况下, batchSize
的值是20。
删除文档
您可以使用 removeDocuments
方法来删除文档。
其中 query
参数决定了要删除集合中的哪些文档。
例如删除作者为 Tolkien 的所有文档:
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
mongoClient.removeDocuments("books", query, res -> {
if (res.succeeded()) {
System.out.println("Never much liked Tolkien stuff!");
} else {
res.cause().printStackTrace();
}
});
文档计数
您可以使用 count
方法去计算文档数量。
例如计算 作者为 Tolkien 的书的数量,结果包装在回调方法中。
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
mongoClient.count("books", query, res -> {
if (res.succeeded()) {
long num = res.result();
} else {
res.cause().printStackTrace();
}
});
管理 MongoDB 集合
MongoDB 的所有文档数据都存储在集合中。
您可以用 getCollections
来获取所有集合的列表
mongoClient.getCollections(res -> {
if (res.succeeded()) {
List<String> collections = res.result();
} else {
res.cause().printStackTrace();
}
});
您可以使用 createCollection
方法来创建一个新的集合。
mongoClient.createCollection("mynewcollectionr", res -> {
if (res.succeeded()) {
// Created ok!
} else {
res.cause().printStackTrace();
}
});
您可以使用 dropCollection
方法来删除一个集合。
请注意:删除一个集合将会删除集合中所有的文档!
mongoClient.dropCollection("mynewcollectionr", res -> {
if (res.succeeded()) {
// Dropped ok!
} else {
res.cause().printStackTrace();
}
});
执行 MongoDB
的其他命令
您可以通过 runCommand
方法执行任何 `MongoDB`命令。
使用这种方式,可以发挥出MongoDB更多优点,比如使用MapReduce。 更多详情,请参考说明文档 Commands。
例如执行 aggregate(译者注:聚合)命令。请注意,命令的名称要做为 runCommand 方法的一个参数, 并且同时也必须包含在包装命令的 JSON 参数中。这么做是因为 JSON 不是有序的,但 BSON 却是, 而且 MongoDB 期望 BSON 参数的第一个键值对是命令的名称。所以,为了明确 JSON 中的哪个键值对是命令名称, 我们也就必须把命令名称单独设置为一个参数:
JsonObject command = new JsonObject()
.put("aggregate", "collection_name")
.put("pipeline", new JsonArray());
mongoClient.runCommand("aggregate", command, res -> {
if (res.succeeded()) {
JsonArray resArr = res.result().getJsonArray("result");
// etc
} else {
res.cause().printStackTrace();
}
});
MongoDB的JSON扩展支持
目前,MongoDB 只支持 date
, oid
和 binary
类型
(请参考:http://docs.mongodb.org/manual/reference/mongodb-extended-json[MongoDB Extended JSON] )
例如插入含有 date
类型字段的文档:
JsonObject document = new JsonObject()
.put("title", "The Hobbit")
//ISO-8601 date
.put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
mongoService.save("publishedBooks", document).compose(id -> {
return mongoService.findOne("publishedBooks", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
if (res.succeeded()) {
System.out.println("To retrieve ISO-8601 date : "
+ res.result().getJsonObject("publicationDate").getString("$date"));
} else {
res.cause().printStackTrace();
}
});
例如,插入一个包含二进制的字段的文档并且读取这个字段:
byte[] binaryObject = new byte[40];
JsonObject document = new JsonObject()
.put("name", "Alan Turing")
.put("binaryStuff", new JsonObject().put("$binary", binaryObject));
mongoService.save("smartPeople", document).compose(id -> {
return mongoService.findOne("smartPeople", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
if (res.succeeded()) {
byte[] reconstitutedBinaryObject = res.result().getJsonObject("binaryStuff").getBinary("$binary");
//This could now be de-serialized into an object in real life
} else {
res.cause().printStackTrace();
}
});
例如保存一个 base 64 编码的字符串,将这个字符串作为 binary 字段插入。并且读取这个字段:
String base64EncodedString = "a2FpbHVhIGlzIHRoZSAjMSBiZWFjaCBpbiB0aGUgd29ybGQ=";
JsonObject document = new JsonObject()
.put("name", "Alan Turing")
.put("binaryStuff", new JsonObject().put("$binary", base64EncodedString));
mongoService.save("smartPeople", document).compose(id -> {
return mongoService.findOne("smartPeople", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
if (res.succeeded()) {
String reconstitutedBase64EncodedString = res.result().getJsonObject("binaryStuff").getString("$binary");
//This could now converted back to bytes from the base 64 string
} else {
res.cause().printStackTrace();
}
});
例如插入一个 object ID 并且读取它:
String individualId = new ObjectId().toHexString();
JsonObject document = new JsonObject()
.put("name", "Stephen Hawking")
.put("individualId", new JsonObject().put("$oid", individualId));
mongoService.save("smartPeople", document).compose(id -> {
JsonObject query = new JsonObject().put("_id", id);
return mongoService.findOne("smartPeople", query, null);
}).onComplete(res -> {
if (res.succeeded()) {
String reconstitutedIndividualId = res.result().getJsonObject("individualId").getString("$oid");
} else {
res.cause().printStackTrace();
}
});
获取 distinct 后的值
例如:
JsonObject document = new JsonObject()
.put("title", "The Hobbit");
mongoClient.save("books", document).compose(v -> {
return mongoClient.distinct("books", "title", String.class.getName());
}).onComplete(res -> {
if (res.succeeded()) {
System.out.println("Title is : " + res.result().getJsonArray(0));
} else {
res.cause().printStackTrace();
}
});
例如:在批量模式下, 获取 distinct 后的值:
JsonObject document = new JsonObject()
.put("title", "The Hobbit");
mongoClient.save("books", document, res -> {
if (res.succeeded()) {
mongoClient.distinctBatch("books", "title", String.class.getName())
.handler(book -> System.out.println("Title is : " + book.getString("title")));
} else {
res.cause().printStackTrace();
}
});
-
例如查询 distinct 后的值
JsonObject document = new JsonObject()
.put("title", "The Hobbit")
.put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
JsonObject query = new JsonObject()
.put("publicationDate",
new JsonObject().put("$gte", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00")));
mongoClient.save("books", document).compose(v -> {
return mongoClient.distinctWithQuery("books", "title", String.class.getName(), query);
}).onComplete(res -> {
if (res.succeeded()) {
System.out.println("Title is : " + res.result().getJsonArray(0));
}
});
例如:在批量查询模式下,获取 distinct 后的值。
JsonObject document = new JsonObject()
.put("title", "The Hobbit")
.put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
JsonObject query = new JsonObject()
.put("publicationDate", new JsonObject()
.put("$gte", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00")));
mongoClient.save("books", document, res -> {
if (res.succeeded()) {
mongoClient.distinctBatchWithQuery("books", "title", String.class.getName(), query)
.handler(book -> System.out.println("Title is : " + book.getString("title")));
}
});
存储/检索文件和二进制数据
客户端可以使用 MongoDB 的 GridFS 模块来存储或检索文件和二进制数据。(译者注:GridFS是MongoDB的一个子模块,主要用于在MongoDB中存储文件,相当于MongoDB内置的一个分布式文件系统。)
MongoGridFsClient
可以用来上传文件以及数据流到GridFS,
以及从GridFS下载文件和数据流。
获取一个可与GridFS交互的MongoGridFsClient。
通过调用 createGridFsBucketService
方法,
并且给方法提供一个 bucket 名称,可创建一个 MongoGridFsClient
客户端。
在 GridFS 中,bucket 名称最终是一个集合,该集合包含对所有存储对象的引用。
您可以通过提供一个唯一的名称将对象隔离到不同的bucket中。
它具有以下字段:
bucketName
: 要创建的 bucket 的名称
例如:使用自定义的bucket名称,
获取 MongoGridFsClient
。
mongoClient.createGridFsBucketService("bakeke", res -> {
if (res.succeeded()) {
//Interact with the GridFS client...
MongoGridFsClient client = res.result();
} else {
res.cause().printStackTrace();
}
});
GridFS使用 "fs" 作为默认的 bucket 名称。如果您想要获取默认的bucket,而不是自定义一个,
那么请调用 createDefaultGridFsBucketService
方法。
例如:使用默认bucket名称,获取 MongoGridFsClient
。
mongoClient.createDefaultGridFsBucketService( res -> {
if (res.succeeded()) {
//Interact with the GridFS client...
MongoGridFsClient client = res.result();
} else {
res.cause().printStackTrace();
}
});
从GridFS中,删除整个文件bucket。
整个文件bucket及其所有内容都可以使用 drop
来进行删除。
该方法可以删除创建`MongoGridFsClient`时指定的bucket。
例如:删除一个文件bucket。
gridFsClient.drop(res -> {
if (res.succeeded()) {
//The file bucket is dropped and all files in it, erased
} else {
res.cause().printStackTrace();
}
});
在一个GridFS的bucket中,查询所有文件ID。
调用 findAllIds
方法,可以查询到在bucket中的所有文件ID。
可以凭借文件ID,通过使用 downloadFileByID
方法,来下载文件。
例如:检索文件ID列表。
gridFsClient.findAllIds(res -> {
if (res.succeeded()) {
List<String> ids = res.result(); //List of file IDs
} else {
res.cause().printStackTrace();
}
});
在一个与查询匹配的GridFS bucket中,查询文件ID。
可以指定查询以匹配GridFS bucket
中的文件。调用 findIds
方法
可以返回与查询匹配的文件ID列表。
它具有以下字段:
query
: 这是一个可以使用标准MongoDB查询运算符来匹配任何文件元数据的json对象。
该json对象为空,会匹配所有文档。您可以按照GridFS手册中所述内容,来查询GridFS文件集合的属性。
https://docs.mongodb.com/manual/core/gridfs/#the-files-collection
您可以调用 downloadFileByID
方法,通过ID来下载文件。
例如:基于元数据查询来检索文件ID列表
JsonObject query = new JsonObject().put("metadata.nick_name", "Puhi the eel");
gridFsClient.findIds(query, res -> {
if (res.succeeded()) {
List<String> ids = res.result(); //List of file IDs
} else {
res.cause().printStackTrace();
}
});
在GridFS中,根据文件的ID来删除文件。
它具有以下字段:
id
: 当文件存储到GridFS时,ID会自动生成。
例如:通过ID删除文件。
String id = "56660b074cedfd000570839c"; //The GridFS ID of the file
gridFsClient.delete(id, (AsyncResult<Void> res) -> {
if (res.succeeded()) {
//File deleted
} else {
//Something went wrong
res.cause().printStackTrace();
}
});
在GridFS中上传文件
通过 uploadFile
方法,文件可以使用文件名称进行存储。
当文件存储成功时,GridFS会返回自动生成的文件ID。这个ID可以被用来之后检索文件。
它具有以下字段:
fileName
: 这是被存储文件的名称。
gridFsClient.uploadFile("file.name", res -> {
if (res.succeeded()) {
String id = res.result();
//The ID of the stored object in Grid FS
} else {
res.cause().printStackTrace();
}
});
上传文件时携带配置参数。
使用 uploadFileWithOptions
方法,
并传入一个 `GridFsUploadOptions`的实例,文件可以在存储时,携带额外的配置参数。
当文件存储成功时,GridFS会返回文件ID。
它具有以下字段:
metadata
: 这是一个json对象,它包含了以后的搜索中可能有用的任何元数据
chunkSizeBytes
: GridFS会将文件分解这个字段数值的数个数据块。(译者注:数据块类似于数据库的分区,数值太大和太小都会影响性能。具体详见官方文档: https://docs.mongodb.com/manual/core/sharding-data-partitioning/)
例如: 通过文件名上传一个文件,并且携带元数据参数以及指定数据块的大小。
JsonObject metadata = new JsonObject();
metadata.put("nick_name", "Puhi the Eel");
GridFsUploadOptions options = new GridFsUploadOptions();
options.setChunkSizeBytes(1024);
options.setMetadata(metadata);
gridFsClient.uploadFileWithOptions("file.name", options, res -> {
if (res.succeeded()) {
String id = res.result();
//The ID of the stored object in Grid FS
} else {
res.cause().printStackTrace();
}
});
下载存储在GridFS中的文件
通过调用 downloadFile
方法,可以凭借文件的原始名称来下载文件。
下载完成后,将返回下载的文件长度,该数值是 Long
类型。
它具有以下字段:
fileName
-
文件在存储时使用的文件名称
例如:凭借文件名称下载存储在GridFS中的文件。
gridFsClient.downloadFile("file.name", res -> {
if (res.succeeded()) {
Long fileLength = res.result();
//The length of the file stored in fileName
} else {
res.cause().printStackTrace();
}
});
使用文件ID来下载存储在GridFS中的文件
通过调用 downloadFileByID
方法,可以凭借文件ID来下载文件。
下载完成后,将返回下载的文件长度,该数值是`Long`类型。
它具有以下字段:
id
: 文件在存储后生成的文件ID
例如:凭借文件ID下载存储在GridFS中的文件。
String id = "56660b074cedfd000570839c";
String filename = "puhi.fil";
gridFsClient.downloadFileByID(id, filename, res -> {
if (res.succeeded()) {
Long fileLength = res.result();
//The length of the file stored in fileName
} else {
res.cause().printStackTrace();
}
});
从GridFS下载文件,并且重新命名文件名称
在解析文件的时候,可以使用文件的原始名称。而在下载文件时,
您可以通过调用 downloadFileAs
方法,来重命名文件。
下载完成后,将返回下载的文件长度,该数值是`Long`类型。
它具有以下字段:
fileName
: 文件之前存储的旧名称
newFileName
: 文件将被保存的新名称
gridFsClient.downloadFileAs("file.name", "new_file.name", res -> {
if (res.succeeded()) {
Long fileLength = res.result();
//The length of the file stored in fileName
} else {
res.cause().printStackTrace();
}
});
将数据流上传到GridFS
使用 uploadByFileName
方法,您可以将数据流上传到GridFS。
当数据流上传成功后,将返回GridFS生成的文件ID。
它具有以下字段:
stream
: 将要被上传的 ReadStream
fileName
: 将被存储的数据流的名称
例如: 上传文件数据流到GridFS。
gridFsStreamClient.uploadByFileName(asyncFile, "kanaloa", stringAsyncResult -> {
String id = stringAsyncResult.result();
});
上传数据流时携带配置参数
使用 uploadByFileNameWithOptions
方法,
并传入 GridFsUploadOptions
实例,您可以将数据流上传到GridFS。
当数据流上传成功后,将返回GridFS生成的文件ID。
它具有以下字段:
stream
: 将要被上传的 ReadStream
fileName
: 将被存储的数据流的名称
`options' : 上传时携带的配置参数
GridFsUploadOptions
具有以下字段:
metadata
: 这是一个json对象,它包含了以后的搜索中可能有用的任何元数据
chunkSizeBytes
: GridFS会将文件分解这个字段数值的数个数据块。(译者注:数据块类似于数据库的分区,数值太大和太小都会影响性能。具体详见mongodb官方文档: https://docs.mongodb.com/manual/core/sharding-data-partitioning/)
例如: 携带配置参数将一个文件流上传到GridFS。
GridFsUploadOptions options = new GridFsUploadOptions();
options.setChunkSizeBytes(2048);
options.setMetadata(new JsonObject().put("catagory", "Polynesian gods"));
gridFsStreamClient.uploadByFileNameWithOptions(asyncFile, "kanaloa", options, stringAsyncResult -> {
String id = stringAsyncResult.result();
});
使用文件名称从GridFS中下载数据流
使用 downloadByFileName
方法,您可以使用文件名称从GridFS中下载数据流。
下载完成后,将返回下载的数据流长度,该数值是 Long
类型。
它具有以下字段:
stream
: 将要被下载的 WriteStream
fileName
: 将被下载的数据流的名称。
例如: 下载文件流。
gridFsStreamClient.downloadByFileName(asyncFile, "kamapuaa.fil", longAsyncResult -> {
Long length = longAsyncResult.result();
});
使用文件名称并携带配置参数,从GridFS中下载数据流
使用 downloadByFileNameWithOptions
方法,
并传入 GridFsDownloadOptions
实例,您可以使用文件名称并携带配置参数,从GridFS中下载数据流。
下载完成后,将返回下载的数据流长度,该数值是`Long`类型。
它具有以下字段:
stream
: 将被下载的`WriteStream`
fileName
: 将被下载的数据流名称
options
: `GridFsDownloadOptions`实例
DownloadOptions 具有以下字段:
revision
: 要下载文件的版本(译者注:版本字段仅显示文件更改的频率。值为0,代表未经过修改的原始存储的文件)
例如:携带配置参数下载文件流。
GridFsDownloadOptions options = new GridFsDownloadOptions();
options.setRevision(0);
gridFsStreamClient.downloadByFileNameWithOptions(asyncFile, "kamapuaa.fil", options, longAsyncResult -> {
Long length = longAsyncResult.result();
});
使用ID下载数据流
通过调用 `downloadById`方法,您可以使用GridFS生成的ID来下载数据流。 下载完成后,将返回下载的数据流长度,该数值是`Long`类型。
它具有以下字段:
stream
: 将要下载的 WriteStream
id
: GridFS生成的id
例如:使用对象的ID下载文件流
String id = "58f61bf84cedfd000661af06";
gridFsStreamClient.downloadById(asyncFile, id, longAsyncResult -> {
Long length = longAsyncResult.result();
});
客户端参数配置
Vert.x MongoDB 客户端把配置参数放在 JSON 对象中。
客户端支持以下这些参数:
db_name
-
mongoDB 实例的数据库名称。默认是
default_db
useObjectId
-
此参数用来支持 ObjectId 的持久化和检索。如果设置为 true , 将会在集合的文档中,以 16 进制的字符串来保存 MongoDB 的 ObjectId 类型的字段。而且在设置为 true 后,可以让文档基于创建时间排序(译者注:前4个字节用来存储创建的时的时间戳,精确到秒)。 您也可以通过使用 ObjectId::getDate() 方法,从这个 16进制的字符串中获取创建时间。若您选择其他类型作为 _id ,则设置此参数为 false 。 如果您保存的文档中,没有设置 _id 字段的值,将会默认的生成 16进制的字符串作为 _id 。 此参数默认为 false 。
此客户端尝试着支持驱动所支持的大多数参数配置。 有两种配置方式,一种是连接字符串,另一种是驱动配置选项。
connection_string
-
连接字符串,指的是创建客户端的字符串,例如:
mongodb://localhost:27017
。 有关连接字符串格式的更多信息,请参考驱动程序文档。
驱动配置的具体选项
{
// Single Cluster Settings
"host" : "127.0.0.1", // string
"port" : 27017, // int
// Multiple Cluster Settings
"hosts" : [
{
"host" : "cluster1", // string
"port" : 27000 // int
},
{
"host" : "cluster2", // string
"port" : 28000 // int
},
...
],
"replicaSet" : "foo", // string
"serverSelectionTimeoutMS" : 30000, // long
// Connection Pool Settings
"maxPoolSize" : 50, // int
"minPoolSize" : 25, // int
"maxIdleTimeMS" : 300000, // long
"maxLifeTimeMS" : 3600000, // long
"waitQueueTimeoutMS" : 10000, // long
"maintenanceFrequencyMS" : 2000, // long
"maintenanceInitialDelayMS" : 500, // long
// Credentials / Auth
"username" : "john", // string
"password" : "passw0rd", // string
"authSource" : "some.db" // string
// Auth mechanism
"authMechanism" : "GSSAPI", // string
"gssapiServiceName" : "myservicename", // string
// Socket Settings
"connectTimeoutMS" : 300000, // int
"socketTimeoutMS" : 100000, // int
"sendBufferSize" : 8192, // int
"receiveBufferSize" : 8192, // int
// Server Settings
"heartbeatFrequencyMS" : 1000, // long
"minHeartbeatFrequencyMS" : 500, // long
// SSL Settings
"ssl" : false, // boolean
"sslInvalidHostNameAllowed" : false, // boolean
"trustAll" : false, // boolean
"keyPath" : "key.pem", // string
"certPath" : "cert.pem", // string
"caPath" : "ca.pem", // string
// Network compression Settings
"compressors" : ["zstd", "snappy", "zlib"], // string array
"zlibCompressionLevel" : 6 // int
}
驱动参数说明
host
-
mongoDB 实例运行的地址。默认是
127.0.0.1
。 如果设置了hosts
参数,就会忽略host
参数 port
-
mongoDB 实例监听的端口。默认是
127.0.0.1
。 如果设置了hosts
参数,就会忽略host
参数 hosts
-
表示支持 MongoDB 集群(分片/复制)的一组地址和端口
host
-
集群中某个运行实例的地址
port
-
集群中某个运行实例监听的端口
replicaSet
-
某个 mongoDB 实例作为副本集的名称
serverSelectionTimeoutMS
-
驱动选择服务器的最大时间,单位毫秒
maxPoolSize
-
连接池最大连接数。默认为
100
minPoolSize
-
连接池最小连接数。默认为
0
maxIdleTimeMS
-
连接池的连接最大空闲时间。默认为
0
,表示一直存在 maxLifeTimeMS
-
连接池的连接最大存活时间。默认为
0
,表示永远存活 waitQueueTimeoutMS
-
线程等待作为连接的最长等待时间。默认为
120000
(2分钟) maintenanceFrequencyMS
-
维护任务进行循环检查连接的时间间隔(译者注:维护任务会定时检查连接的状态,直到连接池剩下最小连接数)。默认为
0
maintenanceInitialDelayMS
-
连接池启动后,维护任务第一次启动的时间。默认为
0
username
-
授权的用户名。默认为
null
(意味着不需要授权) password
-
授权的密码
authSource
-
与授权用户关联的数据库名称。默认值为
db_name
authMechanism
-
所使用的授权认证机制。请参考 [Authentication](http://docs.mongodb.org/manual/core/authentication/ 来获取更多信息。
gssapiServiceName
-
当使用`GSSAPI`的授权机制时,所使用的 Kerberos 服务名。
connectTimeoutMS
-
打开连接超时的时间,单位毫秒。默认为`10000`(10 秒)
socketTimeoutMS
-
在 socket 上接收或者发送超时的时间。默认为`0`,意味着永远不超时(译者注:这是客户端的超时时间。如果一个 insert 达到了 socketTimeoutMS, 将无法得知服务器是否已写入)。
sendBufferSize
-
设置 socket 发送缓冲区大小(SO_SNDBUF)。默认为`0`,这将使用操作系统默认大小。
receiveBufferSize
-
设置 socket 接收缓冲区大小(SO_RCVBUF)。默认为`0`,这将使用操作系统默认大小。
heartbeatFrequencyMS
-
集群监视器访问每个集群服务器的频率。默认为`5000`(5s)
minHeartbeatFrequencyMS
-
最小心跳频率。默认为`1000`(1s)
ssl
-
在mongo客户端 和 mongo之间,启用ssl
sslInvalidHostNameAllowed
-
接受服务器证书中未包含的主机名(译者注:当你启用ssl时,这个配置用来设置是否关闭域名检查。true 为允许,即关闭域名检查)。
trustAll
-
当启用ssl时,信任所有证书。警告 - 开启这个配置将会让您面临一些潜在的安全问题,例如MITM攻击。
keyPath
-
设置客户端私钥的路径。客户端私钥是用于在与mongo建立SSL连接时,对服务器进行身份验证。
certPath
-
设置客户端证书的路径。客户端证书是用来在与mongo建立SSL连接时,对服务器进行身份验证。
caPath
-
设置CA证书的路径。CA证书是用于在与mongo建立SSL连接时,当做一个信任源。
compressors
-
Sets the compression algorithm for network transmission. Valid values range from [
snappy
,zlib
,zstd
], the default value isnull
(meaning no compression).
注意
|
For |
zlibCompressionLevel
-
Sets the compression level for zlib. Valid values are between -1 and 9, the default value is -1 if zlib is enabled.
请注意:上面提到的各类参数的默认值,都是 MongoDB Java 驱动的默认值。 请参考驱动文档来获取最新信息。
RxJava 3 API
Mongo客户端提供了RX化的原始版本API。
创建一个RX化的客户端
要想创建一个RX化的Mongo客户端,需要您确保导入了 MongoClient
类。
创建并获取一个客户端实例的方法有很多,例如下面:
MongoClient client = MongoClient.createShared(vertx, config);
批量查询文档
ReadStream
可以被转换成 Flowable
。这个功能会方便您处理大型数据集。
JsonObject query = new JsonObject()
.put("author", "J. R. R. Tolkien");
ReadStream<JsonObject> books = mongoClient.findBatch("book", query);
// Convert the stream to a Flowable
Flowable<JsonObject> flowable = books.toFlowable();
flowable.subscribe(doc -> {
System.out.println("Found doc: " + doc.encodePrettily());
}, throwable -> {
throwable.printStackTrace();
}, () -> {
System.out.println("End of research");
});