博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Thrift 基于zookeeper改造模式
阅读量:7012 次
发布时间:2019-06-28

本文共 29124 字,大约阅读时间需要 97 分钟。

对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法。

4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

5.其他的改造如:

1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

2)Thrift通过两种方式调用服务Client和Iface 

// *) Client API 调用  (EchoService.Client)client.echo("hello lilei");  ---(1)  // *) Service 接口 调用  (EchoService.Iface)service.echo("hello lilei");  ---(2)  Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

下面我们来一一实现:

一、pom.xml引入依赖jar包

 

org.apache.thrift
libthrift
0.9.2
commons-pool
commons-pool
1.6
org.springframework
spring-context
4.0.9.RELEASE
org.apache.zookeeper
zookeeper
3.4.6
org.apache.curator
curator-recipes
2.7.1
View Code

二、使用zookeeper管理服务节点配置

RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

注: 该图源自dubbo的官网

这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构: 

每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:  1). PERSISTENT: 永久节点  2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失  3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的  4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的  注: 临时节点不能成为父节点  Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:  1). KeeperState: Disconnected,SyncConnected,Expired  2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChangedRPC服务端:  作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.  1). namespace: 命名空间,来区分不同应用   2). service: 服务接口, 采用发布方的类全名来表示  3). version: 版本号  借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.   *) 数据模型的设计  具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点  RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

1.定义Zookeeper的客户端的管理

ZookeeperFactory.Java

package cn.slimsmart.thrift.rpc.zookeeper;    import org.apache.curator.framework.CuratorFramework;  import org.apache.curator.framework.CuratorFrameworkFactory;  import org.apache.curator.retry.ExponentialBackoffRetry;  import org.springframework.beans.factory.FactoryBean;  import org.springframework.util.StringUtils;    /**  * 获取zookeeper客户端链接  */  public class ZookeeperFactory implements FactoryBean
{ private String zkHosts; // session超时 private int sessionTimeout = 30000; private int connectionTimeout = 30000; // 共享一个zk链接 private boolean singleton = true; // 全局path前缀,常用来区分不同的应用 private String namespace; private final static String ROOT = "rpc"; private CuratorFramework zkClient; public void setZkHosts(String zkHosts) { this.zkHosts = zkHosts; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setSingleton(boolean singleton) { this.singleton = singleton; } public void setNamespace(String namespace) { this.namespace = namespace; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public CuratorFramework getObject() throws Exception { if (singleton) { if (zkClient == null) { zkClient = create(); zkClient.start(); } return zkClient; } return create(); } @Override public Class
getObjectType() { return CuratorFramework.class; } @Override public boolean isSingleton() { return singleton; } public CuratorFramework create() throws Exception { if (StringUtils.isEmpty(namespace)) { namespace = ROOT; } else { namespace = ROOT +"/"+ namespace; } return create(zkHosts, sessionTimeout, connectionTimeout, namespace); } public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000) .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .defaultData(null).build(); } public void close() { if (zkClient != null) { zkClient.close(); } } }
View Code

2.服务端注册服务

由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

ThriftServerIpResolve.java

package cn.slimsmart.thrift.rpc.zookeeper;    /**  *   * 解析thrift-server端IP地址,用于注册服务  * 1) 可以从一个物理机器或者虚机的特殊文件中解析  * 2) 可以获取指定网卡序号的Ip  * 3) 其他  */  public interface ThriftServerIpResolve {            String getServerIp() throws Exception;            void reset();            //当IP变更时,将会调用reset方法      static interface IpRestCalllBack{          public void rest(String newIp);      }  }
View Code

可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp

ThriftServerIpLocalNetworkResolve.java

package cn.slimsmart.thrift.rpc.zookeeper;    import java.net.Inet6Address;  import java.net.InetAddress;  import java.net.NetworkInterface;  import java.net.SocketException;  import java.util.Enumeration;    import org.slf4j.Logger;  import org.slf4j.LoggerFactory;    /**  * 解析网卡Ip  *  */  public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {            private Logger logger = LoggerFactory.getLogger(getClass());        //缓存      private String serverIp;            public void setServerIp(String serverIp) {          this.serverIp = serverIp;      }        @Override      public String getServerIp() {          if (serverIp != null) {              return serverIp;          }          // 一个主机有多个网络接口          try {              Enumeration
netInterfaces = NetworkInterface.getNetworkInterfaces(); while (netInterfaces.hasMoreElements()) { NetworkInterface netInterface = netInterfaces.nextElement(); // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 . Enumeration
addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress address = addresses.nextElement(); if(address instanceof Inet6Address){ continue; } if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) { serverIp = address.getHostAddress(); logger.info("resolve server ip :"+ serverIp); continue; } } } } catch (SocketException e) { e.printStackTrace(); } return serverIp; } @Override public void reset() { serverIp = null; } }
View Code

接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。

ThriftServerAddressRegister.java 

package cn.slimsmart.thrift.rpc.zookeeper;    /**  * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器  */  public interface ThriftServerAddressRegister {      /**      * 发布服务接口      * @param service 服务接口名称,一个产品中不能重复      * @param version 服务接口的版本号,默认1.0.0      * @param address 服务发布的地址和端口      */      void register(String service,String version,String address);  }
View Code

实现:ThriftServerAddressRegisterZookeeper.java

package cn.slimsmart.thrift.rpc.zookeeper;    import java.io.UnsupportedEncodingException;    import org.apache.curator.framework.CuratorFramework;  import org.apache.curator.framework.imps.CuratorFrameworkState;  import org.apache.zookeeper.CreateMode;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.util.StringUtils;    import cn.slimsmart.thrift.rpc.ThriftException;    /**  *  注册服务列表到Zookeeper  */  public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{            private Logger logger = LoggerFactory.getLogger(getClass());            private CuratorFramework zkClient;            public ThriftServerAddressRegisterZookeeper(){}            public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){          this.zkClient = zkClient;      }        public void setZkClient(CuratorFramework zkClient) {          this.zkClient = zkClient;      }        @Override      public void register(String service, String version, String address) {          if(zkClient.getState() == CuratorFrameworkState.LATENT){              zkClient.start();          }          if(StringUtils.isEmpty(version)){              version="1.0.0";          }          //临时节点          try {              zkClient.create()                  .creatingParentsIfNeeded()                  .withMode(CreateMode.EPHEMERAL)                  .forPath("/"+service+"/"+version+"/"+address);          } catch (UnsupportedEncodingException e) {              logger.error("register service address to zookeeper exception:{}",e);              throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);          } catch (Exception e) {              logger.error("register service address to zookeeper exception:{}",e);              throw new ThriftException("register service address to zookeeper exception:{}", e);          }      }            public void close(){          zkClient.close();      }  }
View Code

3.客户端发现服务

定义获取服务地址接口

ThriftServerAddressProvider.java

package cn.slimsmart.thrift.rpc.zookeeper;    import java.net.InetSocketAddress;  import java.util.List;    /**  * thrift server-service地址提供者,以便构建客户端连接池  */  public interface ThriftServerAddressProvider {            //获取服务名称      String getService();        /**      * 获取所有服务端地址      * @return      */      List
findServerAddressList(); /** * 选取一个合适的address,可以随机获取等' * 内部可以使用合适的算法. * @return */ InetSocketAddress selector(); void close(); }
View Code

基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

package cn.slimsmart.thrift.rpc.zookeeper;    import java.net.InetSocketAddress;  import java.util.ArrayList;  import java.util.Collections;  import java.util.HashSet;  import java.util.LinkedList;  import java.util.List;  import java.util.Queue;  import java.util.Set;    import org.apache.curator.framework.CuratorFramework;  import org.apache.curator.framework.imps.CuratorFrameworkState;  import org.apache.curator.framework.recipes.cache.ChildData;  import org.apache.curator.framework.recipes.cache.PathChildrenCache;  import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  import org.slf4j.Logger;  import org.slf4j.LoggerFactory;  import org.springframework.beans.factory.InitializingBean;    /**  * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发  */  public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {        private Logger logger = LoggerFactory.getLogger(getClass());        // 注册服务      private String service;      // 服务版本号      private String version = "1.0.0";        private PathChildrenCache cachedPath;        private CuratorFramework zkClient;        // 用来保存当前provider所接触过的地址记录      // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"      private Set
trace = new HashSet
(); private final List
container = new ArrayList
(); private Queue
inner = new LinkedList
(); private Object lock = new Object(); // 默认权重 private static final Integer DEFAULT_WEIGHT = 1; public void setService(String service) { this.service = service; } public void setVersion(String version) { this.version = version; } public ThriftServerAddressProviderZookeeper() { } public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) { this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void afterPropertiesSet() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); } private String getServicePath(){ return "/" + service + "/" + version; } private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: logger.info("Connection is reconection."); break; case CONNECTION_SUSPENDED: logger.info("Connection is suspended."); break; case CONNECTION_LOST: logger.warn("Connection error,waiting..."); return; default: // } // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法. cachedPath.rebuild(); rebuild(); } protected void rebuild() throws Exception { List
children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { // 有可能所有的thrift server都与zookeeper断开了链接 // 但是,有可能,thrift client与thrift server之间的网络是良好的 // 因此此处是否需要清空container,是需要多方面考虑的. container.clear(); logger.error("thrift server-cluster error...."); return; } List
current = new ArrayList
(); String path = null; for (ChildData data : children) { path = data.getPath(); logger.debug("get path:"+path); path = path.substring(getServicePath().length()+1); logger.debug("get serviceAddress:"+path); String address = new String(path.getBytes(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List
transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List
result = new ArrayList
(); // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载 for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } @Override public List
findServerAddressList() { return Collections.unmodifiableList(container); } @Override public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if (!container.isEmpty()) { inner.addAll(container); } else if (!trace.isEmpty()) { synchronized (lock) { for (String hostname : trace) { container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll(); } @Override public void close() { try { cachedPath.close(); zkClient.close(); } catch (Exception e) { } } @Override public String getService() { return service; } }
View Code

对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java

三、服务端服务注册实现

ThriftServiceServerFactory.java

package cn.slimsmart.thrift.rpc;    import java.lang.instrument.IllegalClassFormatException;  import java.lang.reflect.Constructor;    import org.apache.thrift.TProcessor;  import org.apache.thrift.TProcessorFactory;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.server.TServer;  import org.apache.thrift.server.TThreadedSelectorServer;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TNonblockingServerSocket;  import org.springframework.beans.factory.InitializingBean;  import org.springframework.util.StringUtils;    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;  import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;  import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;    /**  * 服务端注册服务工厂  */  public class ThriftServiceServerFactory implements InitializingBean {      // 服务注册本机端口      private Integer port = 8299;      // 优先级      private Integer weight = 1;// default      // 服务实现类      private Object service;// serice实现类      //服务版本号      private String version;      // 解析本机IP      private ThriftServerIpResolve thriftServerIpResolve;      //服务注册      private ThriftServerAddressRegister thriftServerAddressRegister;        private ServerThread serverThread;            public void setPort(Integer port) {          this.port = port;      }        public void setWeight(Integer weight) {          this.weight = weight;      }        public void setService(Object service) {          this.service = service;      }        public void setVersion(String version) {          this.version = version;      }        public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {          this.thriftServerIpResolve = thriftServerIpResolve;      }        public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {          this.thriftServerAddressRegister = thriftServerAddressRegister;      }        @Override      public void afterPropertiesSet() throws Exception {          if (thriftServerIpResolve == null) {              thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();          }          String serverIP = thriftServerIpResolve.getServerIp();          if (StringUtils.isEmpty(serverIP)) {              throw new ThriftException("cant find server ip...");          }            String hostname = serverIP + ":" + port + ":" + weight;          Class
serviceClass = service.getClass(); // 获取实现类接口 Class
[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class
clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class
pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor
constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } //需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 注册服务 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run(){ try{ //启动服务 server.serve(); }catch(Exception e){ // } } public void stopServer(){ server.stop(); } } public void close() { serverThread.stopServer(); } }
View Code

四、客户端获取服务代理及连接池实现

客户端连接池实现:ThriftClientPoolFactory.java

package cn.slimsmart.thrift.rpc;    import java.net.InetSocketAddress;    import org.apache.commons.pool.BasePoolableObjectFactory;  import org.apache.thrift.TServiceClient;  import org.apache.thrift.TServiceClientFactory;  import org.apache.thrift.protocol.TBinaryProtocol;  import org.apache.thrift.protocol.TProtocol;  import org.apache.thrift.transport.TFramedTransport;  import org.apache.thrift.transport.TSocket;  import org.apache.thrift.transport.TTransport;    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;    /**  * 连接池,thrift-client for spring  */  public class ThriftClientPoolFactory extends BasePoolableObjectFactory
{ private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory
clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory
clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory
clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } static interface PoolOperationCallBack { // 销毁client之前执行 void destroy(TServiceClient client); // 创建成功是执行 void make(TServiceClient client); } public void destroyObject(TServiceClient client) throws Exception { if (callback != null) { try { callback.destroy(client); } catch (Exception e) { // } } TTransport pin = client.getInputProtocol().getTransport(); pin.close(); } public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); return pin.isOpen(); } @Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { // } } return client; } }
View Code
 

客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

package cn.slimsmart.thrift.rpc;    import java.lang.reflect.InvocationHandler;  import java.lang.reflect.Method;  import java.lang.reflect.Proxy;    import org.apache.commons.pool.impl.GenericObjectPool;  import org.apache.thrift.TServiceClient;  import org.apache.thrift.TServiceClientFactory;  import org.springframework.beans.factory.FactoryBean;  import org.springframework.beans.factory.InitializingBean;    import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;    /**  * 客户端代理  */  @SuppressWarnings({ "unchecked", "rawtypes" })  public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {        private Integer maxActive = 32;// 最大活跃连接数        // ms,default 3 min,链接空闲时间      // -1,关闭空闲检测      private Integer idleTime = 180000;      private ThriftServerAddressProvider serverAddressProvider;        private Object proxyClient;      private Class
objectClass; private GenericObjectPool
pool; private PoolOperationCallBack callback = new PoolOperationCallBack() { @Override public void make(TServiceClient client) { System.out.println("create"); } @Override public void destroy(TServiceClient client) { System.out.println("destroy"); } }; public void setMaxActive(Integer maxActive) { this.maxActive = maxActive; } public void setIdleTime(Integer idleTime) { this.idleTime = idleTime; } public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) { this.serverAddressProvider = serverAddressProvider; } @Override public void afterPropertiesSet() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加载Iface接口 objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加载Client.Factory类 Class
> fi = (Class
>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory
clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); poolConfig.maxActive = maxActive; poolConfig.minIdle = 0; poolConfig.minEvictableIdleTimeMillis = idleTime; poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L; pool = new GenericObjectPool
(clientPool, poolConfig); proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TServiceClient client = pool.borrowObject(); try { return method.invoke(client, args); } catch (Exception e) { throw e; } finally { pool.returnObject(client); } } }); } @Override public Object getObject() throws Exception { return proxyClient; } @Override public Class
getObjectType() { return objectClass; } @Override public boolean isSingleton() { return true; } public void close() { if (serverAddressProvider != null) { serverAddressProvider.close(); } } }
View Code

下面我们看一下服务端和客户端的配置;

服务端spring-context-thrift-server.xml

View Code

客户端:spring-context-thrift-client.xml

View Code

运行服务端后,我们可以看见zookeeper注册了多个服务地址。 

 

转载地址:http://zsqtl.baihongyu.com/

你可能感兴趣的文章
使用maven编译Java项目 http://www.tuicool.com/articles/YfIfIrq
查看>>
【原创】JDK动态代理,此次之后,永生难忘。
查看>>
ubuntu下linux内核源码阅读工具和调试方法总结
查看>>
ArrayIndexOutOfBoundsException
查看>>
JAVA判断各种类型数据是否为空
查看>>
Vim for Rails developers: Lazy modern configuration
查看>>
2017阿里,百度,京东java面试+笔试大合集,这些面试题你都会吗?
查看>>
JavaScript数据结构03 - 队列
查看>>
笔记-OC运行时问题扩展
查看>>
PHPstorm 优化、设置与提速篇
查看>>
[iOS] 从 application delegate 引申三点
查看>>
深入理解Java虚拟机(一)
查看>>
实战Android 上推下拉——隐藏、显示ActionBar
查看>>
PowerShell 多线程测试IP端口
查看>>
使用SQL Server 2008 Extended Events SSMS Addin轻松管理XEvents
查看>>
Django-celery 安装及使用测试
查看>>
优秀UML制图开源工具--ArgoUML
查看>>
没有服务台,就没有ITSM
查看>>
加点自已内容的新内核下L7-FILTER的应用实例!
查看>>
QQ-weiyun(微云)-云储存
查看>>