Skip to content

解决多线程处理相同产品造成的数据错乱问题

Published: at 03:18 AM | 5 min read

目前场景是大概有十几万产品,每个产品用一个ID号表示,它有很多个属性,收到消息后主要是对产品进行增、删、改操作。 服务端有多个线程,收到一个消息后就分配一个线程去处理,问题是当某个产品ID同时被多个线程操作的时候中间会出现数据错乱的问题。由于是老系统处理一个消息时中间涉及到多个操作,所以操作与操作之间不是线程安全的。

方法一是,串行执行相同的任务,所以应该将相同产品ID的消息放到同一个线程中处理,不同产品ID可以放到其他线程处理。 分配n个Worker,每个Worker一个线程一个队列按入队顺序执行。当收到一个消息时获取这个消息里面的产品ID,遍历所有Worker,如果发现某个Worker里有正在处理或者待处理的ID就放到这个Worker里,没有发现就放到当前Worker中,使用index递增来均匀分配到每个Worker中。这里有个问题是如果某个消息是批量处理多个产品,这种方法是行不通的。

方法二是,原有的逻辑不变,在收到消息时上锁,在消息处理完成后解锁,当然这里并不是锁所有消息那样服务端性能太低了,相当于单线程在处理。这个方法是只锁产品,相同的产品只允许一个线程处理,由于产品的基数比较大所以上锁的概率就比较小了。这个方法处理一个消息里面的多个产品也是可以的,只是批量过多的话性能会降低。下面代码是单个上锁,可以改成批量的。

方法一:

#pragma once

#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <unordered_map>
#include <queue>
#include <vector>
#include <functional>
#include <memory>

class TaskData;
class Worker;
typedef std::function<void(const Message* msg, Message& reply)> TaskCallback;
typedef std::shared_ptr<TaskData> TaskDataPtr;
typedef std::shared_ptr<Worker> WorkerPtr;

class TaskData {
public:
    TaskData(){}
    TaskData(const TaskCallback &_handleMsg, const TaskCallback &_replyMsg, const Message& _msg, Message& _reply)
        : handleMsg(_handleMsg), replyMsg(_replyMsg), msg(_msg), reply(_reply) {}

    std::string id;
    TaskCallback handleMsg;
    TaskCallback replyMsg;
    Message msg;
    Message reply;
};

class Worker {
public:
    Worker()
    {
        thread_.reset(new std::thread(std::bind(&Worker::run, this)));
    }

    bool tryExec(const std::string &id, const TaskDataPtr &data)
    {
        if (counterExistInc(id)) {
            data->id = id;
            std::unique_lock<std::mutex> lock(taskMutex_);
            taskQueue_.push(data);
            cond_.notify_one();
            return true;
        }
        return false;
    }

    void exec(const std::string &id, const TaskDataPtr &data)
    {
        counterInc(id);
        data->id = id;
        std::unique_lock<std::mutex> lock(taskMutex_);
        taskQueue_.push(data);
        cond_.notify_one();
    }

private:
    Worker(const Worker&);
    Worker& operator=(const Worker&);

    bool counterExistInc(const std::string &id)
    {
        std::unique_lock<std::mutex> lock(counterMutex_);
        if (idCounter_[id] > 0) {
            ++idCounter_[id];
            return true;
        }
        return false;
    }

    void counterInc(const std::string &id)
    {
        std::unique_lock<std::mutex> lock(counterMutex_);
        ++idCounter_[id];
    }
    
    void counterDec(const std::string &id)
    {
        std::unique_lock<std::mutex> lock(counterMutex_);
        if (--idCounter_[id] <= 0) {
            idCounter_.erase(id);
        }
    }

    void run()
    {
        while (1) {
            TaskDataPtr task;
            {
                std::unique_lock<std::mutex> lock(taskMutex_);
                while (taskQueue_.empty()) {
                    cond_.wait(lock);
                }
                task = taskQueue_.front();
                taskQueue_.pop();
            }
            if (task) {
                task->handleMsg(&task->msg, task->reply);
                counterDec(task->id);
                task->replyMsg(&task->msg, task->reply);
            }
        }
    }

private:
    std::mutex taskMutex_;
    std::mutex counterMutex_;
    std::condition_variable cond_;
    std::unique_ptr<std::thread> thread_;
    std::unordered_map<std::string, int> idCounter_;
    std::queue<TaskDataPtr> taskQueue_;
};

class SerialTask {
public:
    static SerialTask* instance()
    {
        static SerialTask s_inst;
        return &s_inst;
    }

    void init(int workerCount)
    {
        if (count_ > 0) {
            return;
        }

        count_ = workerCount;
        for (int i = 0; i < workerCount; i++) {
            workers_.push_back(std::make_shared<Worker>());
        }
    }

    bool tryExec(const TaskDataPtr &data)
    {
        if (data->id.empty() || !data->handleMsg || !data->replyMsg) {
            return false;
        }

        std::string id = data->id;
        std::vector<WorkerPtr>::iterator iter = workers_.begin();
        for (; iter != workers_.end(); ++iter) {
            WorkerPtr &worker = *iter;
            if (worker->tryExec(id, data)) {
                return true;
            }
        }
        workers_[index_++ % count_]->exec(id, data);
        return true;
    }

private:
    SerialTask() : index_(0), count_(0) {}

private:
    std::vector<WorkerPtr> workers_;
    std::atomic<int> index_;
    int count_;
};

方法二:

// id_guard.h
#pragma once

#include <unordered_set>
#include <mutex>
#include <condition_variable>

class IDGuard
{
public:
    IDGuard(std::mutex &mutex, const std::string &id);
    ~IDGuard();

private:
    bool isProcessing(const std::string &id);
    void enter(const std::string &id);
    void leave(const std::string &id);

private:
    std::mutex& mutex_;
    std::condition_variable  cond_;
    std::unordered_set<std::string> cache_;
    std::string id_;
};


// id_guard.cpp
#include "stdafx.h"
#include "id_guard.h"

IDGuard::IDGuard(std::mutex &mutex, const std::string &id)
    : mutex_(mutex), id_(id)
{
    {
        std::unique_lock<std::mutex> lock(mutex_);
        while (isProcessing(id_)) {
            cond_.wait(lock);
        }
    }
    enter(id_);
}

IDGuard::~IDGuard()
{
    leave(id_);
}

bool IDGuard::isProcessing(const std::string &id)
{
    return cache_.find(id) != cache_.end();
}

void IDGuard::enter(const std::string &id)
{
    std::unique_lock<std::mutex> lock(mutex_);
    cache_.insert(id);
}

void IDGuard::leave(const std::string &id)
{
    std::unique_lock<std::mutex> lock(mutex_);
    cache_.erase(id);
    cond_.notify_all();
}