Vert.x MongoDB 客户端

您的 Vert.x 应用可使用 Vert.x MongoDB Client(以下简称客户端)与 MongoDB 互动, 包括保存,获取,搜索和删除文档。MongoDB 是在 Vert.x 应用进行数据持久化时的最佳选择, 因为 MongoDB 天生就是处理 JSON(BSON)格式的文档数据库。

特点

  • 完全非阻塞

  • 支持自定义编解码器,从而实现 Vert.x JSON 快速序列化和反序列化

  • 支持 MongoDB Java 驱动大部分配置项

本客户端基于 MongoDB ReactiveStreams Driver

使用 Vert.x MongoDB Client

使用此客户端,需要添加下列依赖:

  • Maven(在 pom.xml 文件中):

<dependency>
 <groupId>io.vertx</groupId>
 <artifactId>vertx-mongo-client</artifactId>
 <version>4.4.0</version>
</dependency>
  • Gradle(在 build.gradle 文件中):

compile 'io.vertx:vertx-mongo-client:4.4.0'

创建客户端

您可用以下几种方式创建客户端:

使用默认的共享连接池

大部分情况下,您希望不同的客户端之间共享一个连接池。

例如:我们通过部署多个verticle实例来扩展你的程序的时候,我们会希望每个verticle实例都共用同一个连接池, 而不是多个verticle实例使用多个连接池。

要想用最简单的方法去使用共享连接池,我们可以这么做:

MongoClient client = MongoClient.createShared(vertx, config);

只有第一次调用 MongoClient.createShared 才会真正的根据您指定的 config 配置创建连接池。

之后再调用此方法,只会返回一个新的客户端对象,但使用的是相同的数据源。因此这时 config 参数不再有作用。

指定 连接池方式数据源的名称

您还可以像下面这样,在创建一个客户端的时候指定连接池方式数据源的名称:

MongoClient client = MongoClient.createShared(vertx, config, "MyPoolName");

如果您使用相同Vert.x实例创建了不同客户端,同时指定了相同的连接池名称, 那么这些客户端将共享这个相同的连接池数据源。

同样的(与使用默认的共享数据源相同),只有第一次调用 MongoClient.createShared 才会真正的根据 您指定的config创建一个连接池。

之后再调用此方法,只会返回一个新的客户端对象,但使用的是相同的数据源。因此这时config参数不再有作用。

当您希望不同groups的客户端,使用不同的连接池时,可以使用这种方式。 举个使用场景的例子,比如这些客户端需要与不同数据源进行交互的时候,可以使用这种方式。

创建非共享数据源的客户端对象

在大部分情况下,您希望在不同客户端实例间共享一个连接池。 但是,在有些情况下,您可能想要使用一个不与其它客户端共享连接的连接池,以创建一个客户端实例。

此时您可使用 MongoClient.create

MongoClient client = MongoClient.create(vertx, config);

每次调用此方法,就相当于在调用 MongoClient.createShared 方法时,加上了具有唯一名称的数据源参数。

使用客户端 API

MongoClient 接口定义了操作客户端的API 方法。您可以使用 MongoClient 来使用调用 API 方法。

保存文档

您可以使用 save 方法以保存文档。

如果文档中没有 \_id 字段,文档会被保存。若有,将执行 upserted。 Upserted 意思是,如果此文档不存在,就保存此文档,此文档存在就更新。

如果被保存的文档没有 \_id 字段,回调方法中可以获得保存后生成的 id。

例如:

JsonObject document = new JsonObject()
  .put("title", "The Hobbit");
mongoClient.save("books", document, res -> {
  if (res.succeeded()) {
    String id = res.result();
    System.out.println("Saved book with id " + id);
  } else {
    res.cause().printStackTrace();
  }
});

下面的例子,文档已有 \_id

JsonObject document = new JsonObject()
  .put("title", "The Hobbit")
  .put("_id", "123244");
mongoClient.save("books", document, res -> {
  if (res.succeeded()) {
    // ...
  } else {
    res.cause().printStackTrace();
  }
});

插入文档

您可以使用 insert 方法来插入文档。

如果被插入的文档没有包含 id,回调方法中可以获得保存后生成的 id。

JsonObject document = new JsonObject()
  .put("title", "The Hobbit");
mongoClient.insert("books", document, res -> {
  if (res.succeeded()) {
    String id = res.result();
    System.out.println("Inserted book with id " + id);
  } else {
    res.cause().printStackTrace();
  }
});

如果被插入的文档包含 id,但是此 id 代表的文档已经存在,插入就会失败:

JsonObject document = new JsonObject()
  .put("title", "The Hobbit")
  .put("_id", "123244");
mongoClient.insert("books", document, res -> {
  if (res.succeeded()) {
    //...
  } else {
    // Will fail if the book with that id already exists.
  }
});

更新文档

您可以使用 updateCollection 方法来更新文档。

此方法可以更新集合(译者注:MongoDB 中的集合概念对应 SQL 中的数据库表)中的一个或多个文档。 在 updateCollection 方法中被当成参数传递的JSON对象, 必须包含 Update Operators, 因为由它决定更新的方式。

其中作为 query 参数的 json对象决定更新集合中的哪个文档。

例如更新 books 集合中的一个文档:

JsonObject query = new JsonObject()
  .put("title", "The Hobbit");
// Set the author field
JsonObject update = new JsonObject().put("$set", new JsonObject()
  .put("author", "J. R. R. Tolkien"));
mongoClient.updateCollection("books", query, update, res -> {
  if (res.succeeded()) {
    System.out.println("Book updated !");
  } else {
    res.cause().printStackTrace();
  }
});

如果您希望更新操作是 upsert(upsert 意思是,如果此文档不存在,就保存此文档;此文档存在就更新)或者 是更新多个文档, 那么就使用 updateCollectionWithOptions 方法,传递一个 UpdateOptions 的实例,去定制化您的更新操作。

参数 UpdateOptions 有以下选项:

multi

若设置为 true,则可以更新多个文档

upsert

若设置为 true,则可以在没有查询到要更新的文档时,新增该文档

writeConcern

写操作的可靠性(译者注:源码中是用 writeOption 枚举类型来代表的)

  //译者注:MongoDB 默认写操作级别是 WriteOption.ACKNOWLEDGED
JsonObject query = new JsonObject().put("title", "The Hobbit");
  // Set the author field
JsonObject update = new JsonObject().put("$set", new JsonObject()
  .put("author", "J. R. R. Tolkien"));
UpdateOptions options = new UpdateOptions().setMulti(true);
mongoClient.updateCollectionWithOptions("books", query, update, options, res -> {
  if (res.succeeded()) {
    System.out.println("Book updated !");
  } else {
    res.cause().printStackTrace();
  }
});

替换文档

您可以使用 replaceDocuments 方法来替换文档。

替换操作和更新操作相似,但替换不需要任何操作符。 因为它是用您提供的文档去替换整个文档。

例如替换 books 集合中的一个文档:

JsonObject query = new JsonObject()
  .put("title", "The Hobbit");
JsonObject replace = new JsonObject()
  .put("title", "The Lord of the Rings")
  .put("author", "J. R. R. Tolkien");
mongoClient.replaceDocuments("books", query, replace, res -> {
  if (res.succeeded()) {
    System.out.println("Book replaced !");
  } else {
    res.cause().printStackTrace();
  }
});

批量操作

您可以使用 bulkWrite 来一次执行多个新增、更新、替换或者删除的操作。

bulkWrite 方法中,您可以传递一系列 BulkOperations,而且每个 BulkOperations 运作方式和单个操作类似。 您可以根据需要传递多个操作,即使这些操作都是同一类型的。

如果您希望 批量操作可以按照顺序执行,那么可以使用 bulkWriteWithOptions 将自定义的配置写入其中,然后传递一个 BulkWriteOptions 的实例。 更多关于有序批量写操作的描述,见 Execution of Operations

查找文档

您可以使用 find 方法查找文档。

其中 query 参数用来匹配集合中的文档。

例如匹配所有文档:

JsonObject query = new JsonObject();
mongoClient.find("books", query, res -> {
  if (res.succeeded()) {
    for (JsonObject json : res.result()) {
      System.out.println(json.encodePrettily());
    }
  } else {
    res.cause().printStackTrace();
  }
});

又例如匹配 books 集合中某一个作者的所有文档:

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");
mongoClient.find("books", query, res -> {
  if (res.succeeded()) {
    for (JsonObject json : res.result()) {
      System.out.println(json.encodePrettily());
    }
  } else {
    res.cause().printStackTrace();
  }
});

查询的结果包装成了 JSON 对象的 List 集合。

如果您希望在查询中指定一些内容,比如指定返回的字段,或者指定返回多少条数据,可以使用 findWithOptions 方法, 在参数 FindOptions 中指定这些查询要求。

FindOptions 中可以设置以下参数:

fields

返回的字段。默认为 null,意味着查询结果会返回所有字段。

sort

指定排序字段。默认为 null。

limit

指定返回的数据条数。默认值为 -1,意味着查询结果会返回所有数据。

skip

在返回查询结果之前,指定跳过的数据数量。默认值为 0 。

hint

要使用的索引。默认为空字符串。

批量查询文档

当我们在处理大量的数据集合的时候,不建议使用 find 方法或者 findWithOptions 方法。 为了避免响应结果数据量太大导致内存溢出,建议使用 findBatch :

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");
mongoClient.findBatch("book", query)
  .exceptionHandler(throwable -> throwable.printStackTrace())
  .endHandler(v -> System.out.println("End of research"))
  .handler(doc -> System.out.println("Found doc: " + doc.encodePrettily()));

被匹配到的文档会被 ReadStream 处理器挨个返回。

FindOptions 中有一个额外的参数 batchSize,您可以通过设置这个参数,来设置 ReadStream 处理器一次加载的文档的数量。

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");
FindOptions options = new FindOptions().setBatchSize(100);
mongoClient.findBatchWithOptions("book", query, options)
  .exceptionHandler(throwable -> throwable.printStackTrace())
  .endHandler(v -> System.out.println("End of research"))
  .handler(doc -> System.out.println("Found doc: " + doc.encodePrettily()));

默认情况下, batchSize 的值是20。

查询单个文档

要查询单个文档,您可以使用 findOne 方法。

这有点类似 find 方法,但是仅仅返回 find 方法查询命中的第一条数据。

删除文档

您可以使用 removeDocuments 方法来删除文档。

其中 query 参数决定了要删除集合中的哪些文档。

例如删除作者为 Tolkien 的所有文档:

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");
mongoClient.removeDocuments("books", query, res -> {
  if (res.succeeded()) {
    System.out.println("Never much liked Tolkien stuff!");
  } else {
    res.cause().printStackTrace();
  }
});

删除单个文档

您可以使用 removeDocument 方法来删除文档。

这有点类似于 removeDocuments 方法,只是 removeDocument 方法仅删除匹配到的第一个文档。

文档计数

您可以使用 count 方法去计算文档数量。

例如计算 作者为 Tolkien 的书的数量,结果包装在回调方法中。

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");
mongoClient.count("books", query, res -> {
  if (res.succeeded()) {
    long num = res.result();
  } else {
    res.cause().printStackTrace();
  }
});

管理 MongoDB 集合

MongoDB 的所有文档数据都存储在集合中。

您可以用 getCollections 来获取所有集合的列表

mongoClient.getCollections(res -> {
  if (res.succeeded()) {
    List<String> collections = res.result();
  } else {
    res.cause().printStackTrace();
  }
});

您可以使用 createCollection 方法来创建一个新的集合。

mongoClient.createCollection("mynewcollectionr", res -> {
  if (res.succeeded()) {
    // Created ok!
  } else {
    res.cause().printStackTrace();
  }
});

您可以使用 dropCollection 方法来删除一个集合。

请注意:删除一个集合将会删除集合中所有的文档!

mongoClient.dropCollection("mynewcollectionr", res -> {
  if (res.succeeded()) {
    // Dropped ok!
  } else {
    res.cause().printStackTrace();
  }
});

执行 MongoDB 的其他命令

您可以通过 runCommand 方法执行任何 `MongoDB`命令。

使用这种方式,可以发挥出MongoDB更多优点,比如使用MapReduce。 更多详情,请参考说明文档 Commands

例如执行 aggregate(译者注:聚合)命令。请注意,命令的名称要做为 runCommand 方法的一个参数, 并且同时也必须包含在包装命令的 JSON 参数中。这么做是因为 JSON 不是有序的,但 BSON 却是, 而且 MongoDB 期望 BSON 参数的第一个键值对是命令的名称。所以,为了明确 JSON 中的哪个键值对是命令名称, 我们也就必须把命令名称单独设置为一个参数:

JsonObject command = new JsonObject()
  .put("aggregate", "collection_name")
  .put("pipeline", new JsonArray());
mongoClient.runCommand("aggregate", command, res -> {
  if (res.succeeded()) {
    JsonArray resArr = res.result().getJsonArray("result");
    // etc
  } else {
    res.cause().printStackTrace();
  }
});

MongoDB的JSON扩展支持

目前,MongoDB 只支持 dateoidbinary 类型 (请参考:http://docs.mongodb.org/manual/reference/mongodb-extended-json[MongoDB Extended JSON] )

例如插入含有 date 类型字段的文档:

JsonObject document = new JsonObject()
  .put("title", "The Hobbit")
  //ISO-8601 date
  .put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
mongoService.save("publishedBooks", document).compose(id -> {
  return mongoService.findOne("publishedBooks", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("To retrieve ISO-8601 date : "
      + res.result().getJsonObject("publicationDate").getString("$date"));
  } else {
    res.cause().printStackTrace();
  }
});

例如,插入一个包含二进制的字段的文档并且读取这个字段:

byte[] binaryObject = new byte[40];
JsonObject document = new JsonObject()
  .put("name", "Alan Turing")
  .put("binaryStuff", new JsonObject().put("$binary", binaryObject));
mongoService.save("smartPeople", document).compose(id -> {
  return mongoService.findOne("smartPeople", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
  if (res.succeeded()) {
    byte[] reconstitutedBinaryObject = res.result().getJsonObject("binaryStuff").getBinary("$binary");
    //This could now be de-serialized into an object in real life
  } else {
    res.cause().printStackTrace();
  }
});

例如保存一个 base 64 编码的字符串,将这个字符串作为 binary 字段插入。并且读取这个字段:

String base64EncodedString = "a2FpbHVhIGlzIHRoZSAjMSBiZWFjaCBpbiB0aGUgd29ybGQ=";
JsonObject document = new JsonObject()
  .put("name", "Alan Turing")
  .put("binaryStuff", new JsonObject().put("$binary", base64EncodedString));
mongoService.save("smartPeople", document).compose(id -> {
  return mongoService.findOne("smartPeople", new JsonObject().put("_id", id), null);
}).onComplete(res -> {
  if (res.succeeded()) {
    String reconstitutedBase64EncodedString = res.result().getJsonObject("binaryStuff").getString("$binary");
    //This could now converted back to bytes from the base 64 string
  } else {
    res.cause().printStackTrace();
  }
});

例如插入一个 object ID 并且读取它:

String individualId = new ObjectId().toHexString();
JsonObject document = new JsonObject()
  .put("name", "Stephen Hawking")
  .put("individualId", new JsonObject().put("$oid", individualId));
mongoService.save("smartPeople", document).compose(id -> {
  JsonObject query = new JsonObject().put("_id", id);
  return mongoService.findOne("smartPeople", query, null);
}).onComplete(res -> {
  if (res.succeeded()) {
    String reconstitutedIndividualId = res.result().getJsonObject("individualId").getString("$oid");
  } else {
    res.cause().printStackTrace();
  }
});

获取 distinct 后的值

例如:

JsonObject document = new JsonObject()
  .put("title", "The Hobbit");
mongoClient.save("books", document).compose(v -> {
  return mongoClient.distinct("books", "title", String.class.getName());
}).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Title is : " + res.result().getJsonArray(0));
  } else {
    res.cause().printStackTrace();
  }
});

例如:在批量模式下, 获取 distinct 后的值:

JsonObject document = new JsonObject()
  .put("title", "The Hobbit");
mongoClient.save("books", document, res -> {
  if (res.succeeded()) {
    mongoClient.distinctBatch("books", "title", String.class.getName())
      .handler(book -> System.out.println("Title is : " + book.getString("title")));
  } else {
    res.cause().printStackTrace();
  }
});
  • 例如查询 distinct 后的值

JsonObject document = new JsonObject()
  .put("title", "The Hobbit")
  .put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
JsonObject query = new JsonObject()
  .put("publicationDate",
    new JsonObject().put("$gte", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00")));
mongoClient.save("books", document).compose(v -> {
  return mongoClient.distinctWithQuery("books", "title", String.class.getName(), query);
}).onComplete(res -> {
  if (res.succeeded()) {
    System.out.println("Title is : " + res.result().getJsonArray(0));
  }
});

例如:在批量查询模式下,获取 distinct 后的值。

JsonObject document = new JsonObject()
  .put("title", "The Hobbit")
  .put("publicationDate", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00"));
JsonObject query = new JsonObject()
  .put("publicationDate", new JsonObject()
    .put("$gte", new JsonObject().put("$date", "1937-09-21T00:00:00+00:00")));
mongoClient.save("books", document, res -> {
  if (res.succeeded()) {
    mongoClient.distinctBatchWithQuery("books", "title", String.class.getName(), query)
      .handler(book -> System.out.println("Title is : " + book.getString("title")));
  }
});

存储/检索文件和二进制数据

客户端可以使用 MongoDB 的 GridFS 模块来存储或检索文件和二进制数据。(译者注:GridFS是MongoDB的一个子模块,主要用于在MongoDB中存储文件,相当于MongoDB内置的一个分布式文件系统。) MongoGridFsClient 可以用来上传文件以及数据流到GridFS, 以及从GridFS下载文件和数据流。

获取一个可与GridFS交互的MongoGridFsClient。

通过调用 createGridFsBucketService 方法, 并且给方法提供一个 bucket 名称,可创建一个 MongoGridFsClient 客户端。 在 GridFS 中,bucket 名称最终是一个集合,该集合包含对所有存储对象的引用。 您可以通过提供一个唯一的名称将对象隔离到不同的bucket中。

它具有以下字段:

bucketName : 要创建的 bucket 的名称

例如:使用自定义的bucket名称, 获取 MongoGridFsClient

mongoClient.createGridFsBucketService("bakeke", res -> {
  if (res.succeeded()) {
    //Interact with the GridFS client...
    MongoGridFsClient client = res.result();
  } else {
    res.cause().printStackTrace();
  }
});

GridFS使用 "fs" 作为默认的 bucket 名称。如果您想要获取默认的bucket,而不是自定义一个, 那么请调用 createDefaultGridFsBucketService 方法。

例如:使用默认bucket名称,获取 MongoGridFsClient

mongoClient.createDefaultGridFsBucketService( res -> {
  if (res.succeeded()) {
    //Interact with the GridFS client...
    MongoGridFsClient client = res.result();
  } else {
    res.cause().printStackTrace();
  }
});

从GridFS中,删除整个文件bucket。

整个文件bucket及其所有内容都可以使用 drop 来进行删除。 该方法可以删除创建`MongoGridFsClient`时指定的bucket。

例如:删除一个文件bucket。

gridFsClient.drop(res -> {
  if (res.succeeded()) {
    //The file bucket is dropped and all files in it, erased
  } else {
    res.cause().printStackTrace();
  }
});

在一个GridFS的bucket中,查询所有文件ID。

调用 findAllIds 方法,可以查询到在bucket中的所有文件ID。 可以凭借文件ID,通过使用 downloadFileByID 方法,来下载文件。

例如:检索文件ID列表。

gridFsClient.findAllIds(res -> {
  if (res.succeeded()) {
    List<String> ids = res.result(); //List of file IDs
  } else {
    res.cause().printStackTrace();
  }
});

在一个与查询匹配的GridFS bucket中,查询文件ID。

可以指定查询以匹配GridFS bucket 中的文件。调用 findIds 方法 可以返回与查询匹配的文件ID列表。

它具有以下字段:

query : 这是一个可以使用标准MongoDB查询运算符来匹配任何文件元数据的json对象。 该json对象为空,会匹配所有文档。您可以按照GridFS手册中所述内容,来查询GridFS文件集合的属性。 https://docs.mongodb.com/manual/core/gridfs/#the-files-collection

您可以调用 downloadFileByID 方法,通过ID来下载文件。

例如:基于元数据查询来检索文件ID列表

JsonObject query = new JsonObject().put("metadata.nick_name", "Puhi the eel");
gridFsClient.findIds(query, res -> {
  if (res.succeeded()) {
    List<String> ids = res.result(); //List of file IDs
  } else {
    res.cause().printStackTrace();
  }
});

在GridFS中,根据文件的ID来删除文件。

通过提供文件ID参数,来调用 delete 方法,可以删除以前存储在GridFS中的文件。 可以通过使用 findIds 方法,来检索文件ID。

它具有以下字段: id : 当文件存储到GridFS时,ID会自动生成。

例如:通过ID删除文件。

String id = "56660b074cedfd000570839c"; //The GridFS ID of the file
gridFsClient.delete(id, (AsyncResult<Void> res) -> {
  if (res.succeeded()) {
    //File deleted
  } else {
    //Something went wrong
    res.cause().printStackTrace();
  }
});

在GridFS中上传文件

通过 uploadFile 方法,文件可以使用文件名称进行存储。 当文件存储成功时,GridFS会返回自动生成的文件ID。这个ID可以被用来之后检索文件。

它具有以下字段:

fileName : 这是被存储文件的名称。

gridFsClient.uploadFile("file.name", res -> {
  if (res.succeeded()) {
    String id = res.result();
    //The ID of the stored object in Grid FS
  } else {
    res.cause().printStackTrace();
  }
});

上传文件时携带配置参数。

使用 uploadFileWithOptions 方法, 并传入一个 `GridFsUploadOptions`的实例,文件可以在存储时,携带额外的配置参数。 当文件存储成功时,GridFS会返回文件ID。

它具有以下字段:

metadata : 这是一个json对象,它包含了以后的搜索中可能有用的任何元数据 chunkSizeBytes : GridFS会将文件分解这个字段数值的数个数据块。(译者注:数据块类似于数据库的分区,数值太大和太小都会影响性能。具体详见官方文档: https://docs.mongodb.com/manual/core/sharding-data-partitioning/)

例如: 通过文件名上传一个文件,并且携带元数据参数以及指定数据块的大小。

JsonObject metadata = new JsonObject();
metadata.put("nick_name", "Puhi the Eel");

GridFsUploadOptions options = new GridFsUploadOptions();
options.setChunkSizeBytes(1024);
options.setMetadata(metadata);

gridFsClient.uploadFileWithOptions("file.name", options, res -> {
  if (res.succeeded()) {
    String id = res.result();
    //The ID of the stored object in Grid FS
  } else {
    res.cause().printStackTrace();
  }
});

下载存储在GridFS中的文件

通过调用 downloadFile 方法,可以凭借文件的原始名称来下载文件。 下载完成后,将返回下载的文件长度,该数值是 Long 类型。

它具有以下字段:

fileName

文件在存储时使用的文件名称

例如:凭借文件名称下载存储在GridFS中的文件。

gridFsClient.downloadFile("file.name", res -> {
  if (res.succeeded()) {
    Long fileLength = res.result();
    //The length of the file stored in fileName
  } else {
    res.cause().printStackTrace();
  }
});

使用文件ID来下载存储在GridFS中的文件

通过调用 downloadFileByID 方法,可以凭借文件ID来下载文件。 下载完成后,将返回下载的文件长度,该数值是`Long`类型。

它具有以下字段:

id : 文件在存储后生成的文件ID

例如:凭借文件ID下载存储在GridFS中的文件。

String id = "56660b074cedfd000570839c";
String filename = "puhi.fil";
gridFsClient.downloadFileByID(id, filename, res -> {
  if (res.succeeded()) {
    Long fileLength = res.result();
    //The length of the file stored in fileName
  } else {
    res.cause().printStackTrace();
  }
});

从GridFS下载文件,并且重新命名文件名称

在解析文件的时候,可以使用文件的原始名称。而在下载文件时, 您可以通过调用 downloadFileAs 方法,来重命名文件。 下载完成后,将返回下载的文件长度,该数值是`Long`类型。

它具有以下字段:

fileName : 文件之前存储的旧名称 newFileName : 文件将被保存的新名称

gridFsClient.downloadFileAs("file.name", "new_file.name", res -> {
  if (res.succeeded()) {
    Long fileLength = res.result();
    //The length of the file stored in fileName
  } else {
    res.cause().printStackTrace();
  }
});

将数据流上传到GridFS

使用 uploadByFileName 方法,您可以将数据流上传到GridFS。 当数据流上传成功后,将返回GridFS生成的文件ID。

它具有以下字段:

stream : 将要被上传的 ReadStream fileName : 将被存储的数据流的名称

例如: 上传文件数据流到GridFS。

gridFsStreamClient.uploadByFileName(asyncFile, "kanaloa", stringAsyncResult -> {
  String id = stringAsyncResult.result();
});

上传数据流时携带配置参数

使用 uploadByFileNameWithOptions 方法, 并传入 GridFsUploadOptions 实例,您可以将数据流上传到GridFS。 当数据流上传成功后,将返回GridFS生成的文件ID。

它具有以下字段:

stream : 将要被上传的 ReadStream fileName : 将被存储的数据流的名称 `options' : 上传时携带的配置参数

GridFsUploadOptions 具有以下字段:

metadata : 这是一个json对象,它包含了以后的搜索中可能有用的任何元数据 chunkSizeBytes : GridFS会将文件分解这个字段数值的数个数据块。(译者注:数据块类似于数据库的分区,数值太大和太小都会影响性能。具体详见mongodb官方文档: https://docs.mongodb.com/manual/core/sharding-data-partitioning/)

例如: 携带配置参数将一个文件流上传到GridFS。

GridFsUploadOptions options = new GridFsUploadOptions();
options.setChunkSizeBytes(2048);
options.setMetadata(new JsonObject().put("catagory", "Polynesian gods"));
gridFsStreamClient.uploadByFileNameWithOptions(asyncFile, "kanaloa", options, stringAsyncResult -> {
  String id = stringAsyncResult.result();
});

使用文件名称从GridFS中下载数据流

使用 downloadByFileName 方法,您可以使用文件名称从GridFS中下载数据流。 下载完成后,将返回下载的数据流长度,该数值是 Long 类型。

它具有以下字段:

stream : 将要被下载的 WriteStream fileName : 将被下载的数据流的名称。

例如: 下载文件流。

gridFsStreamClient.downloadByFileName(asyncFile, "kamapuaa.fil", longAsyncResult -> {
  Long length = longAsyncResult.result();
});

使用文件名称并携带配置参数,从GridFS中下载数据流

使用 downloadByFileNameWithOptions 方法, 并传入 GridFsDownloadOptions 实例,您可以使用文件名称并携带配置参数,从GridFS中下载数据流。 下载完成后,将返回下载的数据流长度,该数值是`Long`类型。

它具有以下字段:

stream : 将被下载的`WriteStream` fileName : 将被下载的数据流名称 options : `GridFsDownloadOptions`实例

DownloadOptions 具有以下字段:

revision : 要下载文件的版本(译者注:版本字段仅显示文件更改的频率。值为0,代表未经过修改的原始存储的文件)

例如:携带配置参数下载文件流。

GridFsDownloadOptions options = new GridFsDownloadOptions();
options.setRevision(0);
gridFsStreamClient.downloadByFileNameWithOptions(asyncFile, "kamapuaa.fil", options, longAsyncResult -> {
  Long length = longAsyncResult.result();
});

使用ID下载数据流

通过调用 `downloadById`方法,您可以使用GridFS生成的ID来下载数据流。 下载完成后,将返回下载的数据流长度,该数值是`Long`类型。

它具有以下字段:

stream : 将要下载的 WriteStream id : GridFS生成的id

例如:使用对象的ID下载文件流

String id = "58f61bf84cedfd000661af06";
gridFsStreamClient.downloadById(asyncFile, id, longAsyncResult -> {
  Long length = longAsyncResult.result();
});

客户端参数配置

Vert.x MongoDB 客户端把配置参数放在 JSON 对象中。

客户端支持以下这些参数:

db_name

mongoDB 实例的数据库名称。默认是 default_db

useObjectId

此参数用来支持 ObjectId 的持久化和检索。如果设置为 true , 将会在集合的文档中,以 16 进制的字符串来保存 MongoDB 的 ObjectId 类型的字段。而且在设置为 true 后,可以让文档基于创建时间排序(译者注:前4个字节用来存储创建的时的时间戳,精确到秒)。 您也可以通过使用 ObjectId::getDate() 方法,从这个 16进制的字符串中获取创建时间。若您选择其他类型作为 _id ,则设置此参数为 false 。 如果您保存的文档中,没有设置 _id 字段的值,将会默认的生成 16进制的字符串作为 _id 。 此参数默认为 false 。

此客户端尝试着支持驱动所支持的大多数参数配置。 有两种配置方式,一种是连接字符串,另一种是驱动配置选项。

connection_string

连接字符串,指的是创建客户端的字符串,例如: mongodb://localhost:27017 。 有关连接字符串格式的更多信息,请参考驱动程序文档。

驱动配置的具体选项

{
 // Single Cluster Settings
 "host" : "127.0.0.1", // string
 "port" : 27017,      // int

 // Multiple Cluster Settings
 "hosts" : [
   {
     "host" : "cluster1", // string
     "port" : 27000       // int
   },
   {
     "host" : "cluster2", // string
     "port" : 28000       // int
   },
   ...
 ],
 "replicaSet" :  "foo",    // string
 "serverSelectionTimeoutMS" : 30000, // long

 // Connection Pool Settings
 "maxPoolSize" : 50,                // int
 "minPoolSize" : 25,                // int
 "maxIdleTimeMS" : 300000,          // long
 "maxLifeTimeMS" : 3600000,         // long
 "waitQueueTimeoutMS" : 10000,      // long
 "maintenanceFrequencyMS" : 2000,   // long
 "maintenanceInitialDelayMS" : 500, // long

 // Credentials / Auth
 "username"   : "john",     // string
 "password"   : "passw0rd", // string
 "authSource" : "some.db"   // string
 // Auth mechanism
 "authMechanism"     : "GSSAPI",        // string
 "gssapiServiceName" : "myservicename", // string

 // Socket Settings
 "connectTimeoutMS" : 300000, // int
 "socketTimeoutMS"  : 100000, // int
 "sendBufferSize"    : 8192,  // int
 "receiveBufferSize" : 8192,  // int

 // Server Settings
 "heartbeatFrequencyMS"    : 1000, // long
 "minHeartbeatFrequencyMS" :  500, // long

 // SSL Settings
 "ssl" : false,                       // boolean
 "sslInvalidHostNameAllowed" : false, // boolean
 "trustAll" : false,                  // boolean
 "keyPath" : "key.pem",               // string
 "certPath" : "cert.pem",             // string
 "caPath" : "ca.pem",                 // string

 // Network compression Settings
 "compressors"           : ["zstd", "snappy", "zlib"],  // string array
 "zlibCompressionLevel"  : 6                            // int
}

驱动参数说明

host

mongoDB 实例运行的地址。默认是 127.0.0.1。 如果设置了 hosts 参数,就会忽略 host 参数

port

mongoDB 实例监听的端口。默认是 127.0.0.1。 如果设置了 hosts 参数,就会忽略 host 参数

hosts

表示支持 MongoDB 集群(分片/复制)的一组地址和端口

host

集群中某个运行实例的地址

port

集群中某个运行实例监听的端口

replicaSet

某个 mongoDB 实例作为副本集的名称

serverSelectionTimeoutMS

驱动选择服务器的最大时间,单位毫秒

maxPoolSize

连接池最大连接数。默认为 100

minPoolSize

连接池最小连接数。默认为 0

maxIdleTimeMS

连接池的连接最大空闲时间。默认为 0,表示一直存在

maxLifeTimeMS

连接池的连接最大存活时间。默认为 0,表示永远存活

waitQueueTimeoutMS

线程等待作为连接的最长等待时间。默认为 120000(2分钟)

maintenanceFrequencyMS

维护任务进行循环检查连接的时间间隔(译者注:维护任务会定时检查连接的状态,直到连接池剩下最小连接数)。默认为 0

maintenanceInitialDelayMS

连接池启动后,维护任务第一次启动的时间。默认为 0

username

授权的用户名。默认为 null(意味着不需要授权)

password

授权的密码

authSource

与授权用户关联的数据库名称。默认值为 db_name

authMechanism

所使用的授权认证机制。请参考 [Authentication](http://docs.mongodb.org/manual/core/authentication/ 来获取更多信息。

gssapiServiceName

当使用`GSSAPI`的授权机制时,所使用的 Kerberos 服务名。

connectTimeoutMS

打开连接超时的时间,单位毫秒。默认为`10000`(10 秒)

socketTimeoutMS

在 socket 上接收或者发送超时的时间。默认为`0`,意味着永远不超时(译者注:这是客户端的超时时间。如果一个 insert 达到了 socketTimeoutMS, 将无法得知服务器是否已写入)。

sendBufferSize

设置 socket 发送缓冲区大小(SO_SNDBUF)。默认为`0`,这将使用操作系统默认大小。

receiveBufferSize

设置 socket 接收缓冲区大小(SO_RCVBUF)。默认为`0`,这将使用操作系统默认大小。

heartbeatFrequencyMS

集群监视器访问每个集群服务器的频率。默认为`5000`(5s)

minHeartbeatFrequencyMS

最小心跳频率。默认为`1000`(1s)

ssl

在mongo客户端 和 mongo之间,启用ssl

sslInvalidHostNameAllowed

接受服务器证书中未包含的主机名(译者注:当你启用ssl时,这个配置用来设置是否关闭域名检查。true 为允许,即关闭域名检查)。

trustAll

当启用ssl时,信任所有证书。警告 - 开启这个配置将会让您面临一些潜在的安全问题,例如MITM攻击。

keyPath

设置客户端私钥的路径。客户端私钥是用于在与mongo建立SSL连接时,对服务器进行身份验证。

certPath

设置客户端证书的路径。客户端证书是用来在与mongo建立SSL连接时,对服务器进行身份验证。

caPath

设置CA证书的路径。CA证书是用于在与mongo建立SSL连接时,当做一个信任源。

compressors

Sets the compression algorithm for network transmission. Valid values range from [snappy, zlib, zstd], the default value is null (meaning no compression).

注意

For snappy and zstd compression algorithms support, additional dependencies must be added to your project build descriptor (snappy-java and zstd-java, respectively).

zlibCompressionLevel

Sets the compression level for zlib. Valid values are between -1 and 9, the default value is -1 if zlib is enabled.

请注意:上面提到的各类参数的默认值,都是 MongoDB Java 驱动的默认值。 请参考驱动文档来获取最新信息。

RxJava 3 API

Mongo客户端提供了RX化的原始版本API。

创建一个RX化的客户端

要想创建一个RX化的Mongo客户端,需要您确保导入了 MongoClient 类。 创建并获取一个客户端实例的方法有很多,例如下面:

MongoClient client = MongoClient.createShared(vertx, config);

批量查询文档

ReadStream 可以被转换成 Flowable 。这个功能会方便您处理大型数据集。

JsonObject query = new JsonObject()
  .put("author", "J. R. R. Tolkien");

ReadStream<JsonObject> books = mongoClient.findBatch("book", query);

// Convert the stream to a Flowable
Flowable<JsonObject> flowable = books.toFlowable();

flowable.subscribe(doc -> {
  System.out.println("Found doc: " + doc.encodePrettily());
}, throwable -> {
  throwable.printStackTrace();
}, () -> {
  System.out.println("End of research");
});