寫在前面:
昨天在博客記錄自己抽空寫的一個Socket聊天程序的初始設計,那是這個程序的整體設計,為了完整性,今天把服務端的設計細化記錄一下,首頁貼出Socket聊天程序的服務端大體設計圖,如下圖:

功能說明:
服務端主要有兩個操作,一是阻塞接收客戶端的socket并做響應處理,二是檢測客戶端的心跳,如果客戶端一段時間內沒有發送心跳則移除該客戶端,由Server創建ServerSocket,然后啟動兩個線程池去處理這兩件事(newFixedThreadPool,newScheduledThreadPool),對應的處理類分別是SocketDispatcher、SocketSchedule,其中SocketDispatcher根據socket不同的請求分發給不同SocketHandler去處理,而SocketWrapper則是對socket加了一層外殼包裝,用lastAliveTime記錄socket最新的交互時間,SocketHolder存儲當前跟服務端交互的socket集合。
具體實現:
[Server.java]
Server是服務端的入口,由Server的start()方法啟動ServerSocket,然后阻塞接收客戶端的請求,交由SocketDispatcher去分發,SocketDispatcher由newFixedThread類型的線程池啟動,當連接數超過最大數據時將被隊列處理,使用scheduleAtFixedRate啟動SocketSchedule定時循環去監聽客戶端的心跳包,這兩個類型都實現了Runnable接口,下面給出服務端的代碼:
package yaolin.chat.server;import java.io.IOException;import java.net.ServerSocket;import java.util.Date;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;import yaolin.chat.common.ConstantValue;import yaolin.chat.util.LoggerUtil;/** * 服務器 * @author yaolin */public class Server {  private final ServerSocket server;  private final ExecutorService pool;    public Server() throws IOException {    server = new ServerSocket(ConstantValue.SERVER_PORT);    pool = Executors.newFixedThreadPool(ConstantValue.MAX_POOL_SIZE);  }  public void start() {    try {      ScheduledExecutorService schedule = Executors.newScheduledThreadPool(1);      // Watch dog. Exception??      schedule.scheduleAtFixedRate(new SocketSchedule(), 10, ConstantValue.TIME_OUT, TimeUnit.SECONDS);      while (true) {        pool.execute(new SocketDispatcher(server.accept()));        LoggerUtil.info("ACCEPT A CLIENT AT " + new Date());      }    } catch (IOException e) {      pool.shutdown();    }  }      public static void main(String[] args) {    try {      new Server().start();    } catch (IOException e) {      LoggerUtil.error("Server start failed! -> " + e.getMessage(), e);    }  }}[SocketDispatcher.java]
Server只是服務端的入口,并指揮中心,SocketDispatcher才是服務端的指揮中心,對客戶端不同的消息類型請求進行分發,讓不同的SocketHandler去處理對應的消息請求,這里服務端和客戶端的消息交互都是用JSON數據,所有消息類都繼承BaseMessage,所以將接收到數據轉換成BaseMessage類型,再判斷其類型,(數據類型模塊屬于common模塊),這里需要提一下的是當消息類型是文件類型的時候會睡眠配置執行的間隔時間,這樣FileHandler才能有時間對文件流進行讀取和重新發送給指定的客戶端,而不會立即進入下一次循環對消息類型的判斷(可能這里設計有點問題,不過暫時先這樣做),下面給出SocketDispatcher的代碼:
/** * SocketDispatcher *  * @author yaolin */public class SocketDispatcher implements Runnable {  private final Socket socket;  public SocketDispatcher(Socket socket) {    this.socket = socket;  }  @Override  public void run() {    if (socket != null) {      while (!socket.isClosed()) {        try {          InputStream is = socket.getInputStream();          String line = null;          StringBuffer sb = null;                    if (is.available() > 0) {                        BufferedReader bufr = new BufferedReader(new InputStreamReader(is));            sb = new StringBuffer();            while (is.available() > 0 && (line = bufr.readLine()) != null) {              sb.append(line);            }            LoggerUtil.trach("RECEIVE [" + sb.toString() + "] AT " + new Date());                        BaseMessage message = JSON.parseObject(sb.toString(), BaseMessage.class);                        switch (message.getType()) {            case MessageType.ALIVE:              HandlerFactory.getHandler(MessageType.ALIVE).handle(socket, sb.toString());              break;            case MessageType.CHAT:              HandlerFactory.getHandler(MessageType.CHAT).handle(socket, sb.toString());              break;            case MessageType.FILE:              HandlerFactory.getHandler(MessageType.FILE).handle(socket, sb.toString());              LoggerUtil.trach("SEVER:PAUSE TO RECEIVE FILE");              Thread.sleep(ConstantValue.MESSAGE_PERIOD);              break;            case MessageType.LOGIN:              HandlerFactory.getHandler(MessageType.LOGIN).handle(socket, sb.toString());              break;            case MessageType.LOGOUT:              break;            case MessageType.REGISTER:              HandlerFactory.getHandler(MessageType.REGISTER).handle(socket, sb.toString());              break;            }          } else {            Thread.sleep(ConstantValue.MESSAGE_PERIOD);          }        } catch (Exception e) { // catch all handler exception          LoggerUtil.error("SocketDispatcher Error!" + e.getMessage(), e);        }      }    }  }}[SocketSchedule.java]
跟Server有直接關系的另一個類(組件)是SocketSchedule,SocketSchedule主要負責檢測客戶端的最新一次跟服務端的交互時間是否超過系統配置允許最大的時間,如果超過了,則將該客戶端socket從服務端移除,否則更新客戶端的最新一次跟服務端的交互時間。下面是具體的實現:
/** * Remove socket from SocketHolder if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketSchedule implements Runnable {  @Override  public void run() {    for (String key : SocketHolder.keySet()) {      SocketWrapper wrapper = SocketHolder.get(key);      if (wrapper != null && wrapper.getLastAliveTime() != null) {        if (((new Date().getTime() - wrapper.getLastAliveTime().getTime()) / 1000) > ConstantValue.TIME_OUT) {          // remove socket if timeout          SocketHolder.remove(key);        }      }    }  }}[SocketHolder.java、SocketWrapper.java]
從上面的代碼可以看出,SocketSchedule#run()只是簡單的對時間進行一次判斷,真正有意義的其實是SocketHolder和SocketWrapper,SocketWrapper則是對socket加了一層外殼包裝,SocketHolder的存儲了當前有效時間內所有跟服務端有交互的客戶端,SocketHolder以客戶端的唯一標識(這里使用用戶名),作為KEY,客戶端所在的socket作為VALUE的鍵值對形式存儲,其中SocketHolder#flushClientStatus()的處理邏輯是用于通知其他客戶端當前客戶端的上線/離線狀態,下面給出這兩個類的具體實現:
/** * Wrap Socket, SocketSchedule remove socket if lastAliveTime > TIME_OUT * @author yaolin * */public class SocketWrapper {  private Socket socket;  private Date lastAliveTime;    // full constructor  public SocketWrapper(Socket socket, Date lastAliveTime) {    this.socket = socket;    this.lastAliveTime = lastAliveTime;  }  public Socket getSocket() {    return socket;  }  public void setSocket(Socket socket) {    this.socket = socket;  }  public Date getLastAliveTime() {    return lastAliveTime;  }  public void setLastAliveTime(Date lastAliveTime) {    this.lastAliveTime = lastAliveTime;  }}/** * SocketHolder * @author yaolin */public class SocketHolder {  private static ConcurrentMap<String, SocketWrapper> listSocketWrap = new ConcurrentHashMap<String, SocketWrapper>();    public static Set<String> keySet() {    return listSocketWrap.keySet();  }    public static SocketWrapper get(String key) {    return listSocketWrap.get(key);  }    public static void put(String key, SocketWrapper value) {    listSocketWrap.put(key, value);    flushClientStatus(key, true);  }    public static SocketWrapper remove(String key) {    flushClientStatus(key, false);    return listSocketWrap.remove(key);  }    public static void clear() {    listSocketWrap.clear();  }    /**   * <pre>content:{username:"",flag:false}</pre>   * @param flag true:put,false:remove;   */  private static void flushClientStatus(String key, boolean flag) {    ClientNotifyDTO dto = new ClientNotifyDTO(flag, key);    ReturnMessage rm = new ReturnMessage().setKey(Key.NOTIFY).setSuccess(true).setContent(dto);    rm.setFrom(ConstantValue.SERVER_NAME);    for (String toKey : listSocketWrap.keySet()) {      if (!toKey.equals(key)) { // not send to self        rm.setTo(toKey);        SocketWrapper wrap = listSocketWrap.get(toKey);        if (wrap != null) {          SendHelper.send(wrap.getSocket(), rm);        }      }    }  }}[SocketHandler.java、HandlerFactory.java、OtherHandlerImpl.java]
SocketDispatcher讓不同的SocketHandler去處理對應的消息請求,SocketHandler的設計其實就是一套簡單的工廠組件吧(其中ReturnHandler暫時由SendHelper實現信息傳送,暫時沒有用到,已經@Deprecated ,這里還是給出),完整類圖如下:

下面給出這一塊的代碼,為了縮小篇幅,將所有Handler實現的代碼收起來。
/** * SocketHandler * @author yaolin */public interface SocketHandler {  /**   * Handle Client Socket   */  public Object handle(Socket client,Object data);}/** * SocketHandlerFactory * @author yaolin */public class HandlerFactory {  // can not create instance  private HandlerFactory(){}  public static SocketHandler getHandler(int type) {    switch (type) {    case MessageType.ALIVE: // usually use      return new AliveHandler();    case MessageType.CHAT:      return new ChatHandler();    case MessageType.LOGIN:      return new LoginHandler();//    case MessageType.RETURN://      return new ReturnHandler();    case MessageType.LOGOUT:      return new LogoutHandler();    case MessageType.REGISTER:      return new RegisterHandler();    case MessageType.FILE:      return new FileHandler();    }    return null; // NullPointException  }}/** * AliveSocketHandler * @author yaolin */public class AliveHandler implements SocketHandler {  /**   * @return null   */  @Override  public Object handle(Socket client, Object data) {    if (data != null) {      BaseMessage message = JSON.parseObject(data.toString(), BaseMessage.class);      if (StringUtil.isNotEmpty(message.getFrom())) {        SocketWrapper wrapper = SocketHolder.get(message.getFrom());        if (wrapper != null) {          wrapper.setLastAliveTime(new Date()); // KEEP SOCKET ...          SocketHolder.put(message.getFrom(), wrapper);        }      }    }    return null;  }}/** * ChatHandler *  * @author yaolin */public class ChatHandler implements SocketHandler {  @Override  public Object handle(Socket client, Object data) {    if (data != null) {      ChatMessage message = JSON.parseObject(data.toString(), ChatMessage.class);      if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) {        // exist & send        if (SocketHolder.keySet().contains(message.getFrom())) {          String owner = message.getFrom();          message.setOwner(owner); // owner will be display          if (ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all            // TO_ALL TAB will be select;            message.setFrom(ConstantValue.TO_ALL);            for (String key : SocketHolder.keySet()) {              // also send to self              SocketWrapper wrapper = SocketHolder.get(key);              if (wrapper != null) {                SendHelper.send(wrapper.getSocket(), message);              }            }          } else {// one-to-one            SocketWrapper wrapper = SocketHolder.get(message.getTo());            if (wrapper != null) {              // owner = from              SendHelper.send(wrapper.getSocket(), message);              // also send to self              // TO TAB will be select;              message.setFrom(message.getTo()).setTo(owner);              SendHelper.send(client, message);            }          }        }      }    }    return null;  }}public class FileHandler implements SocketHandler {  @Override  public Object handle(Socket client, Object data) {    if (client != null) {      FileMessage message = JSON.parseObject(data.toString(), FileMessage.class);      if (StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) {        // exist & send        if (SocketHolder.keySet().contains(message.getFrom())) {          if (!ConstantValue.TO_ALL.equals(message.getTo())) { // one-to-all            SocketWrapper wrapper = SocketHolder.get(message.getTo());            if (wrapper != null) {              SendHelper.send(wrapper.getSocket(), message);              try {                if (client != null && wrapper.getSocket() != null && message.getSize() > 0) {                  InputStream is = client.getInputStream();                  OutputStream os = wrapper.getSocket().getOutputStream();                  int total = 0;                  while (!client.isClosed() && !wrapper.getSocket().isClosed()) {                    if (is.available() > 0) {                      byte[] buff = new byte[ConstantValue.BUFF_SIZE];                      int len = -1;                      while (is.available() > 0 && (len = is.read(buff)) != -1) {                        os.write(buff, 0, len);                        total += len;                        LoggerUtil.debug("SEND BUFF [" + len + "]");                      }                      os.flush();                      if (total >= message.getSize()) {                        LoggerUtil.info("SEND BUFF [OK]");                        break;                      }                    }                  }                  // AFTER SEND FILE                  // SEND SUCCESSFULLY                  ReturnMessage result = new ReturnMessage().setKey(Key.TIP)                      .setSuccess(true)                      .setContent(I18N.INFO_FILE_SEND_SUCCESSFULLY);                  result.setFrom(message.getTo()).setTo(message.getFrom())                      .setOwner(ConstantValue.SERVER_NAME);                  SendHelper.send(client, result);                  // RECEIVE SUCCESSFULLY                  result.setContent(I18N.INFO_FILE_RECEIVE_SUCCESSFULLY)                      .setFrom(message.getFrom())                      .setTo(message.getTo());                  SendHelper.send(wrapper.getSocket(), result);                }              } catch (Exception e) {                LoggerUtil.error("Handle file failed !" + e.getMessage(), e);              }            }          }        }      }    }    return null;  }}/** * LoginHandler *  * @author yaolin * */public class LoginHandler implements SocketHandler {  private UsrService usrService = new UsrService();  @Override  public Object handle(Socket client, Object data) {    ReturnMessage result = new ReturnMessage();    result.setSuccess(false);    if (data != null) {      LoginMessage message = JSON.parseObject(data.toString(), LoginMessage.class);      if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) {        if (usrService.login(message.getUsername(), message.getPassword()) != null) {          result.setSuccess(true);        } else {          result.setMessage(I18N.INFO_LOGIN_ERROR_DATA);        }        result.setFrom(ConstantValue.SERVER_NAME).setTo(message.getUsername());      } else {        result.setMessage(I18N.INFO_LOGIN_EMPTY_DATA);      }      // AFTER LOGIN      result.setKey(Key.LOGIN);      if (result.isSuccess()) { // HOLD SOCKET        SocketHolder.put(result.getTo(), new SocketWrapper(client, new Date()));      }      SendHelper.send(client, result);      if (result.isSuccess()) { // SEND LIST USER        ClientListUserDTO dto = new ClientListUserDTO();        dto.setListUser(SocketHolder.keySet());        result.setContent(dto).setKey(Key.LISTUSER);        SendHelper.send(client, result);      }    }    return null;  }}public class LogoutHandler implements SocketHandler {  @Override  public Object handle(Socket client, Object data) {    if (data != null) {      LogoutMessage message = JSON.parseObject(data.toString(), LogoutMessage.class);      if (message != null && StringUtil.isNotEmpty(message.getFrom())) {        SocketWrapper wrapper = SocketHolder.get(message.getFrom());        Socket socket = wrapper.getSocket();        if (socket != null) {          try {            socket.close();            socket = null;          } catch (Exception ignore) {          }        }        SocketHolder.remove(message.getFrom());      }    }    return null;  }}public class RegisterHandler implements SocketHandler {  private UsrService usrService = new UsrService();    @Override  public Object handle(Socket client, Object data) {    ReturnMessage result = new ReturnMessage();    result.setSuccess(false).setFrom(ConstantValue.SERVER_NAME);    if (data != null) {      RegisterMessage message = JSON.parseObject(data.toString(), RegisterMessage.class);      if (StringUtil.isNotEmpty(message.getUsername()) && StringUtil.isNotEmpty(message.getPassword())) {        if (usrService.register(message.getUsername(), message.getPassword()) != null) {          result.setSuccess(true).setContent(I18N.INFO_REGISTER_OK);        } else {          result.setMessage(I18N.INFO_REGISTER_CLIENT_EXIST);        }      } else {        result.setMessage(I18N.INFO_REGISTER_EMPTY_DATA);      }             if (StringUtil.isNotEmpty(message.getUsername())) {        result.setTo(message.getUsername());      }      // AFTER REGISTER      result.setKey(Key.REGISTER);      SendHelper.send(client, result);    }    return null;  }} /** * Use SendHelper to send ReturnMessage,  * @see yaolin.chat.server.SocketDispatcher#run() * @author yaolin */@Deprecated public class ReturnHandler implements SocketHandler {  /**   * @param data ReturnMessage   */  @Override  public Object handle(Socket client, Object data) {    if (data != null) {      ReturnMessage message = (ReturnMessage) data;      if(StringUtil.isNotEmpty(message.getFrom()) && StringUtil.isNotEmpty(message.getTo())) {        SocketWrapper wrap = SocketHolder.get(message.getTo());        if (wrap != null) {          SendHelper.send(wrap.getSocket(), message);        }      }    }    return null;  }}用戶業務:
服務端除了socket之外,還有一點點具體的業務,那就是用戶的注冊、登陸等,這里簡單的列出Usr和UsrService這兩個類,這些業務暫時沒有怎么實現,我并不打算在這個程序中引入ORM框架,所以自己寫一套DBUtil(待改善),在這里也一并貼出來。

這里只進行了簡單的校驗,沒有持久化存儲到DB中,下面是Usr和UsrService:
public class Usr {  private long id;  private String username;  private String password;  public long getId() {    return id;  }  public void setId(long id) {    this.id = id;  }  public String getUsername() {    return username;  }  public void setUsername(String username) {    this.username = username;  }  public String getPassword() {    return password;  }  public void setPassword(String password) {    this.password = password;  }}/** * // TODO * @see yaolin.chat.server.usr.repository.UsrRepository * @author yaolin * */public class UsrService {  // TODO db   private static Map<String,Usr> db = new HashMap<String,Usr>();    public Usr register(String username, String password) {    if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) {      return null;    }    if (db.containsKey(username)) {      return null; // exist;    }    Usr usr = new Usr();    usr.setUsername(username);    usr.setPassword(MD5Util.getMD5Code(password));    db.put(username, usr);    return usr;  }    public Usr login(String username, String password) {    if (StringUtil.isEmpty(username) || StringUtil.isEmpty(password)) {      return null;    }    if (db.containsKey(username)) {      Usr usr = db.get(username);      if (MD5Util.getMD5Code(password).equals(usr.getPassword())) {        return usr;      }    }    return null;  }} 下面是DBUtil工具:
 /** * DBUtils // TODO 有待調整&優化!! * @author yaolin */public class DBUtil {  // make connection used repeatedly  private static final List<Connection> cache = new LinkedList<Connection>();  private static String url;  private static String driver;  private static String user;  private static String password;  private static Boolean debug;  static {    InputStream is = DBUtil.class.getResourceAsStream("/db.properties");    try {      Properties p = new Properties();      p.load(is);      url = p.getProperty("url");      driver = p.getProperty("driver");      user = p.getProperty("user");      password = p.getProperty("password");      // just for debug      try {        debug = Boolean.valueOf(p.getProperty("debug"));      } catch (Exception ignore) {        debug = false;      }    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      if (is != null) {        try {          is.close();          is = null;        } catch (Exception ignore) {        }      }    }  }  public synchronized static Connection getConnection() {    if (cache.isEmpty()) {      cache.add(makeConnection());    }    Connection conn = null;    int i = 0;    try {      do {        conn = cache.remove(i);      } while (conn != null && conn.isClosed() && i < cache.size());    } catch (Exception ignore) {    }    try {      if (conn == null || conn.isClosed()) {        cache.add(makeConnection());        conn = cache.remove(0);      }      return conn;    } catch (Exception e) {      throw new RuntimeException(e);    }  }  public synchronized static void close(Connection connection) {    try {      if (connection != null && !connection.isClosed()) {        if (debug)          debug("release connection!");        cache.add(connection);      }    } catch (SQLException ignore) {    }  }  public static Object query(String sql, ResultSetMapper mapper, Object... args) {    if (debug)      debug(sql);    Connection conn = getConnection();    PreparedStatement ps = null;    ResultSet rs = null;    Object result = null;    try {      ps = conn.prepareStatement(sql);      int i = 1;      for (Object object : args) {        ps.setObject(i++, object);      }      rs = ps.executeQuery();      result = mapper.mapper(rs);    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      try {        if (rs != null) {          rs.close();          rs = null;        }        if (ps != null) {          ps.close();          ps = null;        }      } catch (Exception ignore) {      }    }    close(conn);    return result;  }  public static int modify(String sql, Object... args) {    if (debug)      debug(sql);    Connection conn = getConnection();    PreparedStatement ps = null;    int row = 0;    try {      ps = conn.prepareStatement(sql);      int i = 1;      for (Object object : args) {        ps.setObject(i++, object);      }      row = ps.executeUpdate();    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      try {        if (ps != null) {          ps.close();          ps = null;        }      } catch (Exception ignore) {      }    }    close(conn);    return row;  }  public static int[] batch(List<String> sqls) {    if (debug)      debug(sqls.toString());    Connection conn = getConnection();    Statement stmt = null;    int[] row;    try {      stmt = conn.createStatement();      for (String sql : sqls) {        stmt.addBatch(sql);      }      row = stmt.executeBatch();    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      try {        if (stmt != null) {          stmt.close();          stmt = null;        }      } catch (Exception ignore) {      }    }    close(conn);    return row;  }  public static int[] batch(String sql, PreparedStatementSetter setter) {    if (debug)      debug(sql);    Connection conn = getConnection();    PreparedStatement ps = null;    int[] row;    try {      ps = conn.prepareStatement(sql);      setter.setter(ps);      row = ps.executeBatch();    } catch (Exception e) {      throw new RuntimeException(e);    } finally {      try {        if (ps != null) {          ps.close();          ps = null;        }      } catch (Exception ignore) {      }    }    close(conn);    return row;  }  private static Connection makeConnection() {    try {      Class.forName(driver).newInstance();      Connection conn = DriverManager.getConnection(url, user, password);      if (debug)        debug("create connection!");      return conn;    } catch (Exception e) {      throw new RuntimeException(e);    }  }  private static void debug(String sqls) {    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");    System.out.println(sdf.format(new Date())         + " DEBUG " + Thread.currentThread().getId()         + " --- [" + Thread.currentThread().getName() + "] " + "excute sqls : " + sqls);  }}/** * PreparedStatementSetter * @author yaolin */public interface PreparedStatementSetter {  public void setter(PreparedStatement ps);}/** * ResultSetMapper * @author yaolin */public interface ResultSetMapper {  public Object mapper(ResultSet rs);}源碼下載:demo 
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持武林網。
新聞熱點
疑難解答