SpringCloud客户端负载均衡——Ribbon-创新互联

  Ribbon——A ribbon is a long, narrow piece of cloth that you use for tying things together or as a decoration.

创新互联建站-专业网站定制、快速模板网站建设、高性价比南召网站开发、企业建站全套包干低至880元,成熟完善的模板库,直接使用。一站式南召网站制作公司更省心,省钱,快速模板网站建设找我们,业务覆盖南召地区。费用合理售后完善,十多年实体公司更值得信赖。

  Ribbon是一个工具类框架,不需要独立部署。

  负载均衡设备/负载均衡软件模块都会维护一个可用的服务清单,通过心跳检测来剔除故障节点,保证清单中都是可用节点。

  客户端负载均衡,由客户端节点维护要访问的服务清单,服务清单来自于注册中心。

  如前所示,使用客户端负载均衡调用分两步:

  1. 服务提供者注册到服务中心。

  2. 服务消费者通过标有@LoadBalanced注解的RestTemplate进行服务调用。

  在service-consumer服务中,通过调用RestTemplate的getForEntity方法,GET调用hello-service的/hello接口。

  RestTemplate

  GET

  RestTemplate有两类GET实现:getForEntity和getForObject。

  getForEntity()有三个重载实现,均返回ResponseEntity,

  // url为请求地址,responseType为响应体body的类型,uriVariables为url参数

  // uriVariables配合url中的占位符进行动态传参,如:

  // entity = getForEntity("http://user-serivce/user?name={1}", String.class, "John");,将John传给参数name

  public ResponseEntity getForEntity(String url, Class responseType, Object... uriVariables) throws RestClientException {

  RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); // new AcceptHeaderRequestCallback(responseType)

  ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); // new ResponseEntityResponseExtractor<>(responseType)

  return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));

  }

  // uriVariables为Map类型,key需要与url中的占位符对应,如:

  // params.put("myname", "John");

  // entity = getForEntity("http://user-service/user?name={myname}", String.class, params); 将key为myname对应的value——John传给name

  public ResponseEntity getForEntity(String url, Class responseType, Map uriVariables);

  // 使用URI对象代替url和uriVariables

  public ResponseEntity getForEntity(URI url, Class responseType);

  // 使用:

  ResponseEntity entity = restTemplate.getForEntity(url, String.class, "John");

  String body = entity.getBody();

  ResponseEntity entity = restTemplate.getForEntity(url, User.class, "John");

  User body = entity.getBody();

  getForObject()也有三个重载实现,传入execute方法的不是ResponseExtractor,而是HttpMessageConverterExtractor,返回的则是对象类型,三个重载和getForEntity的三个重载关系类似:

  public T getForObject(String url, Class responseType, Object... uriVariables) throws RestClientException {

  RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); // new AcceptHeaderRequestCallback(responseType)

  HttpMessageConverterExtractor responseExtractor = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

  return execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables);

  }

  public T getForObject(String url, Class responseType, Map uriVariables);

  public T getForObject(URI url, Class responseType);

  // 使用:

  String result = restTemplate.getForObject(url, String.class, "John");

  User user = restTempleate.getForObject(url, User.class, "John");

  // 使用getForObject可以省略从response中获取body的步骤

  POST

  RestTemplate有三类POST实现:postForEntity和postForObject,postForLocation。

  postForEntity()有三个重载实现,均返回ResponseEntity,

  // 相较于getForEntity,新增参数Object request,reqeust如果是HttpEntity对象,RestTemplate将其当作完整的http请求对象处理,request中包含了header和body的内容。如果request是普通对象,RestTemplate将其转换为HttpEntity来处理,request作为body。

  // if (request instanceof HttpEntity){this.requestEntity = (HttpEntity) request; }

  // else if (requestBody != null) { this.requestEntity = new HttpEntity<>(request); }

  // else { this.requestEntity = HttpEntity.EMPTY; }

  // 使用:

  // User user = new User("didi", 30);

  // entity = postForEntity("http://user-serivce/user", user, String.class, "John");,将John传给参数name

  public ResponseEntity postForEntity(String url, Object request, Class responseType, Object... uriVariables) throws RestClientException {

  RequestCallback requestCallback = httpEntityCallback(request, responseType); // new HttpEntityRequestCallback(requestBody, responseType)

  ResponseExtractor> responseExtractor = responseEntityExtractor(responseType); // new ResponseEntityResonseExtracor<>(responseType)

  return nonNull(execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables));

  }

  public ResponseEntity postForEntity(String url, Object request, Class responseType, Map uriVariables);

  public ResponseEntity postForEntity(URI url, Object request, Class responseType);

  postForObject()也有三个重载实现,传入execute方法的不是ResponseExtractor,而是HttpMessageConverterExtractor,返回的则是对象类型,三个重载和postForEntity的三个重载关系类似:

  public T postForObject(String url, Object request, Class responseType, Object... uriVariables) throws RestClientException {

  RequestCallback requestCallback = httpEntityCallback(request, responseType);

  HttpMessageConverterExtractor responseExtractor = new HttpMessageConverterExtractor<>(responseType, getMessageConverters(), logger);

  return execute(url, HttpMethod.POST, requestCallback, responseExtractor, uriVariables);

  }

  public T postForObject(String url, Object request, Class responseType, Map uriVariables);

  public T postForObject(URI url, Object request, Class responseType);

  postForLocation()用于发送post请求,返回新资源的URI,有三个重载实现,均返回URI对象,

  public URI postForLocation(String url, Object request, Object... uriVariables) throws RestClientException {

  RequestCallback requestCallback = httpEntityCallback(request); // new HttpEntityRequestCallback(request, null)

  HttpHeaders headers = execute(url, HttpMethod.POST, requestCallback, headersExtractor(), uriVariables); // new HeadersExtractor()

  return (headers != null ? headers.getLocation() : null);

  }

  public URI postForLocation(String url, Object request, Map uriVariables);

  public URI postForLocation(URI url, Object ruquest);

  execute

  RestTemplate中,不同的请求方式,最终会调用到execute的三个重载实现上来

  // ------------------------------------

  public T execute(String url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor, Object... uriVariables) {

  URI expanded = getUriTemplateHandler().expand(url, uriVariables);

  return doExecute(expanded, method, requestCallback, responseExtractor);

  }

  public T execute(String url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor, Map uriVariables);

  public T execute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor);

  execute()的三个重载实现,都会调用doExecute()方法,去执行请求

  protected T doExecute(URI url, HttpMethod method, RequestCallback requestCallback, ResponseExtractor responseExtractor) throws RestClientException {

  ClientHttpResponse response = null;

  ClientHttpRequest request = createRequest(url, method);

  if (requestCallback != null) { requestCallback.doWithRequest(request); }

  response = request.execute();// 此处执行前会被拦截

  handleResponse(url, method, response);

  return (responseExtractor != null ? responseExtractor.extractData(response) : null);

  }

  doExecute()方法接收的参数中,有RequestCallback和ResponseExtractor。

  RequestCallback

  AcceptHeaderRequestCallback implements RequestCallback // AcceptHeaderRequestCallback用于GET请求

  HttpEntityRequestCallback extends AcceptHeaderRequestCallback // HttpEntityRequestCallback 用于POST、PUT等请求

  ResponseEntity

  // ResponseEntity扩展自HttpEntity,增加了http的status(http请求状态码)

  package org.springframework.http;

  public class ResponseEntity extends HttpEntity {

  private final Object status; // status为int或HttpStatus类型

  // getter/setter...

  }

  // HttpEntity表示http的request或response的entity,包含headers(http请求的头信息)和body(http请求的请求体)

  package org.springframework.http;

  public class HttpEntity {

  private final HttpHeaders headers;

  private final T body;

  // getter/setter...

  }

  @LoadBalanced

  在服务消费者中,给RestTemplate添加了@LoadBalanced注解,根据注释,该注解用于标记RestTemplate使用LoadBalancerClient来配置,即客户端负载均衡器。

  // Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.

  public @interface LoadBalanced {}

  LoadBalancerClient

  客户端负载均衡器,具有如下能力:

  // Represents a client-side load balancer. 即客户端负载均衡器

  interface LoadBalancerClient extends ServiceInstanceChooser {

  // 使用serviceId服务执行request请求

  T execute(String serviceId, LoadBalancerRequest request) throws IOException;

  T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest request) throws IOException;

  // 将逻辑服务名http://myservice/path/to/service 替换为host:port的形式

  URI reconstructURI(ServiceInstance instance, URI original);

  }

  // 选择一个server用来发送请求的实现接口

  interface ServiceInstanceChooser {

  // 根据serviceId,从负载均衡器选择一个服务实例ServiceInstance

  ServiceInstance choose(String serviceId);

  }

  LoadBalancerClient有一个实现类RibbonLoadBalancerClient。

  在RestTemplate的doExecute()方法中,调用request.execute()之前,会被LoadBalancerInterceptor拦截。该拦截器中有一个LoadBalancerClient实例,此外该拦截器在LoadBalancerAutoConfiguration中被创建。而LoadBalancerAutoConfiguration有两个特殊的注解@ConditionalOnClass(RestTemplate.class)和@ConditionalOnBean(LoadBalancerClient.class),且注释明确说明LoadBalancerAutoConfiguration为Ribbon的自动化配置类。

  spring-cloud-commons的loadbalancer包中的配置类,以2.1.2为例

  LoadBalancerAutoConfiguration:创建LoadBalancerInterceptor、创建RestTemplateCustomizer(匿名内部类)、创建LoadBalancerRequestFactory、创建SmartInitializingSingleton(匿名内部类)

  AsyncLoadBalancerAutoConfiguration:针对AsyncRestTemplate做的类似配置

  spring-cloud-netflix-ribbon中的几个配置类,以2.1.2为例

  RibbonClientConfiguration:创建IClientConfig、创建IRule,创建IPing,创建ServerList,创建ServerListUpdater,创建ILoadBalancer(使用ZoneAwareLoadBalancer实现),创建ServerListFilter、创建RibbonLoadBalancerContext,创建RetryHandler,创建ServerIntrospector

  RibbonAutoConfiguration:创建HasFeatures,创建SpringClientFactory,创建LoadBalancerClient(使用RibbonLoadBalancerClient实现),创建LoadBalancedRetryFactory(使用RibbonLoadBalancedRetryFactory实现),创建PropertiesFactory,创建RibbonApplicationContextInitializer,创建RestTemplateCustomizer(使用匿名内部类),创建RibbonClientHttpRequestFactory

  RestCilentRibbonConfiguration:创建RestClient

  LoadBalancerAutoConfiguration

  Ribbon的自动化配置类:

  @Configuration

  @ConditionalOnClass(RestTemplate.class) // 需要RestTemplate类在classpath中

  @ConditionalOnBean(LoadBalancerClient.class) // 需要LoadBalancerClient的实现Bean在BeanFactory中

  @EnableConfigurationProperties(LoadBalancerRetryProperties.class)

  class LoadBalancerAutoConfiguration {

  @LoadBalanced

  @Autowired(required = false)

  // 工程中注册的RestTemplate的Bean会在此被加载

  private List restTemplates = Collections.emptyList();

  @Autowired(required = false)

  private List transformers = Collections.emptyList();

  // 创建SmartInitializingSingleton的Bean,负责用每个Customizer去修饰每个RestTemplate

  @Bean

  public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(

  ObjectProvider> restTemplateCustomizers) {

  return () -> restTemplateCustomizers.ifAvailable(customizers -> {

  for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {

  for (RestTemplateCustomizer customizer : customizers) {

  customizer.customize(restTemplate);

  }

  }

  });

  }

  @Configuration

  @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")

  static class LoadBalancerInterceptorConfig {

  // 创建拦截器Bean,入参为客户端负载均衡器

  @Bean

  public LoadBalancerInterceptor ribbonInterceptor(LoadBalancerClient loadBalancerClient, LoadBalancerRequestFactory requestFactory) {

  return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);

  }

  // 创建一个RestTemplateCustomizer的Bean,负责将负载均衡拦截器加到入参RestTemplate的拦截器列表中,添加方式为get、add、set

  @Bean

  @ConditionalOnMissingBean

  public RestTemplateCustomizer restTemplateCustomizer(final LoadBalancerInterceptor loadBalancerInterceptor) {

  return restTemplate -> {

  List list = new ArrayList<>(restTemplate.getInterceptors());

  list.add(loadBalancerInterceptor);

  restTemplate.setInterceptors(list);

  }

  }

  }

  }

  LoadBalancerInterceptor

  负载均衡拦截器,用于在请求最终执行前进行拦截,在拦截器的intercept()方法中,首先从request中获取服务名称serviceName,然后调用request工厂的createRequest()方法,创建一个负载均衡的request——LoadBalancerRequest实例,最后将其连同serviceName一起作为LoadBalancerClient的execute()方法的入参。

  class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {

  LoadBalancerClient loadBalancer;

  LoadBalancerRequestFactory requestFactory;

  public ClientHttpResponse intercept(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {

  // 此处调用request的getURI方法,

  final URI originalUri = request.getURI();

  String serviceName = originalUri.getHost();

  return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));

  }

  }

  LoadBalancerRequestFactory——创建LoadBalancerRequest的工厂

  public LoadBalancerRequest createRequest(HttpRequest request, byte[] body, ClientHttpRequestExecution execution) {

  // 返回LoadBalancerRequest的匿名内部类

  return instance -> {

  // 该LoadBalancerRequest的匿名内部类实现,先创建一个ServiceRequestWrapper的request,然后调用execution的execute方法执行请求

  HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);

  return execution.execute(serviceRequest, body);

  };

  }

  LoadBalancerRequest

  LoadBalancerRequest使用该接口的apply()的方法,为request添加处理动作

  interface LoadBalancerRequest {

  T apply(ServiceInstance instance);

  }

  ServiceRequestWrapper

  ServiceRequestWrapper继承自HttpRequestWrapper,HttpRequestWrapper对外提供了获取一个request的method、URI、headers、methodValue等信息的方法。

  ServiceRequestWrapper改写了默认的getURI()方法,使用客户端负载均衡器LoadBalancerClient的重构URI的方法,将入参request的URI进行重构,其具体实现在LoadBalancerClient的实现类RibbonLoadBalancerClient中。

  class ServiceRequestWrapper extends HttpRequestWrapper {

  private final ServiceInstance instance;

  private final LoadBalancerClient loadBalancer;

  public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance, LoadBalancerClient loadBalancer) {}

  @Override

  public URI getURI() {

  // 调用loadBalancer的reconstructURI方法,进行URI重构,改写成host:port的形式,具体实现在RibbonLoadBalancerClient中

  return this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());

  }

  }

  RibbonLoadBalancerClient

  RibbonLoadBalancerClient实现了LoadBalancerClient和ServiceInstanceChooser中的execute、reconstructURI、choose方法,完成了请求执行、URI重构和选择服务实例的任务,execute

  class RibbonLoadBalancerClient implements LoadBalancerClient {

  SpringClientFactory clientFactory;

  @Override

  public URI reconstructURI(ServiceInstance instance, URI original) {

  String serviceId = instance.getServiceId();

  RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);

  URI uri; Server server;

  if (instance instanceof RibbonServer) {

  server = ((RibbonServer) instance).getServer();

  uri = updateToSecureConnectionIfNeeded(original, ribbonServer);

  } else {

  server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());

  IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);

  ServerIntrospector serverIntrospector = serverIntrospector(serviceId);

  uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);

  }

  // 用server中的host、port等替换原始uri

  return context.reconstructURIWithServer(server, uri);

  }

  public ServiceInstance choose(String serviceId, Object hint) {

  // 先调用getLoadBalancer方法,根据serviceId,获取一个ILoadBalancer

  // 然后调用getServer(ILoadBalancer loadBalancer, Object hint),使用loadBalancer选择一个Server,hint默认为"default"

  Server server = getServer(getLoadBalancer(serviceId), hint);

  if (server == null) { return null; }

  // 用入参serviceId、选择的Server,构造一个RibbonServer

  return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));

  }

  public T execute(String serviceId, LoadBalancerRequest request, Object hint) {

  // 首先选择一个RibbonServer,该部分流程与choose相同

  Server server = getServer(getLoadBalancer(serviceId), hint);

  RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server));

  // 调用execute的重载实现,执行请求

  return execute(serviceId, ribbonServer, request);

  }

  public T execute(String serviceId, ServiceInstance serviceIntance, LoadBalancerRequest request) {

  Server server = null;

  if (serviceInstance instanceof RibbonServer) {

  server = ((RibbonServer) serviceInstance).getServer();

  }

  RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);

  RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);

  try { 无锡做人流手术多少钱 http://www.ytsg029.com/

  // 调用apply,向服务实例发起请求

  T returnVal = request.apply(serviceInstance);

  statsRecorder.recordStats(returnVal);

  return returnVal;

  } catch (IOException ex) {

  statsRecorder.recordStats(ex); throw ex;

  } catch (Exception ex) {

  statsRecorder.recordStats(ex); throw ex;

  }

  return null;

  }

  // Ribbon 实现了ServiceInstance接口,即服务实例接口

  public static class RibbonServer implements ServiceInstance {

  private final String serviceId;

  private final Server server;

  private final boolean secure;

  private Map metadata;

  // @Override方法

  }

  }

  ILoadBalancer

  在RibbonLoadBalancerClient的choose()和execute()方法中,都是通过调用ILoadBalancer的chooseServer()方法,来选择一个服务实例Server的,该ILoadBalancer接口是由Ribbon定义的。

  在ILoadBalancer接口中,定义了软件负载均衡器的操作:一个服务实例的集合、标记一个服务停止、选择服务

  package com.netflix.loadbalancer;

  interface ILoadBalancer {

  void addServers(List newServers); // 初始化、后续添加服务列表

  Server chooseServer(Object key); // 从负载均衡器选择一个服务实例

  void markServerDown(Server server); // 标记并通知某个服务实例已经停止

  List getReachableServers(); // up/reachable状态的服务实例,可以提供正常服务

  List getAllServers(); // 所有已知的服务实例,reachable/unreachable都包括

  }

  其中的Server,代表一个服务端节点,包含了一个服务的基本信息:host、port、scheme、id、zone、元数据等等。

  在RibbonLoadBalancerClient的choose()和execute()方法中,通过getLoadBalancer()方法,来根据serviceId获取ILoadBalancer的实例,然后将其包装成RibbonServer。

  配置类RibbonClientConfiguration创建ILoadBalancer时如果配置文件里有配置,则使用配置的实现,否则默认使用ZoneAwareLoadBalancer实现。

  ClientHttpRequestExecution

  RibbonLoadBalancerClient的execute()方法中,调用了入参LoadBalancerRequest的apply方法,execute()方法在LoadBlancerInterceptor的intercept方法中调用,并传入LoadBalancerRequestFactory.createRequest创建的LoadBalancerRequest实现,其实现中最终使用ClientHttpRequestExecution的execute方法执行请求。

  总结

  使用时,注册一个使用@LoadBalanced注解修饰的RestTemplate,在需要发起请求的地方调用RestTemplate的相应的请求方法,最终调用到其doExecute方法。

  @LoadBalanced注解关联了LoadBalancerClient。

  配置类LoadBalancerAutoConfiguration注册了如下Bean:

  SmartInitializingSingleton:遍历restTemplates、遍历customizers,并customizer.customize(restTemplate)

  LoadBalancerRequestFactory:使用LoadBalancerClient构造

  LoadBalancerInterceptor:使用LoadBalancerClient和LoadBalancerRequestFactory创建

  RestTemplateCustomizer:使用LoadBalancerInterceptor构造一个匿名类,将注册的LoadBalancerInterceptor添加进restTemplate的interceptors列表中

  拦截器LoadBalancerInterceptor的intercept方法从原始请求中获取URI,然后使用LoadBalancerClient的execute方法执行请求,接收两个参数:serviceName即host和请求工厂创建的request

  请求工厂LoadBalancerRequestFactory的createRequest方法,由原始请求创建一个LoadBalancerRequest的匿名实现

  负载均衡请求LoadBalancerRequest接口只有apply方法,其匿名实现创建HttpRequest的实现类ServcieRequestWrapper的实例,然后由ClientHttpRequestExecution的execute方法执行请求,返回响应ClientHttpResponse

  ServiceRequestWrapper重写了父类HttpRequestWrapper的getURI方法,返回LoadBalancerClient的reconstructURI方法重构的URI

  ClientHttpRequestExecution的实现类是InterceptingClientHttpRequest的内部类IntercpetingRequestExecution,其execute方法遍历interceptors,如果有拦截器,就执行拦截方法,如果没有了,就执行请求。

  在4中,LoadBalancerClient的execute方法执行请求,其实现类是RibbonLoadBalancerClient。execute 先通过serviceId获取ILoadBalancer,然后调用ILoadBalancer的chooseServer方法,选择一个Server,并将之转换成RibbonServer,RibbonServer是ServiceInstance的子类,最后调用LoadBalancerRequest的apply方法,执行请求,返回响应。


分享名称:SpringCloud客户端负载均衡——Ribbon-创新互联
分享URL:http://myzitong.com/article/cosico.html