文章标题 原创 翻译 转载 文章内容 目前场景是大概有十几万产品,每个产品用一个ID号表示,它有很多个属性,收到消息后主要是对产品进行增、删、改操作。 服务端有多个线程,收到一个消息后就分配一个线程去处理,问题是当某个产品ID同时被多个线程操作的时候中间会出现数据错乱的问题。由于是老系统处理一个消息时中间涉及到多个操作,所以操作与操作之间不是线程安全的。 方法一是,串行执行相同的任务,所以应该将相同产品ID的消息放到同一个线程中处理,不同产品ID可以放到其他线程处理。 分配n个Worker,每个Worker一个线程一个队列按入队顺序执行。当收到一个消息时获取这个消息里面的产品ID,遍历所有Worker,如果发现某个Worker里有正在处理或者待处理的ID就放到这个Worker里,没有发现就放到当前Worker中,使用index递增来均匀分配到每个Worker中。这里有个问题是如果某个消息是批量处理多个产品,这种方法是行不通的。 方法二是,原有的逻辑不变,在收到消息时上锁,在消息处理完成后解锁,当然这里并不是锁所有消息那样服务端性能太低了,相当于单线程在处理。这个方法是只锁产品,相同的产品只允许一个线程处理,由于产品的基数比较大所以上锁的概率就比较小了。这个方法处理一个消息里面的多个产品也是可以的,只是批量过多的话性能会降低。下面代码是单个上锁,可以改成批量的。 方法一: ``` #pragma once #include <thread> #include <mutex> #include <condition_variable> #include <atomic> #include <unordered_map> #include <queue> #include <vector> #include <functional> #include <memory> class TaskData; class Worker; typedef std::function<void(const Message* msg, Message& reply)> TaskCallback; typedef std::shared_ptr<TaskData> TaskDataPtr; typedef std::shared_ptr<Worker> WorkerPtr; class TaskData { public: TaskData(){} TaskData(const TaskCallback &_handleMsg, const TaskCallback &_replyMsg, const Message& _msg, Message& _reply) : handleMsg(_handleMsg), replyMsg(_replyMsg), msg(_msg), reply(_reply) {} std::string id; TaskCallback handleMsg; TaskCallback replyMsg; Message msg; Message reply; }; class Worker { public: Worker() { thread_.reset(new std::thread(std::bind(&Worker::run, this))); } bool tryExec(const std::string &id, const TaskDataPtr &data) { if (counterExistInc(id)) { data->id = id; std::unique_lock<std::mutex> lock(taskMutex_); taskQueue_.push(data); cond_.notify_one(); return true; } return false; } void exec(const std::string &id, const TaskDataPtr &data) { counterInc(id); data->id = id; std::unique_lock<std::mutex> lock(taskMutex_); taskQueue_.push(data); cond_.notify_one(); } private: Worker(const Worker&); Worker& operator=(const Worker&); bool counterExistInc(const std::string &id) { std::unique_lock<std::mutex> lock(counterMutex_); if (idCounter_[id] > 0) { ++idCounter_[id]; return true; } return false; } void counterInc(const std::string &id) { std::unique_lock<std::mutex> lock(counterMutex_); ++idCounter_[id]; } void counterDec(const std::string &id) { std::unique_lock<std::mutex> lock(counterMutex_); if (--idCounter_[id] <= 0) { idCounter_.erase(id); } } void run() { while (1) { TaskDataPtr task; { std::unique_lock<std::mutex> lock(taskMutex_); while (taskQueue_.empty()) { cond_.wait(lock); } task = taskQueue_.front(); taskQueue_.pop(); } if (task) { task->handleMsg(&task->msg, task->reply); counterDec(task->id); task->replyMsg(&task->msg, task->reply); } } } private: std::mutex taskMutex_; std::mutex counterMutex_; std::condition_variable cond_; std::unique_ptr<std::thread> thread_; std::unordered_map<std::string, int> idCounter_; std::queue<TaskDataPtr> taskQueue_; }; class SerialTask { public: static SerialTask* instance() { static SerialTask s_inst; return &s_inst; } void init(int workerCount) { if (count_ > 0) { return; } count_ = workerCount; for (int i = 0; i < workerCount; i++) { workers_.push_back(std::make_shared<Worker>()); } } bool tryExec(const TaskDataPtr &data) { if (data->id.empty() || !data->handleMsg || !data->replyMsg) { return false; } std::string id = data->id; std::vector<WorkerPtr>::iterator iter = workers_.begin(); for (; iter != workers_.end(); ++iter) { WorkerPtr &worker = *iter; if (worker->tryExec(id, data)) { return true; } } workers_[index_++ % count_]->exec(id, data); return true; } private: SerialTask() : index_(0), count_(0) {} private: std::vector<WorkerPtr> workers_; std::atomic<int> index_; int count_; }; ``` 方法二: ``` // id_guard.h #pragma once #include <unordered_set> #include <mutex> #include <condition_variable> class IDGuard { public: IDGuard(std::mutex &mutex, const std::string &id); ~IDGuard(); private: bool isProcessing(const std::string &id); void enter(const std::string &id); void leave(const std::string &id); private: std::mutex& mutex_; std::condition_variable cond_; std::unordered_set<std::string> cache_; std::string id_; }; // id_guard.cpp #include "stdafx.h" #include "id_guard.h" IDGuard::IDGuard(std::mutex &mutex, const std::string &id) : mutex_(mutex), id_(id) { { std::unique_lock<std::mutex> lock(mutex_); while (isProcessing(id_)) { cond_.wait(lock); } } enter(id_); } IDGuard::~IDGuard() { leave(id_); } bool IDGuard::isProcessing(const std::string &id) { return cache_.find(id) != cache_.end(); } void IDGuard::enter(const std::string &id) { std::unique_lock<std::mutex> lock(mutex_); cache_.insert(id); } void IDGuard::leave(const std::string &id) { std::unique_lock<std::mutex> lock(mutex_); cache_.erase(id); cond_.notify_all(); } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交