解决多线程处理相同产品造成的数据错乱问题

Table of Contents

    目前场景是大概有十几万产品,每个产品用一个ID号表示,它有很多个属性,收到消息后主要是对产品进行增、删、改操作。 服务端有多个线程,收到一个消息后就分配一个线程去处理,问题是当某个产品ID同时被多个线程操作的时候中间会出现数据错乱的问题。由于是老系统处理一个消息时中间涉及到多个操作,所以操作与操作之间不是线程安全的。

    方法一是,串行执行相同的任务,所以应该将相同产品ID的消息放到同一个线程中处理,不同产品ID可以放到其他线程处理。 分配n个Worker,每个Worker一个线程一个队列按入队顺序执行。当收到一个消息时获取这个消息里面的产品ID,遍历所有Worker,如果发现某个Worker里有正在处理或者待处理的ID就放到这个Worker里,没有发现就放到当前Worker中,使用index递增来均匀分配到每个Worker中。这里有个问题是如果某个消息是批量处理多个产品,这种方法是行不通的。

    方法二是,原有的逻辑不变,在收到消息时上锁,在消息处理完成后解锁,当然这里并不是锁所有消息那样服务端性能太低了,相当于单线程在处理。这个方法是只锁产品,相同的产品只允许一个线程处理,由于产品的基数比较大所以上锁的概率就比较小了。这个方法处理一个消息里面的多个产品也是可以的,只是批量过多的话性能会降低。下面代码是单个上锁,可以改成批量的。

    方法一:

    #pragma once
    
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <atomic>
    #include <unordered_map>
    #include <queue>
    #include <vector>
    #include <functional>
    #include <memory>
    
    class TaskData;
    class Worker;
    typedef std::function<void(const Message* msg, Message& reply)> TaskCallback;
    typedef std::shared_ptr<TaskData> TaskDataPtr;
    typedef std::shared_ptr<Worker> WorkerPtr;
    
    class TaskData {
    public:
        TaskData(){}
        TaskData(const TaskCallback &_handleMsg, const TaskCallback &_replyMsg, const Message& _msg, Message& _reply)
            : handleMsg(_handleMsg), replyMsg(_replyMsg), msg(_msg), reply(_reply) {}
    
        std::string id;
        TaskCallback handleMsg;
        TaskCallback replyMsg;
        Message msg;
        Message reply;
    };
    
    class Worker {
    public:
        Worker()
        {
            thread_.reset(new std::thread(std::bind(&Worker::run, this)));
        }
    
        bool tryExec(const std::string &id, const TaskDataPtr &data)
        {
            if (counterExistInc(id)) {
                data->id = id;
                std::unique_lock<std::mutex> lock(taskMutex_);
                taskQueue_.push(data);
                cond_.notify_one();
                return true;
            }
            return false;
        }
    
        void exec(const std::string &id, const TaskDataPtr &data)
        {
            counterInc(id);
            data->id = id;
            std::unique_lock<std::mutex> lock(taskMutex_);
            taskQueue_.push(data);
            cond_.notify_one();
        }
    
    private:
        Worker(const Worker&);
        Worker& operator=(const Worker&);
    
        bool counterExistInc(const std::string &id)
        {
            std::unique_lock<std::mutex> lock(counterMutex_);
            if (idCounter_[id] > 0) {
                ++idCounter_[id];
                return true;
            }
            return false;
        }
    
        void counterInc(const std::string &id)
        {
            std::unique_lock<std::mutex> lock(counterMutex_);
            ++idCounter_[id];
        }
        
        void counterDec(const std::string &id)
        {
            std::unique_lock<std::mutex> lock(counterMutex_);
            if (--idCounter_[id] <= 0) {
                idCounter_.erase(id);
            }
        }
    
        void run()
        {
            while (1) {
                TaskDataPtr task;
                {
                    std::unique_lock<std::mutex> lock(taskMutex_);
                    while (taskQueue_.empty()) {
                        cond_.wait(lock);
                    }
                    task = taskQueue_.front();
                    taskQueue_.pop();
                }
                if (task) {
                    task->handleMsg(&task->msg, task->reply);
                    counterDec(task->id);
                    task->replyMsg(&task->msg, task->reply);
                }
            }
        }
    
    private:
        std::mutex taskMutex_;
        std::mutex counterMutex_;
        std::condition_variable cond_;
        std::unique_ptr<std::thread> thread_;
        std::unordered_map<std::string, int> idCounter_;
        std::queue<TaskDataPtr> taskQueue_;
    };
    
    class SerialTask {
    public:
        static SerialTask* instance()
        {
            static SerialTask s_inst;
            return &s_inst;
        }
    
        void init(int workerCount)
        {
            if (count_ > 0) {
                return;
            }
    
            count_ = workerCount;
            for (int i = 0; i < workerCount; i++) {
                workers_.push_back(std::make_shared<Worker>());
            }
        }
    
        bool tryExec(const TaskDataPtr &data)
        {
            if (data->id.empty() || !data->handleMsg || !data->replyMsg) {
                return false;
            }
    
            std::string id = data->id;
            std::vector<WorkerPtr>::iterator iter = workers_.begin();
            for (; iter != workers_.end(); ++iter) {
                WorkerPtr &worker = *iter;
                if (worker->tryExec(id, data)) {
                    return true;
                }
            }
            workers_[index_++ % count_]->exec(id, data);
            return true;
        }
    
    private:
        SerialTask() : index_(0), count_(0) {}
    
    private:
        std::vector<WorkerPtr> workers_;
        std::atomic<int> index_;
        int count_;
    };
    

    方法二:

    // id_guard.h
    #pragma once
    
    #include <unordered_set>
    #include <mutex>
    #include <condition_variable>
    
    class IDGuard
    {
    public:
        IDGuard(std::mutex &mutex, const std::string &id);
        ~IDGuard();
    
    private:
        bool isProcessing(const std::string &id);
        void enter(const std::string &id);
        void leave(const std::string &id);
    
    private:
        std::mutex& mutex_;
        std::condition_variable  cond_;
        std::unordered_set<std::string> cache_;
        std::string id_;
    };
    
    
    // id_guard.cpp
    #include "stdafx.h"
    #include "id_guard.h"
    
    IDGuard::IDGuard(std::mutex &mutex, const std::string &id)
        : mutex_(mutex), id_(id)
    {
        {
            std::unique_lock<std::mutex> lock(mutex_);
            while (isProcessing(id_)) {
                cond_.wait(lock);
            }
        }
        enter(id_);
    }
    
    IDGuard::~IDGuard()
    {
        leave(id_);
    }
    
    bool IDGuard::isProcessing(const std::string &id)
    {
        return cache_.find(id) != cache_.end();
    }
    
    void IDGuard::enter(const std::string &id)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        cache_.insert(id);
    }
    
    void IDGuard::leave(const std::string &id)
    {
        std::unique_lock<std::mutex> lock(mutex_);
        cache_.erase(id);
        cond_.notify_all();
    }