Java-Socket编程

Socket 编程

套接字使用TCP提供了两台计算机之间的通信机制。 客户端程序创建一个套接字,并尝试连接服务器的套接字。

当连接建立时,服务器会创建一个 Socket 对象。客户端和服务器现在可以通过对 Socket 对象的写入和读取来进行通信。

java.net.Socket 类代表一个套接字,并且 java.net.ServerSocket 类为服务器程序提供了一种来监听客户端,并与他们建立连接的机制。

以下步骤在两台计算机之间使用套接字建立TCP连接时会出现:

  • 服务器实例化一个 ServerSocket 对象,表示通过服务器上的端口通信。

  • 服务器调用 ServerSocket 类的 accept() 方法,该方法将一直等待,直到客户端连接到服务器上给定的端口。

  • 服务器正在等待时,一个客户端实例化一个 Socket 对象,指定服务器名称和端口号来请求连接。

  • Socket 类的构造函数试图将客户端连接到指定的服务器和端口号。如果通信被建立,则在客户端创建一个 Socket 对象能够与服务器进行通信。

  • 在服务器端,accept() 方法返回服务器上一个新的 socket 引用,该 socket 连接到客户端的 socket。

  • 连接建立后,通过使用 I/O 流在进行通信,每一个socket都有一个输出流和一个输入流,客户端的输出流连接到服务器端的输入流,而客户端的输入流连接到服务器端的输出流。

TCP 是一个双向的通信协议,因此数据可以通过两个数据流在同一时间发送.以下是一些类提供的一套完整的有用的方法来实现 socket。

来源: https://www.runoob.com/java/java-networking.html

简单代码示例

服务端代码

  1. 单线程

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    package top.mtain.socket.hello;

    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.ServerSocket;
    import java.net.Socket;

    public class SocketServer {
    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {
    int port = 18003;
    ServerSocket server = new ServerSocket(port);
    System.out.println("server start...");

    while (true) {
    Socket socket = server.accept();
    // 建立连接后,从socket中获取输入流,并建立缓冲区进行读取
    InputStream inputStream = socket.getInputStream();
    byte[] bytes = new byte[1024];
    int len;
    StringBuilder sb = new StringBuilder();
    while ((len = inputStream.read(bytes)) != -1) {
    // 注意统一UTF-8
    sb.append(new String(bytes, 0, len, "UTF-8"));
    }
    System.out.println("From client: " + sb);


    OutputStream outputStream = socket.getOutputStream();
    outputStream.write("Hello Client,I get the message.".getBytes("UTF-8"));

    outputStream.close();
    inputStream.close();
    socket.close();
    }
    }
    }
  2. 线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package top.mtain.socket.thread;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolSocketServer {
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
int port = 18003;
ServerSocket server = new ServerSocket(port);
System.out.println("server start...");

ExecutorService threadPool = Executors.newFixedThreadPool(100);
while (true) {
Socket socket = server.accept();

Runnable runnable=()->{
try {
// 建立连接后,从socket中获取输入流,并建立缓冲区进行读取
InputStream inputStream = socket.getInputStream();
byte[] bytes = new byte[1024];
int len;
StringBuilder sb = new StringBuilder();
while ((len = inputStream.read(bytes)) != -1) {
// 注意统一UTF-8
sb.append(new String(bytes, 0, len, "UTF-8"));
}
System.out.println("From client: " + sb);

OutputStream outputStream = socket.getOutputStream();
outputStream.write("1".getBytes("UTF-8"));

outputStream.close();
inputStream.close();
socket.close();
} catch (Exception e) {
// TODO: handle exception
}
};

threadPool.submit(runnable);

}
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package top.mtain.socket.thread;

import java.io.DataInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class SocketClient {
public static void main(String args[]) throws Exception {
String host = "127.0.0.1";
int port = 18003;

Socket socket = new Socket(host, port);

// 向服务端发送
OutputStream outputStream = socket.getOutputStream();
String message="Hello";
OutputStream sendOutputStream = socket.getOutputStream();
// 每次write服务端都在接收
sendOutputStream.write(message.getBytes("UTF-8"));

sendOutputStream.flush();

// 执行完此行后服务端才结束接收
socket.shutdownOutput();

// 接受服务端消息
InputStream inputStream = socket.getInputStream();

/**
* 使用DataInputStream直接获取返回数值,做成功判断
*/
DataInputStream dataInputStream = new DataInputStream(inputStream);
int successFlag = dataInputStream.readInt();

System.out.println("From Server: "+successFlag);

inputStream.close();
outputStream.close();
socket.close();
}
}

相关问题

如何标记传输完成

  1. Socket关闭
    缺点:

    • 客户端无法接受服务端返回信息
    • 客户端再次发送数据需要重新创建socket
  2. 关闭输出流socket.shutdownOutput();

  3. 确定结束字符
  4. 规定长度

参考: https://www.cnblogs.com/yiwangzhibujian/p/7107785.html

Netty

简介

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

‘Quick and easy’ doesn’t mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

为何选用:
自己实现socket服务端,要考虑并发多线程问题,所以出于这个原因就果断选择Netty

相关概念

BIO与NIO

BIO为同步阻塞形式,NIO为同步非阻塞形式,NIO并没有实现异步,在JDK1.7后升级NIO库包,支持异步非阻塞

同步阻塞式IO,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

同步非阻塞式IO,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

参考: https://blog.csdn.net/yswKnight/article/details/79347833

通道Channel和缓冲区Buffer

通道是对原 I/O 包中的流的模拟。到任何目的地(或来自任何地方)的所有数据都必须通过一个 Channel 对象(通道)。

一个 Buffer 实质上是一个容器对象。发送给一个通道的所有对象都必须首先放到缓冲区中;同样地,从通道中读取的任何数据都要读到缓冲区中。

Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流

缓冲区类型:

1
2
3
4
5
6
7
ByteBuffer
CharBuffer
ShortBuffer
IntBuffer
LongBuffer
FloatBuffer
DoubleBuffer

参考: https://blog.csdn.net/qq_36520235/article/details/81318189

代码示例

Netty4服务端

netty-all-4.1.36.Final.jar

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;

public class NettyServer {

private static final int port = 6666;
private static final EventLoopGroup bossGroup = new NioEventLoopGroup();
private static final EventLoopGroup workerGroup = new NioEventLoopGroup();

public static void start() throws Exception {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup) // 绑定线程池
.channel(NioServerSocketChannel.class) // 指定channel
.childHandler(new ChannelInitializer<Channel>() { // 绑定客户端连接时候触发操作
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();

pipeline.addLast(new LoggingHandler());
pipeline.addLast(new ReadTimeoutHandler(20));// 设置20秒空闲自动断开
pipeline.addLast(new WriteTimeoutHandler(0));
pipeline.addLast(new SocketHandler());

}
});

ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
System.out.println("server start");
}

protected static void shutdown() {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}

public static void main(String[] args) throws Exception {
System.out.println("启动server");
NettyServer.start();
}
}





import java.net.InetSocketAddress;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class SocketHandler extends ChannelInboundHandlerAdapter {

/**
* 通道激活时触发
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channel active>>>>>>>");
InetSocketAddress insocket = (InetSocketAddress) ctx.channel()
.remoteAddress();
String clientIP = insocket.getAddress().getHostAddress();
System.out.println("客户端IP:" + clientIP);
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf message = (ByteBuf) msg;
byte[] response = new byte[message.readableBytes()];
message.readBytes(response);
System.out.println("receive client info: " + new String(response));

String sendContent = "hello client ,im server, this is u say:" + new String(response);
ByteBuf seneMsg = Unpooled.buffer(sendContent.length());
seneMsg.writeBytes(sendContent.getBytes());

ctx.writeAndFlush(seneMsg);
System.out.println("send info to client:" + sendContent);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
super.channelReadComplete(ctx);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
System.out.println("异常信息:\r\n" + cause.getMessage());
}




}

ByteBuf

ByteBuf的常用API总结: https://blog.csdn.net/qq_26680031/article/details/79118878

问题解决

Tomcat启动时Netty堵塞Tomcat主线程问题

  1. 使用子线程启动Netty(建议)
    代码示例

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    方法1:
    Thread thread = new Thread(){
    @Override
    public void run() {
    new Server.start();
    }
    };
    //设置守护进程,这样子线程会随主线程(如Tomcat)退出
    thread.setDaemon(true);
    thread.start();
  2. 启动Netty时不使用closeFuture方法去阻塞

Netty随Web容器(Tomcat)退出

netty不退出子线程,会出现重启后端口占用问题

方案如下:

  1. 设置守护进程thread.setDaemon(true);
  2. 容器启动和停止的回调
    1
    2
    3
    SmartLifecycle类的start和stop方法

    ServletContextListener接口的ServletContextEvent方法

Netty粘包拆包解码器

  1. 字节定长分包:和FixedLengthFrameDecoder
  2. 自定义分隔符分包:DelimiterBasedFrameDecoder
  3. 回车换行分包(“\n”或者”\r\n”):LineBasedFrameDecoder
  4. 自定义解码和编码器:LengthFieldBasedFrameDecoder

粘包拆包问题解决思路

  1. 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格;
  2. 在包尾增加回车换行符进行分割,例如FTP协议;
  3. 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常涉及思路为消息头的第一个字段使用int32来表示消息的总长度;
  4. 更复杂的应用层协议。

消息头+消息体代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import top.mtain.socket.server.util.NettyUtil;

import java.util.List;

/**
* 数据定义
* 消息头+消息体
* 前八位标识数据包长度+完整的数据包
*
* 此类根据包长度读取包数据
*
*/
public class SocketRequestDecoder extends ByteToMessageDecoder {
// 定义数据包字节长度
// 如: 00001200,则后面的数据包字节长度为1200
public final int FLAG_LENGTH = 8;
private int msgLen = 0;


@Override
protected void decode(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf, List<Object> list) throws Exception {
if(byteBuf.readableBytes()< FLAG_LENGTH) return;

if(msgLen==0) {
// 从标识位中读取包长度
ByteBuf lengthByteBuf = byteBuf.readBytes(FLAG_LENGTH);
String lengthStr = NettyUtil.convertByteBufToString(lengthByteBuf);
msgLen = Integer.valueOf(lengthStr);

}

int currentLen = byteBuf.readableBytes();
//收到的报文长度不足时,继续接收
if(currentLen<msgLen) return;

if(currentLen>msgLen) System.out.println("标识定义数据字节长度:"+msgLen+"实际接收数据包长度:"+currentLen);


//舍弃开头的长度定义位,提取完整数据包给下一个Handler处理
list.add(byteBuf.readBytes(msgLen));
}
}

客户端根据八位字符串读取数据

1
2
3
4
5
6
7
8
InputStream is = socket.getInputStream();
byte[] datalen = new byte[8];
is.read(datalen);
int length = Integer.parseInt(new String(datalen));

byte[] data = new byte[length];
is.read(data);
String recvMsg = new String(data);