目前场景是大概有十几万产品,每个产品用一个ID号表示,它有很多个属性,收到消息后主要是对产品进行增、删、改操作。 服务端有多个线程,收到一个消息后就分配一个线程去处理,问题是当某个产品ID同时被多个线程操作的时候中间会出现数据错乱的问题。由于是老系统处理一个消息时中间涉及到多个操作,所以操作与操作之间不是线程安全的。
方法一是,串行执行相同的任务,所以应该将相同产品ID的消息放到同一个线程中处理,不同产品ID可以放到其他线程处理。 分配n个Worker,每个Worker一个线程一个队列按入队顺序执行。当收到一个消息时获取这个消息里面的产品ID,遍历所有Worker,如果发现某个Worker里有正在处理或者待处理的ID就放到这个Worker里,没有发现就放到当前Worker中,使用index递增来均匀分配到每个Worker中。这里有个问题是如果某个消息是批量处理多个产品,这种方法是行不通的。
方法二是,原有的逻辑不变,在收到消息时上锁,在消息处理完成后解锁,当然这里并不是锁所有消息那样服务端性能太低了,相当于单线程在处理。这个方法是只锁产品,相同的产品只允许一个线程处理,由于产品的基数比较大所以上锁的概率就比较小了。这个方法处理一个消息里面的多个产品也是可以的,只是批量过多的话性能会降低。下面代码是单个上锁,可以改成批量的。
方法一:
#pragma once
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <unordered_map>
#include <queue>
#include <vector>
#include <functional>
#include <memory>
class TaskData;
class Worker;
typedef std::function<void(const Message* msg, Message& reply)> TaskCallback;
typedef std::shared_ptr<TaskData> TaskDataPtr;
typedef std::shared_ptr<Worker> WorkerPtr;
class TaskData {
public:
TaskData(){}
TaskData(const TaskCallback &_handleMsg, const TaskCallback &_replyMsg, const Message& _msg, Message& _reply)
: handleMsg(_handleMsg), replyMsg(_replyMsg), msg(_msg), reply(_reply) {}
std::string id;
TaskCallback handleMsg;
TaskCallback replyMsg;
Message msg;
Message reply;
};
class Worker {
public:
Worker()
{
thread_.reset(new std::thread(std::bind(&Worker::run, this)));
}
bool tryExec(const std::string &id, const TaskDataPtr &data)
{
if (counterExistInc(id)) {
data->id = id;
std::unique_lock<std::mutex> lock(taskMutex_);
taskQueue_.push(data);
cond_.notify_one();
return true;
}
return false;
}
void exec(const std::string &id, const TaskDataPtr &data)
{
counterInc(id);
data->id = id;
std::unique_lock<std::mutex> lock(taskMutex_);
taskQueue_.push(data);
cond_.notify_one();
}
private:
Worker(const Worker&);
Worker& operator=(const Worker&);
bool counterExistInc(const std::string &id)
{
std::unique_lock<std::mutex> lock(counterMutex_);
if (idCounter_[id] > 0) {
++idCounter_[id];
return true;
}
return false;
}
void counterInc(const std::string &id)
{
std::unique_lock<std::mutex> lock(counterMutex_);
++idCounter_[id];
}
void counterDec(const std::string &id)
{
std::unique_lock<std::mutex> lock(counterMutex_);
if (--idCounter_[id] <= 0) {
idCounter_.erase(id);
}
}
void run()
{
while (1) {
TaskDataPtr task;
{
std::unique_lock<std::mutex> lock(taskMutex_);
while (taskQueue_.empty()) {
cond_.wait(lock);
}
task = taskQueue_.front();
taskQueue_.pop();
}
if (task) {
task->handleMsg(&task->msg, task->reply);
counterDec(task->id);
task->replyMsg(&task->msg, task->reply);
}
}
}
private:
std::mutex taskMutex_;
std::mutex counterMutex_;
std::condition_variable cond_;
std::unique_ptr<std::thread> thread_;
std::unordered_map<std::string, int> idCounter_;
std::queue<TaskDataPtr> taskQueue_;
};
class SerialTask {
public:
static SerialTask* instance()
{
static SerialTask s_inst;
return &s_inst;
}
void init(int workerCount)
{
if (count_ > 0) {
return;
}
count_ = workerCount;
for (int i = 0; i < workerCount; i++) {
workers_.push_back(std::make_shared<Worker>());
}
}
bool tryExec(const TaskDataPtr &data)
{
if (data->id.empty() || !data->handleMsg || !data->replyMsg) {
return false;
}
std::string id = data->id;
std::vector<WorkerPtr>::iterator iter = workers_.begin();
for (; iter != workers_.end(); ++iter) {
WorkerPtr &worker = *iter;
if (worker->tryExec(id, data)) {
return true;
}
}
workers_[index_++ % count_]->exec(id, data);
return true;
}
private:
SerialTask() : index_(0), count_(0) {}
private:
std::vector<WorkerPtr> workers_;
std::atomic<int> index_;
int count_;
};
方法二:
// id_guard.h
#pragma once
#include <unordered_set>
#include <mutex>
#include <condition_variable>
class IDGuard
{
public:
IDGuard(std::mutex &mutex, const std::string &id);
~IDGuard();
private:
bool isProcessing(const std::string &id);
void enter(const std::string &id);
void leave(const std::string &id);
private:
std::mutex& mutex_;
std::condition_variable cond_;
std::unordered_set<std::string> cache_;
std::string id_;
};
// id_guard.cpp
#include "stdafx.h"
#include "id_guard.h"
IDGuard::IDGuard(std::mutex &mutex, const std::string &id)
: mutex_(mutex), id_(id)
{
{
std::unique_lock<std::mutex> lock(mutex_);
while (isProcessing(id_)) {
cond_.wait(lock);
}
}
enter(id_);
}
IDGuard::~IDGuard()
{
leave(id_);
}
bool IDGuard::isProcessing(const std::string &id)
{
return cache_.find(id) != cache_.end();
}
void IDGuard::enter(const std::string &id)
{
std::unique_lock<std::mutex> lock(mutex_);
cache_.insert(id);
}
void IDGuard::leave(const std::string &id)
{
std::unique_lock<std::mutex> lock(mutex_);
cache_.erase(id);
cond_.notify_all();
}