洞察掌握android电视app开发中的安全与合规策略,提升企业运营效率
601
2022-09-22
trafficserver的Net模块源码注释
Net的启动流程:main.ccmain()--->UnixNetProcessor::start()Net模块的启动intUnixNetProcessor::start(int, size_t){ EventType etype = ET_NET;//给NetHandler实例分配空间 netHandler_offset = eventProcessor.allocate(sizeof(NetHandler));//给PollCont实例分配空间 pollCont_offset = eventProcessor.allocate(sizeof(PollCont));//UnixNetProcessor对应的事件类型是ET_NET,如果是sslNetProcessor则对应的事件类型是ET_SSL upgradeEtype(etype);//从eventProcessor获得net的线程,这事在event模块初始化时做好的 n_netthreads = eventProcessor.n_threads_for_type[etype];//从eventProcessor获得net的线程数量 netthreads = eventProcessor.eventthread[etype];//初始化所有Net线程 for (int i = 0; i < n_netthreads; ++i) { initialize_thread_for_net(netthreads[i]);#ifndef STANDALONE_IOCORE extern void initialize_thread_for_*thread, int thread_index); initialize_thread_for_i);#endif } RecData d; d.rec_int = 0;//设置网络链接数的阈值 change_net_connections_throttle(NULL, RECD_INT, d, NULL);//sock相关,很少使用,这里先不介绍 if (!netProcessor.socks_conf_stuff) { socks_conf_stuff = NEW(new socks_conf_struct); loadSocksConfiguration(socks_conf_stuff); if (!socks_conf_stuff->socks_needed && socks_conf_stuff->accept_enabled) { Warning("We can not have accept_enabled and socks_needed turned off" " disabling Socks accept\n"); socks_conf_stuff->accept_enabled = 0; } else { socks_conf_stuff = netProcessor.socks_conf_stuff; } }//在页面上显示Net相关的统计信息#ifdef NON_MODULAR extern Action *register_ShowNet(Continuation * c, HTTPHdr * h); if (etype == ET_NET) statPagesManager.register_register_ShowNet);#endif return 1;}main()--->UnixNetProcessor::start()--->initialize_thread_for_net()顾名思义,这个函数的功能是为网络初始化一个线程,voidinitialize_thread_for_net(EThread *thread){//创建NetHandler、PollCont实例//NetHandler:用于处理Net相关的所有时间//PollCont:是一个Poll的continuation(ats的设计思想),包含指向NetHandler和PollDescriptor的指针//PollDescriptor:Poll的描述封装结构 new((ink_dummy_for_new *) get_NetHandler(thread)) NetHandler(); new((ink_dummy_for_new *) get_PollCont(thread)) PollCont(thread->mutex, get_NetHandler(thread)); get_NetHandler(thread)->mutex = new_ProxyMutex(); PollCont *pc = get_PollCont(thread); PollDescriptor *pd = pc->pollDescriptor;//调用NetHandler实例启动,最终会每秒调用NetHandler::mainNetEvent()函数 thread->schedule_imm(get_NetHandler(thread));#ifndef INACTIVITY_TIMEOUT//创建InactivityCop实例,InactivityCop会定时(1秒)判断每个链接vc是否可以关闭然后进行关闭处理 InactivityCop *inactivityCop = NEW(new InactivityCop(get_NetHandler(thread)->mutex));//定时调度 inactivityCop的check_inactivity()函数 thread->schedule_every(inactivityCop, HRTIME_SECONDS(1));#endif//注册信号处理函数 thread->signal_hook = net_signal_hook_function;//创建EventIO实例并初始化 thread->ep = (EventIO*)ats_malloc(sizeof(EventIO)); thread->ep->type = EVENTIO_ASYNC_SIGNAL;#if HAVE_EVENTFD//启动EventIO实例,使用epoll注册读事件(不知道epoll的先看一下啊) thread->ep->start(pd, thread->evfd, 0, EVENTIO_READ);#else thread->ep->start(pd, thread->evpipe[0], 0, EVENTIO_READ);#endif}NetHandler的初始化main()--->UnixNetProcessor::start()--->NetHandler::NetHandler()设置NetHandler的handler为NetHandler::startNetEventNetHandler::NetHandler():Continuation(NULL), trigger_event(0){ SET_HANDLER((NetContHandler) & NetHandler::startNetEvent);}设置NetHandler的handler为NetHandler::mainNetEvent,并定时调度该函数执行intNetHandler::startNetEvent(int event, Event *e){ (void) event; SET_HANDLER((NetContHandler) & NetHandler::mainNetEvent); e->schedule_every(NET_PERIOD); trigger_event = e; return EVENT_CONT;}PollCont的初始化main()--->UnixNetProcessor::start()--->PollCont::PollCont()PollCont::PollCont(ProxyMutex *m, NetHandler *nh, int pt):Continuation(m), net_handler(nh), poll_timeout(pt){//创建PollDescriptor实例 pollDescriptor = NEW(new PollDescriptor);//初始化PollDescriptor实例 pollDescriptor->init();//设置PollCont的handler为 PollCont::pollEvent SET_HANDLER(&PollCont::pollEvent);}PollDescriptor的初始化main()--->UnixNetProcessor::start()--->PollCont::PollCont()--->init() PollDescriptor *init() { result = 0;#if TS_USE_EPOLL nfds = 0;//创建epoll用的文件描述符 epoll_fd = epoll_create(POLL_DESCRIPTOR_SIZE); memset(ePoll_Triggered_Events, 0, sizeof(ePoll_Triggered_Events)); memset(pfd, 0, sizeof(pfd));#endif...... return this; }main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->NetHandler::mainNetEvent()这个函数看起来有点长,先说一下它的功能:首先是调用epoll_wait()等待事件,再次是根据事件的类型做不同的处理,事件分为EVENTIO_READWRITE_VC(读写事件)、EVENTIO_DNS_CONNECTION(DNS的CONNECT事件)、EVENTIO_ASYNC_SIGNAL(同步信号事件),正常的HTTP请求的接收和响应属于EVENTIO_READWRITE_VC,DNS请求发送流程时说过,调用connect()函数来发送DNS请求时会调用epoll_ctl()来注册响应的事件,这就是EVENTIO_DNS_CONNECTION,我们先不关心EVENTIO_ASYNC_SIGNAL。最后是分别遍历Handler的可读和可写队列,并调用read和write进行读和写,然后通知上层intNetHandler::mainNetEvent(int event, Event *e){ ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL)); (void) event; (void) e; EventIO *epd = NULL; int poll_timeout = net_config_poll_timeout;//计数信息++ NET_INCREMENT_DYN_STAT(net_handler_run_stat);//处理NetHandler的可读和可写队列上的时间UnixNetVConnection,这里你可以看作什么都不做 process_enabled_list(this); if (likely(!read_ready_list.empty() || !write_ready_list.empty() || !read_enable_list.empty() || !write_enable_list.empty())) poll_timeout = 0; else poll_timeout = net_config_poll_timeout; PollDescriptor *pd = get_PollDescriptor(trigger_event->ethread); UnixNetVConnection *vc = NULL;#if TS_USE_EPOLL//调用epoll事件 pd->result = epoll_wait(pd->epoll_fd, pd->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout); NetDebug("iocore_net_main_poll", "[NetHandler::mainNetEvent] epoll_wait(%d,%d), result=%d", pd->epoll_fd,poll_timeout,pd->result); ......//处理所有的事件 vc = NULL; for (int x = 0; x < pd->result; x++) { epd = (EventIO*) get_ev_data(pd,x);// EVENTIO_READWRITE_VC事件的处理:如果是读事件则加入到NetHandler的可读链表read_ready_list,如果是写事件加入NetHandler的可读链表write_ready_list if (epd->type == EVENTIO_READWRITE_VC) { vc = epd->data.vc; if (get_ev_events(pd,x) & (EVENTIO_READ|EVENTIO_ERROR)) { vc->read.triggered = 1; if (!read_ready_list.in(vc)) read_ready_list.enqueue(vc); else if (get_ev_events(pd,x) & EVENTIO_ERROR) { // check for unhandled epoll events that should be handled Debug("iocore_net_main", "Unhandled epoll event on read: 0x%04x read.enabled=%d closed=%d read-ready_queue=%d", get_ev_events(pd,x), vc->read.enabled, vc->closed, read_ready_list.in(vc)); } } vc = epd->data.vc; if (get_ev_events(pd,x) & (EVENTIO_WRITE|EVENTIO_ERROR)) { vc->write.triggered = 1; if (!write_ready_list.in(vc)) write_ready_list.enqueue(vc); else if (get_ev_events(pd,x) & EVENTIO_ERROR) { Debug("iocore_net_main", "Unhandled epoll event on write: 0x%04x write.enabled=%d closed=%d write-ready_queue=%d", get_ev_events(pd,x), vc->write.enabled, vc->closed, write_ready_list.in(vc)); } } else if (!get_ev_events(pd,x) & EVENTIO_ERROR) { Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", get_ev_events(pd,x)); }//EVENTIO_DNS_CONNECTION事件的处理:加入DNSHandler的triggered队列 } else if (epd->type == EVENTIO_DNS_CONNECTION) { if (epd->data.dnscon != NULL) { epd->data.dnscon->trigger(); #if defined(USE_EDGE_TRIGGER) epd->refresh(EVENTIO_READ);#endif } } else if (epd->type == EVENTIO_ASYNC_SIGNAL) net_signal_hook_callback(trigger_event->ethread); ev_next_event(pd,x); } pd->result = 0;#if defined(USE_EDGE_TRIGGER)//遍历Handler的可读队列中的vc,调用net_read_io分别处理每个vc,而net_read_io的功能就是调用read去接收数据,然后通知上层(HttpSM) while ((vc = read_ready_list.dequeue())) { if (vc->closed) close_UnixNetVConnection(vc, trigger_event->ethread); else if (vc->read.enabled && vc->read.triggered) vc->net_read_io(this, trigger_event->ethread); else if (!vc->read.enabled) { read_ready_list.remove(vc); } }//遍历Handler的可写队列中的vc,调用write_to_net分别处理每个vc,而write_to_net的功能就是调用write去发送数据,然后通知上层(HttpSM) while ((vc = write_ready_list.dequeue())) { if (vc->closed) close_UnixNetVConnection(vc, trigger_event->ethread); else if (vc->write.enabled && vc->write.triggered) write_to_net(this, vc, trigger_event->ethread); else if (!vc->write.enabled) { write_ready_list.remove(vc); } } return EVENT_CONT;}别忘了InactivityCop这个结构main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->InactivityCop()设置handler为InactivityCop::check_inactivity,该函数被每秒中调用一次struct InactivityCop : public Continuation { InactivityCop(ProxyMutex *m):Continuation(m) { SET_HANDLER(&InactivityCop::check_inactivity); }main()--->UnixNetProcessor::start()--->initialize_thread_for_net()--->InactivityCop()---> InactivityCop::check_inactivity() int check_inactivity(int event, Event *e) { (void) event; ink_hrtime now = ink_get_hrtime(); NetHandler *nh = get_NetHandler(this_ethread());//遍历NetHandler的链接队列,判断和本线程是不是统一线程,是的话加到NetHandler的cop_list队列 forl_LL(UnixNetVConnection, vc, nh->open_list) { if (vc->thread == this_ethread()) nh->cop_list.push(vc); } while (UnixNetVConnection *vc = nh->cop_list.pop()) { // If we cannot ge tthe lock don't stop just keep cleaning MUTEX_TRY_LOCK(lock, vc->mutex, this_ethread()); if (!lock.lock_acquired) { NET_INCREMENT_DYN_STAT(inactivity_cop_lock_acquire_failure_stat); continue; }//如果该链接vc已设置为关闭状态,则调用close_UnixNetVConnection()进行关闭操作 if (vc->closed) { close_UnixNetVConnection(vc, e->ethread); continue; } if (vc->next_inactivity_timeout_at && vc->next_inactivity_timeout_at < now)//调用vc的handler(UnixNetVConnection::mainEvent)进行处理 vc->handleEvent(EVENT_IMMEDIATE, e); } return 0; } 好了,到此为止,NetProcessor的启动流程已经分析完成,可以简单得总结为:NetProcessor的启动主要任务是初始化几个线程定时调用epoll_wait()等待读写事件,如果有读事件到来时调用read进行读操作,然后把读到的数据传给上层处理,如果有写事件到来时调用write进行发送操作,发送完成后通知上层发送结果。那么读写事件是怎么来的呢?根据网络编程的经验,server在read和write之前一般都要accept,这里的读写事件正是来于accept,下面来分析NetProcessor的accept。NetProcessor的accept是在HttpProxyServer启动时调用的,正确的说是main_accept()函数。main()--->start_HttpProxyServer()voidstart_HttpProxyServer(){//根据配置,每个端口(默认只有一个:8080)创建一个Acceptor for ( int i = 0 , n = proxy_ports.length() ; i < n ; ++i ) { HttpProxyAcceptor& acceptor = HttpProxyAcceptors[i]; HttpProxyPort& port = proxy_ports[i]; ...... if (NULL == netProcessor.main_accept(acceptor._accept, port.m_fd, acceptor._net_opt)) return; } ...... }}main()--->start_HttpProxyServer()--->NetProcessor::main_accept()Action *NetProcessor::main_accept(Continuation *cont, SOCKET fd, AcceptOptions const& opt){ UnixNetProcessor* this_unp = static_cast
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~