/** * Tencent is pleased to support the open source community by making MSEC available. * * Copyright (C) 2016 THL A29 Limited, a Tencent company. All rights reserved. * * Licensed under the GNU General Public License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. You may * obtain a copy of the License at * * https://opensource.org/licenses/GPL-2.0 * * Unless required by applicable law or agreed to in writing, software distributed under the * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, * either express or implied. See the License for the specific language governing permissions * and limitations under the License. */ /** * @filename kqueue_proxy.cpp * @info kqueue for micro thread manage */ #include "kqueue_proxy.h" #include "micro_thread.h" #include "ff_hook.h" using namespace NS_MICRO_THREAD; KqueueProxy::KqueueProxy() { _maxfd = KqueueProxy::DEFAULT_MAX_FD_NUM; _kqfd = -1; _evtlist = NULL; _kqrefs = NULL; } int KqueueProxy::InitKqueue(int max_num) { int rc = 0; if (max_num > _maxfd) { _maxfd = max_num; } _kqfd = ff_kqueue(); if (_kqfd < 0) { rc = -1; goto EXIT_LABEL; } ff_fcntl(_kqfd, F_SETFD, FD_CLOEXEC); _kqrefs = new KqFdRef[_maxfd]; if (_kqrefs == NULL) { rc = -2; goto EXIT_LABEL; } _evtlist = (KqEvent*)calloc(_maxfd, sizeof(KqEvent)); if (_evtlist == NULL) { rc = -3; goto EXIT_LABEL; } struct rlimit rlim; memset(&rlim, 0, sizeof(rlim)); if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) { if ((int)rlim.rlim_max < _maxfd) { rlim.rlim_cur = rlim.rlim_max; setrlimit(RLIMIT_NOFILE, &rlim); rlim.rlim_cur = _maxfd; rlim.rlim_max = _maxfd; setrlimit(RLIMIT_NOFILE, &rlim); } } EXIT_LABEL: if (rc < 0) { TermKqueue(); } return rc; } void KqueueProxy::TermKqueue() { if (_kqfd > 0) { close(_kqfd); _kqfd = -1; } if (_evtlist != NULL) { free(_evtlist); _evtlist = NULL; } if (_kqrefs != NULL) { delete []_kqrefs; _kqrefs = NULL; } } bool KqueueProxy::KqueueAdd(KqObjList& obj_list) { bool ret = true; KqueuerObj *kqobj = NULL; KqueuerObj *kqobj_error = NULL; TAILQ_FOREACH(kqobj, &obj_list, _entry) { if (!KqueueAddObj(kqobj)) { MTLOG_ERROR("kqobj add failed, fd: %d", kqobj->GetOsfd()); kqueue_assert(0); kqobj_error = kqobj; ret = false; goto EXIT_LABEL; } } EXIT_LABEL: if (!ret) { TAILQ_FOREACH(kqobj, &obj_list, _entry) { if (kqobj == kqobj_error) { break; } KqueueDelObj(kqobj); } } return ret; } bool KqueueProxy::KqueueDel(KqObjList& obj_list) { bool ret = true; KqueuerObj *kqobj = NULL; TAILQ_FOREACH(kqobj, &obj_list, _entry) { if (!KqueueDelObj(kqobj)) // failed also need continue, be sure ref count ok { MTLOG_ERROR("epobj del failed, fd: %d", kqobj->GetOsfd()); kqueue_assert(0); ret = false; } } return ret; } bool KqueueProxy::KqueueCtrlAdd(int fd, int events) { KqFdRef* item = KqFdRefGet(fd); if (item == NULL) { MT_ATTR_API(320851, 1); // fd error, wtf? MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); kqueue_assert(0); return false; } item->AttachEvents(events); int old_events = item->GetListenEvents(); int new_events = old_events | events; if (old_events == new_events) { return true; } KqEvent ke; int ret; if (old_events & KQ_EVENT_WRITE) { EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { // TODO, error check item->DetachEvents(events); kqueue_assert(0); return false; } } if (old_events & KQ_EVENT_READ) { EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { // TODO, error check item->DetachEvents(events); kqueue_assert(0); return false; } } if (events & KQ_EVENT_WRITE) { EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { // TODO, error check item->DetachEvents(events); kqueue_assert(0); return false; } } if (events & KQ_EVENT_READ) { EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { // TODO, error check item->DetachEvents(events); kqueue_assert(0); return false; } } item->SetListenEvents(new_events); return true; } bool KqueueProxy::KqueueCtrlDel(int fd, int events) { return KqueueCtrlDelRef(fd, events, false); } bool KqueueProxy::KqueueCtrlDelRef(int fd, int events, bool use_ref) { KqFdRef* item = KqFdRefGet(fd); if (item == NULL) { MT_ATTR_API(320851, 1); // fd error MTLOG_ERROR("kqfd ref not find, failed, fd: %d", fd); kqueue_assert(0); return false; } item->DetachEvents(events); int old_events = item->GetListenEvents(); int new_events = old_events &~ events; if (use_ref) { new_events = old_events; if (item->ReadRefCnt() == 0) { new_events = new_events & ~KQ_EVENT_READ; } if (item->WriteRefCnt() == 0) { new_events = new_events & ~KQ_EVENT_WRITE; } } if (old_events == new_events) { return true; } KqEvent ke; int ret; if (old_events & KQ_EVENT_WRITE) { EV_SET(&ke, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { kqueue_assert(0); return false; } } if (old_events & KQ_EVENT_READ) { EV_SET(&ke, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { kqueue_assert(0); return false; } } if (new_events & KQ_EVENT_WRITE) { EV_SET(&ke, fd, EVFILT_WRITE, EV_ADD, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { kqueue_assert(0); return false; } } if (new_events & KQ_EVENT_READ) { EV_SET(&ke, fd, EVFILT_READ, EV_ADD, 0, 0, NULL); ret = ff_kevent(_kqfd, &ke, 1, NULL, 0, NULL); if (ret == -1) { kqueue_assert(0); return false; } } item->SetListenEvents(new_events); return true; } bool KqueueProxy::KqueueAddObj(KqueuerObj* obj) { if (obj == NULL) { MTLOG_ERROR("kqobj input invalid, %p", obj); return false; } KqFdRef* item = KqFdRefGet(obj->GetOsfd()); if (item == NULL) { MT_ATTR_API(320851, 1); // fd error MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); kqueue_assert(0); return false; } int ret = obj->KqueueCtlAdd(item); if (ret < 0) { MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); kqueue_assert(0); return false; } return true; } bool KqueueProxy::KqueueDelObj(KqueuerObj* obj) { if (obj == NULL) { MTLOG_ERROR("kqobj input invalid, %p", obj); return false; } KqFdRef* item = KqFdRefGet(obj->GetOsfd()); if (item == NULL) { MT_ATTR_API(320851, 1); // fd error MTLOG_ERROR("kqfd ref not find, failed, fd: %d", obj->GetOsfd()); kqueue_assert(0); return false; } int ret = obj->KqueueCtlDel(item); if (ret < 0) { MTLOG_ERROR("kqueue ctrl callback failed, fd: %d, obj: %p", obj->GetOsfd(), obj); kqueue_assert(0); return false; } return true; } void KqueueProxy::KqueueRcvEventList(int evtfdnum) { int ret = 0; int osfd = 0; int revents = 0; int tmp_evts = 0; KqFdRef* item = NULL; KqueuerObj* obj = NULL; for (int i = 0; i < evtfdnum; i++) { osfd = _evtlist[i].ident; item = KqFdRefGet(osfd); if (item == NULL) { MT_ATTR_API(320851, 1); // fd error MTLOG_ERROR("kqfd ref not find, failed, fd: %d", osfd); kqueue_assert(0); continue; } tmp_evts = _evtlist[i].filter; if (tmp_evts == EVFILT_READ) { revents |= KQ_EVENT_READ; } if (tmp_evts == EVFILT_WRITE) { revents |= KQ_EVENT_WRITE; } obj = item->GetNotifyObj(); if (obj == NULL) { MTLOG_ERROR("fd notify obj null, failed, fd: %d", osfd); KqueueCtrlDel(osfd, (revents & (KQ_EVENT_READ | KQ_EVENT_WRITE))); continue; } obj->SetRcvEvents(revents); if (tmp_evts == EV_ERROR) { obj->HangupNotify(); continue; } if (revents & KQ_EVENT_READ) { ret = obj->InputNotify(); if (ret != 0) { continue; } } if (revents & KQ_EVENT_WRITE) { ret = obj->OutputNotify(); if (ret != 0) { continue; } } } } void KqueueProxy::KqueueDispatch() { int nfd; int wait_time = KqueueGetTimeout(); if (wait_time) { struct timespec ts; ts.tv_sec = wait_time / 1000; ts.tv_nsec = 0; nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, &ts); } else { nfd = ff_kevent(_kqfd, NULL, 0, _evtlist, _maxfd, NULL); } if (nfd <= 0) { return; } KqueueRcvEventList(nfd); } int KqueuerObj::InputNotify() { MicroThread* thread = this->GetOwnerThread(); if (thread == NULL) { kqueue_assert(0); MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); return -1; } if (thread->HasFlag(MicroThread::IO_LIST)) { MtFrame* frame = MtFrame::Instance(); frame->RemoveIoWait(thread); frame->InsertRunable(thread); } return 0; } int KqueuerObj::OutputNotify() { MicroThread* thread = this->GetOwnerThread(); if (NULL == thread) { kqueue_assert(0); MTLOG_ERROR("kqueue fd obj, no thread ptr, wrong"); return -1; } // Multiple events arrive at the same time if (thread->HasFlag(MicroThread::IO_LIST)) { MtFrame* frame = MtFrame::Instance(); frame->RemoveIoWait(thread); frame->InsertRunable(thread); } return 0; } int KqueuerObj::HangupNotify() { MtFrame* frame = MtFrame::Instance(); frame->KqueueCtrlDel(this->GetOsfd(), this->GetEvents()); return 0; } int KqueuerObj::KqueueCtlAdd(void* args) { MtFrame* frame = MtFrame::Instance(); KqFdRef* fd_ref = (KqFdRef*)args; kqueue_assert(fd_ref != NULL); int osfd = this->GetOsfd(); int new_events = this->GetEvents(); // Notify object needs updating KqueuerObj* old_obj = fd_ref->GetNotifyObj(); if ((old_obj != NULL) && (old_obj != this)) { MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); return -1; } fd_ref->SetNotifyObj(this); if (!frame->KqueueCtrlAdd(osfd, new_events)) { MTLOG_ERROR("kqfd ref add failed, log"); fd_ref->SetNotifyObj(old_obj); return -2; } return 0; } int KqueuerObj::KqueueCtlDel(void* args) { MtFrame* frame = MtFrame::Instance(); KqFdRef* fd_ref = (KqFdRef*)args; kqueue_assert(fd_ref != NULL); int osfd = this->GetOsfd(); int events = this->GetEvents(); KqueuerObj* old_obj = fd_ref->GetNotifyObj(); if (old_obj != this) { MTLOG_ERROR("kqfd ref conflict, fd: %d, old: %p, now: %p", osfd, old_obj, this); return -1; } fd_ref->SetNotifyObj(NULL); if (!frame->KqueueCtrlDelRef(osfd, events, false)) { MTLOG_ERROR("kqfd ref del failed, log"); fd_ref->SetNotifyObj(old_obj); return -2; } return 0; }