// logic : there is a new socket, there is a new user. import Utils from './Utils.js' import Logger from './Logger.js' import Pusher from './Pusher.js' import Caller from './Caller.js' import XFiler from './XFile.js' import Meta from './Meta.js' import mysql from './MysqlWraper.js' // import UserList from './UserList.js' import Report from './Report.js' import RedisService from "./RedisService.js" // import metadata from './meta_definition.js' // import TestPusher from './TestPusher.js' // let ioClient = require('socket.io-client') let config = require('./config/appconfig.js') export default class User { constructor (io, userlist, metaStore, socket, cardStore, sqlResultStore, callStore, reportFile, workers) { console.log("构建User") if (socket) { console.log("User有Socket") this.userlist = userlist this.socket = null this.session = null this.io = io this.bind(socket) this.name = null // user's name this.type = null // collector, monitor this.workers = workers this.meta = new Meta(reportFile, workers) this.filer = new XFiler() this.metaStore = metaStore this.cardStore = cardStore this.callSotre = callStore this.reportFile = reportFile this.report = new Report(this, metaStore, sqlResultStore) } else { console.warn('无可用连接!') } } // 监控端登录时,PUSH 采集服务器的状态 sendCollectorStatus () { let collector = this.userlist.get(config.COLLECTOR) let status = collector && collector.socket && collector.socket.auth ? 'online' : 'offline' let message = { cmd: 'collector_status', data: { status: status, time: new Date() } } this.socket.emit('PUSH', message) } // 采集服务器状态变化时,广播状态 // 注意,采集server logout 的时候,不广播,直接通过 socket 的 disconncet 事件广播 broadcastCollectorStatus (status) { let message = { cmd: 'collector_status', data: { status: status, time: new Date() } } this.io.to(config.MONITOR).emit('PUSH', message) } sendThreeRate (data) { console.log('data----------------------', data) } registerEventHandler (socket) { if (!socket) { console.warn('注册事件处理器失败:没有可用的网络连接!') return } console.log("Socket 事件监听~") socket.on('disconnect', (req) => { console.log(`USER ${this.name} disconnected.`) if (this.name === config.COLLECTOR) { this.broadcastCollectorStatus('offline') } }) socket.on('USER', (req, callback) => { console.log(`Got USER message : \n\t${JSON.stringify(req)}`) req = Utils.toJson(req) if (!req) { console.warn('Invalid request.', req) return } switch (req.cmd) { case 'login': console.log("&&&&&&&&^^^^^^^^^^^^^^^^^^^&&&&&&&&&&&&&&&") this.login(req, callback) break case 'logout': this.logout(req, callback) break case 'standby': this.standby(req, callback) break case 'modify': this.modify(req, callback) break default: console.warn(`未知的 USER 请求:${req.cmd}`) break } }) socket.on('FILE', (req) => { console.log("444444444444444444") console.log(`Got FILE message : \n\t${JSON.stringify(req)}`) req = Utils.toJson(req) this.filer.dispatch(socket, req) }) socket.on('META', (req) => { // console.log(`Got META message : \n\t${JSON.stringify(req)}`) let sql = req.data.sql let bUpdateSettings = false //设置此字段为超速,server通知采集应用新的配置信息 if (sql.indexOf('dat_setting') != -1 && sql.indexOf('over_speed') != -1) { bUpdateSettings = true } if (sql.indexOf('dat_map_zhurong') != -1 && sql.indexOf('solution') != -1) { bUpdateSettings = true } req = Utils.toJson(req) if (!req) { console.warn('Invalid request.', req) return } if (socket.auth) { this.meta.dispatch(this.socket, req) if (bUpdateSettings) { let message = { cmd: 'setting', data: { file_path: 'fileURI' } } socket.server.sockets.emit('CALL', message) // 向所有用户广播,包括自己 } } else { this.notLogin('META') } }) socket.on('REPT', (req, callback) => { console.log("########REPT########") console.log(`Got META message : \n\t${JSON.stringify(req)}`) req = Utils.toJson(req) if (!req) { console.warn('Invalid request.', req) return } if (socket.auth) { this.report.dispatch(this.socket, req, callback) } else { this.notLogin('REPT') } }) socket.on('CONTROLLIGHT', (req, callback) => { req = Utils.toJson(req) let msg = JSON.stringify(req) console.log("########CONTROLLIGHT########"+msg) }) // 客户端发往采集 Server 的消息,Web Server 中转(发送到 room : config.COLLECTOR) socket.on('CALL', (req) => { req = Utils.toJson(req) console.log(JSON.stringify(req)); if (!req) { console.warn('Invalid request.', req) return } if (socket.auth) { if (req.cmd === 'helpme_done' || req.cmd === 'req_all_data' || req.cmd === 'gas_done') { // 呼救处理消息通过 CALL 从客户端发送过来,通过 PUSH 广播给其他客户端 socket.broadcast.emit('PUSH', req) socket.emit('PUSH', req) // response to the sender if (req.cmd === 'helpme_done') { Caller.call(socket, req) } if (req.cmd === 'helpme_done' || req.cmd === 'req_all_data') { Logger.log2db(socket, 2, `${req.data.user_id}解除${req.data.id}的呼救`) } // socket.to(config.MONITOR).emit('PUSH', req) } else { if (req.cmd === 'clear_card') { // 客户端手动升井 console.log("手动升井") this.cardStore.setHandupdatescards(req) this.meta.dispatch(this.socket, req) } Caller.call(socket, req) } } else { this.notLogin('CALL') } }) socket.on('ALARM', (req) => { // console.log(`Got ALARM message : \n\t${JSON.stringify(req)}`) req = Utils.toJson(req) if (!req) { console.warn('Invalid request.', req) return } if (socket.auth) { if (req.cmd === 'alarm_done') { // 告警处理消息通过 ALARM 从客户端发送过来,通过 PUSH 广播给其他客户端 socket.broadcast.emit('PUSH', req) socket.emit('PUSH', req) // response to the sender // socket.to(config.MONITOR).emit('PUSH', req) } else { Caller.call(socket, req) } } else { this.notLogin('ALARM') } }) // 采集 Server 发往客户端的消息,Web Server 中转(发送到 room : config.MONITOR) socket.on('PUSH', (req) => { console.log(`Got PUSH message : \n\t${JSON.stringify(req)}`) let self = this req = Utils.toJson(req) if (!req) { console.warn('Invalid request.') return } let datas = Utils.toJson(req.data) let cmd = req.cmd this.storePosmapData(datas, cmd) if (socket.auth) { //TODO 测试 // cmd = "device_state" // if(cmd == "device_state"){ // console.log("User.js====PUSH====cmd====device_state") // console.log(datas) // let testData = { // "cmd":"device_state", // "data":datas, // "version":"1.0.0.4" // } // let msg = JSON.stringify(testData) // console.log(msg) // Pusher.push(socket, msg, this.name) // this.socket.emit('PUSH', msg) // // this.socket.emit('PUSH', msg) // // socket.emit('PUSH', msg) // } // TODO 后续打开 加上device_state全局广播逻辑 // req.data = JSON.stringify(datas) if (cmd === 'device_state' || cmd === 'light_state' || cmd === 'tof_data') { // console.log("User.js====PUSH====cmd====device_state") let msg = req.data console.log(req) this.socket.emit('PUSH', req) } if (cmd === 'load_history_completed') { console.log(req) this.socket.emit('PUSH', req) } Pusher.push(socket, req) if (cmd === 'up_mine' || cmd === 'pos_map' || cmd === 'resp_all_data') { let msg = { cmd: 'nosignal_staffs', data: { handuping: this.cardStore.handUpdatescards, nosignal: this.cardStore.nosignalscars } } Pusher.push(socket, msg, this.name) } if (cmd === 'alarm') { console.log("进来了------"+datas); this.startGasCall(socket, datas) } if (cmd === 'pos_map') { console.log(req) this.workers.send({ cmd: cmd, data: self.cardStore.sdiscards ? Array.from(self.cardStore.sdiscards.values()).length : 0 }) this.callBreatheart() // 向采集发送心跳 } } else { this.notLogin('PUSH') } }) // 屏蔽掉测试入口 // // 采集 Server 发往客户端的消息,Web Server 中转(发送到 room : config.MONITOR) // socket.on('TEST', (req) => { // // console.log(`Got TEST message : \n\t${JSON.stringify(req)}`) // req = Utils.toJson(req) // if (!req) { // console.warn('Invalid request.', req) // return // } // switch (req.cmd) { // case 'start': // this.testPusher = new TestPusher('../tmp/testdata/push.json', 980, socket) // this.testPusher.start() // break // case 'stop': // if (this.testPusher) { // this.testPusher.stop() // } // break // default: // console.warn(`未知的 TEST 请求:${req.cmd}`) // break // } // }) socket.on('TIME', (req) => { let now = new Date() let hour = now.getHours() < 10 ? '0' + now.getHours() : now.getHours() let minutes = now.getMinutes() < 10 ? '0' + now.getMinutes() : now.getMinutes() let seconds = now.getSeconds() < 10 ? '0' + now.getSeconds() : now.getSeconds() let servertime = hour + ':' + minutes + ':' + seconds let msg = { cmd: 'time', data: { now: servertime } } this.socket.emit('PUSH', msg) }) // socket.on('ALL', (msg) => { // this.requestAllCardPositions(msg) // }) // 采集 Server 发往客户端的消息,Web Server 中转(发送到 room : config.MONITOR) socket.on('HELP', (req) => { // console.log(`Got HELP message : \n\t${JSON.stringify(req)}`) req = Utils.toJson(req) for (let i = 0; i < req.length; i++) { let users = this.userlist.getList() for (var [key, value] of users) { let name = key let type = value.type let values = Number(req[i].event_id) + ',' + '"' + name + '"' + ',' + '"' + type + '"' let sql = `INSERT into his_help_event_user VALUES (${values})` // let sql = `INSERT into his_help_event_user VALUES (1,"lly","lly")` // let rows = null try { mysql.query(sql) } catch (err) { console.error(`更新DB失败。 \n\t ${err}`) } Pusher.push(socket, req) } } // } // Pusher.push(socket, req) // } else { // let resMsg = { // code: -1, // msg: `用户 ${this.name} 尚未登录!` // } // socket.emit('USER', resMsg) // console.warn(resMsg.msg) // } }) // 登录后第三方java服务 socket.on('JAVA_SOCKET_LOGINED', (req) => { console.log("JAVA_SOCKET_TEST") let reqData = JSON.parse(req) console.log("||||||||||||||||||||||||") console.log(reqData.data) // this.cardStore.setHandupdatescards(reqData) // this.meta.dispatch(socket, reqData) }) } storePosmapData (datas, cmd) { let data = null if (cmd === 'up_mine') { this.cardStore.deleteNosignalCards(datas) } else if (cmd === 'pos_map') { data = datas this.cardStore.cardMove(datas) } else if (cmd === 'resp_all_data') { for (let i = 0; i < datas.length; i++) { let row = datas[i] if (row.cmd === 'pos_map') { data = row.data this.cardStore.cardMove(data) } } } } async startGasCall (socket, rows) { for (let i = 0; i < rows.length; i++) { let row = rows[i] console.log(row); let type = row.type_id type = parseInt(type, 10) if (type !== 34 && type !== 35) return; let msg = null let time = Number(new Date().getTime()) let objID = null let objID2 = null if(type == 41 && row.obj_id.indexOf(":")!=-1){ var arr = row.obj_id.split(":") objID = arr[0] objID2 = arr[1] }else{ objID = Number(row.obj_id) } // if (type === 35) { // 呼叫司机 if (row.start_time != 0) { if (this.callSotre.calling.get(`${type}-${objID}`)) return let rows = null try { let sql = type === 35 ? `select card_id from dat_sensor_driver_map dsdm, dat_staff_extend dse where dsdm.staff_id = dse.staff_id and sensor_id = ${objID};` : `select reader_id from dat_sensor_reader_map where sensor_id = ${objID}` rows = await mysql.query(sql) if (rows.length <= 0) return if (!this.callSotre.calling.get(`${type}-${objID}`)) { let ret = new Map() this.callSotre.calling.set(`${type}-${objID}`, ret) } let calling = this.callSotre.calling.get(`${type}-${objID}`) for (let i = 0; i < rows.length; i++) { let callID = type === 35 ? rows[i].card_id : rows[i].reader_id type === 35 ? calling.set(callID, { cardid: callID, cardtype: 1 }) : calling.set(callID, { stationid: callID }) } let callCards = Array.from(calling.values()) msg = { cmd: 'call_card_req', data: { call_type_id: 1, // 全员呼叫:0 定员呼叫:1 call_time_out: 5, // 呼叫时长 call_level_id: 2, // 呼叫类型 一般呼叫:1 紧急呼叫:2 user_name: 'systrm', // 呼叫人 call_time: time // 呼叫时间戳 } } if (type === 35) { msg.data['stations'] = [{ stationid: 0 }] msg.data['cards'] = callCards } else { msg.data['stations'] = callCards msg.data['cards'] = [] } } catch (err) { console.error(`查询DB失败。 \n\t ${err}`) return } } else if (row.start_time === 0) { let callCards = this.callSotre.calling.get(`${type}-${objID}`) && Array.from(this.callSotre.calling.get(`${type}-${objID}`).values()) if (callCards && callCards.length > 0) { msg = { cmd: 'call_card_cancel_req', data: { call_type_id: 1, // 全员/定员 user_name: 'systrm', // 取消人 call_time: time // 时间戳 } } if (type === 35) { msg.data['stations'] = [{ stationid: 0 }] msg.data['cards'] = callCards } else { msg.data['stations'] = callCards msg.data['cards'] = [] } } this.callSotre.calling.delete(`${type}-${objID}`) } Caller.call(socket, msg) } } notLogin (cmd) { let res = { code: -100, // -100, 表示尚未登录 msg: `${cmd} : 用户 ${this.name} 尚未登录!`, data: { username: this.name } } // info the client this.socket.emit('USER', res) console.warn(res.msg) } doCallBack (fn, msg, remark) { if (fn && typeof fn === 'function') { fn(msg) console.debug(`${remark} : callback is done. callback=${fn}, msg=${msg}`) } else { console.warn(`${remark} : callback is invalid. callback=${fn}, msg=${msg}`) } } /** * login processor * * @method login * * @param {[type]} req [login message] * @param {Function} callback [callback the client's processor] * */ async login (req, callback) { console.log("用户登录:") console.log(req) console.log(req.data.user_name) let resMsg = null let userName = req.data.user_name let userPass = req.data.user_pass // userPass = Utils.sha1(userPass) let sql = `select user_id, dept_id, role_id, access_id, obj_range from dat_user where user_id="${userName}" and pwd="${userPass}"` // console.log('sql', sql) let rows = null try { console.log('Going to do login-check on DB, please wait... ', userName) rows = await mysql.query(sql) // console.log('rows-----------------', rows) console.log('Login-check on DB done. ', userName) // let socket = ioClient.connect('http://127.0.0.1:3000') // socket.emit('new user login', userName) } catch (err) { console.error(`查询DB失败。 \n\t ${err}`) resMsg = { code: -1, msg: '服务器错误,请联系系统管理员!', data: { name: userName } } this.doCallBack(callback, resMsg, 'User.login') return } if (rows && rows.length > 0) { // loged in this.socket.auth = true this.session.user = { // name: rows[0].user_id, name: userName, deptID: rows[0].dept_id, roleID: rows[0].role_id, accessID: rows[0].access_id, objRange: rows[0].obj_range, ip: this.socket.request.connection.remoteAddress || this.socket.request.connection.localAddress || this.socket.handshake.address } this.session.save() // save the session info to sessionStore // TODO 后续需要加密存储 console.log("存入到Redis") let redisClient = RedisService.getRedisConnet() let userMsg = { user_name: userName, user_pwd: userPass } redisClient.set(userName, JSON.stringify(userMsg)) this.initContext(userName, req) Logger.log2db(this.socket, 0, '登录成功!') resMsg = { code: 0, msg: '', data: { name: userName, roleID: rows[0].role_id, deptID: rows[0].dept_id, accessID: rows[0].access_id, objRange: rows[0].obj_range, sid: this.socket.handshake.sessionID, ip: this.socket.request.connection.remoteAddress || this.socket.request.connection.localAddress || this.socket.handshake.address } } // info all connections if (this.name === config.COLLECTOR) { this.broadcastCollectorStatus('online') } else { // 只发给刚登录的用户 this.sendCollectorStatus() } } else { console.log('ERROR: 用户名或密码错误: ' + this.name) resMsg = { code: -1, msg: '用户名或密码错误,请确认后再试。' } } this.doCallBack(callback, resMsg, 'User.login') } /** * 退出登录态 * socket 退出对应的房间。 * 注意,这时 client / browser 与 server 之间的 socket 并没有断开。 * * @method doLogout * * @param {[type]} socket [description] * @param {[type]} event_tag [description] * @param {[type]} req [description] * * @return {[type]} [description] */ logout (req, callback) { // let resMsg = null let userInfo = this.session.user if (userInfo) { let userName = userInfo.name Logger.log2db(this.socket, 1, '退出成功!') this.clearContext(userName) delete this.socket.handshake.session.user this.socket.auth = false if (this.name === config.COLLECTOR) { this.broadcastCollectorStatus('offline') } // logout no need to inform the client // let msg = `${userName} loged out.` // resMsg = { // code: 0, // msg: msg // } // this.doCallBack(callback, resMsg, 'User.logout') this.name = null } } standby (req, callback) { let userName = req.data.username let resMsg = null if (req.data.op === 'enter') { this.socket.leave(config.MONITOR) console.log(`>> User ${userName} leave ${config.MONITOR}`) this.socket.join(config.STANDBY) console.log(`>> User ${userName} enter ${config.STANDBY}`) resMsg = { code: 0, op: req.data.op } } else if (req.data.op === 'leave') { this.socket.leave(config.STANDBY) console.log(`>> User ${userName} leave ${config.STANDBY}`) this.socket.join(config.MONITOR) console.log(`>> User ${userName} enter ${config.MONITOR}`) resMsg = { code: 0, op: req.data.op } } else { resMsg = { code: -1, op: req.data.op } console.warn('UNKNOWN standby command : ', req.cmd) } this.doCallBack(callback, resMsg, 'User.standby') } async modify (req, callback) { let resMsg = null let username = req.data.username let oldpwd = req.data.oldpwd // oldpwd = Utils.sha1(oldpwd) let newpwd = req.data.newpwd // newpwd = Utils.sha1(newpwd) let sql = `select user_id from dat_user where user_id="${username}" and pwd="${oldpwd}"` let rows = null try { rows = await mysql.query(sql) } catch (err) { console.error(`查询DB失败。 \n\t ${err}`) resMsg = { code: -1, msg: '服务器错误,请联系系统管理员!', data: { name: username } } this.doCallBack(callback, resMsg, 'User.modify') return } // console.log(`Modify password : \n sql : ${sql} \n rows : `, rows) if (rows && rows.length > 0) { // loged in sql = `update dat_user set pwd="${newpwd}" where user_id="${username}"` // execute update on db rows = null try { rows = await mysql.query(sql) } catch (err) { console.error(`更新数据库失败 : \n\t SQL : ${sql} \n\t ${err}`) resMsg = { code: -1, msg: '更新数据库失败', cmd: req.cmd } this.doCallBack(callback, resMsg, 'User.modify') return } // 更新 DB 成功 resMsg = { code: 0, msg: '' } Logger.log2db(this.socket, 3, '修改密码成功!') } else { resMsg = { code: -1, msg: '用户名或密码错误,请确认后再试。' } } this.doCallBack(callback, resMsg, 'User.modify') } bind (socket) { console.log("Bind") this.socket = socket this.socket.auth = true this.session = socket.handshake.session this.registerEventHandler(this.socket) var query = socket.request._query; console.log(query) let isDefaultLogin = query.isDefaultLogin; console.log("isDefaultLogin=====" + isDefaultLogin) console.log("***********") if(isDefaultLogin == 'yes') { console.log("/////////静默登录///////////") let req = { cmd: "login", data: { "user_name": "admin", "user_pass": "xcjk@2020#123" } } this.login(req, function() { console.log("静默登录成功") }); } } initContext (userName, req) { this.name = userName // console.log('INIT user context. userName = ', userName) if (userName === config.COLLECTOR) { this.type = config.COLLECTOR this.socket.join(config.COLLECTOR) console.log(`>> User ${userName} enter ${config.COLLECTOR}`) Logger.log2db(this.socket, 3, `User ${userName} enter ${config.COLLECTOR}`) } else { this.type = config.MONITOR this.socket.join(config.MONITOR) console.log(`>> User ${userName} enter ${config.MONITOR}`) // if a client reconnect, it need to update the client's local meta // this.meta.dispatch(this.socket, { cmd: 'meta_definition' }) this.meta.sendMetaDefinition(this.socket) // this.meta.dispatch(this.socket, { cmd: 'card_definition' }) // move to client // send all meta data and all init card data one by one // let promises = this.meta.sendAllMetaData(this.socket) // 发送meta_definition中的所有数据 let promises = this.meta.sendDataTable(this.socket) // 发送meta_dat中的数据,基础表更新或删除,每次登陆时,都先发送到客户端 Promise.all(promises).then(() => { console.log(`>>>> Send all meta data DONE for user ${this.name}.`) this.requestAllCardPositions() }).catch((err) => { console.log(`>>>> Send all meta data FAILED for user ${this.name}.\n`, err) }) } this.userlist.add(this) // save socket for later usage } clearContext (userName) { // leave room if (userName === config.COLLECTOR) { this.socket.leave(config.COLLECTOR) console.log(`<< User ${userName} left ${config.COLLECTOR}`) Logger.log2db(this.socket, 3, `User ${userName} enter ${config.COLLECTOR}`) } else { this.socket.leave(config.MONITOR) console.log(`<< User ${userName} left ${config.MONITOR}`) } this.userlist.remove(this) } callBreatheart () { let reqcall = { cmd: 'beatheart', data: {} } this.io.emit('CALL', reqcall) // Logger.log2db(this.io, 3, `CALL ${reqcall.cmd}成功`) } /** * 用户登录后,获取当前井下所有卡的位置信息 * * @method requestAllCardPositions * * @param {[type]} socket [description] * * @return {[type]} [description] */ requestAllCardPositions () { console.log(`Going to send all cards' init position for user ${this.name}.`) let collector = this.userlist.get(config.COLLECTOR) let collectorSocket = collector ? collector.socket : null if (!collectorSocket) { console.warn('当前没有可用的采集服务器,无法获取现场实时动态。') return } let message = { cmd: 'req_all_data', data: '0', version: '1.0.0.2' } let self = this // console.log('Going to request all data by user ', this.name, message) collectorSocket.emit('CALL', message, (data) => { console.log('Got collector\'s response on req_all_data: ', data) if (data) { let res = Utils.toJson(data) Pusher.push(self.socket, res) // self.socket.emit('PUSH', res) // 将应答结果以 PUSH 的方式发送给客户端(socket) . } }) } }