Skip to content

Tars 统计信息上报源码笔记

参考资料:

  1. 微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端-腾讯云开发者社区-腾讯云 (tencent.com)
  2. 微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端-腾讯云开发者社区-腾讯云 (tencent.com)

在使用TARS构建RPC服务端的时候,TARS会帮你生成一个XXXServer类,这个类是继承自Application类的,声明变量XXXServer g_app,以及调用函数:

cpp
g_app.main(argc, argv); 
g_app.waitForShutdown();

便可以开启TARS的RPC服务了。

框架部分

Application

一个服务端就是一个Application,Application帮助用户读取配置文件,根据配置文件初始化代理(假如这个服务端需要调用其他服务,那么就需要初始化代理了)与服务,新建以及启动网络线程与业务线程。

void Application::main(const string &config) 部分代码:

cpp
//解析配置文件
parseConfig(config);

//初始化客户端
initializeClient();

//初始化 TC_EpollServer
initializeServer();

initializeClient() 会根据配置文件初始化代理(需要调用其他服务),部分代码:

cpp
// 初始化通信器,getInstance() 继承自 TC_Singleton<CommunicatorFactory>,返回 CommunicatorFactory 指针
// _config 中配置了需要初始化的代理
 _communicator = CommunicatorFactory::getInstance()->getCommunicator(_conf);

new 了 一个 Communicator ,设置 Communicator 属性,但是没有 Initialize()Initialize() 会在 Communicator 调用 getServantProxy()getStatReport()reloadProperty() 时被调用,只有第一次调用时才会真正的初始化,在 Application::main 中,initializeServer() 第一次调用 getStatReport() 时 初始化。

Communicator

一个Communicator实例就是一个客户端,负责与服务端建立连接,生成RPC服务句柄,可以通过CommunicatorPtr& Application::getCommunicator()获取Communicator实例,用户最后不要自己声明定义新的Communicator实例。

Communicator 中有一个成员,负责上报统计信息,在Initialize() 中会调用 new StatReport(this) 创建对象。

cpp
StatReport * _statReport; //上报类
// class StatReport : public TC_HandleBase, public TC_Thread, public TC_ThreadLock

StatReport 类继承自 TC_Thread 类,是一个自定义线程类,TC_Thread::start() 启动线程:

cpp
TC_ThreadControl TC_Thread::start()
{
    TC_ThreadLock::Lock sync(_lock);
    if (_running)
    {
        throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread has start");
    }

    try
    {
        _th = new std::thread(&TC_Thread::threadEntry, this); // 创建线程

    }
    catch(...)
    {
        throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread start error");
    }

    _lock.wait();

    return TC_ThreadControl(_th);
}

void TC_Thread::threadEntry(TC_Thread *pThread)
{
	RunningClosure r(pThread);

    {
        TC_ThreadLock::Lock sync(pThread->_lock);
        pThread->_lock.notifyAll();
    }

    try
    {
        pThread->run(); // 执行的任务是 pThread 中实现的 run()
    }
    catch (exception &ex)
    {
        cerr << std::this_thread::get_id() << "|" << ex.what() << endl;
        throw ex;
    }
    catch (...)
    {
        throw;
    }
}

StatReport 实现了 TC_Thread 中的纯虚函数 run()StatReport 创建时不会调用 start(),而是在 Communicator::Initialize() 中调用 setReportInfo()start()

StatReport

StatReport::run() 在循环里上报统计信息,超过设置的时间间隔时上报一次。StatReport 提供了 report() 接口 和 submit() 接口,但是没找到调用的地方。CommunicatorEpoll 的统计信息最终由 AdapterProxy 在远程调用结束时统计。

代码注释:

cpp
void StatReport::run() // 轮询
{
    while(!_terminate)
    {
        {
            Lock lock(*this);

            timedWait(1000);
        }

        try
        {
            time_t tNow = TNOW; // 获取当前时间

            if(tNow - _time >= _reportInterval/1000) // 超过间隔周期
            {
                // Mic = module interval call
                reportMicMsg(_statMicMsgClient, true);  // 上报客户端统计信息

                reportMicMsg(_statMicMsgServer, false);  // 上报服务端统计信息
                
                // typedef  map<StatMicMsgHead, StatMicMsgBody>        MapStatMicMsg;
                MapStatMicMsg mStatMsg;
                
                // 返回类型:vector<shared_ptr<CommunicatorEpoll>>
                // 关系图:
                /**
                 * Communicator:
                 *     - CommunicatorEpoll1     // 网络处理线程类
                 *         - fooObjectProxy1    // 服务实体
                 *         - barObjectProxy1
                 *     - CommunicatorEpoll2
                 *         - fooObjectProxy2
                 *         - barObjectProxy2
                 */
                
                // 获取当前通信器的全部 CommunicatorEpoll
                auto communicatorEpolls = _communicator->getAllCommunicatorEpoll();

                for(auto ce : communicatorEpolls)
                {
                    MapStatMicMsg * pStatMsg;
                    while(ce->popStatMsg(pStatMsg)) // 最终由 AdapterProxy 在远程调用结束时统计
                    {
                        // 合并 StatMicMsgHead 相同的 MapStatMicMsg
                        addMicMsg(mStatMsg, *pStatMsg);
                        delete pStatMsg;
                    }
                }
                
                // 上报当前 Application 需要调用的服务代理中所有的调用统计信息
                // 当前 Application 为客户端
                reportMicMsg(mStatMsg, true); 

                reportPropMsg();

//                reportSampleMsg();

                _time = tNow;
            }

        }
        catch ( exception& e )
        {
            TLOGERROR("StatReport::run catch exception:" << e.what() << endl);
        }
        catch ( ... )
        {
            TLOGERROR("StatReport::run catch unkown exception" << endl);
        }
    }

    ServantProxyThreadData::g_sp.reset();
}

StatReport::reportMicMsg() 通过ServantProxy调用RPC服务,异步上报统计信息。

代码注释:

cpp
int StatReport::reportMicMsg(MapStatMicMsg& msg, bool bFromClient)
{
    if (msg.empty()) return 0;
    try
    {
        int iLen = 0;
        MapStatMicMsg  mTemp;
        MapStatMicMsg  mStatMsg;
        mStatMsg.clear();
        mTemp.clear();
        {
            Lock lock(*this);
            msg.swap(mStatMsg);
        }

       TLOGTARS("[StatReport::reportMicMsg get size:" << mStatMsg.size()<<"]"<< endl);
       
       // 遍历 MapStatMicMsg
       for(MapStatMicMsg::iterator it = mStatMsg.begin(); it != mStatMsg.end(); it++)
       {
           const StatMicMsgHead &head = it->first;  // 当前 MicMsg 的 head
           
           //STAT_PROTOCOL_LEN  一次stat mic上报纯协议部分占用大小,用来控制udp大小防止超MTU 
           int iTemLen = STAT_PROTOCOL_LEN +head.masterName.length() + head.slaveName.length() + head.interfaceName.length()
               + head.slaveSetName.length() + head.slaveSetArea.length() + head.slaveSetID.length(); // 当前 MicMsg 长度
           iLen = iLen + iTemLen;    // 当前 mTemp 中所有 MicMsg 的长度
           if(iLen > _maxReportSize) //不能超过udp 1472
           {
               if(_statPrx)
               {
                   TLOGTARS("[StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
                   
                   // mTemp 达到可以发送的最大长度时,通过ServantProxy调用RPC服务,异步上报统计信息
                   _statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
               }
               iLen = iTemLen;
               mTemp.clear();
           }

           mTemp[head] = it->second;
           if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
           {
               ostringstream os;
               os.str("");
               head.displaySimple(os);
               os << "  ";
               mTemp[head].displaySimple(os);
               TLOGTARS("[StatReport::reportMicMsg display:" << os.str() << "]" << endl);
           }
       }
       if(0 != (int)mTemp.size()) // 最后一组未达到最大长度而没有上报的 MicMsg
       {
           if(_statPrx)
           {
               TLOGTARS("[StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
               _statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
           }
       }
       return 0;
    }
    catch (exception& e)
    {
        TLOGERROR("StatReport::report catch exception:" << e.what() << endl);
    }
    catch ( ... )
    {
        TLOGERROR("StatReport::report catch unkown exception" << endl);
    }
    return -1;
}

StatServer

StatServer::initialize() 时 使用 addServant<StatImp>() 增加 Servant 对象,并添加一个自定义的用于执行定时操作的线程类(ReapSSDThread)对象,该对象定时执行写数据库操作,同时启动一个定时器,定时删除超过保留时间的数据。

StatServer::initialize() 部分代码注释:

cpp
void StatServer::initialize()
{
    try
    {
        //增加对象
        addServant<StatImp>( ServerConfig::Application + "." + ServerConfig::ServerName +".StatObj" );

        _iInsertInterval  = TC_Common::strto<int>(g_pconf->get("/tars/hashmap<insertInterval>","5")); // 数据库插入间隔,单位分钟
        if(_iInsertInterval < 5)
        {
            _iInsertInterval = 5;
        }

	    _reserveDay = TC_Common::strto<int>(g_pconf->get("/tars/db_reserve<stat_reserve_time>", "31")); // 保留天数
	    if(_reserveDay < 2)
	    {
		    _reserveDay = 2;
        }

        //获取业务线程个数,业务代码StatImp由多个业务线程执行
        // ...
        int iHandleNum = TC_Common::strto<int>(g_pconf->get(sHandleNum, "20"));
        vector<int64_t> vec;
        vec.resize(iHandleNum);
        for(size_t i =0; i < vec.size(); ++i)
        {
            vec[i] = 0;
        }

        // _vBuffer 中保存每个业务线程对各 buffer 上次的 addHashMap(写入) 的时间
        _vBuffer[0]=vec;  
        _vBuffer[1]=vec;

        TLOGDEBUG("StatServer::initialize iHandleNum:" << iHandleNum<< endl);

        initHashMap();  // 初始化接受 MicMsg 的数据结构

        string s("");
        _iSelectBuffer = getSelectBufferFromFlag(s);  // 初始备选 buffer

        TLOGDEBUG("StatServer::initialize iSelectBuffer:" << _iSelectBuffer<< endl);

        // ...

        _pReapSSDThread = new ReapSSDThread();
        _pReapSSDThread->start(); // 写数据库线程

	    _timer.startTimer();
	    _timer.postCron("0 0 3 * * *", std::bind(&StatServer::doReserveDb, this, "statdb", g_pconf)); // 定时删除 _reserveDay 天之间的数据表
    }
    catch ( ... )
    {
        TLOGERROR("StatServer::initialize unknow  exception  catched" << endl);
        exit( 0 );

    }
}

StatServer 使用双 buffer 机制(为了实现无锁读?),reportMicMsg() 写 buffer 时,ReapSSDThread 读取另一个 buffer 。

StatImp

StatImp::reportMicMsg() 收到远程调用请求时,将统计信息写入缓冲区,代码注释:

cpp
int StatImp::reportMicMsg( const map<tars::StatMicMsgHead, tars::StatMicMsgBody>& statmsg,bool bFromClient, tars::TarsCurrentPtr current )
{
    TLOGINFO("report---------------------------------access size:" << statmsg.size() << "|bFromClient:" <<bFromClient << endl);

    for ( map<StatMicMsgHead, StatMicMsgBody>::const_iterator it = statmsg.begin(); it != statmsg.end(); it++ )
    {
        StatMicMsgHead head = it->first;
        const StatMicMsgBody &body = it->second;
        
        // 解析 statmsg
        if(bFromClient)
        {
            head.masterIp   = current->getHostName() ;  //以前是自己获取主调ip,现在从proxy直接

            head.slaveName  = getSlaveName(head.slaveName);
        }
        else
        {
            head.slaveIp = current->getHostName() ;//现在从proxy直接
        }

        string sMasterName      = head.masterName;
        string::size_type pos   =  sMasterName.find("@");
        if (pos != string::npos)
        {
            head.masterName   = sMasterName.substr(0, pos);
            head.tarsVersion   = sMasterName.substr(pos+1);
        }

        map<string, string>::iterator it_vip;
        it_vip =  g_app.getVirtualMasterIp().find(getSlaveName(head.slaveName));
        if( it_vip != g_app.getVirtualMasterIp().end())
        {
            head.masterIp    = it_vip->second; //按 slaveName来匹配,填入假的主调ip,减小入库数据量
        }

        //如果不是info等级的日志级别,就别往里走了
        ostringstream os;
        if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
        {
            os.str("");
            head.displaySimple(os);
            body.displaySimple(os);
        }

        //三个数据都为0时不入库
        if(body.count == 0 && body.execCount == 0 && body.timeoutCount == 0)
        {
            TLOGINFO(os.str()<<"|zero"<<endl);
            continue;
        }

        int iAddHash        = addHashMap(head,body); // *** 添加到哈希表中 ***

        TLOGINFO(os.str()<<"|"<<iAddHash<<endl);
    }

    return 0;
}

StatImp::addHashMap() 代码注释:

cpp
int StatImp::addHashMap(const StatMicMsgHead &head, const StatMicMsgBody &body )
{
    size_t iIndex = _threadIndex;

    //--- dump数据到文件 ---//
    
    // 获取当前被选择的 buffer
    int iBufferIndex = g_app.getSelectBufferIndex(); 
    map<int,vector<int64_t> >& mBuffer = g_app.getBuffer();
    map<int,vector<int64_t> >::iterator iter = mBuffer.find(iBufferIndex);
    
    iter->second[iIndex] = TNOW;  // 记录当前线程 addHashMap 的时间戳

    // 上次切换缓冲区的时间大于数据库插入时间间隔时,切换读写缓冲区
    // 切换读写缓冲区后,数据库插入线程将之前写缓冲区(现在是读缓冲)的数据插入数据库
    // 并清空缓冲区 
    dump2file();

    iBufferIndex = g_app.getSelectBufferIndex();

    string sKey = head.slaveName;
    sKey += head.masterName;
    sKey += head.interfaceName;
    sKey += head.masterIp;
    sKey += head.slaveIp;

    int iHashKey = _hashf(sKey) % g_app.getBuffNum();

    StatHashMap *pHashMap = g_app.getHashMapBuff(iBufferIndex, iHashKey); // 返回当前的 Hash 表
    
    //////////////////////////////////////////////////////////////////////////////////////

    float rate =  (pHashMap->getMapHead()._iUsedChunk) * 1.0/pHashMap->allBlockChunkCount(); // 碰撞率检测

    if(rate >0.9)
    {
        TLOGERROR("StatImp::addHashMap hashmap will full|_iMemSize:" << pHashMap->getMapHead()._iMemSize << endl);
//        FDLOG("HashMap")<<"StatImp::addHashMap hashmap will full|_iMemSize:" << pHashMap->getMapHead()._iMemSize << endl;
        return -1;
    }

    int iRet = pHashMap->add(head, body);
    if(iRet != 0)
    {
        TLOGDEBUG("StatImp::addHashMap set g_hashmap recourd erro|" << iRet << endl);
        return iRet;
    }
    return iRet;
}

StatImp::dump2file() 判断是否切换读写缓冲区,切换后更新切换时间戳,只有切换读写缓冲区之后,数据库才会执行插入操作,dump到文件。代码注释:

cpp
void StatImp::dump2file()
{
    static string g_sDate;  // 局部静态变量
    static string g_sFlag;
    static time_t g_tLastDumpTime   = 0;

    time_t tTimeNow         = TC_TimeProvider::getInstance()->getNow();
    time_t tTimeInterv      = g_app.getInserInterv() *60;//second. 数据库插入时间

    if(g_tLastDumpTime == 0) // 第一次 dump2file() 
    {
        g_app.getTimeInfo(g_tLastDumpTime,g_sDate,g_sFlag); // 获取当前时间
    }

    if(tTimeNow - g_tLastDumpTime > tTimeInterv) // 双检锁
    {
        static  TC_ThreadLock g_mutex;
        TC_ThreadLock::Lock  lock( g_mutex );
        if(tTimeNow - g_tLastDumpTime > tTimeInterv)
        {
            g_app.getTimeInfo(g_tLastDumpTime,g_sDate,g_sFlag); // 更新缓冲区切换时间
            int iSelectBuffer = g_app.getSelectBufferIndex();
            iSelectBuffer = !iSelectBuffer;

            g_app.setSelectBufferIndex(iSelectBuffer); // 切换缓冲区

            TLOGDEBUG("StatImp::dump2file select buffer:" << iSelectBuffer << "|TimeInterv:" << tTimeInterv << "|now:" << tTimeNow << "|last:" << g_tLastDumpTime << endl);
//            FDLOG("CountStat") << "stat ip:" << ServerConfig::LocalIp << "|StatImp::dump2file select buffer:" << iSelectBuffer << "|TimeInterv:" << tTimeInterv << "|now:" << tTimeNow << "|last:" << g_tLastDumpTime << endl;
        }
    }
}

ReapSSDThread

ReapSSDThread 中有一组 ReapSSDProcThread 类成员 _runners_runners 才是执行向数据库插入数据的线程。ReapSSDThread 每隔 REAP_INTERVAL 秒将双缓冲区 buffer 中的读 buffer 的内容读到数据库缓冲中,由 _runners 执行数据库插入操作。

ReapSSDThread::run() 部分代码注释:

cpp
void ReapSSDThread::run()
{
    int iInsertDataNum = StatDbManager::getInstance()->getDbIpNum(); // DB IP数量
    
    // 启动 _runners 
    for(int i = 0; i < iInsertDataNum; ++i)
    {
        ReapSSDProcThread *r = new ReapSSDProcThread(this);

        r->start();

        _runners.push_back(r);
    }

    string sDate,sTime;
    int dbNumber = StatDbManager::getInstance()->getDbNumber();
    string sRandOrder;
    uint64_t iTotalNum = 0;
    int iLastIndex = -1;


    while (!_terminate)
    {
        try
        {
            //双buffer中一个buffer入库
            int iBufferIndex = !(g_app.getSelectBufferIndex()); // 读buffer 的 index,_iSelectBuffer: 写入的buffer
            int64_t iInterval = 3;  // 单位:秒
            
            // 1. iBufferIndex != iLastIndex 判断两次读的是否是同一个buffer,因为读完之后会清空 buffer
            // 2. getSelectBuffer(iBufferIndex, iInterval) 判断 buffer[iBufferIndex] 的最后一次写入的时间间隔是否大于 iInterval
            if(iBufferIndex != iLastIndex && g_app.getSelectBuffer(iBufferIndex, iInterval))
            {
                
                iLastIndex = iBufferIndex;

                iTotalNum = 0;

                vector<StatMsg*> vAllStatMsg;
                for(int iStatIndex = 0; iStatIndex < dbNumber; ++iStatIndex)
                {
                    vAllStatMsg.push_back(new StatMsg());
                }

                int64_t tBegin = TNOWMS;

                getDataFromBuffer(iBufferIndex, vAllStatMsg, iTotalNum); // 从 buffer 中获取数据放入 vAllStatMsg 中
                
                // 没有获取到数据
                if(iTotalNum <= 0)
                {
                    for(int iStatIndex = 0; iStatIndex < dbNumber; ++iStatIndex)
                    {
                        delete vAllStatMsg[iStatIndex];  // 资源释放
                    }

                    vAllStatMsg.clear();
                }
                else
                {
                    
                    string sFile="";
                    string sDate="";
                    string sFlag="";
                    time_t time=0;
                    g_app.getTimeInfo(time,sDate,sFlag);

                    //size_t iSize = vAllStatMsg.size();

                    QueueItem item;
                    int iInsertThreadIndex = 0;
                    sRandOrder = g_app.getRandOrder();

                    if (sRandOrder == "")
                    {
                        sRandOrder = "0";
                    }

                    map<string, vector<size_t> >& mIpHasDbInfo = StatDbManager::getInstance()->getIpHasDbInfo(); 
                    map<string, vector<size_t> >::iterator m_iter = mIpHasDbInfo.begin();

                    while(m_iter != mIpHasDbInfo.end())
                    {
                        vector<size_t> &vDb = m_iter->second;

                        for(size_t i = 0; i < vDb.size(); ++i)
                        {
                            int k = (i + TC_Common::strto<int>(sRandOrder)) % vDb.size();

                            item._index = vDb[k];
                            item._date  = sDate;
                            item._tflag = sFlag;
                            item._statmsg = vAllStatMsg[item._index];

                            iInsertThreadIndex = StatDbManager::getInstance()->getDbToIpIndex(vDb[k]);

                            assert(iInsertThreadIndex >= 0);

                            _runners[iInsertThreadIndex]->put(item); // 存入数据库缓冲区 _queue
                        }

                        ++m_iter;
                    }

                    if(_terminate)
                    {
                        break;
                    }
                }

                for(int k = 0; k < g_app.getBuffNum(); ++k)  // 清空hashmap缓冲区
                {
                    StatHashMap *pHashMap = g_app.getHashMapBuff(iBufferIndex, k);
                    pHashMap->clear();
                }
            }

        }
        catch(exception& ex)
        {
            TLOGERROR("ReapSSDThread::run exception:"<< ex.what() << endl);
        }
        catch(...)
        {
            TLOGERROR("ReapSSDThread::run ReapSSDThread unkonw exception catched" << endl);
        }

        TC_ThreadLock::Lock lock(*this);
        timedWait(REAP_INTERVAL);  // 等待 5s
    }

    // 终止线程,资源回收
    StatDbManager::getInstance()->setITerminateFlag(true);

    for(size_t i = 0; i < _runners.size(); ++i)
    {
        if(_runners[i]->isAlive())
        {
            _runners[i]->terminate();

            _runners[i]->getThreadControl().join();
        }
    }

    for(size_t i = 0; i < _runners.size(); ++i)
    {
        if(_runners[i])
        {
            delete _runners[i];
            _runners[i] = NULL;
        }
    }

    TLOGDEBUG("ReapSSDThread run terminate." << endl);
}

ReapSSDProcThread::run() 部分代码注释:

cpp
void ReapSSDProcThread::run()
{
    string sDate1("");
    string sFlag1("");
    string sDate2("");
    string sFlag2("");

    while (!_terminate)
    {
        try
        {
            sDate1 = "";
            sFlag1 = "";
            sDate2 = "";
            sFlag2 = "";

            QueueItem item;

            if(pop(item))  // 获取 _queue 缓冲区的前 1000 条数据 存入 item 中
            {
                if(item._statmsg != NULL)
                {
                    int64_t iBegin = TNOWMS;
                    int64_t iEnd = 0;
                    
                    // 插入数据库
                    StatDbManager::getInstance()->insert2MultiDbs(item._index, *item._statmsg, item._date, item._tflag);
                    
                    // 记录日志 ...
                    
                    delete item._statmsg; // 释放资源
                    item._statmsg = NULL;
                }
                else
                {
                    TLOGERROR("ReapSSDProcThread::run item._statmsg == NULL." << endl);
                }
            }
        }
        catch(exception& e)
        {
            TLOGERROR("ReapSSDProcThread::run exception:" << e.what() << endl);
//            FDLOG("CountStat") << "ReapSSDProcThread::run exception:" << e.what() << endl;
        }
    }
}

基于 VitePress 构建