這篇文章主要講解了“基于Java NIO的即時(shí)聊天服務(wù)器模型怎么實(shí)現(xiàn)”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“基于Java NIO的即時(shí)聊天服務(wù)器模型怎么實(shí)現(xiàn)”吧!
目前創(chuàng)新互聯(lián)公司已為近1000家的企業(yè)提供了網(wǎng)站建設(shè)、域名、網(wǎng)站空間、網(wǎng)站托管、服務(wù)器托管、企業(yè)網(wǎng)站設(shè)計(jì)、大興安嶺網(wǎng)站維護(hù)等服務(wù),公司將堅(jiān)持客戶導(dǎo)向、應(yīng)用為本的策略,正道將秉承"和諧、參與、激情"的文化,與客戶和合作伙伴齊心協(xié)力一起成長(zhǎng),共同發(fā)展。
廢話不多說,關(guān)于NIO的SelectionKey、Selector、Channel網(wǎng)上的介紹例子都很多,直接上代碼:
JsonParser
Json的解析類,隨便封裝了下,使用的最近比較火的fastjson
public class JsonParser { private static JSONObject mJson; public synchronized static String get(String json,String key) { mJson = JSON.parseObject(json); return mJson.getString(key); } }
Main
入口,不解釋
public class Main { public static void main(String... args) { new SeekServer().start(); } }
Log
public class Log { public static void i(Object obj) { System.out.println(obj); } public static void e(Object e) { System.err.println(e); } }
SeekServer:
服務(wù)器端的入口,請(qǐng)求的封裝和接收都在此類,端口暫時(shí)寫死在了代碼里,mSelector.select(TIME_OUT) > 0 目的是為了當(dāng)服務(wù)器空閑的時(shí)候(沒有任何讀寫甚至請(qǐng)求斷開事件),循環(huán)時(shí)有個(gè)間隔時(shí)間,不然基本上相當(dāng)于while(true){//nothing}了,你懂的。
public class SeekServer extends Thread{ private final int ACCPET_PORT = 55555; private final int TIME_OUT = 1000; private Selector mSelector = null; private ServerSocketChannel mSocketChannel = null; private ServerSocket mServerSocket = null; private InetSocketAddress mAddress = null; public SeekServer() { long sign = System.currentTimeMillis(); try { mSocketChannel = ServerSocketChannel.open(); if(mSocketChannel == null) { System.out.println("can't open server socket channel"); } mServerSocket = mSocketChannel.socket(); mAddress = new InetSocketAddress(ACCPET_PORT); mServerSocket.bind(mAddress); Log.i("server bind port is " + ACCPET_PORT); mSelector = Selector.open(); mSocketChannel.configureBlocking(false); SelectionKey key = mSocketChannel.register(mSelector, SelectionKey.OP_ACCEPT); key.attach(new Acceptor()); //檢測(cè)Session狀態(tài) Looper.getInstance().loop(); //開始處理Session SessionProcessor.start(); Log.i("Seek server startup in " + (System.currentTimeMillis() - sign) + "ms!"); } catch (ClosedChannelException e) { Log.e(e.getMessage()); } catch (IOException e) { Log.e(e.getMessage()); } } public void run() { Log.i("server is listening..."); while(!Thread.interrupted()) { try { if(mSelector.select(TIME_OUT) > 0) { Set<SelectionKey> keys = mSelector.selectedKeys(); Iterator<SelectionKey> iterator = keys.iterator(); SelectionKey key = null; while(iterator.hasNext()) { key = iterator.next(); Handler at = (Handler) key.attachment(); if(at != null) { at.exec(); } iterator.remove(); } } } catch (IOException e) { Log.e(e.getMessage()); } } } class Acceptor extends Handler{ public void exec(){ try { SocketChannel sc = mSocketChannel.accept(); new Session(sc, mSelector); } catch (ClosedChannelException e) { Log.e(e); } catch (IOException e) { Log.e(e); } } } }
Handler:
只有一個(gè)抽象方法exec,Session將會(huì)繼承它。
public abstract class Handler { public abstract void exec(); }
Session:
封裝了用戶的請(qǐng)求和SelectionKey和SocketChannel,每次接收到新的請(qǐng)求時(shí)都重置它的最后活動(dòng)時(shí)間,通過狀態(tài)mState=READING or SENDING 去執(zhí)行消息的接收與發(fā)送,當(dāng)客戶端異常斷開時(shí)則從SessionManager清除該會(huì)話。
public class Session extends Handler{ private SocketChannel mChannel; private SelectionKey mKey; private ByteBuffer mRreceiveBuffer = ByteBuffer.allocate(10240); private Charset charset = Charset.forName("UTF-8"); private CharsetDecoder mDecoder = charset.newDecoder(); private CharsetEncoder mEncoder = charset.newEncoder(); private long lastPant;//最后活動(dòng)時(shí)間 private final int TIME_OUT = 1000 * 60 * 5; //Session超時(shí)時(shí)間 private String key; private String sendData = ""; private String receiveData = null; public static final int READING = 0,SENDING = 1; int mState = READING; public Session(SocketChannel socket, Selector selector) throws IOException { this.mChannel = socket; mChannel = socket; mChannel.configureBlocking(false); mKey = mChannel.register(selector, 0); mKey.attach(this); mKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); lastPant = Calendar.getInstance().getTimeInMillis(); } public String getReceiveData() { return receiveData; } public void clear() { receiveData = null; } public void setSendData(String sendData) { mState = SENDING; mKey.interestOps(SelectionKey.OP_WRITE); this.sendData = sendData + "\n"; } public boolean isKeekAlive() { return lastPant + TIME_OUT > Calendar.getInstance().getTimeInMillis(); } public void setAlive() { lastPant = Calendar.getInstance().getTimeInMillis(); } /** * 注銷當(dāng)前Session */ public void distroy() { try { mChannel.close(); mKey.cancel(); } catch (IOException e) {} } @Override public synchronized void exec() { try { if(mState == READING) { read(); }else if(mState == SENDING) { write(); } } catch (IOException e) { SessionManager.remove(key); try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } mKey.cancel(); } } public void read() throws IOException{ mRreceiveBuffer.clear(); int sign = mChannel.read(mRreceiveBuffer); if(sign == -1) { //客戶端連接關(guān)閉 mChannel.close(); mKey.cancel(); } if(sign > 0) { mRreceiveBuffer.flip(); receiveData = mDecoder.decode(mRreceiveBuffer).toString(); setAlive(); setSign(); SessionManager.addSession(key, this); } } private void setSign() { //設(shè)置當(dāng)前Session的Key key = JsonParser.get(receiveData,"imei"); //檢測(cè)消息類型是否為心跳包 // String type = jo.getString("type"); // if(type.equals("HEART_BEAT")) { // setAlive(); // } } /** * 寫消息 */ public void write() { try { mChannel.write(mEncoder.encode(CharBuffer.wrap(sendData))); sendData = null; mState = READING; mKey.interestOps(SelectionKey.OP_READ); } catch (CharacterCodingException e) { e.printStackTrace(); } catch (IOException e) { try { mChannel.close(); } catch (IOException e1) { Log.e(e1); } } } }
SessionManager:
將所有Session存放到ConcurrentHashMap,這里使用手機(jī)用戶的imei做key,ConcurrentHashMap因?yàn)槭蔷€程安全的,所以能很大程度上避免自己去實(shí)現(xiàn)同步的過程,
封裝了一些操作Session的方法例如get,remove等。
public class SessionManager { private static ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<String, Session>(); public static void addSession(String key,Session session) { sessions.put(key, session); } public static Session getSession(String key) { return sessions.get(key); } public static Set<String> getSessionKeys() { return sessions.keySet(); } public static int getSessionCount() { return sessions.size(); } public static void remove(String[] keys) { for(String key:keys) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } } public static void remove(String key) { if(sessions.containsKey(key)) { sessions.get(key).distroy(); sessions.remove(key); } } }
SessionProcessor
里面使用了JDK自帶的線程池,用來分發(fā)處理所有Session中當(dāng)前需要處理的請(qǐng)求(線程池的初始化參數(shù)不是太熟,望有了解的童鞋能告訴我),內(nèi)部類Process則是將Session再次封裝成SocketRequest和SocketResponse(看到這里是不是有點(diǎn)熟悉的感覺,對(duì)沒錯(cuò),JavaWeb里到處都是request和response)。
public class SessionProcessor implements Runnable{ private static Runnable processor = new SessionProcessor(); private static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 200, 500, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10),new ThreadPoolExecutor.CallerRunsPolicy()); public static void start() { new Thread(processor).start(); } @Override public void run() { while(true) { Session tmp = null; for(String key:SessionManager.getSessionKeys()) { tmp = SessionManager.getSession(key); //處理Session未處理的請(qǐng)求 if(tmp.getReceiveData() != null) { pool.execute(new Process(tmp)); } } try { Thread.sleep(10); } catch (InterruptedException e) { Log.e(e); } } } class Process implements Runnable { private SocketRequest request; private SocketResponse response; public Process(Session session) { //將Session封裝成Request和Response request = new SocketRequest(session); response = new SocketResponse(session); } @Override public void run() { new RequestTransform().transfer(request, response); } } }
RequestTransform里的transfer方法利用反射對(duì)請(qǐng)求參數(shù)中的請(qǐng)求類別和請(qǐng)求動(dòng)作來調(diào)用不同類的不同方法(UserHandler和MessageHandler)
public class RequestTransform { public void transfer(SocketRequest request,SocketResponse response) { String action = request.getValue("action"); String handlerName = request.getValue("handler"); //根據(jù)Session的請(qǐng)求類型,讓不同的類方法去處理 try { Class<?> c= Class.forName("com.seek.server.handler." + handlerName); Class<?>[] arg=new Class[]{SocketRequest.class,SocketResponse.class}; Method method=c.getMethod(action,arg); method.invoke(c.newInstance(), new Object[]{request,response}); } catch (Exception e) { e.printStackTrace(); } } }
SocketRequest和SocketResponse
public class SocketRequest { private Session mSession; private String mReceive; public SocketRequest(Session session) { mSession = session; mReceive = session.getReceiveData(); mSession.clear(); } public String getValue(String key) { return JsonParser.get(mReceive, key); } public String getQueryString() { return mReceive; } }
public class SocketResponse { private Session mSession; public SocketResponse(Session session) { mSession = session; } public void write(String msg) { mSession.setSendData(msg); } }
最后則是兩個(gè)處理請(qǐng)求的Handler
public class UserHandler { public void login(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //TODO: 處理用戶登錄 response.write("你肯定收到消息了"); } }
public class MessageHandler { public void send(SocketRequest request,SocketResponse response) { System.out.println(request.getQueryString()); //消息發(fā)送 String key = request.getValue("imei"); Session session = SessionManager.getSession(key); new SocketResponse(session).write(request.getValue("sms")); } }
還有個(gè)監(jiān)測(cè)是否超時(shí)的類Looper,定期去刪除Session
public class Looper extends Thread{ private static Looper looper = new Looper(); private static boolean isStart = false; private final int INTERVAL = 1000 * 60 * 5; private Looper(){} public static Looper getInstance() { return looper; } public void loop() { if(!isStart) { isStart = true; this.start(); } } public void run() { Task task = new Task(); while(true) { //Session過期檢測(cè) task.checkState(); //心跳包檢測(cè) //task.sendAck(); try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { Log.e(e); } } } }
public class Task { public void checkState() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } List<String> removes = new ArrayList<String>(); Iterator<String> iterator = keys.iterator(); String key = null; while(iterator.hasNext()) { key = iterator.next(); if(!SessionManager.getSession(key).isKeekAlive()) { removes.add(key); } } if(removes.size() > 0) { Log.i("sessions is time out,remove " + removes.size() + "session"); } SessionManager.remove(removes.toArray(new String[removes.size()])); } public void sendAck() { Set<String> keys = SessionManager.getSessionKeys(); if(keys.size() == 0) { return; } Iterator<String> iterator = keys.iterator(); while(iterator.hasNext()) { iterator.next(); //TODO 發(fā)送心跳包 } } }
注意,在Task和SessionProcessor類里都有對(duì)SessionManager的sessions做遍歷,文中使用的方法并不是很好,主要是效率問題,推薦使用遍歷Entry的方式來獲取Key和Value,因?yàn)橐恢痹贘avaWeb上折騰,所以會(huì)的童鞋看到Request和Response會(huì)挺親切,這個(gè)例子沒有經(jīng)過任何安全和性能測(cè)試,如果需要放到生產(chǎn)環(huán)境上得話請(qǐng)先自行做測(cè)試- -!
客戶端請(qǐng)求時(shí)的數(shù)據(jù)內(nèi)容例如{handler:"UserHandler",action:"login",imei:"2364656512636".......},這些約定就自己來定了。
感謝各位的閱讀,以上就是“基于Java NIO的即時(shí)聊天服務(wù)器模型怎么實(shí)現(xiàn)”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對(duì)基于Java NIO的即時(shí)聊天服務(wù)器模型怎么實(shí)現(xiàn)這一問題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是創(chuàng)新互聯(lián),小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
名稱欄目:基于JavaNIO的即時(shí)聊天服務(wù)器模型怎么實(shí)現(xiàn)
文章出自:http://chinadenli.net/article6/gepiig.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供搜索引擎優(yōu)化、軟件開發(fā)、用戶體驗(yàn)、面包屑導(dǎo)航、網(wǎng)站設(shè)計(jì)公司、定制網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)