#include <functional>
#include <list>
#include <vector>
#include <mutex>
#include <atomic>
#include <memory>
#include <thread>

#include <ev++.h>

#include "log.h"
#include "worker.h"
#include "message.h"
#include "card_base.h"
#include "card.h"
#include "zloop.h"
#include "area.h"
#include "clock.h"
#include "mine_business.h"
#include "bulletin_broad_show.h"
#include "event.h"

loop_thread::loop_thread ()
{
	m_thread.reset(new std::thread(std::bind(&loop_thread::run,this)));
}

void loop_thread::run()
{
	loop_init();
	log_info("worker_thread start....");
	while(!check_stop_flag())
	{
		try
		{
			ev::dynamic_loop::run(0);
		}
		catch(const std::exception&e)
		{
			log_error("捕获到异常:%s",e.what());
		}
		catch(...)
		{
			log_error("捕获到未知异常");
		}
	}
	log_info("worker_thread exit....");
}

void loop_thread::on_async(const std::list<task*>&task_list)
{
	for(task*t:task_list)
	{
		do_task(*t);
	}
}

void loop_thread::loop_init()
{

}

void loop_thread::join()
{
	m_thread->join();
}

void loop_thread::stop()
{
	async_stop();
}

loop_thread::~loop_thread()
{
}


struct hash_thread
{
	int m_num_thread=4;

	void set_num_thread(int num_thread)
	{
		m_num_thread=num_thread;
	}

	int hash_code(uint32_t card_id)
	{
		return card_id*2003%m_num_thread;
	}
};

struct timer_worker_thread: loop_thread
{
	void do_task_0001()
	{
		static int version = -1;
		zclock clock;

		int v = card_list::instance()->version();
		card_list_visit clv;
		if(v != version){
			version=v;
			clv._flag=true;
			mine_business::inst()->clear_vehicle();
		}
		card_list::instance()->accept(clv);
		bulletin_broad_show::inst()->run_bulletin_board();
		mine_business::inst()->run_business();
        event_list::instance()->load_his_data_from_db(false);

		log_info("timer_worker_thread use time:%ldus", clock.count_us());
	}

	void on_timeout()
	{
		do_task_0001();
	}

	std::unique_ptr<ev::timer> card_timer_1s;
	void loop_init()
	{
		card_timer_1s.reset(new ev::timer(*this));
		card_timer_1s->set<timer_worker_thread,&timer_worker_thread::on_timeout>(this);
		card_timer_1s->start(1,1);
	}

	void do_task(task&t)
	{
		t.destroy();
	}
};

std::unique_ptr<timer_worker_thread> m_timer_worker;

hash_thread g_hash;
struct worker_thread: loop_thread ,visitor<std::shared_ptr<card_location_base>>
{
	int  m_thread_id=0;
	int  m_card_list_version=-1;
	std::vector<std::shared_ptr<card_location_base>> m_local_card_list;
	worker_thread ()
	{
		m_local_card_list.reserve(128);
	}

	void set_thread_id(int id)
	{
		m_thread_id=id;
	}

	std::unique_ptr<ev::timer> card_timer_1s;
	void loop_init()
	{
		card_timer_1s.reset(new ev::timer(*this));
		card_timer_1s->set<worker_thread,&worker_thread::on_timeout>(this);
		card_timer_1s->start(1,1);
	}

	void update_local_cards()
	{
		int version=card_list::instance()->version();
		if(m_card_list_version!=version)
		{
			m_card_list_version=version;
			init_local_card_list();
		}
	}

	void on_timeout()
	{
		update_local_cards();
		for(auto&c:m_local_card_list)
		{
			c->on_timer();
		}
	}

	bool visit(std::shared_ptr<card_location_base> c)
	{
		if(g_hash.hash_code(c->m_id)==m_thread_id) //32bit id也可以
		{
			m_local_card_list.push_back(c);
		}

		return true;
	}

	void init_local_card_list()
	{
		m_local_card_list.clear();
		card_list::instance()->accept(*this);

		log_info("update local cards,count=%d",m_local_card_list.size());
	}

	void do_task(task&t)
	{
		switch(t.m_cmd_code)
		{
			case 0x843b://tof
			case 0x863b://tdoa
				log_info("card loc message%04X",t.m_cmd_code);
				card_list::instance()->on_message(this,t.body<message_locinfo>(),false);
				t.destroy();

				//card_message::on_loc_message(this,t.m_param1);
			break;
			case 0x853b://tof his
			case 0x873b://tdoa his
				log_info("site history message%04X",t.m_cmd_code);
				card_list::instance()->on_message(this,t.body<message_locinfo>(),true);
				//site_message::on_sync(this,t.m_param1);
				t.destroy();
			break;

			case 0x804c://ctrl site message
				log_info("ctrl site message%04X",t.m_cmd_code);
				t.destroy();
			break;

			case 0x10001://区域业务类型修改
			{
				update_local_cards();
				for(auto&c:m_local_card_list)
				{
					c->get_area_tool()->on_change_business(c,t);
				}

				auto&mcb=t.body<message_change_business>();
				std::shared_ptr<area> a=area_list::instance()->get(mcb.area_id);

				if(a && a->sub_frozen_count()==2)
				{
					a->set_business_list(std::move(mcb.new_list));
					a->sub_frozen_count();
				}

				if(mcb.ref_count.fetch_sub(1)==1)
				{
					t.destroy();
				}
                break;
			}
			case 0x10002://区域业务类型修改
            {
				auto&mcb=t.body<message_change_card_display>();
                auto c = card_list::instance()->get(mcb.m_card_id);
                c->m_display=mcb.m_display;
				t.destroy();
                break;
            }
		}
	}
};

struct worker_impl:worker
{
	std::vector<std::shared_ptr<worker_thread>> m_threads;
	std::atomic<int> m_init_flag{-2};
	virtual void stop()
	{

		if(m_timer_worker)
		{
			m_timer_worker->async_stop();
			m_timer_worker->join();
			m_timer_worker.reset(nullptr);
		}

		int exp=0;
		if(!m_init_flag.compare_exchange_strong (exp,1))
			return;

		for(auto&thr:m_threads)
			thr->stop();

		for(auto&thr:m_threads)
			thr->join();


	}

	worker_thread& hash(uint32_t i)
	{
		return *m_threads[g_hash.hash_code(i)];
	}

	virtual int  num_thread()
	{
		return std::thread::hardware_concurrency();
	}

	bool running()
	{
		return m_init_flag.load()==0;
	}

	virtual void request(task*t)
	{
		hash(t->m_hash_id).async_request(t);
	}

	virtual void broadcast(task*tk) 
	{
		for(auto&thr:m_threads)
			thr->async_request(tk);
	}

	void init(int num_thread)
	{
		int exp=-2;
		if(m_init_flag.compare_exchange_strong (exp,-1))
		{
			m_threads.resize(num_thread);
			for(int i=0;i<num_thread;i++)
			{
				m_threads[i].reset(new worker_thread());
				m_threads[i]->set_thread_id(i);
			}
			m_init_flag.store(0);
		}

		while(0!=m_init_flag.load())
			std::this_thread::yield();
	}
};

worker_impl _worker_impl;
worker*worker::instance()
{
	int num_thread=_worker_impl.num_thread();
	if(!_worker_impl.running())
	{
		log_info("worker thread count=%d",num_thread);
		g_hash.set_num_thread(num_thread);
		_worker_impl.init(num_thread);
		m_timer_worker.reset(new timer_worker_thread);
	}

	return &_worker_impl;
}