随笔分类
Socket同步 / 异步和阻塞 / 非阻塞
同步和异步
是属于操作系统
级别的,指的是操作系统在受到程序请求的 IO之后,如果 IO资源没有准备好,该如何去响应程序的问题;同步就是不响应,直到 IO资源准备好;而异步的话则是会返回给程序一个 标志
,这个标志用于当 IO资源准备好之后,通过事件机制来决定该发送到什么地方去
阻塞和非阻塞
是程序
级别的,指的是程序在请求操作系统进行 IO操作时,如果 IO资源没有准备好,程序该怎么去处理的问题‘;阻塞便是程序什么都不做,一直等到资源准备完毕;非阻塞的话程序会继续运行,不过会时不时回来看看 IO资源有没有准备好.
我们所常见的 BIO是 同步阻塞的
,同步意味着操作系统底层会一直等待 IO资源就绪才会去响应程序,阻塞意味着程序会一直等待操作系统的响应 (对应 IO资源就绪),这期间程序 (对应线程)无法去做其它事情
具体来将,程序级别的阻塞便是 accept和 read造成的,我们可以通过改造将程序变成非阻塞的,但是操作系统层次的阻塞 (对应的便是同步问题)我们没法改变
我们的 NIO是同步非阻塞的,其非阻塞的实现其实就是去改善 accept和 read方法带来的阻塞线程,因此,其也引入了 Channel和 Buffer的概念
下面正式开始些许源码解读吧!
对 ServerSocket.accept解读
最近,读了秋大的些许文章,收获颇多,至此些许分享,不足之处耐指出.
先来看个简单 demo:
BIOServer
public class BIOServer {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;
Socket socket = null;
BufferedReader reader = null;
String inputContent;
int count = 0;
try {
serverSocket = new ServerSocket(port);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
socket = serverSocket.accept(); // 等待新连接
System.out.println("新连接建立: " + socket.getRemoteSocketAddress() + ", 端口号:" + socket.getPort() + ", 服务端口号: " + socket.getLocalPort());
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count ++;
System.out.println("id为" + socket.hashCode() + "的clientSocket" + stringNowTime() + "读取结束");
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOServer bioServer = new BIOServer();
bioServer.initBIOServer(9898);
}
}
BIOClient
public class BIOClient {
public void initBIOClient(String host, int port) {
BufferedReader reader = null;
BufferedWriter writer = null;
Socket socket = null;
String inputContent;
int count = 0;
try {
reader = new BufferedReader(new InputStreamReader(System.in));
socket = new Socket(host, port);
writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
System.out.println("ClientSocket started: " + stringNowTime());
while ((inputContent = reader.readLine()) != null && count < 2) {
inputContent = stringNowTime() + ": 第" + count + "条消息: " + inputContent + "\n";
// 对于 outputstream, 是有自己对于的一个缓冲区的, 默认下达到一定数据量时才会去将数据写入到内核的 Socket缓冲区
writer.write(inputContent); // 将消息发送给服务端
writer.flush();
count ++;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
reader.close();
writer.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return format.format(new Date());
}
public static void main(String[] args) {
BIOClient bioClient = new BIOClient();
bioClient.initBIOClient("127.0.0.1", 9898);
}
}
初始化
当我们创建 ServerSocket时,会做些什么?
public ServerSocket() throws IOException {
// 这里去创建了 impl
setImpl();
}
我们来考虑有参的情况:
public ServerSocket(int port) throws IOException {
// 重载调用
this(port, 50, null);
}
public ServerSocket(int port, int backlog, InetAddress bindAddr) throws IOException {
// 这里也调用了 setImpl? - 这个方法主要来干了什么? - 创建 impl
setImpl();
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException(
"Port value out of range: " + port);
if (backlog < 1)
backlog = 50;
try {
// 服务端去绑定端口 - 该有的参数都传递进去了
bind(new InetSocketAddress(bindAddr, port), backlog);
} catch(SecurityException e) {
close();
throw e;
} catch(IOException e) {
close();
throw e;
}
}
setImpl
private void setImpl() {
// 一开始 factory会为空
if (factory != null) {
impl = factory.createSocketImpl();
checkOldImpl();
} else {
// No need to do a checkOldImpl() here, we know it's an up to date
// SocketImpl!
// 可以看到, 这里来创建了 impl - 实现 SocksSocketImpl
// impl是什么? ServerSocket的成员 SocketImpl
impl = new SocksSocketImpl();
}
if (impl != null)
// 这里将 ServerSocket设置到 SocketImpl中去了
// 不妨想想, 为什么这么做?
// impl ? 莫非其封装着 ServerSocket, ServerSocket的真正实现都在 impl中?
impl.setServerSocket(this);
}
如下,可以看到 SocketImpl中封装着 ServerSocket、Socket,莫非 Socket创建时也是类似的操作?继续往下看
bind
public void bind(SocketAddress endpoint, int backlog) throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!oldImpl && isBound())
throw new SocketException("Already bound");
if (endpoint == null)
endpoint = new InetSocketAddress(0);
if (!(endpoint instanceof InetSocketAddress))
throw new IllegalArgumentException("Unsupported address type");
InetSocketAddress epoint = (InetSocketAddress) endpoint;
if (epoint.isUnresolved())
throw new SocketException("Unresolved address");
// 这里来校验了下 backlog
// backlog是啥?backlog提供了容量限制的功能
// 服务端处理客户端 Socket连接是耗时的 - ServerSocket维护着一个处理请求的队列, 里面存放着来不及处理的客户端 Socket
// 而 backlog便是该队列长度, 当队列满时, 再有新的连接请求, 服务端便会拒绝
// so, 可以说 backlog提供了 容量限制的功能, 避免客户端 Socket太多占用了服务端资源
if (backlog < 1)
backlog = 50;
try {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkListen(epoint.getPort());
// 注意, 这里使用到了 impl去调用 bind、listen等方法
// 这说明了什么? - ServerSocket的内部是靠 SocketImpl的实现类运作的
// SocketImpl才是真正的核心运作类 - 可以认为 Socket只是一个壳而已, 里面封装着 impl
getImpl().bind(epoint.getAddress(), epoint.getPort());
getImpl().listen(backlog);
bound = true;
} catch(SecurityException e) {
bound = false;
throw e;
} catch(IOException e) {
bound = false;
throw e;
}
}
这里去调用了 AbstractPlainSocketImpl
的 bind方法
protected synchronized void bind(InetAddress address, int lport)
throws IOException
{
synchronized (fdLock) {
if (!closePending && (socket == null || !socket.isBound())) {
NetHooks.beforeTcpBind(fd, address, lport);
}
}
// 去调用了 DualStackPlainDatagramSocketImpl的socketBind - 至此, 系统调用尚未发生
socketBind(address, lport);
// 判断客户端是否连接建立成功与否
if (socket != null)
socket.setBound();
if (serverSocket != null)
// 表示端口已经绑定成功
serverSocket.setBound();
}
void socketBind(InetAddress var1, int var2) throws IOException {
int var3 = this.checkAndReturnNativeFD();
if (var1 == null) {
throw new NullPointerException("inet address argument is null.");
} else {
// bind0 - 本地方法, 至此, 服务端绑定端口结束 - 对应的其实是系统调用
bind0(var3, var1, var2, this.exclusiveBind);
if (var2 == 0) {
this.localport = localPort0(var3);
} else {
this.localport = var2;
}
this.address = var1;
}
}
accept
public Socket accept() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isBound())
throw new SocketException("Socket is not bound yet");
// 注意, 这里创建了 Socket, 并且其内部也有着 impl, 这里初始化为 null
Socket s = new Socket((SocketImpl) null);
// 这里就是让 impl去运作起来 - 处理 socket连接, 并来设置些属性 - 下面将 socket返回了
implAccept(s);
// 这里将 socket返回
return s;
}
protected final void implAccept(Socket s) throws IOException {
SocketImpl si = null;
try {
if (s.impl == null)
// 为 Socket创建 impl
s.setImpl();
else {
s.impl.reset();
}
// si保存了 Socket中 impl
si = s.impl;
// 将 Socket中 impl设置为了 null
s.impl = null;
// 为 impl中属性分配内存地址初始化
si.address = new InetAddress();
// 文件描述符
si.fd = new FileDescriptor();
// 看, 这里使用了 ServerSocket中 impl执行 accept
// 将 Socket中 impl传递了进去
getImpl().accept(si);
SocketCleanable.register(si.fd); // raw fd has been set
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkAccept(si.getInetAddress().getHostAddress(),
si.getPort());
}
} catch (IOException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
} catch (SecurityException e) {
if (si != null)
si.reset();
s.impl = si;
throw e;
}
s.impl = si;
s.postAccept();
}
protected void accept(SocketImpl s) throws IOException {
acquireFD();
try {
// 该去系统调用了
socketAccept(s);
} finally {
releaseFD();
}
}
PlainSocketImpl
@Override
void socketAccept(SocketImpl s) throws IOException {
int nativefd = checkAndReturnNativeFD();
if (s == null)
throw new NullPointerException("socket is null");
int newfd = -1;
InetSocketAddress[] isaa = new InetSocketAddress[1];
// os系统级别的阻塞在程序中, 我们是无法改变的 - accept0调用
// 我们可以通过在程序级别来改变 accept的阻塞, 怎么来实现? - 通过判断 timeout值来实现
// timeout <= 0, 直接去调用了 accept0, os在没有客户端接入时, 会一直处于阻塞状态, 这也就造成了我们 accept方法的阻塞
if (timeout <= 0) {
newfd = accept0(nativefd, isaa);
} else {
// timeout > 0: os级别带来的阻塞, 我们无法去干扰 - 但我们可以让我们的程序在指定时间后返回
configureBlocking(nativefd, false);
try {
// 指定时间内, 若没新连接, 则会抛 IOException, 以至程序可以不受阻塞影响, 继续去执行自己的逻辑代码块
waitForNewConnection(nativefd, timeout);
newfd = accept0(nativefd, isaa);
if (newfd != -1) {
configureBlocking(newfd, true);
}
} finally {
configureBlocking(nativefd, true);
}
}
/* Update (SocketImpl)s' fd */
fdAccess.set(s.fd, newfd);
/* Update socketImpls remote port, address and localport */
InetSocketAddress isa = isaa[0];
s.port = isa.getPort();
s.address = isa.getAddress();
s.localport = localport;
if (preferIPv4Stack && !(s.address instanceof Inet4Address))
throw new SocketException("Protocol family not supported");
}
至此,accept解读完毕...
read
Socket.getInputStream
public InputStream getInputStream() throws IOException {
if (isClosed())
throw new SocketException("Socket is closed");
if (!isConnected())
throw new SocketException("Socket is not connected");
if (isInputShutdown())
throw new SocketException("Socket input is shutdown");
InputStream is = null;
try {
is = AccessController.doPrivileged(
new PrivilegedExceptionAction<>() {
public InputStream run() throws IOException {
// 调用 Socket.impl的 getInputStream
return impl.getInputStream();
}
});
} catch (java.security.PrivilegedActionException e) {
throw (IOException) e.getException();
}
return is;
}
AbstractPlainSocketImpl
protected synchronized InputStream getInputStream() throws IOException {
synchronized (fdLock) {
if (isClosedOrPending())
throw new IOException("Socket Closed");
if (shut_rd)
throw new IOException("Socket input is shutdown");
if (socketInputStream == null)
// socket.getInputStream() - 创建的是 SocketInputStream
socketInputStream = new SocketInputStream(this);
}
return socketInputStream;
}
来看看,SocketInputStream中 read方法是如何来运作的:
*/
public int read(byte b[], int off, int length) throws IOException {
// 传递了 Socket中 impl中所设定的 timeout值
return read(b, off, length, impl.getTimeout());
}
int read(byte b[], int off, int length, int timeout) throws IOException {
int n;
// EOF already encountered
if (eof) {
return -1;
}
// connection reset
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
// bounds check
if (length <= 0 || off < 0 || length > b.length - off) {
if (length == 0) {
return 0;
}
throw new ArrayIndexOutOfBoundsException("length == " + length
+ " off == " + off + " buffer length == " + b.length);
}
// acquire file descriptor and do the read
FileDescriptor fd = impl.acquireFD();
try {
// n - 实际读取的字节数
// 这里传递进了 timeout
n = socketRead(fd, b, off, length, timeout);
if (n > 0) {
return n;
}
} catch (ConnectionResetException rstExc) {
impl.setConnectionReset();
} finally {
impl.releaseFD();
}
/*
* If we get here we are at EOF, the socket has been closed,
* or the connection has been reset.
*/
if (impl.isClosedOrPending()) {
throw new SocketException("Socket closed");
}
if (impl.isConnectionReset()) {
throw new SocketException("Connection reset");
}
eof = true;
return -1;
}
// 这里传递进了 timeout
private int socketRead(FileDescriptor fd,
byte b[], int off, int len,
int timeout)
throws IOException {
return socketRead0(fd, b, off, len, timeout);
}
改造原始 demo
如何将原始 demo改造成非阻塞的 accept、read呢?- 程序级别去避免阻塞
经过上述分析,只需要分别去设置下 ServerSocket、Socket中 impl的 timeout值即可
timeout在哪里会被设置?
public synchronized void setSoTimeout(int timeout) throws SocketException {
if (isClosed())
throw new SocketException("Socket is closed");
// 这里来设置了下 SocketImpl的 timeOut
getImpl().setOption(SocketOptions.SO_TIMEOUT, timeout);
}
AbstractPlainSocketImpl
public void setOption(int opt, Object val) throws SocketException {
if (isClosedOrPending()) {
throw new SocketException("Socket Closed");
}
boolean on = true;
switch (opt) {
/* check type safety b4 going native. These should never
* fail, since only java.Socket* has access to
* PlainSocketImpl.setOption().
*/
case SO_LINGER:
if (val == null || (!(val instanceof Integer) && !(val instanceof Boolean)))
throw new SocketException("Bad parameter for option");
if (val instanceof Boolean) {
/* true only if disabling - enabling should be Integer */
on = false;
}
break;
case SO_TIMEOUT:
if (val == null || (!(val instanceof Integer)))
throw new SocketException("Bad parameter for SO_TIMEOUT");
int tmp = ((Integer) val).intValue();
if (tmp < 0)
throw new IllegalArgumentException("timeout < 0");
// 这里去将 timeout进行了赋值
timeout = tmp;
break;
case IP_TOS:
if (val == null || !(val instanceof Integer)) {
throw new SocketException("bad argument for IP_TOS");
}
trafficClass = ((Integer)val).intValue();
break;
case SO_BINDADDR:
throw new SocketException("Cannot re-bind socket");
case TCP_NODELAY:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for TCP_NODELAY");
on = ((Boolean)val).booleanValue();
break;
case SO_SNDBUF:
case SO_RCVBUF:
if (val == null || !(val instanceof Integer) ||
!(((Integer)val).intValue() > 0)) {
throw new SocketException("bad parameter for SO_SNDBUF " +
"or SO_RCVBUF");
}
break;
case SO_KEEPALIVE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_KEEPALIVE");
on = ((Boolean)val).booleanValue();
break;
case SO_OOBINLINE:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_OOBINLINE");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEADDR:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEADDR");
on = ((Boolean)val).booleanValue();
break;
case SO_REUSEPORT:
if (val == null || !(val instanceof Boolean))
throw new SocketException("bad parameter for SO_REUSEPORT");
if (!supportedOptions().contains(StandardSocketOptions.SO_REUSEPORT))
throw new UnsupportedOperationException("unsupported option");
on = ((Boolean)val).booleanValue();
break;
default:
throw new SocketException("unrecognized TCP option: " + opt);
}
socketSetOption(opt, on, val);
}
改造后的 Server
// 改造实现 read非阻塞 - 程序级别
public class BIOProNotReadBlocking {
public void initBIOServer(int port) {
ServerSocket serverSocket = null;
Socket socket = null;
ClientSocketThread thread = null;
try {
ExecutorService executorService = Executors.newCachedThreadPool();
serverSocket = new ServerSocket(port);
serverSocket.setSoTimeout(1000);
System.out.println(stringNowTime() + ": serverSocket started");
while (true) {
try {
socket = serverSocket.accept();
} catch (SocketTimeoutException e) {
// 执行到这, 说明此次 accept是没有接收到任何数据的 - so, 服务端的主线程可以在着做些其它事情
System.out.println("now time is: " + this.stringNowTime());
continue;
}
System.out.println("id为" + socket.hashCode() + "的clientSocket" + stringNowTime() + ": connected");
thread = new BIOProNotReadBlocking.ClientSocketThread(socket);
executorService.execute(thread);
}
} catch (IOException e) {
e.printStackTrace();
}
}
public String stringNowTime() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
return format.format(new Date());
}
class ClientSocketThread extends Thread{
private Socket socket;
public ClientSocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader reader = null;
String inputContent = null;
int count = 0;
try {
socket.setSoTimeout(1000);
} catch (SocketException e) {
e.printStackTrace();
}
try {
reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
while (true) {
try {
while ((inputContent = reader.readLine()) != null) {
System.out.println("收到id为" + socket.hashCode() + " " + inputContent);
count ++;
}
} catch (Exception e) {
// 执行到这, 表示 read方法并没有获取到任何数据, 线程可以执行一些其它操作
System.out.println("Not read data: " + stringNowTime());
continue;
}
// 执行到这, 说明读取到了数据, 我们可以在这里进行恢复客户端的工作
System.out.println("id为" + socket.hashCode() + "的clientSocket" + stringNowTime() + "读取结束");
sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
reader.close();
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
BIOProNotReadBlocking server = new BIOProNotReadBlocking();
server.initBIOServer(9898);
}
}
Res
2021-05-08 16:27:21:501: serverSocket started
now time is: 2021-05-08 16:27:22:503 // 这里体现了 accept的非阻塞 - timeout, 指定时长后 (未接收到指定连接), 抛异常, 程序继续执行
now time is: 2021-05-08 16:27:23:503
now time is: 2021-05-08 16:27:24:504
now time is: 2021-05-08 16:27:25:504
now time is: 2021-05-08 16:27:26:505
id为1525262377的clientSocket2021-05-08 16:27:26:730: connected // 连接来后
now time is: 2021-05-08 16:27:27:732
Not read data: 2021-05-08 16:27:27:732 // 这里也就体现了读的非阻塞
Not read data: 2021-05-08 16:27:28:733
now time is: 2021-05-08 16:27:28:733
now time is: 2021-05-08 16:27:29:733
Not read data: 2021-05-08 16:27:29:733
Not read data: 2021-05-08 16:27:30:733
now time is: 2021-05-08 16:27:30:733
收到id为1525262377 2021-05-08 16:27:31: 第0条消息: liangye ?
now time is: 2021-05-08 16:27:31:734
Not read data: 2021-05-08 16:27:32:141
now time is: 2021-05-08 16:27:32:734
Not read data: 2021-05-08 16:27:33:142
now time is: 2021-05-08 16:27:33:735
Not read data: 2021-05-08 16:27:34:142
now time is: 2021-05-08 16:27:34:735
收到id为1525262377 2021-05-08 16:27:35: 第1条消息: night !
id为1525262377的clientSocket2021-05-08 16:27:35:622读取结束
now time is: 2021-05-08 16:27:35:736
id为1525262377的clientSocket2021-05-08 16:27:36:623读取结束
now time is: 2021-05-08 16:27:36:737
id为1525262377的clientSocket2021-05-08 16:27:37:623读取结束
now time is: 2021-05-08 16:27:37:737
思考
上述改造在程序基本解决了 accept和 read方法带来的阻塞问题,这也是 最初版本的 NIO
;
我们来思考下,这其中存在的问题:在服务端分别为每一个客户端创建了一个线程来去处理其业务逻辑,这虽然是解决了阻塞问题,但线程本身是昂贵的资源,客户端连接多时,创建的线程数多了,浪费了服务器的资源,再加上线程上下文切换所带来的开销,更是雪上加霜!就算引入了线程池,以此来去控制线程的个数,但客户端连接多时,线程池的 BlockingQueue本身也会越来越庞大,一定程序时 OOM,这并不能实际地去解决这个问题
因此,NIO来解决这个问题,它并不会为每个客户端单独创建一个线程,相反,在服务端只会有一个线程,会为每个单独创建一个通道