Vert.x Consul 客户端
Consul 是一个在你的应用架构中提供服务发现和配置管理的工具。 Vert.x 的 Consul 客户端允许应用通过阻塞或非阻塞的 HTTP 接口与 Consul 系统互动。
使用 Vert.x Consul 客户端
为了使用该项目, 将以下依赖添加到你的构建描述文件的 依赖配置 中:
-
Maven (在你的
pom.xml
中):
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-consul-client</artifactId>
<version>4.0.3</version>
</dependency>
-
Gradle (在你的
build.gradle
文件中):
compile 'io.vertx:vertx-consul-client:4.0.3'
创建客户端
只需使用工厂方法即可创建:
ConsulClient client = ConsulClient.create(vertx);
Consul 客户端也可以配置选项
ConsulClientOptions options = new ConsulClientOptions()
.setHost("consul.example.com");
ConsulClient client = ConsulClient.create(vertx, options);
Consul 客户端支持以下配置:
host
-
Consul 的主机地址。 默认是
localhost
port
-
Consul 的 HTTP 接口地址。 默认是
8500
timeout
-
设置超时时间 (单位为毫秒) 如果在超时时间内请求没有返回任何数据,那么 处理器会收到一个失败的结果,请求也会被关闭
aclToken
-
ACL 签名。 如果设置了该参数, 客户端在请求 Consul 时会在 查询参数中携带 "?token" 参数。如没有提供,那么签名为空,会使用对应的 '匿名' ACL 策略。
dc
-
数据中心名称。 如果设置了该参数, 客户端在请求 Consul 时会在 查询参数中携带 "?dc" 参数。 如果没有提供,那么会默认查询当前的 Consul agent 的数据中心。
ConsulClient 的选项类继承了 vertx-web-client
模块的 WebClientOptions 类,
因此还可以提供很多设置。 请参阅相关文档。
使用接口
客户端接口参见 ConsulClient
。 接口的格式与 Consul 的
HTTP 接口非常相像,Consul 的 HTTP 接口参见 Consul API 文档
阻塞查询
Consul 的某些接口支持名为 "阻塞查询" 的特性。 阻塞查询基于长轮询以查询结果的潜在 变化。 每个支持阻塞查询的接口都提供了一个唯一标识 (index),以代表 所请求资源的当前状态。 以下参数配置用于阻塞查询:
index
-
该值用于表示 Consul 客户端期望看到的所请求资源的 index 的变化。
wait
-
该值用于表示阻塞查询的最大时间。 该值最大为 10 分钟。
BlockingQueryOptions opts = new BlockingQueryOptions()
.setIndex(lastIndex)
.setWait("1m");
要注意的是,阻塞查询 不能严格保证 所返回的数据一定是发生了变化的。 很有可能的情况是,阻塞查询的返回只是因为超时时间到了或发生了并没有改变数据内容的写入。
键值对存储
键值对接口用于访问 Consul 内部的简单键值对存储,这对保存服务的配置或其他元数据很有用。 以下是可以使用的接口:
-
管理单个键值对的更新,删除和获取,以及根据键的前缀匹配的同样操作
-
管理通过单个原子性的事务操作来获取和更新键值对的操作
从存储中获取键值对
Consul 客户端可以返回一个键的值
consulClient.getValue("key", res -> {
if (res.succeeded()) {
System.out.println("retrieved value: " + res.result().getValue());
System.out.println("modify index: " + res.result().getModifyIndex());
} else {
res.cause().printStackTrace();
}
});
…或者返回所有匹配某一给定前缀的键的值
consulClient.getValues("prefix", res -> {
if (res.succeeded()) {
System.out.println("modify index: " + res.result().getIndex());
for (KeyValue kv : res.result().getList()) {
System.out.println("retrieved value: " + kv.getValue());
}
} else {
res.cause().printStackTrace();
}
});
返回的键值对数据包括以下字段(参考 文档):
createIndex
-
内部的创建索引,表示该键值对何时被创建的。
modifyIndex
-
更新索引,该键值对最后一次被修改时的索引
lockIndex
-
该键被成功获取锁的次数
key
-
键
flags
-
该键值对的标志位。 客户端可以选择使用标识位 来表示应用业务中的一些信息
value
-
值
session
-
拥有锁的会话
更新索引可以用于阻塞查询:
BlockingQueryOptions opts = new BlockingQueryOptions()
.setIndex(modifyIndex)
.setWait("1m");
consulClient.getValueWithOptions("key", opts, res -> {
if (res.succeeded()) {
System.out.println("retrieved value: " + res.result().getValue());
System.out.println("new modify index: " + res.result().getModifyIndex());
} else {
res.cause().printStackTrace();
}
});
保存键值对
consulClient.putValue("key", "value", res -> {
if (res.succeeded()) {
String opResult = res.result() ? "success" : "fail";
System.out.println("result of the operation: " + opResult);
} else {
res.cause().printStackTrace();
}
});
保存请求也可以接受参数
KeyValueOptions opts = new KeyValueOptions()
.setFlags(42)
.setCasIndex(modifyIndex)
.setAcquireSession("acquireSessionID")
.setReleaseSession("releaseSessionID");
consulClient.putValueWithOptions("key", "value", opts, res -> {
if (res.succeeded()) {
String opResult = res.result() ? "success" : "fail";
System.out.println("result of the operation: " + opResult);
} else {
res.cause().printStackTrace();
}
});
以下是 PUT
请求可以接收的参数:
flags
-
可以指定一个无符号数,范围是
0
到264-1
。 客户端可以选择使用标识位来表示应用业务中的一些信息。 casIndex
-
该标志位用于将 PUT 操作转为一次检查设置(Check-And-Set)操作。 如果你需要构建一个 拥有非常复杂同步原语的操作的话这回非常有用。 如果该索引值为
0
, Consul 只会在键不存在的情况下 设置该键值对。 如果该索引不为 0, 该键值对只会在该索引与更新索引相匹配的时候被设置。 acquireSession
-
该标志位用于将 PUT 操作转为一次需要获取锁的操作。 这个参数可以让 Consul 在顶层发生领导选举。 如果锁未被获取并且会话有效, 那么该操作会增加锁索引(LockIndex) 并设置该键值对所属的会话以更新该键的值。 获取锁时键不一定存在。 如果锁已经被设置的会话持有, 那么锁索引 不会增加,但是键的内容还是会更新。 这是因为该会话当前已经拥有锁,可以直接更新键的内容, 而不用再释放锁并重新获取。
releaseSession
-
该标志位用于将 PUT 操作转换为一次释放锁的操作。 跟
acquireSession
一起搭配 使用的话会很有用,这样可以让客户端生成一个锁。 该操作不会修改锁索引,但会释放 与该键相关联的会话。 前提是该键的锁正在被该会话持有。
事务
当连接到 Consul 的 0.7 或更高的版本时, 客户端可以使用单次原子性的事务操作 来管理对多个键的更新和查询。 事务操作目前只支持键值对, 其他类型的 事务操作可能会在以后的版本中支持。 (参见 文档).
TxnRequest request = new TxnRequest()
.addOperation(new TxnKVOperation().setKey("key1").setValue("value1").setType(TxnKVVerb.SET))
.addOperation(new TxnKVOperation().setKey("key2").setValue("value2").setType(TxnKVVerb.SET));
consulClient.transaction(request, res -> {
if (res.succeeded()) {
System.out.println("succeeded results: " + res.result().getResults().size());
System.out.println("errors: " + res.result().getErrors().size());
} else {
res.cause().printStackTrace();
}
});
删除键值对
最后, Consul 允许你从存储中删除键值对:
consulClient.deleteValue("key", res -> {
if (res.succeeded()) {
System.out.println("complete");
} else {
res.cause().printStackTrace();
}
});
…或者删除所有符合该前缀的键值对
consulClient.deleteValues("prefix", res -> {
if (res.succeeded()) {
System.out.println("complete");
} else {
res.cause().printStackTrace();
}
});
服务
服务发现的主要目的之一是提供可用的服务清单。 为了实现该目标, agent 提供了一个简单的服务声明格式以便说明当前服务是否可用 并将它与健康检查潜在地关联起来。
服务注册
服务的声明格式必须包含一个 name
,还可以选择设置 id
, tags
, address
, port
, 和 checks
等属性
ServiceOptions opts = new ServiceOptions()
.setName("serviceName")
.setId("serviceId")
.setTags(Arrays.asList("tag1", "tag2"))
.setCheckOptions(new CheckOptions().setTtl("10s"))
.setAddress("10.0.0.1")
.setPort(8048);
name
-
服务的名称
id
-
如果没有设置
name
则会使用id
替代。 节点的每个服务都需要使用唯一的 ID, 如果name
发生了冲突,那么会使用id
替换。 tags
-
一组数据,对 Consul 来说没有意义,但是用于区分服务是不是主要节点或是次要节点 不同的版本,或者服务的其他不同等级标记。
address
-
用于区分服务的 IP 地址。 默认情况下,使用 agent 的地址, 不需要手动设置
port
-
也是用于简要配置面向服务的结构; 这样, 通过配置的地址和端口可以发现一个服务
checks
-
关联的健康检查
这些设置用于将服务注册到清单中:
consulClient.registerService(opts, res -> {
if (res.succeeded()) {
System.out.println("Service successfully registered");
} else {
res.cause().printStackTrace();
}
});
服务发现
Consul 客户端可以当前可以提供服务的节点列表:
consulClient.catalogServiceNodes("serviceName", res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " services");
System.out.println("consul state index: " + res.result().getIndex());
for (Service service : res.result().getList()) {
System.out.println("Service node: " + service.getNode());
System.out.println("Service address: " + service.getAddress());
System.out.println("Service port: " + service.getPort());
}
} else {
res.cause().printStackTrace();
}
});
获取节点列表时还可以同时获取相关联的健康检查的状态。 可以使用健康检查的状态过滤结果。
consulClient.healthServiceNodes("serviceName", passingOnly, res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " services");
System.out.println("consul state index: " + res.result().getIndex());
for (ServiceEntry entry : res.result().getList()) {
System.out.println("Service node: " + entry.getNode());
System.out.println("Service address: " + entry.getService().getAddress());
System.out.println("Service port: " + entry.getService().getPort());
}
} else {
res.cause().printStackTrace();
}
});
查询服务时还可以设置以下参数:
ServiceQueryOptions queryOpts = new ServiceQueryOptions()
.setTag("tag1")
.setNear("_agent")
.setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
tag
-
默认情况下,会返回所有匹配服务的节点。 你可以指定
tag
查询参数以根据服务的标签过滤返回结果 near
-
添加可选的
near
参数为一个节点名,那么返回结果会根据 到该节点的 rtt(round trip time,即往返时间)升序排序。 如果设置near
=_agent
那么会使用当前节点作为排序的依据。 blockingOptions
-
阻塞查询参数
请求如下所示:
consulClient.healthServiceNodesWithOptions("serviceName", passingOnly, queryOpts, res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " services");
} else {
res.cause().printStackTrace();
}
});
健康检查
agent 的主要角色之一就是系统层面和应用层面上的健康检查管理。 如果健康检查与服务关联,那么该健康检查被认为是应用层面上的。 反之,则健康检查监控的是整个节点的健康状态。
CheckOptions opts = new CheckOptions()
.setTcp("localhost:4848")
.setInterval("1s");
以下是 Consul 客户端支持的健康检查选项:
id
-
健康检查 ID
name
-
健康检查名称
script
-
健康检查脚本的路径,同时你应该设置检查的间隔
http
-
健康检查的 HTTP 地址,同时你应该设置检查的间隔
ttl
-
健康检查的 ttl(time to live,即生存时间)
tcp
-
健康检查的 tcp 地址,同时你应该设置检查的间隔
interval
-
健康检查的时间间隔,使用 Go 语言的时间格式,是一个十进制的数组字符串, 可以携带小数和单位后缀,例如 "300ms", "-1.5h" 或 "2h45m" 有效的时间单位是 "ns", "us" (或 "µs"), "ms", "s", "m", "h"
notes
-
健康检查的备注
serviceId
-
服务的 ID,用于将当前注册的健康检查与 agent 提供的已存在的服务相关联
deregisterAfter
-
取消注册超时时间。 该设置是可选的, 它的时间格式与 Interval 和 TTL 相同。 如果健康检查与一个服务相关联并且服务的状态处于不可用的时间超过该值, 那么该服务 (以及所有该服务相关联的服务) 将会自动取消注册 该值最小为 1 分钟, 并且用于解除服务的进程每 30 秒运行一次。 因此实际运用中,Consul 可能会花费比该设置长一点的时间将服务取消注册。 该超时时间应该配置得远大于该服务 重启或恢复所需要得时间。
status
-
健康检查的初始状态值
Name
字段是必须的,而 Script
, HTTP
, TCP
或 TTL
只需一个就可以。 Script
, TCP
和 HTTP
需要设置 Interval
选项。 如果没有提供 ID
, 会用 Name
的值替代。
每个节点上不能有重复的 ID, 因此设置一个 ID 是很有必要的。
consulClient.registerCheck(opts, res -> {
if (res.succeeded()) {
System.out.println("check successfully registered");
} else {
res.cause().printStackTrace();
}
});
事件
Consul 提供了事件机制以便让用户触发自定义的事件给整个数据中心(datacenter)。 这些事件对 Consul 是没有特殊意义的, 但是它们可以用于构建自动发布的脚本系统, 重启服务,或处理其他一些编排好的动作。
发送用户事件时只需填写事件名称:
consulClient.fireEvent("eventName", res -> {
if (res.succeeded()) {
System.out.println("Event sent");
System.out.println("id: " + res.result().getId());
} else {
res.cause().printStackTrace();
}
});
也可以携带一些额外的参数
node
-
一个正则表达式,用于过滤需要接收事件的节点
service
-
一个正则表达式,用于过滤需要接收事件的服务
tag
-
一个正则表达式,用于根据标签过滤
payload
-
事件可以拥有一个消息体 对 Consul 来说消息体的内容也是透明的,它会成为事件的内容。
EventOptions opts = new EventOptions()
.setTag("tag")
.setPayload("message");
consulClient.fireEventWithOptions("eventName", opts, res -> {
if (res.succeeded()) {
System.out.println("Event sent");
System.out.println("id: " + res.result().getId());
} else {
res.cause().printStackTrace();
}
});
Consul 客户端支持查询 agent 最近收到的事件。 Consul 的事件使用 gossip 协议广播, 因此事件之间没有顺序关系,也并不保证送达。 Agent 只会缓存 最近的事件。 目前的缓存大小是 256, 但该值在未来也可能会发生变化。
consulClient.listEvents(res -> {
if (res.succeeded()) {
System.out.println("Consul index: " + res.result().getIndex());
for(Event event: res.result().getList()) {
System.out.println("Event id: " + event.getId());
System.out.println("Event name: " + event.getName());
System.out.println("Event payload: " + event.getPayload());
}
} else {
res.cause().printStackTrace();
}
});
Consul 的索引也可以用于预备查询请求:
EventListOptions opts = new EventListOptions()
.setName("eventName")
.setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
consulClient.listEventsWithOptions(opts, res -> {
if (res.succeeded()) {
System.out.println("Consul index: " + res.result().getIndex());
for(Event event: res.result().getList()) {
System.out.println("Event id: " + event.getId());
}
} else {
res.cause().printStackTrace();
}
});
会话
Consul 提供了会话机制,可以用于构建分布式锁。 会话表现为一个中间层,绑定了节点,健康检查和键值对存储的资源。 当会话创建时, 需要提供节点名称,健康检查列表,一个动作,TTL,和 一个延迟锁。
SessionOptions opts = new SessionOptions()
.setNode("nodeId")
.setBehavior(SessionBehavior.RELEASE);
lockDelay
-
是一个表示时间的字符串,使用 's' 作为后缀,代表以秒为单位。默认的值是 '15s'
name
-
用于给会话设置一个有意义的易于理解的名称
node
-
如果要设置的话,必须是一个已经注册的节点。 默认情况下, 会使用当前 agent 的节点
checks
-
用于提供相关联的健康检查的列表。 这里强烈建议如果你设置了该参数, 健康检查列表中需要包含默认的
serfHealth
。 behavior
-
可以被设置为
release
或delete
。 该设置控制会话到期时的行为。 默认情况下设置为release
, 这样该会话占有的锁会被释放。 如果将其改为delete
那么 该会话占有的锁会被删除。如果你要创建临时的键值对存储那么使用delete
会很有用 ttl
-
一个时间字符串, 格式与
LockDelay
一样,也使用 s 作为后缀。 该设置需要 在 10s 和 86400s 之间。 如果提供该设置,那么如果会话在 TTL 时间到期前仍没有刷新的话会失效。
更多信息请参考 Consul Sessions internals
新创建的会话会拥有一个命名 ID, 可以用于区分不同的会话。 这个 ID 可以和键值对一起使用以尝试获取锁,这个锁用通知机制实现了互斥。
consulClient.createSessionWithOptions(opts, res -> {
if (res.succeeded()) {
System.out.println("Session successfully created");
System.out.println("id: " + res.result());
} else {
res.cause().printStackTrace();
}
});
你也可以销毁会话
consulClient.destroySession(sessionId, res -> {
if (res.succeeded()) {
System.out.println("Session successfully destroyed");
} else {
res.cause().printStackTrace();
}
});
获取属于某个节点的会话列表
consulClient.listNodeSessions("nodeId", res -> {
if (res.succeeded()) {
for(Session session: res.result().getList()) {
System.out.println("Session id: " + session.getId());
System.out.println("Session node: " + session.getNode());
System.out.println("Session create index: " + session.getCreateIndex());
}
} else {
res.cause().printStackTrace();
}
});
所有读取会话的接口支持阻塞查询和全一致模式。
BlockingQueryOptions blockingOpts = new BlockingQueryOptions()
.setIndex(lastIndex);
consulClient.listSessionsWithOptions(blockingOpts, res -> {
if (res.succeeded()) {
System.out.println("Found " + res.result().getList().size() + " sessions");
} else {
res.cause().printStackTrace();
}
});
数据中心下的节点
consulClient.catalogNodes(res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " nodes");
System.out.println("consul state index " + res.result().getIndex());
} else {
res.cause().printStackTrace();
}
});
该接口支持阻塞查询,并根据与指定节点的距离排序
NodeQueryOptions opts = new NodeQueryOptions()
.setNear("_agent")
.setBlockingOptions(new BlockingQueryOptions().setIndex(lastIndex));
consulClient.catalogNodesWithOptions(opts, res -> {
if (res.succeeded()) {
System.out.println("found " + res.result().getList().size() + " nodes");
} else {
res.cause().printStackTrace();
}
});
预备查询(Prepared Query)
以下接口用于创建,更新,销毁和执行预备查询。 预备查询可以让你注册一个复杂的服务查询并在之后通过该预备查询的 ID 或名字执行, 并获取可以提供服务的健康节点的集合。 当与 Consul 的 DNS 接口一起 使用时会很有帮助,因为这样可以使用更复杂的查询,而不是被 DNS 接口 的查询接口要求所限制。
创建一条阻塞查询语句有很多参数 所有的细节请参考 文档
dc
-
设置要查询的数据中心的名称。 默认情况下该参数为被查询的 agent 所属的数据中心。 它会作为查询参数一部分存在于 URL 中
name
-
一个可选的昵称, 你可以使用该昵称而不是 ID 来指定要使用的预备查询。
session
-
指定一个现有会话的 ID,这样如果该会话过期后,对应的预备查询语句也会被删除,否则你需要手动删除不再使用的预备查询语句。
token
-
指定一个 ACL 签名,每次执行查询都会使用该签名。 预备查询在执行时可以没有 ACL 签名, 因此你需要谨慎使用。 签名本身只对管理签名的客户端可见。 如果签名没有设置或为空字符串,那么会使用客户端的 ACL 签名来决定是否有权限来查询对应服务。 如果客户端无法提供 ACL 签名, 那么会使用一个匿名签名。
service
-
指定要查询的服务名称,这个参数是必填的。
failover
-
该参数包括两个字段,都是可选的。 该参数用于指定执行查询时如果本地的数据中心没有处于健康状态下的服务时的行为。通过简单的配置你可以使用其他数据中心上的节点。
nearestN
-
该选项用于设置预备请求会被转发至其余最近的 NearestN 个数据中心。 使用 WAN gossip pool 中数据中心之间的网络 rtt(round trip time,即往返时间)构成的网路坐标来判断距离。 远程的数据中心上的服务器处理请求的 rtt 的中位数会作为判断权重的依据。
datacenters
-
指定要转发的远程数据中心列表。 如果本地没有健康节点可用的话,会转发请求到这些数据中心上。 Consul 会按照列表中的顺序查询数据中心。 如果该选项与 NearestN 一起使用, 那么会首先依据 NearestN 设置的规则, 其次才是 Datacenters 配置的规则。 在失效转移请求的过程中一个数据中心只会使用一次, 即使它都满足 NearestN 和 Datacenters 配置的条件。
onlyPassing
-
指定该查询的健康检查状态的条件。 如果设置为 false, 那么即使服务处于警告状态也会返回。 如果设置为 true, 则只会返回健康检查成功的服务。
tags
-
设置服务标签列表, 查询结果会根据参数列表过滤。 如果设置了该参数,那么只有服务拥有列表中所有的标签才会满足条件, 并且不包含禁止标签(前缀带有 ! 的为禁止标签)。
nodeMeta
-
设置一个自定义的用户键值对列表,可以用于查询节点的元数据过滤返回结果。
dnsTtl
-
如果查询结果是通过 DNS 服务的话,设置 TTL 的范围。 如果提供了该设置, 那么该设置相比其他设置项是最优先的。
templateType
-
查询类型,用于
名称前缀匹配
。 这意味着任何查询,如果它的名称的前缀都与该字段匹配,那么则属于该模板。在以下示例中,任何对 geo-db 的查询都与该查询相匹配。 查询模板使用最长前缀匹配,因此可以使用高级模板,这些高级模板为了一些服务被重写。 静态查询会被首先解析,因此也可以重写查询模板。 templateRegexp
-
一个可选的正则表达式,用于从整个名称字段中匹配提取对应的变量。 如果选择了该模板,在以下示例中正则表达式提取了 "-" 符号后的第一个变量作为数据库的名称而之后的变量均作为标签数据。 参考 RE2 文档以获取正则表达式的更多信息。
PreparedQueryDefinition def = new PreparedQueryDefinition()
.setName("Query name")
.setService("service-${match(1)}-${match(2)}")
.setDcs(Arrays.asList("dc1", "dc42"))
.setTemplateType("name_prefix_match")
.setTemplateRegexp("^find_(.+?)_(.+?)$");
如果预备查询被成功创建,那么会返回它的 ID
consulClient.createPreparedQuery(def, res -> {
if (res.succeeded()) {
String queryId = res.result();
System.out.println("Query created: " + queryId);
} else {
res.cause().printStackTrace();
}
});
通过指定预备查询的 ID,可以执行该查询命令
consulClient.executePreparedQuery(id, res -> {
if (res.succeeded()) {
PreparedQueryExecuteResponse response = res.result();
System.out.println("Found " + response.getNodes().size() + " nodes");
} else {
res.cause().printStackTrace();
}
});
或者使用正则匹配来指定要执行的预备查询
consulClient.executePreparedQuery("find_1_2", res -> {
// matches template regexp "^find_(.+?)_(.+?)$"
if (res.succeeded()) {
PreparedQueryExecuteResponse response = res.result();
System.out.println("Found " + response.getNodes().size() + " nodes");
} else {
res.cause().printStackTrace();
}
});
最后, ConsulClient
允许你修改,获取和删除预备查询
consulClient.deletePreparedQuery(query, res -> {
if (res.succeeded()) {
System.out.println("Query deleted");
} else {
res.cause().printStackTrace();
}
});
监视
监视是一种指定一个数据(例如节点列表, 键值对数据, 健康检查)的视图的方法,
它会监视数据的更新。 当发现数据更新后,一个参数为 WatchResult
的 Handler
会被调用。
以下示例中, 你可以监视健康检查的状态并当状态不正常时收到通知。
Watch.key("foo/bar", vertx)
.setHandler(res -> {
if (res.succeeded()) {
System.out.println("value: " + res.nextResult().getValue());
} else {
res.cause().printStackTrace();
}
})
.start();