Tars 统计信息上报源码笔记
参考资料:
- 微服务开源框架TARS的RPC源码解析 之 初识TARS C++客户端-腾讯云开发者社区-腾讯云 (tencent.com)
- 微服务开源框架TARS的RPC源码解析 之 初识TARS C++服务端-腾讯云开发者社区-腾讯云 (tencent.com)
在使用TARS构建RPC服务端的时候,TARS会帮你生成一个XXXServer类,这个类是继承自Application类的,声明变量XXXServer g_app,以及调用函数:
g_app.main(argc, argv);
g_app.waitForShutdown(); 便可以开启TARS的RPC服务了。
框架部分
Application
一个服务端就是一个Application,Application帮助用户读取配置文件,根据配置文件初始化代理(假如这个服务端需要调用其他服务,那么就需要初始化代理了)与服务,新建以及启动网络线程与业务线程。
void Application::main(const string &config) 部分代码:
//解析配置文件
parseConfig(config);
//初始化客户端
initializeClient();
//初始化 TC_EpollServer
initializeServer(); initializeClient() 会根据配置文件初始化代理(需要调用其他服务),部分代码:
// 初始化通信器,getInstance() 继承自 TC_Singleton<CommunicatorFactory>,返回 CommunicatorFactory 指针
// _config 中配置了需要初始化的代理
_communicator = CommunicatorFactory::getInstance()->getCommunicator(_conf); new 了 一个 Communicator ,设置 Communicator 属性,但是没有 Initialize(), Initialize() 会在 Communicator 调用 getServantProxy()、getStatReport() 和 reloadProperty() 时被调用,只有第一次调用时才会真正的初始化,在 Application::main 中,initializeServer() 第一次调用 getStatReport() 时 初始化。
Communicator
一个Communicator实例就是一个客户端,负责与服务端建立连接,生成RPC服务句柄,可以通过CommunicatorPtr& Application::getCommunicator()获取Communicator实例,用户最后不要自己声明定义新的Communicator实例。
Communicator 中有一个成员,负责上报统计信息,在Initialize() 中会调用 new StatReport(this) 创建对象。
StatReport * _statReport; //上报类
// class StatReport : public TC_HandleBase, public TC_Thread, public TC_ThreadLock StatReport 类继承自 TC_Thread 类,是一个自定义线程类,TC_Thread::start() 启动线程:
TC_ThreadControl TC_Thread::start()
{
TC_ThreadLock::Lock sync(_lock);
if (_running)
{
throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread has start");
}
try
{
_th = new std::thread(&TC_Thread::threadEntry, this); // 创建线程
}
catch(...)
{
throw TC_ThreadThreadControl_Exception("[TC_Thread::start] thread start error");
}
_lock.wait();
return TC_ThreadControl(_th);
}
void TC_Thread::threadEntry(TC_Thread *pThread)
{
RunningClosure r(pThread);
{
TC_ThreadLock::Lock sync(pThread->_lock);
pThread->_lock.notifyAll();
}
try
{
pThread->run(); // 执行的任务是 pThread 中实现的 run()
}
catch (exception &ex)
{
cerr << std::this_thread::get_id() << "|" << ex.what() << endl;
throw ex;
}
catch (...)
{
throw;
}
} StatReport 实现了 TC_Thread 中的纯虚函数 run()。StatReport 创建时不会调用 start(),而是在 Communicator::Initialize() 中调用 setReportInfo() 时 start() 。
StatReport
StatReport::run() 在循环里上报统计信息,超过设置的时间间隔时上报一次。StatReport 提供了 report() 接口 和 submit() 接口,但是没找到调用的地方。CommunicatorEpoll 的统计信息最终由 AdapterProxy 在远程调用结束时统计。
代码注释:
void StatReport::run() // 轮询
{
while(!_terminate)
{
{
Lock lock(*this);
timedWait(1000);
}
try
{
time_t tNow = TNOW; // 获取当前时间
if(tNow - _time >= _reportInterval/1000) // 超过间隔周期
{
// Mic = module interval call
reportMicMsg(_statMicMsgClient, true); // 上报客户端统计信息
reportMicMsg(_statMicMsgServer, false); // 上报服务端统计信息
// typedef map<StatMicMsgHead, StatMicMsgBody> MapStatMicMsg;
MapStatMicMsg mStatMsg;
// 返回类型:vector<shared_ptr<CommunicatorEpoll>>
// 关系图:
/**
* Communicator:
* - CommunicatorEpoll1 // 网络处理线程类
* - fooObjectProxy1 // 服务实体
* - barObjectProxy1
* - CommunicatorEpoll2
* - fooObjectProxy2
* - barObjectProxy2
*/
// 获取当前通信器的全部 CommunicatorEpoll
auto communicatorEpolls = _communicator->getAllCommunicatorEpoll();
for(auto ce : communicatorEpolls)
{
MapStatMicMsg * pStatMsg;
while(ce->popStatMsg(pStatMsg)) // 最终由 AdapterProxy 在远程调用结束时统计
{
// 合并 StatMicMsgHead 相同的 MapStatMicMsg
addMicMsg(mStatMsg, *pStatMsg);
delete pStatMsg;
}
}
// 上报当前 Application 需要调用的服务代理中所有的调用统计信息
// 当前 Application 为客户端
reportMicMsg(mStatMsg, true);
reportPropMsg();
// reportSampleMsg();
_time = tNow;
}
}
catch ( exception& e )
{
TLOGERROR("StatReport::run catch exception:" << e.what() << endl);
}
catch ( ... )
{
TLOGERROR("StatReport::run catch unkown exception" << endl);
}
}
ServantProxyThreadData::g_sp.reset();
} StatReport::reportMicMsg() 通过ServantProxy调用RPC服务,异步上报统计信息。
代码注释:
int StatReport::reportMicMsg(MapStatMicMsg& msg, bool bFromClient)
{
if (msg.empty()) return 0;
try
{
int iLen = 0;
MapStatMicMsg mTemp;
MapStatMicMsg mStatMsg;
mStatMsg.clear();
mTemp.clear();
{
Lock lock(*this);
msg.swap(mStatMsg);
}
TLOGTARS("[StatReport::reportMicMsg get size:" << mStatMsg.size()<<"]"<< endl);
// 遍历 MapStatMicMsg
for(MapStatMicMsg::iterator it = mStatMsg.begin(); it != mStatMsg.end(); it++)
{
const StatMicMsgHead &head = it->first; // 当前 MicMsg 的 head
//STAT_PROTOCOL_LEN 一次stat mic上报纯协议部分占用大小,用来控制udp大小防止超MTU
int iTemLen = STAT_PROTOCOL_LEN +head.masterName.length() + head.slaveName.length() + head.interfaceName.length()
+ head.slaveSetName.length() + head.slaveSetArea.length() + head.slaveSetID.length(); // 当前 MicMsg 长度
iLen = iLen + iTemLen; // 当前 mTemp 中所有 MicMsg 的长度
if(iLen > _maxReportSize) //不能超过udp 1472
{
if(_statPrx)
{
TLOGTARS("[StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
// mTemp 达到可以发送的最大长度时,通过ServantProxy调用RPC服务,异步上报统计信息
_statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
}
iLen = iTemLen;
mTemp.clear();
}
mTemp[head] = it->second;
if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
{
ostringstream os;
os.str("");
head.displaySimple(os);
os << " ";
mTemp[head].displaySimple(os);
TLOGTARS("[StatReport::reportMicMsg display:" << os.str() << "]" << endl);
}
}
if(0 != (int)mTemp.size()) // 最后一组未达到最大长度而没有上报的 MicMsg
{
if(_statPrx)
{
TLOGTARS("[StatReport::reportMicMsg send size:" << mTemp.size()<<"]"<< endl);
_statPrx->tars_set_timeout(_reportTimeout)->async_reportMicMsg(NULL,mTemp,bFromClient, ServerConfig::Context);
}
}
return 0;
}
catch (exception& e)
{
TLOGERROR("StatReport::report catch exception:" << e.what() << endl);
}
catch ( ... )
{
TLOGERROR("StatReport::report catch unkown exception" << endl);
}
return -1;
} StatServer
StatServer::initialize() 时 使用 addServant<StatImp>() 增加 Servant 对象,并添加一个自定义的用于执行定时操作的线程类(ReapSSDThread)对象,该对象定时执行写数据库操作,同时启动一个定时器,定时删除超过保留时间的数据。
StatServer::initialize() 部分代码注释:
void StatServer::initialize()
{
try
{
//增加对象
addServant<StatImp>( ServerConfig::Application + "." + ServerConfig::ServerName +".StatObj" );
_iInsertInterval = TC_Common::strto<int>(g_pconf->get("/tars/hashmap<insertInterval>","5")); // 数据库插入间隔,单位分钟
if(_iInsertInterval < 5)
{
_iInsertInterval = 5;
}
_reserveDay = TC_Common::strto<int>(g_pconf->get("/tars/db_reserve<stat_reserve_time>", "31")); // 保留天数
if(_reserveDay < 2)
{
_reserveDay = 2;
}
//获取业务线程个数,业务代码StatImp由多个业务线程执行
// ...
int iHandleNum = TC_Common::strto<int>(g_pconf->get(sHandleNum, "20"));
vector<int64_t> vec;
vec.resize(iHandleNum);
for(size_t i =0; i < vec.size(); ++i)
{
vec[i] = 0;
}
// _vBuffer 中保存每个业务线程对各 buffer 上次的 addHashMap(写入) 的时间
_vBuffer[0]=vec;
_vBuffer[1]=vec;
TLOGDEBUG("StatServer::initialize iHandleNum:" << iHandleNum<< endl);
initHashMap(); // 初始化接受 MicMsg 的数据结构
string s("");
_iSelectBuffer = getSelectBufferFromFlag(s); // 初始备选 buffer
TLOGDEBUG("StatServer::initialize iSelectBuffer:" << _iSelectBuffer<< endl);
// ...
_pReapSSDThread = new ReapSSDThread();
_pReapSSDThread->start(); // 写数据库线程
_timer.startTimer();
_timer.postCron("0 0 3 * * *", std::bind(&StatServer::doReserveDb, this, "statdb", g_pconf)); // 定时删除 _reserveDay 天之间的数据表
}
catch ( ... )
{
TLOGERROR("StatServer::initialize unknow exception catched" << endl);
exit( 0 );
}
} StatServer 使用双 buffer 机制(为了实现无锁读?),reportMicMsg() 写 buffer 时,ReapSSDThread 读取另一个 buffer 。
StatImp
StatImp::reportMicMsg() 收到远程调用请求时,将统计信息写入缓冲区,代码注释:
int StatImp::reportMicMsg( const map<tars::StatMicMsgHead, tars::StatMicMsgBody>& statmsg,bool bFromClient, tars::TarsCurrentPtr current )
{
TLOGINFO("report---------------------------------access size:" << statmsg.size() << "|bFromClient:" <<bFromClient << endl);
for ( map<StatMicMsgHead, StatMicMsgBody>::const_iterator it = statmsg.begin(); it != statmsg.end(); it++ )
{
StatMicMsgHead head = it->first;
const StatMicMsgBody &body = it->second;
// 解析 statmsg
if(bFromClient)
{
head.masterIp = current->getHostName() ; //以前是自己获取主调ip,现在从proxy直接
head.slaveName = getSlaveName(head.slaveName);
}
else
{
head.slaveIp = current->getHostName() ;//现在从proxy直接
}
string sMasterName = head.masterName;
string::size_type pos = sMasterName.find("@");
if (pos != string::npos)
{
head.masterName = sMasterName.substr(0, pos);
head.tarsVersion = sMasterName.substr(pos+1);
}
map<string, string>::iterator it_vip;
it_vip = g_app.getVirtualMasterIp().find(getSlaveName(head.slaveName));
if( it_vip != g_app.getVirtualMasterIp().end())
{
head.masterIp = it_vip->second; //按 slaveName来匹配,填入假的主调ip,减小入库数据量
}
//如果不是info等级的日志级别,就别往里走了
ostringstream os;
if(LOG->isNeedLog(LocalRollLogger::INFO_LOG))
{
os.str("");
head.displaySimple(os);
body.displaySimple(os);
}
//三个数据都为0时不入库
if(body.count == 0 && body.execCount == 0 && body.timeoutCount == 0)
{
TLOGINFO(os.str()<<"|zero"<<endl);
continue;
}
int iAddHash = addHashMap(head,body); // *** 添加到哈希表中 ***
TLOGINFO(os.str()<<"|"<<iAddHash<<endl);
}
return 0;
} StatImp::addHashMap() 代码注释:
int StatImp::addHashMap(const StatMicMsgHead &head, const StatMicMsgBody &body )
{
size_t iIndex = _threadIndex;
//--- dump数据到文件 ---//
// 获取当前被选择的 buffer
int iBufferIndex = g_app.getSelectBufferIndex();
map<int,vector<int64_t> >& mBuffer = g_app.getBuffer();
map<int,vector<int64_t> >::iterator iter = mBuffer.find(iBufferIndex);
iter->second[iIndex] = TNOW; // 记录当前线程 addHashMap 的时间戳
// 上次切换缓冲区的时间大于数据库插入时间间隔时,切换读写缓冲区
// 切换读写缓冲区后,数据库插入线程将之前写缓冲区(现在是读缓冲)的数据插入数据库
// 并清空缓冲区
dump2file();
iBufferIndex = g_app.getSelectBufferIndex();
string sKey = head.slaveName;
sKey += head.masterName;
sKey += head.interfaceName;
sKey += head.masterIp;
sKey += head.slaveIp;
int iHashKey = _hashf(sKey) % g_app.getBuffNum();
StatHashMap *pHashMap = g_app.getHashMapBuff(iBufferIndex, iHashKey); // 返回当前的 Hash 表
//////////////////////////////////////////////////////////////////////////////////////
float rate = (pHashMap->getMapHead()._iUsedChunk) * 1.0/pHashMap->allBlockChunkCount(); // 碰撞率检测
if(rate >0.9)
{
TLOGERROR("StatImp::addHashMap hashmap will full|_iMemSize:" << pHashMap->getMapHead()._iMemSize << endl);
// FDLOG("HashMap")<<"StatImp::addHashMap hashmap will full|_iMemSize:" << pHashMap->getMapHead()._iMemSize << endl;
return -1;
}
int iRet = pHashMap->add(head, body);
if(iRet != 0)
{
TLOGDEBUG("StatImp::addHashMap set g_hashmap recourd erro|" << iRet << endl);
return iRet;
}
return iRet;
} StatImp::dump2file() 判断是否切换读写缓冲区,切换后更新切换时间戳,只有切换读写缓冲区之后,数据库才会执行插入操作,dump到文件。代码注释:
void StatImp::dump2file()
{
static string g_sDate; // 局部静态变量
static string g_sFlag;
static time_t g_tLastDumpTime = 0;
time_t tTimeNow = TC_TimeProvider::getInstance()->getNow();
time_t tTimeInterv = g_app.getInserInterv() *60;//second. 数据库插入时间
if(g_tLastDumpTime == 0) // 第一次 dump2file()
{
g_app.getTimeInfo(g_tLastDumpTime,g_sDate,g_sFlag); // 获取当前时间
}
if(tTimeNow - g_tLastDumpTime > tTimeInterv) // 双检锁
{
static TC_ThreadLock g_mutex;
TC_ThreadLock::Lock lock( g_mutex );
if(tTimeNow - g_tLastDumpTime > tTimeInterv)
{
g_app.getTimeInfo(g_tLastDumpTime,g_sDate,g_sFlag); // 更新缓冲区切换时间
int iSelectBuffer = g_app.getSelectBufferIndex();
iSelectBuffer = !iSelectBuffer;
g_app.setSelectBufferIndex(iSelectBuffer); // 切换缓冲区
TLOGDEBUG("StatImp::dump2file select buffer:" << iSelectBuffer << "|TimeInterv:" << tTimeInterv << "|now:" << tTimeNow << "|last:" << g_tLastDumpTime << endl);
// FDLOG("CountStat") << "stat ip:" << ServerConfig::LocalIp << "|StatImp::dump2file select buffer:" << iSelectBuffer << "|TimeInterv:" << tTimeInterv << "|now:" << tTimeNow << "|last:" << g_tLastDumpTime << endl;
}
}
} ReapSSDThread
ReapSSDThread 中有一组 ReapSSDProcThread 类成员 _runners , _runners 才是执行向数据库插入数据的线程。ReapSSDThread 每隔 REAP_INTERVAL 秒将双缓冲区 buffer 中的读 buffer 的内容读到数据库缓冲中,由 _runners 执行数据库插入操作。
ReapSSDThread::run() 部分代码注释:
void ReapSSDThread::run()
{
int iInsertDataNum = StatDbManager::getInstance()->getDbIpNum(); // DB IP数量
// 启动 _runners
for(int i = 0; i < iInsertDataNum; ++i)
{
ReapSSDProcThread *r = new ReapSSDProcThread(this);
r->start();
_runners.push_back(r);
}
string sDate,sTime;
int dbNumber = StatDbManager::getInstance()->getDbNumber();
string sRandOrder;
uint64_t iTotalNum = 0;
int iLastIndex = -1;
while (!_terminate)
{
try
{
//双buffer中一个buffer入库
int iBufferIndex = !(g_app.getSelectBufferIndex()); // 读buffer 的 index,_iSelectBuffer: 写入的buffer
int64_t iInterval = 3; // 单位:秒
// 1. iBufferIndex != iLastIndex 判断两次读的是否是同一个buffer,因为读完之后会清空 buffer
// 2. getSelectBuffer(iBufferIndex, iInterval) 判断 buffer[iBufferIndex] 的最后一次写入的时间间隔是否大于 iInterval
if(iBufferIndex != iLastIndex && g_app.getSelectBuffer(iBufferIndex, iInterval))
{
iLastIndex = iBufferIndex;
iTotalNum = 0;
vector<StatMsg*> vAllStatMsg;
for(int iStatIndex = 0; iStatIndex < dbNumber; ++iStatIndex)
{
vAllStatMsg.push_back(new StatMsg());
}
int64_t tBegin = TNOWMS;
getDataFromBuffer(iBufferIndex, vAllStatMsg, iTotalNum); // 从 buffer 中获取数据放入 vAllStatMsg 中
// 没有获取到数据
if(iTotalNum <= 0)
{
for(int iStatIndex = 0; iStatIndex < dbNumber; ++iStatIndex)
{
delete vAllStatMsg[iStatIndex]; // 资源释放
}
vAllStatMsg.clear();
}
else
{
string sFile="";
string sDate="";
string sFlag="";
time_t time=0;
g_app.getTimeInfo(time,sDate,sFlag);
//size_t iSize = vAllStatMsg.size();
QueueItem item;
int iInsertThreadIndex = 0;
sRandOrder = g_app.getRandOrder();
if (sRandOrder == "")
{
sRandOrder = "0";
}
map<string, vector<size_t> >& mIpHasDbInfo = StatDbManager::getInstance()->getIpHasDbInfo();
map<string, vector<size_t> >::iterator m_iter = mIpHasDbInfo.begin();
while(m_iter != mIpHasDbInfo.end())
{
vector<size_t> &vDb = m_iter->second;
for(size_t i = 0; i < vDb.size(); ++i)
{
int k = (i + TC_Common::strto<int>(sRandOrder)) % vDb.size();
item._index = vDb[k];
item._date = sDate;
item._tflag = sFlag;
item._statmsg = vAllStatMsg[item._index];
iInsertThreadIndex = StatDbManager::getInstance()->getDbToIpIndex(vDb[k]);
assert(iInsertThreadIndex >= 0);
_runners[iInsertThreadIndex]->put(item); // 存入数据库缓冲区 _queue
}
++m_iter;
}
if(_terminate)
{
break;
}
}
for(int k = 0; k < g_app.getBuffNum(); ++k) // 清空hashmap缓冲区
{
StatHashMap *pHashMap = g_app.getHashMapBuff(iBufferIndex, k);
pHashMap->clear();
}
}
}
catch(exception& ex)
{
TLOGERROR("ReapSSDThread::run exception:"<< ex.what() << endl);
}
catch(...)
{
TLOGERROR("ReapSSDThread::run ReapSSDThread unkonw exception catched" << endl);
}
TC_ThreadLock::Lock lock(*this);
timedWait(REAP_INTERVAL); // 等待 5s
}
// 终止线程,资源回收
StatDbManager::getInstance()->setITerminateFlag(true);
for(size_t i = 0; i < _runners.size(); ++i)
{
if(_runners[i]->isAlive())
{
_runners[i]->terminate();
_runners[i]->getThreadControl().join();
}
}
for(size_t i = 0; i < _runners.size(); ++i)
{
if(_runners[i])
{
delete _runners[i];
_runners[i] = NULL;
}
}
TLOGDEBUG("ReapSSDThread run terminate." << endl);
} ReapSSDProcThread::run() 部分代码注释:
void ReapSSDProcThread::run()
{
string sDate1("");
string sFlag1("");
string sDate2("");
string sFlag2("");
while (!_terminate)
{
try
{
sDate1 = "";
sFlag1 = "";
sDate2 = "";
sFlag2 = "";
QueueItem item;
if(pop(item)) // 获取 _queue 缓冲区的前 1000 条数据 存入 item 中
{
if(item._statmsg != NULL)
{
int64_t iBegin = TNOWMS;
int64_t iEnd = 0;
// 插入数据库
StatDbManager::getInstance()->insert2MultiDbs(item._index, *item._statmsg, item._date, item._tflag);
// 记录日志 ...
delete item._statmsg; // 释放资源
item._statmsg = NULL;
}
else
{
TLOGERROR("ReapSSDProcThread::run item._statmsg == NULL." << endl);
}
}
}
catch(exception& e)
{
TLOGERROR("ReapSSDProcThread::run exception:" << e.what() << endl);
// FDLOG("CountStat") << "ReapSSDProcThread::run exception:" << e.what() << endl;
}
}
}