From fbbf822b63dbd315703cef122afcf82ea792ba93 Mon Sep 17 00:00:00 2001 From: FengKang Date: Thu, 12 Oct 2017 18:52:41 +0800 Subject: [PATCH 1/3] [fengkang]support connecting with custom headers --- src/main/java/io/github/sac/Socket.java | 362 ++++++++++++------------ 1 file changed, 185 insertions(+), 177 deletions(-) diff --git a/src/main/java/io/github/sac/Socket.java b/src/main/java/io/github/sac/Socket.java index 3feb1c0..21cf865 100644 --- a/src/main/java/io/github/sac/Socket.java +++ b/src/main/java/io/github/sac/Socket.java @@ -26,21 +26,21 @@ public class Socket extends Emitter { private WebSocket ws; private BasicListener listener; private String AuthToken; - private HashMap acks; - private List channels; + private HashMap acks; + private List channels; private WebSocketAdapter adapter; public Socket(String URL) { this.URL = URL; - factory=new WebSocketFactory().setConnectionTimeout(5000); - counter=new AtomicInteger(1); - acks= new HashMap<>(); - channels= new ArrayList<>(); - adapter=getAdapter(); + factory = new WebSocketFactory().setConnectionTimeout(5000); + counter = new AtomicInteger(1); + acks = new HashMap<>(); + channels = new ArrayList<>(); + adapter = getAdapter(); } - public Channel createChannel(String name){ - Channel channel=new Channel(name); + public Channel createChannel(String name) { + Channel channel = new Channel(name); channels.add(channel); return channel; } @@ -49,41 +49,42 @@ public List getChannels() { return channels; } - public Channel getChannelByName(String name){ - for (Channel channel:channels){ + public Channel getChannelByName(String name) { + for (Channel channel : channels) { if (channel.getChannelName().equals(name)) return channel; } return null; } - public void seturl(String url){ - this.URL=url; + public void seturl(String url) { + this.URL = url; } public void setReconnection(ReconnectStrategy strategy) { this.strategy = strategy; } - public void setListener(BasicListener listener){ - this.listener=listener; + public void setListener(BasicListener listener) { + this.listener = listener; } /** * used to set up TLS/SSL connection to server for more details visit neovisionaries websocket client + * * @return */ - public WebSocketFactory getFactorySettings(){ + public WebSocketFactory getFactorySettings() { return factory; } - public void setAuthToken(String token){ - AuthToken=token; + public void setAuthToken(String token) { + AuthToken = token; } - public WebSocketAdapter getAdapter(){ - return new WebSocketAdapter(){ + public WebSocketAdapter getAdapter() { + return new WebSocketAdapter() { @Override public void onConnected(WebSocket websocket, Map> headers) throws Exception { @@ -93,30 +94,30 @@ public void onConnected(WebSocket websocket, Map> headers) */ counter.set(1); - if (strategy!=null) - strategy.setAttmptsMade(0); - - JSONObject handshakeObject=new JSONObject(); - handshakeObject.put("event","#handshake"); - JSONObject object=new JSONObject(); - object.put("authToken",AuthToken); - handshakeObject.put("data",object); - handshakeObject.put("cid",counter.getAndIncrement()); + if (strategy != null) + strategy.setAttmptsMade(0); + + JSONObject handshakeObject = new JSONObject(); + handshakeObject.put("event", "#handshake"); + JSONObject object = new JSONObject(); + object.put("authToken", AuthToken); + handshakeObject.put("data", object); + handshakeObject.put("cid", counter.getAndIncrement()); websocket.sendText(handshakeObject.toString()); // websocket.sendText("{\"event\": \"#handshake\",\"data\": {\"authToken\":\""+AuthToken+"\"},\"cid\": "+ cid++ +"}"); - listener.onConnected(Socket.this,headers); + listener.onConnected(Socket.this, headers); super.onConnected(websocket, headers); } @Override public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, WebSocketFrame clientCloseFrame, boolean closedByServer) throws Exception { - listener.onDisconnected(Socket.this,serverCloseFrame,clientCloseFrame,closedByServer); - if (strategy!=null) { + listener.onDisconnected(Socket.this, serverCloseFrame, clientCloseFrame, closedByServer); + if (strategy != null) { reconnect(); - }else{ + } else { // System.out.println("cant reconnect , reconnection is null"); LOGGER.info("cant reconnect , reconnection is null"); } @@ -125,10 +126,10 @@ public void onDisconnected(WebSocket websocket, WebSocketFrame serverCloseFrame, @Override public void onConnectError(WebSocket websocket, WebSocketException exception) throws Exception { - listener.onConnectError(Socket.this,exception); - if (strategy!=null) { + listener.onConnectError(Socket.this, exception); + if (strategy != null) { reconnect(); - }else{ + } else { // System.out.println("cant reconnect , reconnection is null"); LOGGER.info("cant reconnect , reconnection is null"); @@ -147,14 +148,14 @@ public void onFrame(WebSocket websocket, WebSocketFrame frame) throws Exception * PING-PONG logic goes here */ websocket.sendText("#2"); - }else { + } else { - JSONObject object= new JSONObject(frame.getPayloadText()); + JSONObject object = new JSONObject(frame.getPayloadText()); /** * Message retrieval mechanism goes here */ - LOGGER.info("Message :"+object.toString()); + LOGGER.info("Message :" + object.toString()); try { @@ -163,51 +164,51 @@ public void onFrame(WebSocket websocket, WebSocketFrame frame) throws Exception Integer cid = (Integer) object.opt("cid"); String event = (String) object.opt("event"); - switch (Parser.parse(dataobject,event)) { - - case ISAUTHENTICATED: - listener.onAuthentication(Socket.this, ((JSONObject)dataobject).getBoolean("isAuthenticated")); - subscribeChannels(); - break; - case PUBLISH: - Socket.this.handlePublish(((JSONObject)dataobject).getString("channel"), ((JSONObject)dataobject).opt("data")); - break; - case REMOVETOKEN: - setAuthToken(null); - break; - case SETTOKEN: - listener.onSetAuthToken(((JSONObject)dataobject).getString("token"),Socket.this); - break; - case EVENT: - if (hasEventAck(event)) { + switch (Parser.parse(dataobject, event)) { + + case ISAUTHENTICATED: + listener.onAuthentication(Socket.this, ((JSONObject) dataobject).getBoolean("isAuthenticated")); + subscribeChannels(); + break; + case PUBLISH: + Socket.this.handlePublish(((JSONObject) dataobject).getString("channel"), ((JSONObject) dataobject).opt("data")); + break; + case REMOVETOKEN: + setAuthToken(null); + break; + case SETTOKEN: + listener.onSetAuthToken(((JSONObject) dataobject).getString("token"), Socket.this); + break; + case EVENT: + if (hasEventAck(event)) { // System.out.println("This event has ack"); // LOGGER.info("This event have ack"); - handleEmitAck(event,dataobject,ack(Long.valueOf(cid))); - }else { - Socket.this.handleEmit(event, dataobject); + handleEmitAck(event, dataobject, ack(Long.valueOf(cid))); + } else { + Socket.this.handleEmit(event, dataobject); // System.out.println("This ack doesnt have ack"); // LOGGER.info("This event doesn't have ack"); - } - break; - case ACKRECEIVE: - if (acks.containsKey((long)rid)) { + } + break; + case ACKRECEIVE: + if (acks.containsKey((long) rid)) { // System.out.println("Contains ack with id "+rid); - Object[] objects = acks.remove((long)rid); - if (objects != null) { - Ack fn = (Ack) objects[1]; - if (fn != null) { + Object[] objects = acks.remove((long) rid); + if (objects != null) { + Ack fn = (Ack) objects[1]; + if (fn != null) { // System.out.println("calling fun with ack"+rid); - fn.call((String) objects[0],object.opt("error"), object.opt("data")); - } else { + fn.call((String) objects[0], object.opt("error"), object.opt("data")); + } else { // System.out.println("ack function is null with rid "+rid); - LOGGER.info("ack function is null with rid " + rid); + LOGGER.info("ack function is null with rid " + rid); + } } } - } - break; - } - }catch (Exception e){ + break; + } + } catch (Exception e) { e.printStackTrace(); } @@ -235,13 +236,14 @@ public void onSendError(WebSocket websocket, WebSocketException cause, WebSocket }; } - public Socket emit(final String event, final Object object){ + + public Socket emit(final String event, final Object object) { EventThread.exec(new Runnable() { public void run() { - JSONObject eventObject=new JSONObject(); + JSONObject eventObject = new JSONObject(); try { - eventObject.put("event",event); - eventObject.put("data",object); + eventObject.put("event", event); + eventObject.put("data", object); } catch (JSONException e) { e.printStackTrace(); } @@ -254,16 +256,16 @@ public void run() { } - public Socket emit(final String event, final Object object, final Ack ack){ + public Socket emit(final String event, final Object object, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject eventObject=new JSONObject(); - acks.put(counter.longValue(),getAckObject(event,ack)); + JSONObject eventObject = new JSONObject(); + acks.put(counter.longValue(), getAckObject(event, ack)); try { - eventObject.put("event",event); - eventObject.put("data",object); - eventObject.put("cid",counter.getAndIncrement()); + eventObject.put("event", event); + eventObject.put("data", object); + eventObject.put("cid", counter.getAndIncrement()); } catch (JSONException e) { e.printStackTrace(); } @@ -273,17 +275,17 @@ public void run() { return this; } - private Socket subscribe(final String channel){ + private Socket subscribe(final String channel) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject=new JSONObject(); - try{ - subscribeObject.put("event","#subscribe"); - JSONObject object=new JSONObject(); - object.put("channel",channel); - subscribeObject.put("data",object); - - subscribeObject.put("cid",counter.getAndIncrement()); + JSONObject subscribeObject = new JSONObject(); + try { + subscribeObject.put("event", "#subscribe"); + JSONObject object = new JSONObject(); + object.put("channel", channel); + subscribeObject.put("data", object); + + subscribeObject.put("cid", counter.getAndIncrement()); } catch (JSONException e) { e.printStackTrace(); } @@ -294,15 +296,15 @@ public void run() { return this; } - private Object[] getAckObject(String event,Ack ack){ - Object object[]={event,ack}; + private Object[] getAckObject(String event, Ack ack) { + Object object[] = {event, ack}; return object; } - private Socket subscribe(final String channel, final Ack ack){ + private Socket subscribe(final String channel, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject=new JSONObject(); + JSONObject subscribeObject = new JSONObject(); try { subscribeObject.put("event", "#subscribe"); JSONObject object = new JSONObject(); @@ -320,14 +322,14 @@ public void run() { return this; } - private Socket unsubscribe(final String channel){ + private Socket unsubscribe(final String channel) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject=new JSONObject(); - try{ - subscribeObject.put("event","#unsubscribe"); - subscribeObject.put("data",channel); - subscribeObject.put("cid",counter.getAndIncrement()); + JSONObject subscribeObject = new JSONObject(); + try { + subscribeObject.put("event", "#unsubscribe"); + subscribeObject.put("data", channel); + subscribeObject.put("cid", counter.getAndIncrement()); } catch (JSONException e) { e.printStackTrace(); } @@ -337,10 +339,10 @@ public void run() { return this; } - private Socket unsubscribe(final String channel,final Ack ack){ + private Socket unsubscribe(final String channel, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject subscribeObject=new JSONObject(); + JSONObject subscribeObject = new JSONObject(); try { subscribeObject.put("event", "#unsubscribe"); subscribeObject.put("data", channel); @@ -356,17 +358,17 @@ public void run() { return this; } - public Socket publish (final String channel, final Object data){ + public Socket publish(final String channel, final Object data) { EventThread.exec(new Runnable() { public void run() { - JSONObject publishObject=new JSONObject(); - try{ - publishObject.put("event","#publish"); - JSONObject object=new JSONObject(); - object.put("channel",channel); - object.put("data",data); - publishObject.put("data",object); - publishObject.put("cid",counter.getAndIncrement()); + JSONObject publishObject = new JSONObject(); + try { + publishObject.put("event", "#publish"); + JSONObject object = new JSONObject(); + object.put("channel", channel); + object.put("data", data); + publishObject.put("data", object); + publishObject.put("cid", counter.getAndIncrement()); } catch (JSONException e) { e.printStackTrace(); } @@ -378,10 +380,10 @@ public void run() { return this; } - public Socket publish (final String channel, final Object data,final Ack ack){ + public Socket publish(final String channel, final Object data, final Ack ack) { EventThread.exec(new Runnable() { public void run() { - JSONObject publishObject=new JSONObject(); + JSONObject publishObject = new JSONObject(); try { publishObject.put("event", "#publish"); JSONObject object = new JSONObject(); @@ -391,7 +393,7 @@ public void run() { publishObject.put("data", object); publishObject.put("cid", counter.getAndIncrement()); } catch (JSONException e) { - e.printStackTrace(); + e.printStackTrace(); } ws.sendText(publishObject.toString()); } @@ -401,18 +403,18 @@ public void run() { return this; } - private Ack ack(final Long cid){ + private Ack ack(final Long cid) { return new Ack() { - public void call(final String channel,final Object error, final Object data) { + public void call(final String channel, final Object error, final Object data) { EventThread.exec(new Runnable() { public void run() { - JSONObject object=new JSONObject(); + JSONObject object = new JSONObject(); try { object.put("error", error); object.put("data", data); object.put("rid", cid); } catch (JSONException e) { - e.printStackTrace(); + e.printStackTrace(); } ws.sendText(object.toString()); } @@ -422,73 +424,77 @@ public void run() { } - private void subscribeChannels(){ - for (Channel channel:channels){ + private void subscribeChannels() { + for (Channel channel : channels) { channel.subscribe(); } } public void connect() { + connect(null); + } + + public void connect(HashMap customHeaders) { try { ws = factory.createSocket(URL); - }catch (IOException e){ + } catch (IOException e) { e.printStackTrace(); } ws.addExtension("permessage-deflate; client_max_window_bits"); - ws.addHeader("Accept-Encoding","gzip, deflate, sdch"); - ws.addHeader("Accept-Language","en-US,en;q=0.8"); - ws.addHeader("Pragma","no-cache"); - ws.addHeader("User-Agent","Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"); + ws.addHeader("Accept-Encoding", "gzip, deflate, sdch"); + ws.addHeader("Accept-Language", "en-US,en;q=0.8"); + ws.addHeader("Pragma", "no-cache"); + ws.addHeader("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"); + if (customHeaders != null) { + Iterator> iterator = customHeaders.entrySet().iterator(); + while (iterator.hasNext()) { + ws.addHeader(iterator.next().getKey(), iterator.next().getValue()); + } + } ws.addListener(adapter); try { ws.connect(); - }catch (OpeningHandshakeException e) - { + } catch (OpeningHandshakeException e) { // A violation against the WebSocket protocol was detected // during the opening handshake. // Status line. StatusLine sl = e.getStatusLine(); LOGGER.info("=== Status Line ==="); - LOGGER.info("HTTP Version = \n"+sl.getHttpVersion()); - LOGGER.info("Status Code = \n"+ sl.getStatusCode()); - LOGGER.info("Reason Phrase = \n"+ sl.getReasonPhrase()); + LOGGER.info("HTTP Version = \n" + sl.getHttpVersion()); + LOGGER.info("Status Code = \n" + sl.getStatusCode()); + LOGGER.info("Reason Phrase = \n" + sl.getReasonPhrase()); // HTTP headers. Map> headers = e.getHeaders(); LOGGER.info("=== HTTP Headers ==="); - for (Map.Entry> entry : headers.entrySet()) - { + for (Map.Entry> entry : headers.entrySet()) { // Header name. String name = entry.getKey(); // Values of the header. List values = entry.getValue(); - if (values == null || values.size() == 0) - { + if (values == null || values.size() == 0) { // Print the name only. LOGGER.info(name); continue; } - for (String value : values) - { + for (String value : values) { // Print the name and the value. - LOGGER.info(name+value+"\n"); + LOGGER.info(name + value + "\n"); } } - } - catch (WebSocketException e) - { + } catch (WebSocketException e) { // Failed to establish a WebSocket connection. - listener.onConnectError(Socket.this,e); - if (strategy!=null) { + listener.onConnectError(Socket.this, e); + if (strategy != null) { reconnect(); - }else{ + } else { LOGGER.info("cant reconnect , reconnection is null"); } } @@ -496,34 +502,34 @@ public void connect() { } - private void reconnect(){ + private void reconnect() { if (!strategy.areAttemptsComplete()) { - final Timer timer=new Timer(); + final Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { - if(strategy != null) { - strategy.processValues(); - Socket.this.connect(); - timer.cancel(); - timer.purge(); - } else { - LOGGER.info("Strategy is null. Reconnection stopped"); - } + if (strategy != null) { + strategy.processValues(); + Socket.this.connect(); + timer.cancel(); + timer.purge(); + } else { + LOGGER.info("Strategy is null. Reconnection stopped"); + } } - },strategy.getReconnectInterval()); + }, strategy.getReconnectInterval()); - }else{ + } else { strategy.setAttmptsMade(0); } } - public void disconnect(){ + public void disconnect() { ws.disconnect(); - strategy=null; + strategy = null; } /** @@ -533,26 +539,28 @@ public void disconnect(){ * CONNECTING * CREATED * OPEN + * * @return */ - public WebSocketState getCurrentState(){ + public WebSocketState getCurrentState() { return ws.getState(); } - public Boolean isconnected(){ - return ws.getState()==WebSocketState.OPEN; + public Boolean isconnected() { + return ws.getState() == WebSocketState.OPEN; } - public void disableLogging(){ + public void disableLogging() { LogManager.getLogManager().reset(); } + /** * Channels need to be subscribed everytime whenever client is reconnected to server (handled inside) * Add only one listener to one channel for whole lifetime of process */ - public class Channel{ + public class Channel { String channelName; @@ -564,33 +572,33 @@ public Channel(String channelName) { this.channelName = channelName; } - public void subscribe(){ + public void subscribe() { Socket.this.subscribe(channelName); } - public void subscribe(Ack ack){ - Socket.this.subscribe(channelName,ack); + public void subscribe(Ack ack) { + Socket.this.subscribe(channelName, ack); } - public void onMessage(Listener listener){ - Socket.this.onSubscribe(channelName,listener); + public void onMessage(Listener listener) { + Socket.this.onSubscribe(channelName, listener); } - public void publish(Object data){ - Socket.this.publish(channelName,data); + public void publish(Object data) { + Socket.this.publish(channelName, data); } - public void publish(Object data,Ack ack){ - Socket.this.publish(channelName,data,ack); + public void publish(Object data, Ack ack) { + Socket.this.publish(channelName, data, ack); } - public void unsubscribe(){ + public void unsubscribe() { Socket.this.unsubscribe(channelName); channels.remove(this); } - public void unsubscribe(Ack ack){ - Socket.this.unsubscribe(channelName,ack); + public void unsubscribe(Ack ack) { + Socket.this.unsubscribe(channelName, ack); channels.remove(this); } } From f77f956502943c5490bee4da095882f754e9e1e7 Mon Sep 17 00:00:00 2001 From: FengKang Date: Fri, 13 Oct 2017 12:07:12 +0800 Subject: [PATCH 2/3] fix headers bug --- src/main/java/io/github/sac/Socket.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/github/sac/Socket.java b/src/main/java/io/github/sac/Socket.java index 21cf865..d16ed63 100644 --- a/src/main/java/io/github/sac/Socket.java +++ b/src/main/java/io/github/sac/Socket.java @@ -449,7 +449,8 @@ public void connect(HashMap customHeaders) { if (customHeaders != null) { Iterator> iterator = customHeaders.entrySet().iterator(); while (iterator.hasNext()) { - ws.addHeader(iterator.next().getKey(), iterator.next().getValue()); + Map.Entry entry = iterator.next(); + ws.addHeader(entry.getKey(), entry.getValue()); } } From c44c046594ab0ba5039639a82ed833d37dfc69ce Mon Sep 17 00:00:00 2001 From: FengKang Date: Mon, 16 Oct 2017 19:26:37 +0800 Subject: [PATCH 3/3] reconnect supports custom headers --- src/main/java/io/github/sac/Socket.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/github/sac/Socket.java b/src/main/java/io/github/sac/Socket.java index d16ed63..6a38384 100644 --- a/src/main/java/io/github/sac/Socket.java +++ b/src/main/java/io/github/sac/Socket.java @@ -29,6 +29,7 @@ public class Socket extends Emitter { private HashMap acks; private List channels; private WebSocketAdapter adapter; + private HashMap customHeaders; public Socket(String URL) { this.URL = URL; @@ -431,7 +432,7 @@ private void subscribeChannels() { } public void connect() { - connect(null); + connect(customHeaders); } public void connect(HashMap customHeaders) { @@ -447,6 +448,7 @@ public void connect(HashMap customHeaders) { ws.addHeader("Pragma", "no-cache"); ws.addHeader("User-Agent", "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36"); if (customHeaders != null) { + this.customHeaders = customHeaders; Iterator> iterator = customHeaders.entrySet().iterator(); while (iterator.hasNext()) { Map.Entry entry = iterator.next();