網(wǎng)游少不了網(wǎng)絡(luò)通信,不像寫C++時(shí)自己造輪子,java服務(wù)器使用Netty。Netty做了很多工作,使編寫網(wǎng)絡(luò)程序變得輕松簡單。靈活利用這些基礎(chǔ)設(shè)施,以實(shí)現(xiàn)我們的需求。
其中一個(gè)需求是自動重連。自動重連有兩種應(yīng)用場景:
在有多個(gè)服務(wù)器(比如LoginServer和GameServer等)時(shí),這樣就不用考慮服務(wù)器啟動順序。
有需求就需要有解決方案,其實(shí)很簡單,Netty已經(jīng)提供,如下:
ctx.channel().eventLoop().schedule(() -> tryConnect(), reconnectInterval, TimeUnit.SECONDS);tryConnect是實(shí)際執(zhí)行連接的方法,后面兩個(gè)參數(shù)表示每隔reconnectInterval秒重連一次即執(zhí)行tryConnect,而對應(yīng)上述兩種應(yīng)用場景的分別是connect失敗和channel inactive時(shí),詳見后面代碼。
自動重連解決后,還有一個(gè)問題是如何管理連接。Netty使用Channel來抽象一個(gè)連接,但實(shí)際開發(fā)時(shí),通常邏輯上會有一個(gè)會話(session)對象用來表示對端,可以在其上添加各種邏輯屬性方法等,以及收發(fā)網(wǎng)絡(luò)消息。這樣一個(gè)Channel就需要對應(yīng)一個(gè)Session,且方便互相索引。
首先考慮如何創(chuàng)建這個(gè)Session。
為了方便Netty使用和復(fù)用,我抽象了一個(gè)TcpServer/TcpClient類分別表示服務(wù)器和客戶端。理想情況是 TcpServer和TcpClient合并為一個(gè),不同行為由Session來決定。但因?yàn)镹etty的服務(wù)器和客戶端分別使用ServerBootstrap和Bootstrap,其分別包含bind和connect,這個(gè)想法未能實(shí)現(xiàn)。
Session有兩種,ListenSession負(fù)責(zé)監(jiān)聽連接請求,TransmitSession負(fù)責(zé)傳輸數(shù)據(jù)。在實(shí)際應(yīng)用中,有這么一種需求,比如GameServer主動連接LoginServer,這時(shí)GameServer即作為client端。在連接成功時(shí),需要GameServer主動發(fā)個(gè)注冊消息給LoginServer,LoginServer籍此得知是哪個(gè)服務(wù)器組。此時(shí),GameServer可能同時(shí)會以Client身份連接另一個(gè)服務(wù)器比如Gateway而且同樣要發(fā)消息。那么作為client端主動連接的TransmitSession最好細(xì)化,需要包含要連接的主機(jī)地址、端口和重連時(shí)間等信息,也需要在Active時(shí)發(fā)送不同消息,而Server端TransmitSession并不需要。所以設(shè)計(jì)上TransmitSession又分為ClientSession和ServerSession。SeverSession由TcpServer在建立連接時(shí)自動創(chuàng)建,而ListenSession和ClientSession則由使用者自行創(chuàng)建并交由TcpServer/TcpClient管理。
接口如下:
public abstract class ListenSession {	PRivate boolean working = false;	private int localPort = 0;	private int relistenInterval = 10;    ...	public abstract ServerSession createServerSession();}public abstract class TransmitSession {	protected Channel channel = null;	protected boolean working = false;	...	public abstract void onActive() throws Exception;	public abstract void onInactive() throws Exception;	public abstract void onException() throws Exception;	public abstract void onReceive(Object data) throws Exception;	public abstract void send(Object data);}public abstract class ClientSession extends TransmitSession {	private String remoteHost = "";	private int remotePort = 0;	private int reconnectInterval = 10;	...}其次考慮如何管理Channel和Session的對應(yīng)關(guān)系。除了使用一個(gè)類似HashMap
綜上,TcpServer示例如下:
public class TcpServer {	private final AttributeKey<ListenSession> LISTENSESSIONKEY = AttributeKey.valueOf("LISTENSESSIONKEY");	private final AttributeKey<ServerSession> SERVERSESSIONKEY = AttributeKey.valueOf("SERVERSESSIONKEY");	private final ServerBootstrap bootstrap = new ServerBootstrap();	private EventLoopGroup bossGroup = null;	private EventLoopGroup workerGroup = null;	private ArrayList<ListenSession> listenSessions = new ArrayList<ListenSession>();    ...	private void start() {		bossGroup = new NioEventLoopGroup(1);		workerGroup = new NioEventLoopGroup(4);		bootstrap.group(bossGroup, workerGroup);		bootstrap.channel(NioServerSocketChannel.class);		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {			@Override			protected void initChannel(SocketChannel ch) throws Exception {				ChannelPipeline pipeline = ch.pipeline();				pipeline.addLast("encode", new ObjectEncoder());				pipeline.addLast("decode", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));				pipeline.addLast(workerGroup, new ChannelInboundHandlerAdapter() {					@Override					public void channelActive(ChannelHandlerContext ctx) throws Exception {						ListenSession listenSession = ctx.channel().parent().attr(LISTENSESSIONKEY).get();						ServerSession serverSession = listenSession.createServerSession();						ctx.channel().attr(SERVERSESSIONKEY).set(serverSession);						serverSession.setChannel(ctx.channel());						serverSession.onActive();					}					@Override					public void channelInactive(ChannelHandlerContext ctx) throws Exception {						ServerSession serverSession = ctx.channel().attr(SERVERSESSIONKEY).get();						serverSession.onInactive();					}					...	}    ...	private void tryListen(ListenSession listenSession) {		if (!listenSession.isWorking()) {			return;		}		final int port = listenSession.getLocalPort();		final int interval = listenSession.getRelistenInterval();		ChannelFuture f = bootstrap.bind(port);		f.addListener(new ChannelFutureListener() {			public void OperationComplete(ChannelFuture f) throws Exception {				if (f.isSuccess()) {					f.channel().attr(LISTENSESSIONKEY).set(listenSession);				} else {					f.channel().eventLoop().schedule(() -> tryListen(listenSession), interval, TimeUnit.SECONDS);				}			}		});	}}如果監(jiān)聽失敗則隔interval秒重試,新連接建立時(shí)創(chuàng)建ServerSession關(guān)聯(lián)該Channel。
TcpClient的實(shí)現(xiàn)大同小異,不同點(diǎn)在于需要在Channel Inactive時(shí)執(zhí)行重連:
public class TcpClient {	private final AttributeKey<ClientSession> SESSIONKEY = AttributeKey.valueOf("SESSIONKEY");	private final Bootstrap bootstrap = new Bootstrap();	private EventLoopGroup workerGroup = null;	private ArrayList<ClientSession> clientSessions = new ArrayList<ClientSession>();	...	private void start() {		workerGroup = new NioEventLoopGroup();		bootstrap.group(workerGroup);		bootstrap.channel(NioSocketChannel.class);		bootstrap.handler(new ChannelInitializer<SocketChannel>() {			@Override			protected void initChannel(SocketChannel ch) throws Exception {				ChannelPipeline pipeline = ch.pipeline();				pipeline.addLast("encode", new ObjectEncoder());				pipeline.addLast("decode", new ObjectDecoder(ClassResolvers.cacheDisabled(null)));				pipeline.addLast(new ChannelInboundHandlerAdapter() {					@Override					public void channelActive(ChannelHandlerContext ctx) throws Exception {						ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get();						clientSession.setChannel(ctx.channel());						clientSession.onActive();					}					@Override					public void channelInactive(ChannelHandlerContext ctx) throws Exception {						ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get();						clientSession.onInactive();						final int interval = clientSession.getReconnectInterval();						ctx.channel().eventLoop().schedule(() -> tryConnect(clientSession), interval, TimeUnit.SECONDS);					}					...	}    ...	private void tryConnect(ClientSession clientSession) {		if (!clientSession.isWorking()) {			return;		}		final String host = clientSession.getRemoteHost();		final int port = clientSession.getRemotePort();		final int interval = clientSession.getReconnectInterval();		ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));		future.addListener(new ChannelFutureListener() {			public void operationComplete(ChannelFuture f) throws Exception {				if (f.isSuccess()) {					f.channel().attr(SESSIONKEY).set(clientSession);				} else {					f.channel().eventLoop().schedule(() -> tryConnect(clientSession), interval, TimeUnit.SECONDS);				}			}		});	}}如果需要監(jiān)聽多個(gè)端口或連接多個(gè)目的主機(jī),只需要創(chuàng)建多個(gè)ClientSession/ListenSession即可。如:
private TcpServer tcpServer = new TcpServer();  private LSServer lsServer = new LSServer(); private LSClient lsClient = new LSClient();  lsServer.setLocalPort(30001); lsServer.setRelistenInterval(10); tcpServer.attach(lsServer); lsClient.setLocalPort(40001); lsClient.setRelistenInterval(10); tcpServer.attach(lsClient); 另外值得一提的是網(wǎng)上很多例子,都會在bind端口后,調(diào)用如下代碼:
f.channel().closeFuture().sync(); 這會阻塞當(dāng)前線程,其實(shí)就是在當(dāng)前線程做main loop。而實(shí)際游戲服務(wù)器中,通常main線程做邏輯線程,邏輯線程需要自己tick,也就是自定義main loop,我們在其中執(zhí)行一些每幀更新的邏輯。所以并不需要上面這種方式。
公共庫倉庫:JMetazion
服務(wù)器示例倉庫:JGameDemo
服務(wù)器架構(gòu)QQ交流群:330459037 
新聞熱點(diǎn)
疑難解答
圖片精選