sync_manager.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #include"sync_manager.h"
  2. #include <deque>
  3. #include <Eigen/Dense>
  4. #include <fstream>
  5. #include <iostream>
  6. #include <log.h>
  7. std::unordered_map<int, host_server::sync_manager> host_server::sync_manager::map_id_manager;
  8. host_server::sync_manager::sync_manager()
  9. {
  10. init();
  11. }
  12. host_server::sync_manager::sync_manager(bool status)
  13. {
  14. init();
  15. m_log_status = status;
  16. }
  17. void host_server::sync_manager::init()
  18. {
  19. unordered_map<unsigned long long, position> a;
  20. ump_anchors.swap(a);
  21. unordered_map<unsigned long long, unordered_map<unsigned long long, double>> b;
  22. ump_distance.swap(b);
  23. m_log_status = false;
  24. }
  25. void host_server::sync_manager::analyze_sync_msg(sync_time_message& msg)
  26. {
  27. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  28. // 查找root节点的时间同步序号
  29. int idx = find_sync_time_msg(msg.get_root_id(), msg.get_sync_num());
  30. ssync_manager.sRootIdCode = msg.get_root_id();
  31. if( -1 == idx){
  32. // 没找到
  33. // 如果时间同步消息的版本数量超过最大限制,则删除最早添加的消息
  34. if(ump_sync_time_msg[msg.get_root_id()].size() >= MAX_SYNCTIME_NUM){
  35. // 删除第一个,可能有泄漏
  36. ump_sync_time_msg[msg.get_root_id()].pop_front();
  37. }
  38. // 构造此时间同步数据保存到队列中
  39. sync_time_msg_item it;
  40. it.sync_num = msg.get_sync_num();
  41. it.ump_sync_time_msg_item[msg.get_local_id()] = msg;
  42. ump_sync_time_msg[msg.get_root_id()].push_back(it);
  43. idx = ump_sync_time_msg[msg.get_root_id()].size() - 1;
  44. }else{
  45. ump_sync_time_msg[msg.get_root_id()][idx].ump_sync_time_msg_item[msg.get_local_id()] = msg;
  46. }
  47. // 如果历史同步队列中的时间同步数据大于指定容量,清除数据直到满足指定容量
  48. while(ump_history_sync[msg.get_root_id()].size() > MAX_SYNCTIME_NUM){
  49. ump_history_sync[msg.get_root_id()].pop_front();
  50. }
  51. // 更新时间同步并计算
  52. for(auto it : ump_sync_time_msg[msg.get_root_id()][idx].ump_sync_time_msg_item)
  53. {
  54. update_sync(msg.get_root_id(), idx, msg.get_sync_num(), it.first);
  55. }
  56. }
  57. bool host_server::sync_manager::update_sync(unsigned long long root_id_code, int idx, unsigned short sync_num, unsigned long long local_id_code)
  58. {
  59. if(-1 == idx){
  60. return false;
  61. }
  62. // 如果当前版本的时间同步消息未收到则返回false
  63. unordered_map<unsigned long long, sync_time_message>::iterator itSyncTime = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item.find(local_id_code);
  64. if(itSyncTime == ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item.end()){
  65. return false;
  66. }
  67. sync_time_message& msg = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item[local_id_code];
  68. // 如果当前节点为root,则返回true
  69. if(msg.get_sync_level() == 0){
  70. return true;
  71. }
  72. int idx_synctime = find_his_sync_time(root_id_code, sync_num);
  73. // 如果时间同步已经计算过,则返回true
  74. if(-1 != idx_synctime){
  75. unordered_map<unsigned long long, sync_time>::iterator itHistSync = ump_history_sync[root_id_code][idx_synctime].hist_sync.find(local_id_code);
  76. if(itHistSync != ump_history_sync[root_id_code][idx_synctime].hist_sync.end()){
  77. if (itHistSync->second.get_delay_time())
  78. {
  79. return true;
  80. }
  81. }
  82. }
  83. // 如果已经收到了上一级的同步消息
  84. if(ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item.count(msg.get_upper_id())){
  85. if(!update_sync(root_id_code, idx, sync_num, msg.get_upper_id())){
  86. return false;
  87. }
  88. sync_time_message &upperMsg = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item[msg.get_upper_id()];
  89. sync_time* s = nullptr;
  90. for(auto it = ump_history_sync[root_id_code].rbegin(); it != ump_history_sync[root_id_code].rend(); ++it)
  91. {
  92. if(it->sync_num != msg.get_sync_num() && it->hist_sync.count(local_id_code))
  93. {
  94. s = &(it->hist_sync.find(local_id_code)->second);
  95. break;
  96. }
  97. }
  98. idx_synctime = find_his_sync_time(root_id_code, sync_num);
  99. if(-1 == idx_synctime){
  100. sync_time_item it;
  101. it.sync_num = msg.get_sync_num();
  102. it.hist_sync[local_id_code] = sync_time(msg, upperMsg, s);
  103. ump_history_sync[root_id_code].push_back(it);
  104. idx_synctime = ump_history_sync[root_id_code].size() - 1;
  105. }else{
  106. ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code] = sync_time(msg, upperMsg, s);
  107. }
  108. // 计算时间同步
  109. long long upperTimeDelay = 0;
  110. if(ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg_item[msg.get_upper_id()].get_sync_level() != 0)
  111. {
  112. upperTimeDelay = ump_history_sync[root_id_code][idx_synctime].hist_sync[msg.get_upper_id()].get_delay_time();
  113. }
  114. long long sendTime = ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].get_send_time();
  115. long long receiveTime = ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].get_receive_time();
  116. long long timeDelay = receiveTime - sendTime - ump_distance[local_id_code][msg.get_upper_id()];
  117. timeDelay += upperTimeDelay;
  118. // 从不同的upper来的同步数据,跨周期判断可能不正确,将时间差控制在一个周期内
  119. while(timeDelay > TIME_MAX){
  120. timeDelay -= TIME_MAX;
  121. }
  122. while(timeDelay + TIME_MAX < 0 ){
  123. timeDelay += TIME_MAX;
  124. }
  125. ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].set_delay_time(timeDelay);
  126. if(m_log_status){
  127. }
  128. return true;
  129. }
  130. return false;
  131. }
  132. // 线性插值外插计算tt值
  133. unsigned long long host_server::sync_manager::cal_time_by_linear(tag_message& tag)
  134. {
  135. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  136. log_info("[tdoa] tdoa_sync begin calc");
  137. // 获取历史记录中与此tag最近的两条
  138. deque<sync_time> hisSync;
  139. unsigned long long rootIdCode = tag.m_sync_root_id;
  140. int i = 0;
  141. int idx = find_sync_time_msg(rootIdCode, tag.m_sync_num);
  142. if(-1 != idx){
  143. unordered_map<unsigned long long, sync_time_message>::iterator it = ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg_item.find(tag.m_local_id);
  144. if(it != ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg_item.end()){
  145. if(it->second.get_sync_level() == 0){
  146. return tag.m_receive_time;
  147. }
  148. }
  149. }
  150. int idx_sync = -1;
  151. while(hisSync.size() < 2 && i < MAX_CALCLINER_NUM)
  152. {
  153. auto syncNum = tag.m_sync_num - i;
  154. idx_sync = find_his_sync_time(rootIdCode, syncNum);
  155. if(-1 != idx_sync){
  156. if(ump_history_sync[rootIdCode][idx_sync].hist_sync.count(tag.m_local_id)){
  157. hisSync.push_front(ump_history_sync[rootIdCode][idx_sync].hist_sync[tag.m_local_id]);
  158. }
  159. }
  160. i++;
  161. }
  162. // 如果满足条件的历史记录不足两个则返回
  163. if(hisSync.size() < 2){
  164. log_info("[tdoa] tdoa_sync his_sync's size is less 2");
  165. return LLONG_MAX;
  166. }
  167. // 计算预估值
  168. long long y1(hisSync.at(0).get_receive_time() - hisSync.at(0).get_delay_time()),y2(hisSync.at(1).get_receive_time() - hisSync.at(1).get_delay_time());
  169. long long x1(hisSync.at(0).get_real_receive_time()), x2(hisSync.at(1).get_real_receive_time()), x3(tag.m_receive_time);
  170. unsigned long long res;
  171. if(x1 > x2)
  172. {
  173. x2 += TIME_MAX;
  174. }
  175. if(x2 > x3)
  176. {
  177. x3 += TIME_MAX;
  178. }
  179. if(y1 < 0){
  180. // 理论y值不能小于0
  181. y1 += TIME_MAX;
  182. }
  183. if(y2 < 0 ){
  184. y2 += TIME_MAX;
  185. }
  186. if(y1 > y2){
  187. y2 += TIME_MAX;
  188. }else if(y2-y1 > TIME_MAX){
  189. y2 -=TIME_MAX;
  190. }
  191. Eigen::Matrix3d a;
  192. a << x1 ,1 , 0,
  193. x2 ,1 , 0,
  194. x3, 1, -1;
  195. Eigen::Vector3d b(y1, y2, 0);
  196. Eigen::Vector3d X = a.colPivHouseholderQr().solve(b);
  197. res = X(2);
  198. res &= TIME_MAX;
  199. log_info("[tdoa] tdoa_sync_cal end, value=%lld", res);
  200. return res;
  201. }
  202. /*
  203. * 使用内插值计算tt值
  204. *
  205. * param
  206. * tag 卡的参数信息
  207. *
  208. * return
  209. * 返回插值后的tt值
  210. * */
  211. unsigned long long host_server::sync_manager::cal_time_by_inter_linear(tag_message& tag)
  212. {
  213. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  214. deque<sync_time> hisSync;
  215. unsigned long long rootIdCode = tag.m_sync_root_id;
  216. int i = 0;
  217. int idx = find_sync_time_msg(rootIdCode, tag.m_sync_num);
  218. if(-1 != idx){
  219. unordered_map<unsigned long long, sync_time_message>::iterator it = ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg_item.find(tag.m_local_id);
  220. if(it != ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg_item.end()){
  221. if(it->second.get_sync_level() == 0){
  222. return sub(tag.m_receive_time, it->second.get_local_send_time());
  223. }
  224. }
  225. }
  226. int idx_sync = -1;
  227. long long y[2] = {0};
  228. long long x[2] = {0};
  229. unsigned int r[2] = {0};
  230. i = 0;
  231. r[0] = tag.m_local_id;
  232. while (i<2)
  233. {
  234. idx_sync = find_sync_time_msg(rootIdCode, tag.m_sync_num + i);
  235. if(-1 != idx_sync){
  236. if(ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg_item.count(tag.m_local_id)){
  237. x[i] = ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg_item[tag.m_local_id].get_local_receive_time();
  238. for (auto it = ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg_item.begin();it != ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg_item.end();++it)
  239. {
  240. if (it->second.get_sync_level() == 0)
  241. {
  242. y[i] = it->second.get_local_send_time();
  243. r[1] = it->second.get_local_id();
  244. break;
  245. }
  246. }
  247. }
  248. }
  249. i++;
  250. }
  251. if (0 == x[0] || 0 == x[1] || 0 == y[0] || 0 == y[1])
  252. {
  253. return 0;
  254. }
  255. long long y1(y[0]),y2(y[1]);
  256. long long x1(x[0]),x2(x[1]);
  257. long long x3 = tag.m_receive_time;
  258. long long Tf = ump_distance[r[0]][r[1]];
  259. unsigned long long res = 0;
  260. //long double k = long double (sub(y2,y1)) / long double(sub(x2,x1));
  261. res = (long double)(sub(y2,y1)) / (long double)(sub(x2,x1)) * sub(x3,x1) + Tf;
  262. return res;
  263. }
  264. void host_server::sync_manager::update_distance(unsigned int local_id, uint8_t local_ant_num, unsigned int upper_id, uint8_t upper_ant_num, double d)
  265. {
  266. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  267. unsigned long long uId = sync_helper::parse_id(upper_id, upper_ant_num);
  268. if(ump_anchors.count(lId) == 0)
  269. {
  270. ump_anchors[lId] = position();
  271. }
  272. if(ump_anchors.count(uId) == 0)
  273. {
  274. ump_anchors[uId] = position();
  275. }
  276. ump_distance[lId][uId] = d;
  277. ump_distance[uId][lId] = d;
  278. // 删除所有版本的消息记录
  279. ump_history_sync.clear();
  280. ump_sync_time_msg.clear();
  281. }
  282. void host_server::sync_manager::update_anchor(unsigned int local_id, uint8_t local_ant_num, double x, double y, double z)
  283. {
  284. delete_anchor(local_id, local_ant_num);
  285. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  286. ump_anchors[lId] = position( x, y, z);
  287. }
  288. /*void host_server::sync_manager::update_anchor(unsigned int local_id, unsigned char local_ant_num, position p)
  289. {
  290. delete_anchor(local_id, local_ant_num);
  291. unsigned long long lid = sync_helper::parse_id(local_id, local_ant_num);
  292. ump_anchors[lid] = p;
  293. }*/
  294. void host_server::sync_manager::delete_anchor(unsigned int local_id, uint8_t local_ant_num)
  295. {
  296. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  297. auto it = ump_anchors.find(lId);
  298. if(it == ump_anchors.end())
  299. return;
  300. // 删除anchor
  301. ump_anchors.erase(it);
  302. // 删除与此anchor相关的距离信息
  303. ump_distance.erase(ump_distance.find(lId));
  304. for(auto it = ump_distance.begin(); it != ump_distance.end(); ++it)
  305. {
  306. it->second.erase(it->second.find(lId));
  307. if(it->second.size() == 0)
  308. {
  309. it = ump_distance.erase(it);
  310. }
  311. }
  312. // 删除所有版本的消息记录
  313. ump_history_sync.clear();
  314. ump_sync_time_msg.clear();
  315. }
  316. host_server::sync_manager::~sync_manager()
  317. {
  318. }
  319. int host_server::sync_manager::find_sync_time_msg(unsigned long long root_id_code, unsigned short sync_num )
  320. {
  321. int idx = -1;
  322. for(int i = ump_sync_time_msg[root_id_code].size() - 1; i >= 0; --i){
  323. if(ump_sync_time_msg[root_id_code][i].sync_num == sync_num ){
  324. idx = i;
  325. break;
  326. }
  327. }
  328. return idx;
  329. }
  330. int host_server::sync_manager::find_his_sync_time(unsigned long long root_id_code, unsigned short sync_num )
  331. {
  332. int idx = -1;
  333. for(int i = ump_history_sync[root_id_code].size() - 1; i >= 0; --i){
  334. if(ump_history_sync[root_id_code][i].sync_num == sync_num ){
  335. idx = i;
  336. break;
  337. }
  338. }
  339. return idx;
  340. }