Архитектура клиент-серверного приложения с использованием Netty в нашем случае может выглядеть так как показано на рисунке ниже
Код
public class ServerPipelineFactory implements
ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
// Add the number codec first,
pipeline.addLast("decoder", new PacketDecoder());
pipeline.addLast("encoder", new PacketEncoder());
// and then business logic.
// Please note we create a handler for every new channel
// because it has stateful properties.
pipeline.addLast("handler", new ServerHandler());
return pipeline;
}
}
В данном классе в конструкторе в конвейер добавляются обработчики пакетов. Пакеты будут проходить тот порядок в котором добавлены обработчики.
Рассмотрим все классы добавленные в конвейер.
Код
public class PacketDecoder extends FrameDecoder {
private static final Logger logger = Logger.getLogger(
ClientHandler.class.getName());
@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
int length = buffer.readInt();
int ID = buffer.readInt();
byte [] data = new byte[length];
buffer.readBytes(data, 0, length);
if (ID == Packet.ReciveMessage.getDefaultInstance().getType())
{
return Packet.ReciveMessage.parseFrom(data);
}else
if (ID == Packet.SendMessage.getDefaultInstance().getType())
{
return Packet.SendMessage.parseFrom(data);
}
return null;
}
}
Класс PacketDecoder унаследован у класса FrameDecoder при наследовании мы перегрузили функцию decode() в которой и выполняется преобразование последовательности байт в необходимые объекты. Обращу внимание на использование значений по умолчанию в пакетах для их однозначной идентификации. Для это в файле .proto в каждое сообщение были добавлены необязательные поля type с значением по умолчанию равному ID типа данного сообщения, это позволяет определять эти ID в одном месте а именно в файле .proto, что по мне очень удобно. Преобразование данных происходит в обратном порядке чем в классе PacketEncoder.
Код
public class PacketEncoder extends OneToOneEncoder {
@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();
//обработка пакетов Packet.SendMessage
if (msg instanceof Packet.SendMessage)
{
byte [] data = ((Packet.SendMessage)msg).toByteArray();
//сначала длина пакета затем ID потом его тело
buf.writeInt(data.length);
buf.writeInt(((Packet.SendMessage)msg).getType());
buf.writeBytes(data);
}
//обработка пакетов Packet.ReciveMessage
if (msg instanceof Packet.ReciveMessage)
{
byte [] data = ((Packet.ReciveMessage)msg).toByteArray();
buf.writeInt(data.length);
buf.writeInt(((Packet.ReciveMessage)msg).getType());
buf.writeBytes(data);
}
// возвращаем сконструированное сообщение.
return buf;
}
}
Класс PacketEncoder унаследован от OneToOneEncoder, в нем перегружена функция encode в которой происходит превращение пакетов в последовательность байт.
В классе ServerHandler реализована логика обработки пакетов на стороне сервера.
Код
public class ServerHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(
ServerHandler.class.getName());
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// Log all channel state changes.
if (e instanceof MessageEvent) {
logger.info("Writing:: " + e);
}
super.handleDownstream(ctx, e);
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
Object obj = e.getMessage();
if (obj instanceof Packet.SendMessage)
{
Packet.SendMessage msg = (Packet.SendMessage)obj;
Packet.ReciveMessage recive= Packet.ReciveMessage.newBuilder()
.setCount(msg.getText().length()).build();
ctx.getChannel().write(recive);
}
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
logger.info("Closed");
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
В данном случае логика работы очень проста мы возвращаем клиенту информацию о длине переданной строки поэтому она размещена внутри метода обработки получаемых пакетов messageReceived. Что не корректно если бы мы выполняли более трудоемкую операцию. Все остальные перегруженные функции отражают различные события которые могут произойти в данном канале связи а именно установку/разрыв соединения и т.д.
Теперь рассмотри из чего состоит клиент как он посылает и получает пакеты. Логика его организации как видно из рисунка выше почти идентична логике создания сервера. У клиента и сервера одинаковые классы decored\encoder поэтому я опишу только различия
Класс Client. В нем происходит установка соединение, отправка сообщений серверу.
Код
public class Client {
String host;
int port;
private ClientBootstrap bootstrap;
private Channel channel;
public Client(String host, int port) {
this.host = host;
this.port = port;
}
public void Connect(ClientFrame frame)
{
// Configure the client.
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
ClientLogic logic = new ClientLogic(frame);
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new ClientPipelineFactory(logic));
// Make a new connection.
ChannelFuture connectFuture =
bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is made successfully.
channel = connectFuture.awaitUninterruptibly().getChannel();
//
ClientHandler handler =
(ClientHandler) channel.getPipeline().getLast();
}
public void SendMessage(String text)
{
Packet.SendMessage msg = Packet.SendMessage.newBuilder()
.setText(text).build();
channel.write(msg);
}
}
Класс ClientPipelineFactory почти идентичен ServerPipelineFactory
Код
public class ClientPipelineFactory implements
ChannelPipelineFactory {
ClientLogic clientLogic;
public ClientPipelineFactory(ClientLogic clientLogic) {
this.clientLogic = clientLogic;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
// добавляем кодеки
pipeline.addLast("decoder", new PacketDecoder());
pipeline.addLast("encoder", new PacketEncoder());
// и бизнес логику
pipeline.addLast("handler", new ClientHandler(clientLogic));
return pipeline;
}
}
Класс ClientHandler. В нем реализована логика работы клиента по получению сообщений их обработка происходит в классе ClientLogic
Код
public class ClientHandler extends SimpleChannelHandler {
private static final Logger logger = Logger.getLogger(
ClientHandler.class.getName());
private ClientLogic clientLogic;
public ClientHandler(ClientLogic clientLogic) {
this.clientLogic = clientLogic;
}
@Override
public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
logger.info(e.toString());
}
super.handleUpstream(ctx, e);
}
@Override
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
// Log all channel state changes.
if (e instanceof MessageEvent) {
logger.info("Writing:: " + e);
}
super.handleDownstream(ctx, e);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
clientLogic.start();
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) {
}
@Override
public void messageReceived(
ChannelHandlerContext ctx, final MessageEvent e) {
//получаем пакет
Object obj = e.getMessage();
//проверяем что за пакет
if (obj instanceof Packet.ReciveMessage)
{//если пакет наш то кладем его в обработку
clientLogic.putPacket((Packet.ReciveMessage)obj);
}
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}
Собственно разбирать весь код примера я не буду т.к. во первых он достаточно прост, а во вторых это урок про Netty, исходный код примера расположен
Доступно только для пользователей.