NIO-WindowsSelectorImpl源码剖析
(数据科学学习手札78)基于geopandas的空间数据分析——基础可视化
NIO-WindowsSelectorImpl源码理会
目次
媒介
本来是想进修Netty的,然则Netty是一个NIO框架,因而在进修netty之前,照样先梳理一下NIO的学问。经由历程理会明白NIO的设想道理。
本系列文章针对的是JDK1.8.0.161的源码。
上一篇文章对Selector
的功用和建立历程举行了理会,本篇对Windows下的WindowsSelectorImpl
源码完成举行细致解说。
初始化WindowsSelectorProvider
上一篇文章提到,若没有举行设置时,默许经由历程sun.nio.ch.DefaultSelectorProvider.create()
建立SelectorProvider
。
Windows下的代码途径在jdksrcwindowsclassessunniochDefaultSelectorProvider.java
。在其内部经由历程现实是建立了一个WindowsSelectorProvider)
。
建立WindowsSelectorImpl
WindowsSelectorProvider
是用于建立WindowsSelectorImpl
的。
Selector.Open()->
SelectorProvider.provider()->
sun.nio.ch.DefaultSelectorProvider.create()->
new WindowsSelectorImpl(this)->
WindowsSelectorProvider.openSelector()
public class WindowsSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
}
WindowsSelectorImpl构造
在细致解说WindowsSelectorImpl
源码之前,先相识WindowsSelectorImpl
的大抵代码构造。
在其内部有几个重要的数据构造和属性。
称号 | 作用 |
---|---|
SelectionKeyImpl[] channelArray | 寄存注册的SelectionKey |
PollArrayWrapper pollWrapper | 底层的本机轮询数组包装对象,用于寄存Socket文件形貌符和事宜掩码 |
List<SelectThread> threads | 辅佐线程,多个线程有助于进步高并发时的机能 |
Pipe wakeupPipe | 用于叫醒辅佐线程 |
FdMap fdMap | 保留文件形貌符和SelectionKey的映照关联 |
SubSelector subSelector | 挪用JNI的poll和处置惩罚停当的SelectionKey |
StartLock startLock | 新增的辅佐线程运用该锁守候主线程的入手下手信号 |
FinishLock finishLock | 主线程用该锁守候统统辅佐线程实行终了 |
SelectionKeyImpl
用于寄存Channel
,Selector
以及寄存Channel注册时的事宜掩码。
- 在注册的时刻会建立
SelectionKeyImpl
- 将
SelectionKeyImpl
到场到SelectionKeyImpl[] channelArray
- 将文件句柄和
SelectionKeyImpl
的对应关联到场到FdMap fdMap
- 将key的文件形貌符保留到
PollArrayWrapper pollWrapper
中。
PollArrayWrapper
PollArrayWrapper
用于寄存文件形貌符的文件形貌符和事宜掩码的native数组。相干的文件形貌符的构造以下图:
个中每项的构造以下:
称号 | 大小 | 申明 |
---|---|---|
SOCKET fd | 4字节 | 寄存Socket文件句柄 |
short events | 2字节 | 守候的事宜掩码 |
short reevents | 2字节 | 现实发作的事宜掩码,临时美有用到 |
如上所示,每项为8字节,即为SIZE_POLLFD
的值,如今NIO现实只用前两个字段。
class PollArrayWrapper {
private AllocatedNativeObject pollArray; // The fd array
long pollArrayAddress; // pollArrayAddress
static short SIZE_POLLFD = 8; // sizeof pollfd struct
private int size; // Size of the pollArray
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
...
}
在 PollArrayWrapper
内部运用 AllocatedNativeObject
对象建立的堆外(native)内存对象。
将数组的首地点保留到pollArrayAddress
中,在挪用Poll
的时刻须要通报该参数给JNI。
PollArrayWrapper
暴露了读写FD和Event的要领供WindowsSelectorImpl
运用。
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
int getEventOps(int i) {
return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}
int getDescriptor(int i) {
return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}
SelectThread
由于select最大一次性猎取1024个文件形貌符。因而为了进步poll的机能
WindowsSelectorImpl
底层 经由历程引入多个辅佐线程的体式格局完成多线程poll以进步高并发时的机能问题。 我们先看一下注册的逻辑
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
//推断是不是须要扩容行列以及增加辅佐线程
growIfNeeded();
//保留到缓存中
channelArray[totalChannels] = ski;
//保留在数组中的位置
ski.setIndex(totalChannels);
//保留文件形貌符和SelectionKeyImpl的映照关联到FDMap
fdMap.put(ski);
//保留到keys中
keys.add(ski);
//保留文件形貌符和事宜到native数组中
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
在注册之前会先会推断当前注册的Channel
数目 是不是到达须要启动辅佐线程的阈值。假如到达阈值则须要扩容pollWrapper
数组,同时还要 将wakeupSourceFd
到场到扩容后的第一个位置 (详细作用下面会解说)。
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
//channel数组已满,扩容两倍
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
//文件形貌符数组扩容
pollWrapper.grow(newSize);
}
//到达最大文件形貌符数目时增加辅佐线程
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
//将叫醒的文件形貌符到场到扩容后的第一个位置。
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
//增加线程数
threadsCount++;
}
}
扩容PollArrayWrapper
pollWrapper.grow(newSize);
void grow(int newSize) {
//建立新的数组
PollArrayWrapper temp = new PollArrayWrapper(newSize);
for (int i = 0; i < size; i++)
//将本来的数组的内容寄存到新的数组中
replaceEntry(this, i, temp, i);
//开释本来的数组
pollArray.free();
//更新援用
pollArray = temp.pollArray;
//更新大小
this.size = temp.size;
//更新地点
pollArrayAddress = pollArray.address();
}
扩容完成时,须要增加一个辅佐线程以并行的处置惩罚统统文件形貌符。主线程处置惩罚前1024个文件形貌符,第二个辅佐线程处置惩罚1025到2048的文件形貌符,以此类推。 如许使得主线程挪用poll的时刻,经由历程多线程并行实行一次性猎取到统统的已停当的文件形貌符,从而进步在高并发时的poll的机能。
每1024个PollFD的第一个句柄都要设置为wakeupSourceFd
,因而在扩容的时刻也须要将新的位置的第一个设置为wakeupSourceFd
,该线程的目的是为了叫醒辅佐线程 。当多个线程壅塞在Poll
,若此时主线程已处置惩罚完成,则须要守候统统辅佐线程完成,经由历程向wakeupSourceFd
发送信号以激活Poll
不在壅塞。
如今我们知道了windows下poll多线程的运用要领,由于多线程poll还须要其他的数据构造支撑同步,详细的多线程实行逻辑我们下面再议论。
FdMap
FDMap只是为了保留文件形貌符句柄和SelectionKey
的关联,前面我们提到了PollFD的数据构造包括了文件形貌符句柄信息,因而我们能够经由历程文件形貌符句柄从FdMap中猎取到对应的SelectionKey
。
private final static class FdMap extends HashMap<Integer, MapEntry> {
static final long serialVersionUID = 0L;
private MapEntry get(int desc) {
return get(new Integer(desc));
}
private MapEntry put(SelectionKeyImpl ski) {
return put(new Integer(ski.channel.getFDVal()), new MapEntry(ski));
}
private MapEntry remove(SelectionKeyImpl ski) {
Integer fd = new Integer(ski.channel.getFDVal());
MapEntry x = get(fd);
if ((x != null) && (x.ski.channel == ski.channel))
return remove(fd);
return null;
}
}
SubSelector
SubSelector
封装了挪用JNI poll的逻辑,以及猎取停当SelectionKey
的要领。
主线程和每一个子线程都有一个SubSelector
,其内存保留了poll猎取到的可读文件形貌符,可写文件形貌符以及非常的文件形貌符。如许每一个线程就有本身零丁的停当文件形貌符数组。
private final int pollArrayIndex;
private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];
private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];
pollArrayIndex
纪录了当前SubSelector
的序号,在挪用poll的时刻,须要将文件形貌符数组的地点通报给JNI中,由于我们有多个线程一同挪用poll,且每一个线程处置惩罚1024个Channel
。经由历程序号和数组的地点盘算当前SubSelector
所担任哪些通道。
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private int poll(int index) throws IOException {
// poll for helper threads
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
在主线程挪用poll以后,会猎取到已停当的文件形貌符(包括可读、可写、非常)。经由历程挪用processSelectedKeys
将停当的文件形貌符对应的SelectorKey
到场到selectedKeys
中。如许我们外部就能够挪用到统统停当的SelectorKey
举行遍历处置惩罚。
private int processSelectedKeys(long updateCount) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
可读文件形貌符,可写文件形貌符以及非常文件形貌符的处置惩罚逻辑都是一样的,挪用processFDSet
处置惩罚更新SelectorKey
的停当事宜。这里会传入文件形貌符的数组。须要注重的是文件形貌符第一个元素是数组的长度。
private int processFDSet(long updateCount, int[] fds, int rOps, boolean isExceptFds)
{
int numKeysUpdated = 0;
//1. 遍历文件形貌符数组
for (int i = 1; i <= fds[0]; i++) {
//猎取文件形貌符句柄值
int desc = fds[i];
//2. 推断当前文件形貌符是不是是用于叫醒的文件形貌
if (desc == wakeupSourceFd) {
synchronized (interruptLock) {
interruptTriggered = true;
}
continue;
}
//3. 猎取文件形貌符句柄对应的SelectionKey的映照值
MapEntry me = fdMap.get(desc);
// 4. 若为空,则示意已被作废。
if (me == null)
continue;
SelectionKeyImpl sk = me.ski;
// 5. 抛弃OOD数据(紧要数据)
if (isExceptFds &&
(sk.channel() instanceof SocketChannelImpl) &&
discardUrgentData(desc))
{
continue;
}
//6. 推断key是不是已停当,若已停当,则将当前操纵累加到本来的操纵上,比方本来写事宜停当,如今读事宜停当,就须要更新该key读写停当
if (selectedKeys.contains(sk)) { // Key in selected set
//clearedCount 和 updateCount用于防止同一个key的事宜设置屡次,由于同一个文件形貌符可能在可读文件形貌符数组也可能在非常文件形貌符数组中。
if (me.clearedCount != updateCount) {
if (sk.channel.translateAndSetReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
if (sk.channel.translateAndUpdateReadyOps(rOps, sk) &&
(me.updateCount != updateCount)) {
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
} else { // Key is not in selected set yet
//key本来未停当,将key到场selectedKeys中
if (me.clearedCount != updateCount) {
sk.channel.translateAndSetReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
} else { // The readyOps have been set; now add
sk.channel.translateAndUpdateReadyOps(rOps, sk);
if ((sk.nioReadyOps() & sk.nioInterestOps()) != 0) {
selectedKeys.add(sk);
me.updateCount = updateCount;
numKeysUpdated++;
}
}
me.clearedCount = updateCount;
}
}
return numKeysUpdated;
}
- 起首疏忽
wakeupSourceFd
,前面说了该文件形貌符用于叫醒。 - 过滤fdMap不存在的文件形貌符,这些文件形貌符已被作废了。
- 疏忽OOB(紧要)数据,这些数据须要挪用
discardUrgentData
读取并疏忽。 - 依据key是不是在SelectorKeys中决定是设置事宜掩码照样更新事宜掩码。
多线程Poll
如今大部分数据构造都已引见了,在议论Pipe、StartLock和FinishLock之前,是时刻引入多线程Poll功用了,在议论多线程时,会对上述三个数据构造和功用举行细致申明。
起首我们先看一下建立WindowsSelectorImpl
做了什么
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
- 起首建立了一个默许8个长度(8*8字节)的文件形貌符数组
PollArrayWrapper
- 建立一个Pipe,Pipe我们之前议论过是一个单向通信管道。
- 猎取Pipe的源端和目的端的文件形貌符句柄,该句柄用于激活线程。
- 将
wakeupSourceFd
存到PollArrayWapper
每1024个元素的第一个位置。使得每一个线程都能被wakeupSourceFd
叫醒。
由于select最大支撑1024个句柄,这里第一个文件形貌符是wakeupSourceFd
,所以一个线程现实最多并发处置惩罚1023个socket文件形貌符。
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, Net.POLLIN);
}
如今我们看一下doSelect逻辑
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
//1. 删除作废的key
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
//2. 调解线程数 ,守候运转
adjustThreadsCount();
//3. 设置辅佐线程数
finishLock.reset();
//4. 入手下手运转新增的辅佐线程
startLock.startThreads();
try {
begin();
try {
//5. 猎取停当文件形貌符
subSelector.poll();
} catch (IOException e) {
finishLock.setException(e); // Save this exception
}
//6. 守候统统辅佐线程完成
if (threads.size() > 0)
finishLock.waitForHelperThreads();
} finally {
end();
}
// Done with poll(). Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
//7. 再次搜检删除作废的key
processDeregisterQueue();
//8. 将停当的key到场到selectedKeys中
int updated = updateSelectedKeys();
// 完成,重置叫醒标记下次在运转。
resetWakeupSocket();
return updated;
}
- 删除作废key,当channel封闭时,对应的Key会被作废,被作废的key会到场到
cancelledKeys
中。
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();
synchronized (keyLock) {
int count = (keys == null) ? 0 : keys.length;
for (int i = 0; i < count; i++) {
SelectionKey k = keys[i];
if (k != null)
k.cancel();
}
}
}
public final void cancel() {
...
((AbstractSelector)selector()).cancel(this);
...
}
void cancel(SelectionKey k) { // package-private
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}
挪用processDeregisterQueue
举行注销。
processDeregisterQueue();
//遍历统统已作废的key,作废他们
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
//遍历统统key
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
//注销key
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
protected void implDereg(SelectionKeyImpl ski) throws IOException{
int i = ski.getIndex();
assert (i >= 0);
synchronized (closeLock) {
if (i != totalChannels - 1) {
// 把末了一个通道复制到作废key地点的位置。
SelectionKeyImpl endChannel = channelArray[totalChannels-1];
channelArray[i] = endChannel;
endChannel.setIndex(i);
pollWrapper.replaceEntry(pollWrapper, totalChannels - 1,
pollWrapper, i);
}
ski.setIndex(-1);
}
//将末了一个通道清空。
channelArray[totalChannels - 1] = null;
totalChannels--;
//推断是不是须要削减一个辅佐线程。
if ( totalChannels != 1 && totalChannels % MAX_SELECTABLE_FDS == 1) {
totalChannels--;
threadsCount--; // The last thread has become redundant.
}
//消灭对应的缓存。
fdMap.remove(ski); // Remove the key from fdMap, keys and selectedKeys
keys.remove(ski);
selectedKeys.remove(ski);
//设置key无效
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
//封闭文件形貌符
((SelChImpl)selch).kill();
}
//将统统key都设置为无效
protected final void deregister(AbstractSelectionKey key) {
((AbstractSelectableChannel)key.channel()).removeKey(key);
}
void removeKey(SelectionKey k) { // package-private
synchronized (keyLock) {
for (int i = 0; i < keys.length; i++)
if (keys[i] == k) {
keys[i] = null;
keyCount--;
}
//将key设置为无效
((AbstractSelectionKey)k).invalidate();
}
}
- 作废时起首会将该Key的文件形貌符的PollFD项从pollWrapper中移除。
- 将key从
channelArray
中删除。 - 若总的注册通道数到达了减小线程的阈值,则减小一个线程。
- 清算
fdMap
、keys
、selectedKeys
数据缓存。 - 设置key无效
- 封闭文件形貌符
((SelChImpl)selch).kill();
是在各个Channel中完成的,以SocketChannel为例,终究会挪用nd.close(fd);
封闭对应的文件形貌符
- 调解辅佐线程数
private void adjustThreadsCount() {
//当线程大于现实线程,建立更多线程
if (threadsCount > threads.size()) {
// More threads needed. Start more threads.
for (int i = threads.size(); i < threadsCount; i++) {
SelectThread newThread = new SelectThread(i);
threads.add(newThread);
//设置为保卫线程
newThread.setDaemon(true);
newThread.start();
}
} else if (threadsCount < threads.size()) {
// 当线程小于现实线程,移除线程。
for (int i = threads.size() - 1 ; i >= threadsCount; i--)
threads.remove(i).makeZombie();
}
}
在建立新的线程时,会纪录上一次运转的数目保留到lastRun
变量中
private SelectThread(int i) {
this.index = i;
this.subSelector = new SubSelector(i);
//make sure we wait for next round of poll
this.lastRun = startLock.runsCounter;
}
当线程启动时会守候主线程激活
public void run() {
while (true) { // poll loop
//守候主线程信号激活
if (startLock.waitForStart(this))
return;
// call poll()
try {
subSelector.poll(index);
} catch (IOException e) {
// Save this exception and let other threads finish.
finishLock.setException(e);
}
// 关照主线程完成.
finishLock.threadFinished();
}
}
经由历程startLock
守候主线程的入手下手信号。若当前线程是新启动的线程,则runsCounter == thread.lastRun
为真,此时新的线程须要守候主线程挪用启动。
startLock.waitForStart(this)
private synchronized boolean waitForStart(SelectThread thread) {
while (true) {
while (runsCounter == thread.lastRun) {
try {
startLock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
if (thread.isZombie()) { // redundant thread
return true; // will cause run() to exit.
} else {
thread.lastRun = runsCounter; // update lastRun
return false; // will cause run() to poll.
}
}
}
}
- 设置辅佐线程数目
纪录当前辅佐线程数目,下次新增的辅佐线程须要守候主线程关照启动。
finishLock.reset();
private void reset() {
threadsToFinish = threads.size(); // helper threads
}
- 入手下手运转新增的辅佐线程
startLock.startThreads();
private synchronized void startThreads() {
runsCounter++; // next run
notifyAll(); // 关照统统辅佐线程继承实行,
}
- 猎取已停当的文件形貌符
subSelector.poll();
//主线程挪用
private int poll() throws IOException{
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
//辅佐线程挪用
private int poll(int index) throws IOException {
return poll0(pollWrapper.pollArrayAddress +
(pollArrayIndex * PollArrayWrapper.SIZE_POLLFD),
Math.min(MAX_SELECTABLE_FDS,
totalChannels - (index + 1) * MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
辅佐线程和主线程挪用的区分就是寄存PollFD的位置变化,每一个线程会有1024个PollFD(8B)的位置寄存PollFD。如许使得多个线程的数据内存星散互不影响。
下面看一下JNI的poll0
做了什么处置惩罚。下面罗略了重要的逻辑
typedef struct {
jint fd;
jshort events;
} pollfd;
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
jlong pollAddress, jint numfds,
jintArray returnReadFds, jintArray returnWriteFds,
jintArray returnExceptFds, jlong timeout)
{
DWORD result = 0;
pollfd *fds = (pollfd *) pollAddress;
int i;
FD_SET readfds, writefds, exceptfds;
struct timeval timevalue, *tv;
static struct timeval zerotime = {0, 0};
...
/* Call select */
if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))
== SOCKET_ERROR) {
//当涌现错误时,变量每一个socket猎取它的停当状况
FD_SET errreadfds, errwritefds, errexceptfds;
...
for (i = 0; i < numfds; i++) {
errreadfds.fd_count = 0;
errwritefds.fd_count = 0;
if (fds[i].events & POLLIN) {
errreadfds.fd_array[0] = fds[i].fd;
errreadfds.fd_count = 1;
}
if (fds[i].events & (POLLOUT | POLLCONN))
{
errwritefds.fd_array[0] = fds[i].fd;
errwritefds.fd_count = 1;
}
errexceptfds.fd_array[0] = fds[i].fd;
errexceptfds.fd_count = 1;
//遍历每一个socket,探测它的状况
/* call select on the i-th socket */
if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)
== SOCKET_ERROR) {
/* This socket causes an error. Add it to exceptfds set */
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
} else {
...
}
}
}
}
/* Return selected sockets. */
/* Each Java array consists of sockets count followed by sockets list */
...
(*env)->SetIntArrayRegion(env, returnReadFds, 0,
readfds.fd_count + 1, (jint *)&readfds);
(*env)->SetIntArrayRegion(env, returnWriteFds, 0,
writefds.fd_count + 1, (jint *)&writefds);
(*env)->SetIntArrayRegion(env, returnExceptFds, 0,
exceptfds.fd_count + 1, (jint *)&exceptfds);
return 0;
}
- 起首会经由历程
pollfd *fds = (pollfd *) pollAddress;
将pollAddress的地点转换为polldf的数组构造。
这里会自动内存对齐,pollfd一共只要6个字节,第一个是int范例的文件形貌符句柄,第二个是short范例的守候事宜掩码值。第二个short后会添补2B,因而每一个pollFD是8B。而现实背面2字节用于寄存现实发作事宜的事宜掩码。
- 经由历程挪用Win32API的select实行现实的操纵猎取停当的文件形貌符。当socket收到OOB(紧要)数据时,会发生非常。此时须要遍历统统文件形貌符,以确定是哪一个socket接收到OOB数据。从而一般处置惩罚。上面也提到过OOB数据会经由历程挪用
discardUrgentData
举行清算。
JNIEXPORT jboolean JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_discardUrgentData(JNIEnv* env, jobject this,
jint s)
{
char data[8];
jboolean discarded = JNI_FALSE;
int n;
do {
//读取MSG_OOB数据
n = recv(s, (char*)&data, sizeof(data), MSG_OOB);
if (n > 0) {
//读取到设置标记为true
discarded = JNI_TRUE;
}
} while (n > 0);
return discarded;
}
假如timeval为{0,0},则select()马上返回,这可用于探听所选套接口的状况。假如处于这类状况,则select()挪用可以为黑白壅塞的,且统统适用于非壅塞挪用的假定都适用于它。
- 当猎取到统统的停当的文件形貌符时,须要保留到返回效果中,同时读写和非常的返回效果的数组第一个为停当的长度值。
- 守候统统辅佐线程完成,当主线程完成时会马上挪用
wakeup
向wakeupSourceFd
发作数据以触发辅佐线程叫醒。辅佐线程叫醒后也会挪用wakeup
一次。当辅佐线程都被叫醒后就会关照主线程。
if (threads.size() > 0)
finishLock.waitForHelperThreads();
private synchronized void waitForHelperThreads() {
if (threadsToFinish == threads.size()) {
// no helper threads finished yet. Wakeup them up.
wakeup();
}
while (threadsToFinish != 0) {
try {
finishLock.wait();
} catch (InterruptedException e) {
// Interrupted - set interrupted state.
Thread.currentThread().interrupt();
}
}
}
private synchronized void threadFinished() {
if (threadsToFinish == threads.size()) { // finished poll() first
// if finished first, wakeup others
wakeup();
}
threadsToFinish--;
if (threadsToFinish == 0) // all helper threads finished poll().
notify(); // notify the main thread
}
若辅佐线接收到数据,则它须要挪用wakeup
来叫醒其他辅佐线程,如许使得主线程火辅佐线程最少能挪用一次wakeup
激活其他辅佐线程。wakeup
内部会挪用setWakeupSocket
向wakeupSourceFd
发作一个信号。
public Selector wakeup() {
synchronized (interruptLock) {
if (!interruptTriggered) {
setWakeupSocket();
interruptTriggered = true;
}
}
return this;
}
//发作一个字节数据叫醒wakeupsocket
Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
jint scoutFd)
{
/* Write one byte into the pipe */
const char byte = 1;
send(scoutFd, &byte, 1, 0);
}
当主线被激活时,须要挪用resetWakeupSocket
将wakeupSourceFd
的数据读取出来。
private void resetWakeupSocket() {
synchronized (interruptLock) {
if (interruptTriggered == false)
return;
resetWakeupSocket0(wakeupSourceFd);
interruptTriggered = false;
}
}
//读取wakeupsocket的数据。
Java_sun_nio_ch_WindowsSelectorImpl_resetWakeupSocket0(JNIEnv *env, jclass this,
jint scinFd)
{
char bytes[WAKEUP_SOCKET_BUF_SIZE];
long bytesToRead;
/* 猎取数据大小 */
ioctlsocket (scinFd, FIONREAD, &bytesToRead);
if (bytesToRead == 0) {
return;
}
/* 从缓冲区读取统统数据 */
if (bytesToRead > WAKEUP_SOCKET_BUF_SIZE) {
char* buf = (char*)malloc(bytesToRead);
recv(scinFd, buf, bytesToRead, 0);
free(buf);
} else {
recv(scinFd, bytes, WAKEUP_SOCKET_BUF_SIZE, 0);
}
}
是一个盘算机函数,功用是掌握套接口的形式。可用于任一状况的任一套接口。它用于猎取与套接口相干的操纵参数,而与详细协定或通信子系统无关。第二个参数时对socket的操纵敕令
- 再次挪用删除作废的key
- 将停当的key到场到selectKeys中,有多个线程会将统统线程的停当key到场到selectKeys中。
int updated = updateSelectedKeys();
private int updateSelectedKeys() {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount);
for (SelectThread t: threads) {
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);
}
return numKeysUpdated;
}
若key初次被到场,则会挪用translateAndSetReadyOps
,若key已在selectKeys中,则会挪用translateAndUpdateReadyOps
。这两个要领都是挪用translateReadyOps
,translateReadyOps
操纵会将已停当的操纵保留。
public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, sk.nioReadyOps(), sk);
}
public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl sk) {
return translateReadyOps(ops, 0, sk);
}
封闭WindowsSelectorImpl
封闭WindowsSelectorImpl时会将统统注册的通道一同封闭
protected void implClose() throws IOException {
synchronized (closeLock) {
if (channelArray != null) {
if (pollWrapper != null) {
// prevent further wakeup
synchronized (interruptLock) {
interruptTriggered = true;
}
wakeupPipe.sink().close();
wakeupPipe.source().close();
//封闭统统channel
for(int i = 1; i < totalChannels; i++) { // Deregister channels
if (i % MAX_SELECTABLE_FDS != 0) { // skip wakeupEvent
deregister(channelArray[i]);
SelectableChannel selch = channelArray[i].channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
}
//开释数据
pollWrapper.free();
pollWrapper = null;
selectedKeys = null;
channelArray = null;
//开释辅佐线程
for (SelectThread t: threads)
t.makeZombie();
//叫醒辅佐线程使其退出。
startLock.startThreads();
}
}
}
}
总结
本文对WindowsSelectorImpl
的代码完成举行细致剖析。下一篇将对Linux下的EpollSelectorImpl
的完成继承解说。
你一定看得懂的 DDD+CQRS+EDA+ES 核心思想与极简可运行代码示例