diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java index 1b9fb2e..bd74b55 100644 --- a/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java +++ b/src/main/java/com/shinemo/mpush/alloc/AllocHandler.java @@ -50,13 +50,19 @@ private ScheduledExecutorService scheduledExecutor; private List serverNodes = Collections.emptyList(); private final ServiceDiscovery discovery = ServiceDiscoveryFactory.create(); - - public void start() { + + private String server;//需要监控的server引用ServiceNames.CONN_SERVER、ServiceNames.WS_SERVER + + public AllocHandler(String server) { + this.server = server; + } + + public void start() { CacheManagerFactory.create().init(); //启动缓冲服务 ServiceDiscovery discovery = ServiceDiscoveryFactory.create();// 启动发现服务 discovery.syncStart(); - discovery.subscribe(ServiceNames.CONN_SERVER, new ConnServerNodeListener()); + discovery.subscribe(server, new ConnServerNodeListener()); scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); @@ -96,7 +102,7 @@ public void handle(HttpExchange httpExchange) throws IOException { */ private void refresh() { //1.从缓存中拿取可用的长链接服务器节点 - List nodes = discovery.lookup(ServiceNames.CONN_SERVER); + List nodes = discovery.lookup(server); if (nodes.size() > 0) { //2.对serverNodes可以按某种规则排序,以便实现负载均衡,比如:随机,轮询,链接数量等 this.serverNodes = nodes diff --git a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java index 4809db3..03e4f35 100644 --- a/src/main/java/com/shinemo/mpush/alloc/AllocServer.java +++ b/src/main/java/com/shinemo/mpush/alloc/AllocServer.java @@ -22,6 +22,7 @@ import com.mpush.api.service.BaseService; import com.mpush.api.service.Listener; import com.mpush.api.service.ServiceException; +import com.mpush.api.srd.ServiceNames; import com.mpush.tools.config.CC; import com.mpush.tools.log.Logs; import com.sun.net.httpserver.HttpServer; @@ -39,6 +40,7 @@ public final class AllocServer extends BaseService { private HttpServer httpServer; private AllocHandler allocHandler; + private AllocHandler wsAllocHandler;//ws负载均衡 private PushHandler pushHandler; @Override @@ -46,7 +48,8 @@ public void init() { try { int port = CC.mp.net.cfg.getInt("alloc-server-port"); this.httpServer = HttpServer.create(new InetSocketAddress(port), 0); - this.allocHandler = new AllocHandler(); + this.allocHandler = new AllocHandler(ServiceNames.CONN_SERVER); + this.wsAllocHandler = new AllocHandler(ServiceNames.WS_SERVER); this.pushHandler = new PushHandler(); } catch (IOException e) { throw new ServiceException(e); @@ -54,6 +57,7 @@ public void init() { httpServer.setExecutor(Executors.newCachedThreadPool());//设置线程池,由于是纯内存操作,不需要队列 httpServer.createContext("/", allocHandler);//查询mpush机器 + httpServer.createContext("/ws", wsAllocHandler);//查询mpush机器(ws) httpServer.createContext("/push", pushHandler);//模拟发送push httpServer.createContext("/index.html", new IndexPageHandler());//查询mpush机器 } @@ -62,6 +66,7 @@ public void init() { protected void doStart(Listener listener) throws Throwable { pushHandler.start(); allocHandler.start(); + wsAllocHandler.start(); httpServer.start(); Logs.Console.info("==================================================================="); Logs.Console.info("====================ALLOC SERVER START SUCCESS====================="); @@ -73,6 +78,7 @@ protected void doStop(Listener listener) throws Throwable { httpServer.stop(0);//1 min pushHandler.stop(); allocHandler.stop(); + wsAllocHandler.stop(); Logs.Console.info("==================================================================="); Logs.Console.info("====================ALLOC SERVER STOPPED SUCCESS====================="); Logs.Console.info("===================================================================");