From dca7c88c7b891fd668cb4ed8148bee51e00a2992 Mon Sep 17 00:00:00 2001 From: layou1989 <378394241@qq.com> Date: Fri, 26 May 2017 17:13:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86websocket=E7=AB=AF?= =?UTF-8?q?=E5=8F=A3=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1=E8=83=BD=E5=8A=9B?= =?UTF-8?q?,url=E4=B8=BAhttp://alloc:port/ws?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/shinemo/mpush/alloc/AllocHandler.java | 14 ++++++++++---- .../java/com/shinemo/mpush/alloc/AllocServer.java | 8 +++++++- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java index 1b9fb2e..bd74b55 100644 --- a/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java +++ b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java @@ -50,13 +50,19 @@ private ScheduledExecutorService scheduledExecutor; private List serverNodes = Collections.emptyList(); private final ServiceDiscovery discovery = ServiceDiscoveryFactory.create(); - - public void start() { + + private String server;//需要监控的server引用ServiceNames.CONN_SERVER、ServiceNames.WS_SERVER + + public AllocHandler(String server) { + this.server = server; + } + + public void start() { CacheManagerFactory.create().init(); //启动缓冲服务 ServiceDiscovery discovery = ServiceDiscoveryFactory.create();// 启动发现服务 discovery.syncStart(); - discovery.subscribe(ServiceNames.CONN_SERVER, new ConnServerNodeListener()); + discovery.subscribe(server, new ConnServerNodeListener()); scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); @@ -96,7 +102,7 @@ public void handle(HttpExchange httpExchange) throws IOException { */ private void refresh() { //1.从缓存中拿取可用的长链接服务器节点 - List nodes = discovery.lookup(ServiceNames.CONN_SERVER); + List nodes = discovery.lookup(server); if (nodes.size() > 0) { //2.对serverNodes可以按某种规则排序,以便实现负载均衡,比如:随机,轮询,链接数量等 this.serverNodes = nodes diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java index 4809db3..03e4f35 100644 --- a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java +++ b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java @@ -22,6 +22,7 @@ import com.mpush.api.service.BaseService; import com.mpush.api.service.Listener; import com.mpush.api.service.ServiceException; +import com.mpush.api.srd.ServiceNames; import com.mpush.tools.config.CC; import com.mpush.tools.log.Logs; import com.sun.net.httpserver.HttpServer; @@ -39,6 +40,7 @@ public final class AllocServer extends BaseService { private HttpServer httpServer; private AllocHandler allocHandler; + private AllocHandler wsAllocHandler;//ws负载均衡 private PushHandler pushHandler; @Override @@ -46,7 +48,8 @@ public void init() { try { int port = CC.mp.net.cfg.getInt("alloc-server-port"); this.httpServer = HttpServer.create(new InetSocketAddress(port), 0); - this.allocHandler = new AllocHandler(); + this.allocHandler = new AllocHandler(ServiceNames.CONN_SERVER); + this.wsAllocHandler = new AllocHandler(ServiceNames.WS_SERVER); this.pushHandler = new PushHandler(); } catch (IOException e) { throw new ServiceException(e); @@ -54,6 +57,7 @@ public void init() { httpServer.setExecutor(Executors.newCachedThreadPool());//设置线程池,由于是纯内存操作,不需要队列 httpServer.createContext("/", allocHandler);//查询mpush机器 + httpServer.createContext("/ws", wsAllocHandler);//查询mpush机器(ws) httpServer.createContext("/push", pushHandler);//模拟发送push httpServer.createContext("/index.html", new IndexPageHandler());//查询mpush机器 } @@ -62,6 +66,7 @@ public void init() { protected void doStart(Listener listener) throws Throwable { pushHandler.start(); allocHandler.start(); + wsAllocHandler.start(); httpServer.start(); Logs.Console.info("==================================================================="); Logs.Console.info("====================ALLOC SERVER START SUCCESS====================="); @@ -73,6 +78,7 @@ protected void doStop(Listener listener) throws Throwable { httpServer.stop(0);//1 min pushHandler.stop(); allocHandler.stop(); + wsAllocHandler.stop(); Logs.Console.info("==================================================================="); Logs.Console.info("====================ALLOC SERVER STOPPED SUCCESS====================="); Logs.Console.info("===================================================================");