bet3365西甲-365bet上网导航-365bet国际

Yarn 状态机以及事件机制

Yarn 状态机以及事件机制

简介Yarn采用了基于事件驱动的并发模型:

所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器。需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。

事件处理器定义事件处理器定义如下:

代码语言:javascript代码运行次数:0运行复制@SuppressWarnings("rawtypes")

@Public

@Evolving

public interface EventHandler {

void handle(T event);

}只有一个handler函数,如参是事件:

中央处理器AsyncDispatcherAsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能:

注册不同类型的事件,主要包含事件类型和事件处理器。获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。代码语言:javascript代码运行次数:0运行复制@Public

@Evolving

public interface Dispatcher {

EventHandler getEventHandler();

void register(Class eventType, EventHandler handler);

}AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务,

是一个典型的生产者消费这模型。

代码语言:javascript代码运行次数:0运行复制public class AsyncDispatcher extends AbstractService implements Dispatcher {

...

}事件处理器的注册事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:Map, EventHandler> eventDispatchers,键是事件类型,value是事件的处理器。

对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,会依次调用每个handler。

代码语言:javascript代码运行次数:0运行复制public void register(Class eventType,

EventHandler handler) {

/* check to see if we have a listener registered */

EventHandler registeredHandler = (EventHandler) eventDispatchers.get(eventType);

LOG.info("Registering " + eventType + " for " + handler.getClass());

if (registeredHandler == null) {

eventDispatchers.put(eventType, handler);

} else if (!(registeredHandler instanceof MultiListenerHandler)){

/* for multiple listeners of an event add the multiple listener handler */

MultiListenerHandler multiHandler = new MultiListenerHandler();

multiHandler.addHandler(registeredHandler);

multiHandler.addHandler(handler);

eventDispatchers.put(eventType, multiHandler);

} else {

/* already a multilistener, just add to it */

MultiListenerHandler multiHandler

= (MultiListenerHandler) registeredHandler;

multiHandler.addHandler(handler);

}

}事件处理AsyncDispatcher#getEventHandler()是异步派发的关键:

代码语言:javascript代码运行次数:0运行复制private final EventHandler handlerInstance = new GenericEventHandler();

// 省略.....

@Override

public EventHandler getEventHandler() {

return handlerInstance;

}GenericEventHandler:一个特殊的事件处理器GenericEventHandler是一个特殊的事件处理器,用于接受各种事件。由指定线程处理接收到的事件。

代码语言:javascript代码运行次数:0运行复制public void handle(Event event) {

if (blockNewEvents) {

return;

}

drained = false;

/* all this method does is enqueue all the events onto the queue */

int qSize = eventQueue.size();

if (qSize != 0 && qSize % 1000 == 0

&& lastEventQueueSizeLogged != qSize) {

lastEventQueueSizeLogged = qSize;

LOG.info("Size of event-queue is " + qSize);

}

if (qSize != 0 && qSize % detailsInterval == 0

&& lastEventDetailsQueueSizeLogged != qSize) {

lastEventDetailsQueueSizeLogged = qSize;

printEventQueueDetails();

printTrigger = true;

}

int remCapacity = eventQueue.remainingCapacity();

if (remCapacity < 1000) {

LOG.warn("Very low remaining capacity in the event-queue: "

+ remCapacity);

}

try {

eventQueue.put(event);

} catch (InterruptedException e) {

if (!stopped) {

LOG.warn("AsyncDispatcher thread interrupted", e);

}

// Need to reset drained flag to true if event queue is empty,

// otherwise dispatcher will hang on stop.

drained = eventQueue.isEmpty();

throw new YarnRuntimeException(e);

}

};blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。eventQueue:将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。事件处理线程在服务启动时(serviceStart函数)创建一个线程,会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。

代码语言:javascript代码运行次数:0运行复制Runnable createThread() {

return new Runnable() {

@Override

public void run() {

while (!stopped && !Thread.currentThread().isInterrupted()) {

drained = eventQueue.isEmpty();

// 省略。。。

Event event;

try {

event = eventQueue.take();

} catch(InterruptedException ie) {

if (!stopped) {

LOG.warn("AsyncDispatcher thread interrupted", ie);

}

return;

}

if (event != null) {

// 省略。。。

dispatch(event);

// 省略。。。

}

}

}

};

}dispatch详解从已经注册的eventDispatchers列表里面查找当前事件对应的处理器,调用当前处理器的handler函数。如果当前handler处理出现异常时,默认会退出RM。代码语言:javascript代码运行次数:0运行复制protected void dispatch(Event event) {

//all events go thru this loop

LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),

event);

Class type = event.getType().getDeclaringClass();

try{

EventHandler handler = eventDispatchers.get(type);

if(handler != null) {

handler.handle(event);

} else {

throw new Exception("No handler for registered for " + type);

}

} catch (Throwable t) {

//TODO Maybe log the state of the queue

LOG.error(FATAL, "Error in dispatcher thread", t);

// If serviceStop is called, we should exit this thread gracefully.

if (exitOnDispatchException

&& (ShutdownHookManager.get().isShutdownInProgress()) == false

&& stopped == false) {

stopped = true;

Thread shutDownThread = new Thread(createShutDownThread());

shutDownThread.setName("AsyncDispatcher ShutDown handler");

shutDownThread.start();

}

}

}状态机状态转换由成员变量StateMachine管理,所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。

代码语言:javascript代码运行次数:0运行复制private static final StateMachineFactory

RMAppState,

RMAppEventType,

RMAppEvent> stateMachineFactory

= new StateMachineFactory

RMAppState,

RMAppEventType,

RMAppEvent>(RMAppState.NEW)

// Transitions from NEW state

.addTransition(RMAppState.NEW, RMAppState.NEW,

RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())

.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,

RMAppEventType.START, new RMAppNewlySavingTransition())

.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,

RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,

RMAppState.KILLED, RMAppState.FINAL_SAVING),

RMAppEventType.RECOVER, new RMAppRecoveredTransition())

.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,

new AppKilledTransition())

.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,

RMAppEventType.APP_REJECTED,

new FinalSavingTransition(new AppRejectedTransition(),

RMAppState.FAILED))

.addTransition(

RMAppState.KILLED,

RMAppState.KILLED,

EnumSet.of(RMAppEventType.APP_ACCEPTED,

RMAppEventType.APP_REJECTED, RMAppEventType.KILL,

RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,

RMAppEventType.NODE_UPDATE, RMAppEventType.START))

.installTopology();Transition定义了“从一个状态转换到另一个状态”的行为,由转换操作、开始状态、事件类型、事件组成:

代码语言:javascript代码运行次数:0运行复制public interface StateMachine

,

EVENTTYPE extends Enum, EVENT> {

public STATE getCurrentState();

public STATE getPreviousState();

public STATE doTransition(EVENTTYPE eventType, EVENT event)

throws InvalidStateTransitionException;

}ResourceManager中状态机RMApp:用于维护一个Application的生命周期,实现类 - RMAppImplRMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImplRMContainer:用于维护一个已分配的资源最小单位Container的生命周期,实现类 - RMContainerImplRMNode:用于维护一个NodeManager的生命周期,实现类 - RMNodeImplNodeManager中状态机:

Application:用于维护节点上一个Application的生命周期,实现类 - ApplicationImplContainer:用于维护节点上一个容器的生命周期,实现类 - ContainerImplLocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource