Skip to content

使用Python和Redis实现多台电脑剪切板内容同步

Published: at 05:00 PM | 5 min read

简介: 在本篇博客中,我们将探讨一个引人入胜的项目,它允许您在不同电脑之间同步剪切板内容。想象一下,您可以轻松地在工作电脑和家用电脑之间复制和粘贴文本或文件。该项目使用Python和Redis来实现这一目标。我们将以简单易懂的方式为您介绍整个过程。

先决条件: 在开始之前,您需要以下内容:

  1. 在您的电脑上安装Python(建议使用Python 3)。
  2. 运行中且可通过网络访问的Redis服务器。
  3. 已安装pyperclipredis-py库。您可以使用pip install pyperclip redis进行安装。

项目概述: 我们将创建一个Python程序,该程序在您的电脑上运行,并监听剪切板内容的变化。当您复制内容时,它会将数据发送到一个中央Redis服务器。其他也运行此程序的电脑可以接收并从Redis服务器中粘贴这些复制的数据。

设置Redis: 首先确保在您的网络上有一个可访问的Redis服务器。您需要知道连接到它所需的主机、端口和数据库信息。

创建ClipboardSync类: 我们创建一个名为ClipboardSync的Python类来管理剪切板同步。以下是其关键功能的解释:

  1. __init__(): 初始化类,连接到Redis服务器,并获取您电脑的MAC地址。

  2. get_mac_address(): 获取MAC地址,作为电脑的唯一标识符。

  3. send_clipboard_data(): 当您将内容复制到剪切板时,它将数据发送到Redis服务器,包括您的MAC地址、时间戳和复制的内容。

  4. receive_clipboard_data(): 当从Redis接收到数据时,它检查是否来自另一台电脑,并将内容粘贴到剪切板中。

  5. watch_clipboard(): 持续监控您的剪切板内容变化,当有新内容复制时,发送到Redis。

  6. subscribe_redis(): 订阅Redis通道并监听消息。当新消息到达时,处理数据。

运行程序: 程序运行两个单独的线程,同时处理剪切板监控和Redis订阅。它使用asyncio事件循环来保持主程序运行。

结论: 通过这个Python程序和Redis的集成,您可以轻松地在不同电脑之间同步剪切板内容。无论您是在进行需要分享链接、文本片段或文件的项目,这个工具都会简化流程,提高您的工作效率。

请随意尝试并扩展此项目。您可以添加安全功能,处理不同的数据类型,或根据特定需求进行自定义。享受在各设备之间实现无缝剪切板同步的便利!

import json
import pyperclip
import redis
import time
import uuid
import threading
import asyncio

class ClipboardSync:
    def __init__(self, redis_host, redis_port, redis_db):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
        self.mac_address = self.get_mac_address()
        self.last_clipboard_content = None

    def get_mac_address(self):
        mac = ":".join(["{:02x}".format((uuid.getnode() >> elements) & 0xFF) for elements in range(0, 2 * 6, 2)][::-1])
        return mac
    
    def send_clipboard_data(self, clipboard_content):
        if clipboard_content != self.last_clipboard_content:
            self.last_clipboard_content = clipboard_content

            data = {
                "mac_address": self.mac_address,
                "time": time.time(),
                "content": clipboard_content,
            }

            self.redis_client.publish("clipboard_channel", json.dumps(data))
            print("Content sent to Redis: " + clipboard_content)

    def receive_clipboard_data(self, clipboard_data):
        sender_mac_address = clipboard_data["mac_address"]
        clipboard_content = clipboard_data["content"]

        if sender_mac_address != self.mac_address:
            self.last_clipboard_content = clipboard_content
            pyperclip.copy(clipboard_content)
            print("Received from Redis: " + clipboard_content)
        else:
            print("来自我自己剪切板的消息,直接忽略掉。")

    def watch_clipboard(self):
        while True:
            clipboard_content = pyperclip.waitForNewPaste()
            if clipboard_content:
                if len(clipboard_content) > 1024 * 10:
                    print("剪切板消息太大了,忽略掉!")
                    continue
                
                print("来自剪切板的内容:", clipboard_content)
                self.send_clipboard_data(clipboard_content)
                
            time.sleep(1)

    def subscribe_redis(self):
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe("clipboard_channel")

        for message in pubsub.listen():
            if message["type"] == "message":
                clipboard_data = json.loads(message["data"].decode("utf-8"))
                print("来自Redis的数据:", clipboard_data)
                self.receive_clipboard_data(clipboard_data)

async def main(redis_host, redis_port, redis_db):
    clipboard_sync_sdk = ClipboardSync(redis_host, redis_port, redis_db)

    # 启动watch_clipboard线程
    watch_clipboard_thread = threading.Thread(target=clipboard_sync_sdk.watch_clipboard)
    watch_clipboard_thread.daemon = True
    watch_clipboard_thread.start()

    # 启动subscribe_redis线程
    subscribe_redis_thread = threading.Thread(target=clipboard_sync_sdk.subscribe_redis)
    subscribe_redis_thread.daemon = True
    subscribe_redis_thread.start()

    # 使用asyncio事件循环来保持主程序运行
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    redis_host = "xxx.xxx.xxx.xxx"
    redis_port = 6379
    redis_db = 0

    try:
        print("按Ctrl+C退出")
        asyncio.run(main(redis_host, redis_port, redis_db))
    except KeyboardInterrupt:
        print("quit")