sync_manager.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. #include"sync_manager.h"
  2. #include <deque>
  3. #include <Eigen/Dense>
  4. #include <fstream>
  5. #include <iostream>
  6. #include <log.h>
  7. host_server::sync_manager::sync_manager()
  8. {
  9. init();
  10. }
  11. host_server::sync_manager::sync_manager(bool status)
  12. {
  13. init();
  14. m_log_status = status;
  15. }
  16. void host_server::sync_manager::init()
  17. {
  18. unordered_map<unsigned long long, position> a;
  19. ump_anchors.swap(a);
  20. unordered_map<unsigned long long, unordered_map<unsigned long long, double>> b;
  21. ump_distance.swap(b);
  22. m_log_status = false;
  23. }
  24. void host_server::sync_manager::analyze_sync_msg(sync_time_message& msg)
  25. {
  26. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  27. // 查找root节点的时间同步序号
  28. int idx = find_sync_time_msg(msg.get_root_id(), msg.get_sync_num());
  29. if( -1 == idx){
  30. // 没找到
  31. // 如果时间同步消息的版本数量超过最大限制,则删除最早添加的消息
  32. if(ump_sync_time_msg[msg.get_root_id()].size() >= MAX_SYNCTIME_NUM){
  33. // 删除第一个,可能有泄漏
  34. ump_sync_time_msg[msg.get_root_id()].pop_front();
  35. }
  36. // 构造此时间同步数据保存到队列中
  37. sync_time_msg_item it;
  38. it.sync_num = msg.get_sync_num();
  39. it.ump_sync_time_msg[msg.get_local_id()] = msg;
  40. ump_sync_time_msg[msg.get_root_id()].push_back(it);
  41. idx = ump_sync_time_msg[msg.get_root_id()].size() - 1;
  42. }else{
  43. ump_sync_time_msg[msg.get_root_id()][idx].ump_sync_time_msg[msg.get_local_id()] = msg;
  44. }
  45. // 如果历史同步队列中的时间同步数据大于指定容量,清除数据直到满足指定容量
  46. while(ump_history_sync[msg.get_root_id()].size() > MAX_SYNCTIME_NUM){
  47. ump_history_sync[msg.get_root_id()].pop_front();
  48. }
  49. // 更新时间同步并计算
  50. for(auto it : ump_sync_time_msg[msg.get_root_id()][idx].ump_sync_time_msg)
  51. {
  52. update_sync(msg.get_root_id(), idx, msg.get_sync_num(), it.first);
  53. }
  54. }
  55. bool host_server::sync_manager::update_sync(unsigned long long root_id_code, int idx, unsigned short sync_num, unsigned long long local_id_code)
  56. {
  57. if(-1 == idx){
  58. return false;
  59. }
  60. // 如果当前版本的时间同步消息未收到则返回false
  61. unordered_map<unsigned long long, sync_time_message>::iterator itSyncTime = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg.find(local_id_code);
  62. if(itSyncTime == ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg.end()){
  63. return false;
  64. }
  65. sync_time_message& msg = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg[local_id_code];
  66. // 如果当前节点为root,则返回true
  67. if(msg.get_sync_level() == 0){
  68. return true;
  69. }
  70. int idx_synctime = find_his_sync_time(root_id_code, sync_num);
  71. // 如果时间同步已经计算过,则返回true
  72. if(-1 != idx_synctime){
  73. unordered_map<unsigned long long, sync_time>::iterator itHistSync = ump_history_sync[root_id_code][idx_synctime].hist_sync.find(local_id_code);
  74. if(itHistSync != ump_history_sync[root_id_code][idx_synctime].hist_sync.end()){
  75. if(itHistSync->second.get_delay_time())
  76. return true;
  77. }
  78. }
  79. // 如果已经收到了上一级的同步消息
  80. if(ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg.count(msg.get_upper_id())){
  81. if(!update_sync(root_id_code, idx, sync_num, msg.get_upper_id())){
  82. return false;
  83. }
  84. sync_time_message &upperMsg = ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg[msg.get_upper_id()];
  85. sync_time* s = nullptr;
  86. for(auto it = ump_history_sync[root_id_code].rbegin(); it != ump_history_sync[root_id_code].rend(); ++it)
  87. {
  88. if(it->sync_num != msg.get_sync_num() && it->hist_sync.count(local_id_code))
  89. {
  90. s = &(it->hist_sync.find(local_id_code)->second);
  91. break;
  92. }
  93. }
  94. idx_synctime = find_his_sync_time(root_id_code, sync_num);
  95. if(-1 == idx_synctime){
  96. sync_time_item it;
  97. it.sync_num = msg.get_sync_num();
  98. it.hist_sync[local_id_code] = sync_time(msg, upperMsg, s);
  99. ump_history_sync[root_id_code].push_back(it);
  100. idx_synctime = ump_history_sync[root_id_code].size() - 1;
  101. }else{
  102. ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code] = sync_time(msg, upperMsg, s);
  103. }
  104. // 计算时间同步
  105. long long upperTimeDelay = 0;
  106. if(ump_sync_time_msg[root_id_code][idx].ump_sync_time_msg[msg.get_upper_id()].get_sync_level() != 0)
  107. {
  108. upperTimeDelay = ump_history_sync[root_id_code][idx_synctime].hist_sync[msg.get_upper_id()].get_delay_time();
  109. }
  110. long long sendTime = ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].get_send_time();
  111. long long receiveTime = ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].get_receive_time();
  112. long long timeDelay = receiveTime - sendTime - ump_distance[local_id_code][msg.get_upper_id()];
  113. timeDelay += upperTimeDelay;
  114. // 从不同的upper来的同步数据,跨周期判断可能不正确,将时间差控制在一个周期内
  115. while(timeDelay > TIME_MAX){
  116. timeDelay -= TIME_MAX;
  117. }
  118. while(timeDelay + TIME_MAX < 0 ){
  119. timeDelay += TIME_MAX;
  120. }
  121. ump_history_sync[root_id_code][idx_synctime].hist_sync[local_id_code].set_delay_time(timeDelay);
  122. if(m_log_status){
  123. }
  124. return true;
  125. }
  126. return false;
  127. }
  128. // 线性插值外插计算tt值
  129. unsigned long long host_server::sync_manager::cal_time_by_linear(tag_message& tag)
  130. {
  131. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  132. log_info("[tdoa] tdoa_sync begin calc");
  133. // 获取历史记录中与此tag最近的两条
  134. deque<sync_time> hisSync;
  135. unsigned long long rootIdCode = tag.m_sync_root_id;
  136. int i = 0;
  137. int idx = find_sync_time_msg(rootIdCode, tag.m_sync_num);
  138. if(-1 != idx){
  139. unordered_map<unsigned long long, sync_time_message>::iterator it = ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg.find(tag.m_local_id);
  140. if(it != ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg.end()){
  141. if(it->second.get_sync_level() == 0){
  142. return tag.m_receive_time;
  143. }
  144. }
  145. }
  146. int idx_sync = -1;
  147. while(hisSync.size() < 2 && i < MAX_CALCLINER_NUM)
  148. {
  149. auto syncNum = tag.m_sync_num - i;
  150. idx_sync = find_his_sync_time(rootIdCode, syncNum);
  151. if(-1 != idx_sync){
  152. if(ump_history_sync[rootIdCode][idx_sync].hist_sync.count(tag.m_local_id)){
  153. hisSync.push_front(ump_history_sync[rootIdCode][idx_sync].hist_sync[tag.m_local_id]);
  154. }
  155. }
  156. i++;
  157. }
  158. // 如果满足条件的历史记录不足两个则返回
  159. if(hisSync.size() < 2){
  160. log_info("[tdoa] tdoa_sync his_sync's size is less 2");
  161. return LLONG_MAX;
  162. }
  163. // 计算预估值
  164. long long y1(hisSync.at(0).get_receive_time() - hisSync.at(0).get_delay_time()),y2(hisSync.at(1).get_receive_time() - hisSync.at(1).get_delay_time());
  165. long long x1(hisSync.at(0).get_real_receive_time()), x2(hisSync.at(1).get_real_receive_time()), x3(tag.m_receive_time);
  166. unsigned long long res;
  167. if(x1 > x2)
  168. {
  169. x2 += TIME_MAX;
  170. }
  171. if(x2 > x3)
  172. {
  173. x3 += TIME_MAX;
  174. }
  175. if(y1 < 0){
  176. // 理论y值不能小于0
  177. y1 += TIME_MAX;
  178. }
  179. if(y2 < 0 ){
  180. y2 += TIME_MAX;
  181. }
  182. if(y1 > y2){
  183. y2 += TIME_MAX;
  184. }else if(y2-y1 > TIME_MAX){
  185. y2 -=TIME_MAX;
  186. }
  187. Eigen::Matrix3d a;
  188. a << x1 ,1 , 0,
  189. x2 ,1 , 0,
  190. x3, 1, -1;
  191. Eigen::Vector3d b(y1, y2, 0);
  192. Eigen::Vector3d X = a.colPivHouseholderQr().solve(b);
  193. res = X(2);
  194. res &= TIME_MAX;
  195. log_info("[tdoa] tdoa_sync_cal end, value=%lld", res);
  196. return res;
  197. }
  198. /*
  199. * 使用内插值计算tt值
  200. *
  201. * param
  202. * tag 卡的参数信息
  203. *
  204. * return
  205. * 返回插值后的tt值
  206. * */
  207. unsigned long long host_server::sync_manager::cal_time_by_inter_linear(tag_message& tag)
  208. {
  209. std::lock_guard<std::recursive_mutex> lg(m_mu_sync_time);
  210. deque<sync_time> hisSync;
  211. unsigned long long rootIdCode = tag.m_sync_root_id;
  212. int i = 0;
  213. int idx = find_sync_time_msg(rootIdCode, tag.m_sync_num);
  214. if(-1 != idx){
  215. unordered_map<unsigned long long, sync_time_message>::iterator it = ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg.find(tag.m_local_id);
  216. if(it != ump_sync_time_msg[rootIdCode][idx].ump_sync_time_msg.end()){
  217. if(it->second.get_sync_level() == 0){
  218. return sub(tag.m_receive_time, it->second.get_local_send_time());
  219. }
  220. }
  221. }
  222. int idx_sync = -1;
  223. long long y[2] = {0};
  224. long long x[2] = {0};
  225. unsigned int r[2] = {0};
  226. i = 0;
  227. r[0] = tag.m_local_id;
  228. while (i<2)
  229. {
  230. idx_sync = find_sync_time_msg(rootIdCode, tag.m_sync_num + i);
  231. if(-1 != idx_sync){
  232. if(ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg.count(tag.m_local_id)){
  233. x[i] = ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg[tag.m_local_id].get_local_receive_time();
  234. for (auto it = ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg.begin();it != ump_sync_time_msg[rootIdCode][idx_sync].ump_sync_time_msg.end();++it)
  235. {
  236. if (it->second.get_sync_level() == 0)
  237. {
  238. y[i] = it->second.get_local_send_time();
  239. r[1] = it->second.get_local_id();
  240. break;
  241. }
  242. }
  243. }
  244. }
  245. i++;
  246. }
  247. if (0 == x[0] || 0 == x[1] || 0 == y[0] || 0 == y[1])
  248. {
  249. return 0;
  250. }
  251. long long y1(y[0]),y2(y[1]);
  252. long long x1(x[0]),x2(x[1]);
  253. long long x3 = tag.m_receive_time;
  254. long long Tf = ump_distance[r[0]][r[1]];
  255. unsigned long long res = 0;
  256. //long double k = long double (sub(y2,y1)) / long double(sub(x2,x1));
  257. res = (long double)(sub(y2,y1)) / (long double)(sub(x2,x1)) * sub(x3,x1) + Tf;
  258. return res;
  259. }
  260. void host_server::sync_manager::update_distance(unsigned int local_id, uint8_t local_ant_num, unsigned int upper_id, uint8_t upper_ant_num, double d)
  261. {
  262. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  263. unsigned long long uId = sync_helper::parse_id(upper_id, upper_ant_num);
  264. if(ump_anchors.count(lId) == 0)
  265. {
  266. ump_anchors[lId] = position();
  267. }
  268. if(ump_anchors.count(uId) == 0)
  269. {
  270. ump_anchors[uId] = position();
  271. }
  272. ump_distance[lId][uId] = d;
  273. ump_distance[uId][lId] = d;
  274. // 删除所有版本的消息记录
  275. ump_history_sync.clear();
  276. ump_sync_time_msg.clear();
  277. }
  278. void host_server::sync_manager::update_anchor(unsigned int local_id, uint8_t local_ant_num, double x, double y, double z)
  279. {
  280. delete_anchor(local_id, local_ant_num);
  281. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  282. ump_anchors[lId] = position( x, y, z);
  283. }
  284. /*void host_server::sync_manager::update_anchor(unsigned int local_id, unsigned char local_ant_num, position p)
  285. {
  286. delete_anchor(local_id, local_ant_num);
  287. unsigned long long lid = sync_helper::parse_id(local_id, local_ant_num);
  288. ump_anchors[lid] = p;
  289. }*/
  290. void host_server::sync_manager::delete_anchor(unsigned int local_id, uint8_t local_ant_num)
  291. {
  292. unsigned long long lId = sync_helper::parse_id(local_id, local_ant_num);
  293. auto it = ump_anchors.find(lId);
  294. if(it == ump_anchors.end())
  295. return;
  296. // 删除anchor
  297. ump_anchors.erase(it);
  298. // 删除与此anchor相关的距离信息
  299. ump_distance.erase(ump_distance.find(lId));
  300. for(auto it = ump_distance.begin(); it != ump_distance.end(); ++it)
  301. {
  302. it->second.erase(it->second.find(lId));
  303. if(it->second.size() == 0)
  304. {
  305. it = ump_distance.erase(it);
  306. }
  307. }
  308. // 删除所有版本的消息记录
  309. ump_history_sync.clear();
  310. ump_sync_time_msg.clear();
  311. }
  312. host_server::sync_manager::~sync_manager()
  313. {
  314. }
  315. int host_server::sync_manager::find_sync_time_msg(unsigned long long root_id_code, unsigned short sync_num )
  316. {
  317. int idx = -1;
  318. for(int i = ump_sync_time_msg[root_id_code].size() - 1; i >= 0; --i){
  319. if(ump_sync_time_msg[root_id_code][i].sync_num == sync_num ){
  320. idx = i;
  321. break;
  322. }
  323. }
  324. return idx;
  325. }
  326. int host_server::sync_manager::find_his_sync_time(unsigned long long root_id_code, unsigned short sync_num )
  327. {
  328. int idx = -1;
  329. for(int i = ump_history_sync[root_id_code].size() - 1; i >= 0; --i){
  330. if(ump_history_sync[root_id_code][i].sync_num == sync_num ){
  331. idx = i;
  332. break;
  333. }
  334. }
  335. return idx;
  336. }