使用Python和Redis实现多台电脑剪切板内容同步

Table of Contents

    简介: 在本篇博客中,我们将探讨一个引人入胜的项目,它允许您在不同电脑之间同步剪切板内容。想象一下,您可以轻松地在工作电脑和家用电脑之间复制和粘贴文本或文件。该项目使用Python和Redis来实现这一目标。我们将以简单易懂的方式为您介绍整个过程。

    先决条件: 在开始之前,您需要以下内容:

    1. 在您的电脑上安装Python(建议使用Python 3)。
    2. 运行中且可通过网络访问的Redis服务器。
    3. 已安装pyperclipredis-py库。您可以使用pip install pyperclip redis进行安装。

    项目概述: 我们将创建一个Python程序,该程序在您的电脑上运行,并监听剪切板内容的变化。当您复制内容时,它会将数据发送到一个中央Redis服务器。其他也运行此程序的电脑可以接收并从Redis服务器中粘贴这些复制的数据。

    设置Redis: 首先确保在您的网络上有一个可访问的Redis服务器。您需要知道连接到它所需的主机、端口和数据库信息。

    创建ClipboardSync类: 我们创建一个名为ClipboardSync的Python类来管理剪切板同步。以下是其关键功能的解释:

    1. __init__(): 初始化类,连接到Redis服务器,并获取您电脑的MAC地址。

    2. get_mac_address(): 获取MAC地址,作为电脑的唯一标识符。

    3. send_clipboard_data(): 当您将内容复制到剪切板时,它将数据发送到Redis服务器,包括您的MAC地址、时间戳和复制的内容。

    4. receive_clipboard_data(): 当从Redis接收到数据时,它检查是否来自另一台电脑,并将内容粘贴到剪切板中。

    5. watch_clipboard(): 持续监控您的剪切板内容变化,当有新内容复制时,发送到Redis。

    6. subscribe_redis(): 订阅Redis通道并监听消息。当新消息到达时,处理数据。

    运行程序: 程序运行两个单独的线程,同时处理剪切板监控和Redis订阅。它使用asyncio事件循环来保持主程序运行。

    结论: 通过这个Python程序和Redis的集成,您可以轻松地在不同电脑之间同步剪切板内容。无论您是在进行需要分享链接、文本片段或文件的项目,这个工具都会简化流程,提高您的工作效率。

    请随意尝试并扩展此项目。您可以添加安全功能,处理不同的数据类型,或根据特定需求进行自定义。享受在各设备之间实现无缝剪切板同步的便利!

    import json
    import pyperclip
    import redis
    import time
    import uuid
    import threading
    import asyncio
    
    class ClipboardSync:
        def __init__(self, redis_host, redis_port, redis_db):
            self.redis_client = redis.Redis(host=redis_host, port=redis_port, db=redis_db)
            self.mac_address = self.get_mac_address()
            self.last_clipboard_content = None
    
        def get_mac_address(self):
            mac = ":".join(["{:02x}".format((uuid.getnode() >> elements) & 0xFF) for elements in range(0, 2 * 6, 2)][::-1])
            return mac
        
        def send_clipboard_data(self, clipboard_content):
            if clipboard_content != self.last_clipboard_content:
                self.last_clipboard_content = clipboard_content
    
                data = {
                    "mac_address": self.mac_address,
                    "time": time.time(),
                    "content": clipboard_content,
                }
    
                self.redis_client.publish("clipboard_channel", json.dumps(data))
                print("Content sent to Redis: " + clipboard_content)
    
        def receive_clipboard_data(self, clipboard_data):
            sender_mac_address = clipboard_data["mac_address"]
            clipboard_content = clipboard_data["content"]
    
            if sender_mac_address != self.mac_address:
                self.last_clipboard_content = clipboard_content
                pyperclip.copy(clipboard_content)
                print("Received from Redis: " + clipboard_content)
            else:
                print("来自我自己剪切板的消息,直接忽略掉。")
    
        def watch_clipboard(self):
            while True:
                clipboard_content = pyperclip.waitForNewPaste()
                if clipboard_content:
                    if len(clipboard_content) > 1024 * 10:
                        print("剪切板消息太大了,忽略掉!")
                        continue
                    
                    print("来自剪切板的内容:", clipboard_content)
                    self.send_clipboard_data(clipboard_content)
                    
                time.sleep(1)
    
        def subscribe_redis(self):
            pubsub = self.redis_client.pubsub()
            pubsub.subscribe("clipboard_channel")
    
            for message in pubsub.listen():
                if message["type"] == "message":
                    clipboard_data = json.loads(message["data"].decode("utf-8"))
                    print("来自Redis的数据:", clipboard_data)
                    self.receive_clipboard_data(clipboard_data)
    
    async def main(redis_host, redis_port, redis_db):
        clipboard_sync_sdk = ClipboardSync(redis_host, redis_port, redis_db)
    
        # 启动watch_clipboard线程
        watch_clipboard_thread = threading.Thread(target=clipboard_sync_sdk.watch_clipboard)
        watch_clipboard_thread.daemon = True
        watch_clipboard_thread.start()
    
        # 启动subscribe_redis线程
        subscribe_redis_thread = threading.Thread(target=clipboard_sync_sdk.subscribe_redis)
        subscribe_redis_thread.daemon = True
        subscribe_redis_thread.start()
    
        # 使用asyncio事件循环来保持主程序运行
        while True:
            await asyncio.sleep(1)
    
    if __name__ == "__main__":
        redis_host = "xxx.xxx.xxx.xxx"
        redis_port = 6379
        redis_db = 0
    
        try:
            print("按Ctrl+C退出")
            asyncio.run(main(redis_host, redis_port, redis_db))
        except KeyboardInterrupt:
            print("quit")