Netty启动流程(4)

public Channel bind(final SocketAddress localAddress) {
    ChannelFuture future = bindAsync(localAddress);
    future.awaitUninterruptibly();
    return future.getChannel();
}

从bindAsync开始说起吧,异步的绑定端口

awaitUninterruptibly如果没有done会wait()直到被人叫醒,这个操作是属于future的

在bindAsync中

Binder binder = new Binder(localAddress);
ChannelPipeline bossPipeline = pipeline();
bossPipeline.addLast("binder", binder);
Channel channel = getFactory().newChannel(bossPipeline);
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
System.out.println(Thread.currentThread().getName()+" : bindFuture addListener");
binder.bindFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            System.out.println(Thread.currentThread().getName()+" : bfuture 操作完成 回调setSuccess");
            bfuture.setSuccess();
        } else {
            // Call close on bind failure
            bfuture.getChannel().close();
            bfuture.setFailure(future.getCause());
        }
    }
});
return bfuture;

初始化一个binder ,继承SimpleChannelUpstreamHandler,负责绑定端口

初始化一个boss的pipeline,将这个handler放入pipeline里面

初始化一个channel

this是factory

我要初始化一个channel,我要设置对应的属性,打开ServerSocketChannel,设置为非阻塞,感兴趣的事件设置为read

然后创建一个config,然后通知channel open的事件

return new NioServerSocketChannel(this, pipeline, sink, bossPool.nextBoss(), workerPool);

static final ConcurrentMap<Integer, Channel> allChannels = new ConcurrentHashMap<Integer, Channel>();

int interestOps = OP_READ;

NioServerSocketChannel(
        ChannelFactory factory,
        ChannelPipeline pipeline,
        ChannelSink sink, Boss boss, WorkerPool<NioWorker> workerPool) {

    //set各种属性
    //random一个id

    socket = ServerSocketChannel.open();
    socket.configureBlocking(false);
    config = new DefaultServerSocketChannelConfig(socket.socket());
    fireChannelOpen(this);
}

在fireChannelOpen中sendUpstream,里面Event的channel就是刚才的newChannel,也就是NioServerSocketChannel

channel.getPipeline().sendUpstream(
        new UpstreamChannelStateEvent(
                channel, ChannelState.OPEN, Boolean.TRUE));
//找到一个可以获取能够处理Upstream的handler,也就是刚才的binder
public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
    sendUpstream(head, e);
}

((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);

binder并没有重载handleUpstream,因此执行他的父类的SimpleChannelUpstreamHandler的handleUpstream

if (e instanceof ChannelStateEvent) {
    switch (evt.getState()) {
        case OPEN:
            if (Boolean.TRUE.equals(evt.getValue())) {
                channelOpen(ctx, evt);
            }
    }
}

binder重载了channelOpen的方法

evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            bindFuture.setSuccess();
        } else {
            bindFuture.setFailure(future.getCause());
        }
    }
});

调用的是刚才的NioServerSocketChannel的bind方法,在父类AbstractChannel里面

new一个ChannelFuture,这个future是在binder中调用的那个bind产生的future,会在这个sedDownstream结束后返回到binder里面并addListener,这个要看一下究竟是谁调用的这个future的operationComplete

public ChannelFuture bind(SocketAddress localAddress) {
    return Channels.bind(this, localAddress);
}

public static ChannelFuture bind(Channel channel, SocketAddress localAddress) {
    ChannelFuture future = future(channel);
    channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
            channel, future, ChannelState.BOUND, localAddress));
    return future;
}

sendDownstream,暂时没有能处理Downstream的handler,这个要交给Sink来处理

public void sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
    if (tail == null) {
        getSink().eventSunk(this, e);
        return;
    }
    sendDownstream(tail, e);
}
public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    Channel channel = e.getChannel();
    if (channel instanceof NioServerSocketChannel) {
        handleServerSocket(e);
    } else if (channel instanceof NioSocketChannel) {
        handleAcceptedSocket(e);
    }
}

显然

private static void handleServerSocket(ChannelEvent e) {
    ChannelStateEvent event = (ChannelStateEvent) e;
    NioServerSocketChannel channel =
        (NioServerSocketChannel) event.getChannel();
    ChannelFuture future = event.getFuture();
    ChannelState state = event.getState();
    Object value = event.getValue();

    switch (state) {
    
    case BOUND:
        if (value != null) {
            ((NioServerBoss) channel.boss).bind(channel, future, (SocketAddress) value);
        }
        break;
    
}

channel是设置过boss的,在boss里面注册任务,这个任务是用来给boss线程的

void bind(final NioServerSocketChannel channel, final ChannelFuture future,
          final SocketAddress localAddress) {
    registerTask(new RegisterTask(channel, future, localAddress));
}

首先要new一下registerTask,然后再注册

在task队列里加入一个task,wakenUp设置为true,然后叫醒boss线程

protected final void registerTask(Runnable task) {
    taskQueue.add(task);

    Selector selector = this.selector;
    if (selector != null) {
        if (wakenUp.compareAndSet(false, true)) {
            selector.wakeup();
        }
    }
}

然后就要分2段来讨论了,一边boss线程睡醒要开始处理task,task.run(),来做真正的绑定

一边主线程要继续添加addListener,然后继续处理newChannel之后的新建ChannelFuture来接收bindFuture的success事件,这是一个连续或是链式的异步调用,把bindFuture直接返回去究竟行不行,为什么要这么设计,而且写了这么多东西真正有用的就4句话,这究竟实在干什么,真的很优雅吗?不过这倒是一个典型的生产者消费者的模型


boss线程

这个future也就是之前bind(localAddress)产生的future,这个futur被扔进了DownstreamChannelStateEvent,放到了downstream里,sink接住,sink调用event里面的channel的boss的bind方法把这个future又传了出去,扔进了registerTask里面,然后这个registerTask被扔进了taskQueue里面,然后被boss线程拿出来,并给这个future设置了success

processTaskQueue();

final Runnable task = taskQueue.poll();
task.run();

//NioServerBoss$RegisterTask->run()
public void run() {
    boolean bound = false;
    boolean registered = false;
    
        channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
        bound = true;
        future.setSuccess();
        fireChannelBound(channel, channel.getLocalAddress());
        channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

        registered = true;
    
}

setSuccess很多操作和awaitUninterruptibly有共同之处

设置done为true,并通知所用的listener

**这个存在一个时序的问题,究竟是boss线程先完成并设置success还是主线程先添加完addListener,使得DefaultChannelFuture里面里面的firstListener不再为null影响着这个程序到底是怎么运行的,不过这个无论怎么运行都是对的**</span>**.运行大概5次有1次是主线程先跑完的**</span>

**如果是主线程先跑完的这个notifyListeners就能直接运行,否则就什么都没发生**</span>

public boolean setSuccess() {
    synchronized (this) {
        if (done) return false;
        done = true;
        if (waiters > 0) notifyAll();
    }

    notifyListeners();
    return true;
}

private void notifyListeners() {
    if (firstListener != null) {
        notifyListener(firstListener);
        firstListener = null;

        if (otherListeners != null) {
            for (ChannelFutureListener l: otherListeners) {
                notifyListener(l);
            }
            otherListeners = null;
        }
    }
}

private void notifyListener(ChannelFutureListener l) {
    //回调
    l.operationComplete(this);
}

不过boss被我debug了这么长时间肯定是主线程先跑完了,但是主线程现在不知道到哪了,估计在future.awaitUninterruptibly()等着呢

事件完成回调之前addListener里面ChannelFutureListener的operationComplete

这个bind(localAddress)产生的future是一个隐式的future,这个future在上述的过程中执行完成,被boss线程setSuccess

然后在这个隐式的future的listener的operationComplete –>bindFuture.setSuccess()

然后触发bindFuture的operationComplete,调用bfuture的setSuccess()

然后future.awaitUninterruptibly()结束等待进入下一步

**当我调试到bfuture的setSuccess时,会有waiters在等待,在notifyAll的时候,main线程结束等待并执行完成**</span>

修改源代码,返回binder.bindFuture,不再创建bfuture,运行完全没问题,这应该算是为了不返回一个内部类的变量而作的优雅的选择吧**</span> **</span>

evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            bindFuture.setSuccess();
        }
    }
});

public synchronized boolean isSuccess() {
    return done && cause == null;
}
final ChannelFuture bfuture = new DefaultChannelFuture(channel, false);
System.out.println(Thread.currentThread().getName()+" : bindFuture addListener");
binder.bindFuture.addListener(new ChannelFutureListener() {
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            System.out.println(Thread.currentThread().getName()+" : bfuture 操作完成 回调setSuccess");
            bfuture.setSuccess();
        } else {
            // Call close on bind failure
            bfuture.getChannel().close();
            bfuture.setFailure(future.getCause());
        }
    }
});
return bfuture;

回到boss线程,boss线程把隐式的future.setSuccess之后调用了上述的一大串操作,然后boss在

fireChannelBound(channel, channel.getLocalAddress());
channel.socket.register(selector, SelectionKey.OP_ACCEPT, channel);

之后就运行完成了准备回去睡觉了

但binder并没有准备处理channelBound(ctx, evt)

然后注册下就准备睡了

这就是这4句话的执行过程和对应的时序分支


main线程

main线程在bind完了之后的动作** **

也就是这个

evt.getChannel().bind(localAddress).addListener(new ChannelFutureListener()

addListener,就是判断这个future的done是否被设置成true,也就是说是否被setSuccess了(或是以其他的方式结束了)

如果done为true,那么直接notifyListener()

如果没有的话,就进入这个future的listener list,至于为什么有firstListener和otherListeners可能是出于性能的考虑吧

public void addListener(ChannelFutureListener listener) {
    if (listener == null) {
        throw new NullPointerException("listener");
    }

    boolean notifyNow = false;
    synchronized (this) {
        if (done) {
            notifyNow = true;
        } else {
            if (firstListener == null) {
                firstListener = listener;
            } else {
                if (otherListeners == null) {
                    otherListeners = new ArrayList<ChannelFutureListener>(1);
                }
                otherListeners.add(listener);
            }

            if (listener instanceof ChannelFutureProgressListener) {
                if (progressListeners == null) {
                    progressListeners = new ArrayList<ChannelFutureProgressListener>(1);
                }
                progressListeners.add((ChannelFutureProgressListener) listener);
            }
        }
    }

    if (notifyNow) {
        notifyListener(listener);
    }
}

主线程在添加了2个addListener之后,bind就结束了,进入bind外边的await等待

在大部分情况,boss线程的wakeUp还是挺快的,addListener操作会直接进入notifyListener()

** **