Python 串口编程从入门到精通:pyserial 库详细代码示例与应用实践80
作为一名专业的程序员,我深知串口通信在物联网、自动化控制、嵌入式系统以及数据采集等领域的重要性。Python凭借其简洁的语法、丰富的库生态和跨平台特性,成为了进行串口通信的首选语言之一。本文将深入探讨如何使用Python进行串口通信,主要围绕`pyserial`库展开,提供详尽的代码示例和最佳实践,帮助您从入门到精通。
串口通信(Serial Communication),通常指的是通过串行接口(如RS-232, RS-485, USB转串口)在两个设备之间进行数据交换的一种方式。它是一种简单而可靠的通信协议,广泛应用于微控制器(如Arduino, ESP32)、工业设备(如PLC)、GPS模块、传感器等与计算机之间的数据交互。
Python通过其强大的第三方库`pyserial`,使得串口编程变得异常简单和高效。`pyserial`库提供了完整的API,支持Windows、Linux、macOS等多种操作系统,能够处理各种串口通信的需求。
1. 准备工作:安装 `pyserial` 库
在开始编写代码之前,您需要确保已经安装了`pyserial`库。打开您的终端或命令行工具,执行以下命令:pip install pyserial
如果您使用的是Anaconda环境,也可以使用:conda install pyserial
安装完成后,您就可以在Python脚本中导入`serial`模块了。
2. 识别串口
在进行串口通信之前,首先需要知道您的设备连接到哪个串口。不同操作系统下识别串口的方式有所不同:
Windows: 串口通常命名为`COM1`、`COM2`等。您可以在设备管理器中查看“端口 (COM 和 LPT)”来找到它们。
Linux: 串口通常命名为`/dev/ttyS0`(物理串口)或`/dev/ttyUSB0`、`/dev/ttyACM0`等(USB转串口设备)。您可以使用`ls /dev/tty*`命令查看所有可用的串口设备。
macOS: 串口通常命名为`/dev/-XXXX`或`/dev/`等。您也可以使用`ls /dev/tty.*`或`ls /dev/cu.*`来查看。
为了方便,`pyserial`也提供了一个`.list_ports`模块来列出所有可用的串口,这在编写跨平台应用时非常有用:import .list_ports
def list_serial_ports():
"""列出所有可用的串口并打印其详细信息"""
ports = ()
if not ports:
print("未找到任何串口设备。")
return []
available_ports = []
print("可用串口设备列表:")
for port, desc, hwid in sorted(ports):
print(f" 端口: {port}")
print(f" 描述: {desc}")
print(f" 硬件ID: {hwid}")
print("-" * 30)
(port)
return available_ports
if __name__ == "__main__":
list_serial_ports()
3. 串口通信的基本操作
`pyserial`库的核心是``类,它代表了一个串口连接。通过创建该类的实例,我们可以打开、配置、写入和读取串口数据。
3.1 打开和关闭串口
打开串口时,需要指定串口名称、波特率等参数。使用`with`语句是推荐的做法,它能确保串口在操作完成后自动关闭,即使发生异常。
关键参数:
`port`: 串口名称,例如 `'COM1'` 或 `'/dev/ttyUSB0'`。
`baudrate`: 波特率,数据传输速率,如 `9600`, `115200` 等。必须与设备匹配。
`bytesize`: 数据位,每帧数据的位数,通常为 `8`。
`parity`: 奇偶校验位,用于错误检测,可选 `serial.PARITY_NONE`, `serial.PARITY_EVEN`, `serial.PARITY_ODD` 等。通常为 `None`。
`stopbits`: 停止位,每帧数据后的位数,可选 `serial.STOPBITS_ONE`, `serial.STOPBITS_TWO` 等。通常为 `1`。
`timeout`: 读取超时时间(秒),非常重要。`None`表示一直等待,`0`表示非阻塞立即返回,大于`0`表示等待指定时间。
import serial
import time
# 配置串口参数
SERIAL_PORT = 'COM3' # 请根据您的实际情况修改
BAUDRATE = 9600
def open_and_close_serial():
try:
# 使用 with 语句,确保串口在块结束后自动关闭
with (
port=SERIAL_PORT,
baudrate=BAUDRATE,
bytesize=,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=1 # 1秒读取超时
) as ser:
print(f"成功打开串口: {SERIAL_PORT}")
print(f"串口名称: {}") # 获取当前打开的串口名称
print(f"是否打开: {ser.is_open}") # 判断串口是否打开
# 可以在这里进行读写操作
print(f"串口将在with语句块结束时自动关闭。")
print(f"串口 {SERIAL_PORT} 已关闭。")
except as e:
print(f"串口错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
if __name__ == "__main__":
open_and_close_serial()
3.2 写入数据
向串口写入数据使用`()`方法。需要注意的是,`pyserial`要求写入的数据必须是`bytes`类型,而不是`str`类型。如果您的数据是字符串,需要先使用`.encode()`方法进行编码,通常使用`'utf-8'`或`'ascii'`。import serial
import time
SERIAL_PORT = 'COM3' # 请根据您的实际情况修改
BAUDRATE = 9600
def write_to_serial(data_to_send):
try:
with (
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=1
) as ser:
print(f"成功打开串口: {SERIAL_PORT}")
# 将字符串数据编码为字节流
encoded_data = ('utf-8')
(encoded_data)
print(f"已发送数据: '{data_to_send}' ({len(encoded_data)} bytes)")
# 也可以发送字节数组
# (b'\x01\x02\x03')
except as e:
print(f"串口错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
if __name__ == "__main__":
# 假设我们要发送一个简单的命令
write_to_serial("Hello, Serial Port!")
# 发送一个带有回车换行符的指令,很多设备需要
write_to_serial("STATUS?\r")
3.3 读取数据
从串口读取数据有几种常用方法:
`(size)`: 读取`size`个字节。如果设置了超时,它会等待直到读取到指定数量的字节或超时发生。
`()`: 读取一行数据,直到遇到换行符``为止。返回的也是`bytes`类型。
`ser.read_until(expected=LF, size=None)`: 读取直到遇到指定的字节序列,或者读取到`size`个字节。
`ser.in_waiting`: 返回输入缓冲区中等待读取的字节数。这对于非阻塞读取很有用。
import serial
import time
SERIAL_PORT = 'COM3' # 请根据您的实际情况修改
BAUDRATE = 9600
def read_from_serial():
try:
with (
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=1 # 1秒读取超时
) as ser:
print(f"成功打开串口: {SERIAL_PORT}")
print("等待接收数据...")
# 示例1: 读取指定字节数
# 如果没有数据,且timeout=1,则1秒后返回空bytes
data_read = (10) # 尝试读取10个字节
if data_read:
print(f"读取到 {len(data_read)} 字节: {('utf-8', errors='ignore')}")
else:
print("未读取到数据 (read 10)")
# 示例2: 读取一行数据
# 确保设备发送的数据以或\r结束
line_read = ()
if line_read:
print(f"读取到一行数据: {('utf-8', errors='ignore').strip()}")
else:
print("未读取到数据 (readline)")
# 示例3: 结合 in_waiting 进行非阻塞读取
print("尝试非阻塞读取...")
(0.1) # 给设备一点时间发送数据
if ser.in_waiting > 0:
available_data = (ser.in_waiting)
print(f"非阻塞读取到 {len(available_data)} 字节: {('utf-8', errors='ignore')}")
else:
print("非阻塞读取:缓冲区无数据。")
except as e:
print(f"串口错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
if __name__ == "__main__":
read_from_serial()
4. 完整示例:发送命令并接收响应
这是一个模拟向设备发送“请求温度”命令,并接收其响应的完整代码示例。通常设备会在接收到命令后返回一些数据。import serial
import time
SERIAL_PORT = 'COM3' # 请根据您的实际情况修改
BAUDRATE = 9600
COMMAND_GET_TEMP = "GET_TEMP\r" # 假设的命令,以回车换行结束
def communicate_with_device():
try:
with (
port=SERIAL_PORT,
baudrate=BAUDRATE,
bytesize=,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=2 # 设置较长的超时,等待设备响应
) as ser:
print(f"成功打开串口: {SERIAL_PORT}")
# 清空输入输出缓冲区,确保没有残留数据
()
()
(0.1) # 短暂等待,确保缓冲区清空
# 1. 发送命令
print(f"发送命令: '{()}'")
(('utf-8'))
# 2. 等待并读取响应
print("等待设备响应...")
response_line = b''
start_time = ()
# 循环读取直到超时或读到回车换行符
while (() - start_time) < :
if ser.in_waiting > 0:
char = (1) # 逐字节读取
response_line += char
if char == b'': # 假设响应以 结束
break
(0.01) # 小暂停,避免CPU占用过高
if response_line:
# 尝试解码并去除首尾空白符
decoded_response = ('utf-8', errors='ignore').strip()
print(f"接收到响应: '{decoded_response}'")
# 进一步处理响应,例如解析温度值
if "TEMP=" in decoded_response:
try:
temp_str = ("TEMP=")[1].split("C")[0].strip()
temperature = float(temp_str)
print(f"解析出的温度值为: {temperature}°C")
except (ValueError, IndexError):
print("无法解析温度值。")
else:
print("未接收到任何响应 (超时)。请检查设备连接和波特率。")
except as e:
print(f"串口错误: {e}")
except Exception as e:
print(f"发生未知错误: {e}")
if __name__ == "__main__":
communicate_with_device()
5. 高级话题与最佳实践
5.1 编码和解码
Python 3中字符串是Unicode类型,而串口通信传输的是字节流。因此,在发送数据时必须将字符串`encode()`为字节流,接收到字节流后需要`decode()`为字符串。
`'utf-8'`: 最常见的编码,支持多语言字符。
`'ascii'`: 适用于只包含ASCII字符的简单数据。
`'latin-1'` (或 `'iso-8859-1'`): 如果设备发送的是原始字节数据,直接映射到128-255范围的字符,这个编码有时会比较有用,因为它不会抛出解码错误。
# 编码
my_string = "你好,世界!"
byte_data = ('utf-8')
# byte_data = ('ascii') # 如果包含非ASCII字符会报错
# 解码
received_bytes = b'\x48\x65\x6c\x6c\x6f' # 假设接收到的字节
decoded_string = ('ascii') # "Hello"
# 带有错误处理的解码
error_bytes = b'\xc2\xb0C' # 例如度数符号的UTF-8编码
decoded_with_errors = ('ascii', errors='ignore') # 忽略错误
decoded_with_replace = ('ascii', errors='replace') # 替换无法解码的字符
print(f"忽略错误解码: {decoded_with_errors}") # 输出 'C'
print(f"替换错误解码: {decoded_with_replace}") # 输出 '�C'
5.2 错误处理
串口通信中经常会遇到各种问题,例如串口不存在、权限不足、波特率不匹配等。使用`try...except `来捕获串口相关的错误,并提供友好的错误提示。try:
ser = ('COM_NON_EXIST', 9600)
except as e:
print(f"无法打开串口: {e}")
# 可以在这里根据错误类型进行更详细的处理
if "Permission denied" in str(e):
print("可能是权限问题,请检查是否以管理员/root权限运行或用户是否在dialout组中。")
elif "No such file or directory" in str(e) or "The system cannot find the file specified" in str(e):
print("串口名称可能不正确或设备未连接。")
except Exception as e:
print(f"发生其他错误: {e}")
5.3 缓冲区管理
`pyserial`提供了方法来管理串口的输入/输出缓冲区:
`()`: 清空输入缓冲区(丢弃所有已接收但尚未读取的数据)。
`()`: 清空输出缓冲区(丢弃所有已写入但尚未发送的数据)。
在发送命令前或读取新数据前清空缓冲区是一个好习惯,可以避免读取到旧的、无关的数据。
5.4 持续读取数据(多线程/异步)
如果需要在一个单独的线程中持续监听串口数据,以避免阻塞主程序,可以使用`threading`模块。这对于需要同时进行UI操作或执行其他任务的应用程序非常有用。import serial
import threading
import time
SERIAL_PORT = 'COM3'
BAUDRATE = 9600
STOP_FLAG = False
def read_serial_data(ser_instance):
global STOP_FLAG
print(f"开始监听串口 {}...")
while not STOP_FLAG:
try:
if ser_instance.in_waiting > 0:
# 每次读取一个字节,直到缓冲区为空或遇到换行符
# 或者可以一次性读取所有可用数据:(ser_instance.in_waiting)
data = () # 读取一行
if data:
decoded_data = ('utf-8', errors='ignore').strip()
print(f"接收到数据: {decoded_data}")
(0.05) # 短暂等待,减少CPU占用
except as e:
print(f"串口读取错误: {e}")
break
except Exception as e:
print(f"读取过程中发生未知错误: {e}")
break
print(f"串口 {} 监听线程已停止。")
def main_program():
global STOP_FLAG
try:
with (
port=SERIAL_PORT,
baudrate=BAUDRATE,
timeout=0.1 # 设置较小的超时,以便线程能够快速退出
) as ser:
print(f"主程序成功打开串口: {}")
# 创建并启动一个线程来读取串口数据
read_thread = (target=read_serial_data, args=(ser,))
= True # 设置为守护线程,主程序退出时自动结束
()
# 主程序可以做其他事情,例如发送命令
for i in range(5):
command = f"CMD_STATUS_{i}\r"
(('utf-8'))
print(f"主程序发送: {()}")
(1) # 等待一段时间
print("主程序完成发送任务,等待读取线程结束...")
(2) # 给读取线程一些时间接收最后的响应
# 设置停止标志,通知读取线程退出
STOP_FLAG = True
(timeout=3) # 等待读取线程结束,最多等待3秒
if read_thread.is_alive():
print("警告: 读取线程未能正常退出。")
except as e:
print(f"主程序串口错误: {e}")
except Exception as e:
print(f"主程序发生未知错误: {e}")
finally:
print("主程序结束。")
if __name__ == "__main__":
main_program()
6. 常见问题与故障排除
`: [Errno 13] Permission denied` (Linux/macOS):
通常是权限问题。您的用户可能没有访问串口设备的权限。
解决方案: 将当前用户添加到`dialout`组(在Linux上),或使用`sudo`运行脚本。
`sudo usermod -a -G dialout $USER` (然后注销并重新登录)
`: [Errno 2] No such file or directory` (或 Windows `[Errno 22] Invalid argument`):
串口名称不正确,或者设备未连接,或者驱动未安装。
解决方案: 检查设备管理器(Windows)或`ls /dev/tty*`(Linux/macOS)确认正确的串口名称。确保USB转串口驱动已正确安装。
数据乱码或接收不到数据:
最常见的原因是波特率、数据位、停止位、奇偶校验位不匹配。
解决方案: 确保所有串口参数与连接的设备完全一致。检查数据编码/解码是否正确,尝试使用`'latin-1'`或`errors='ignore'`解码来诊断。
发送数据后设备无响应:
检查发送的数据格式是否正确,例如是否包含设备期望的命令结束符(如`\r`)。
检查设备是否处于接收模式或是否需要特定的初始化命令。
串口线缆连接是否正确,RXD和TXD是否交叉连接(TxD到RxD,RxD到TxD)。
`timeout`设置不当:
如果`timeout`设置太短,可能在数据完全到达前就返回了。如果`timeout`设置为`None`(无限等待),程序可能会卡死。
解决方案: 根据设备的响应速度和数据量合理设置`timeout`。通常设置为1-5秒是比较安全的。
7. 总结
Python的`pyserial`库为串口通信提供了一个强大、灵活且易于使用的接口。无论是简单的设备控制,还是复杂的数据采集系统,`pyserial`都能胜任。通过本文的详细介绍和代码示例,您应该能够:
正确安装和导入`pyserial`库。
识别和管理不同操作系统下的串口。
使用``类打开、配置、写入和读取串口数据。
理解数据编码/解码的重要性,并进行正确的处理。
掌握基本的错误处理和缓冲区管理技巧。
了解如何在更复杂的场景下(如持续监听)使用多线程进行串口通信。
对常见的串口通信问题进行故障排除。
实践是最好的老师,我鼓励您连接一个实际的串口设备(如Arduino、USB转串口模块),结合本文的示例代码,亲手尝试和调试。在实际应用中,您可能会遇到更多独特的挑战,但掌握了`pyserial`的基础,将使您有能力解决这些问题,并构建出功能强大的串口通信应用。
2025-11-02
Java数组交集:深度解析、实现策略与性能优化
https://www.shuihudhg.cn/131976.html
Python 列表与字符串:互联互通,高效编程的核心利器
https://www.shuihudhg.cn/131975.html
PHP 字符串首尾字符处理:高效删除、修剪与规范化指南
https://www.shuihudhg.cn/131974.html
Python字符串处理引号的完整指南:从基础到高级实践
https://www.shuihudhg.cn/131973.html
深入理解Java数据接口异常:成因、危害与高效处理策略
https://www.shuihudhg.cn/131972.html
热门文章
Python 格式化字符串
https://www.shuihudhg.cn/1272.html
Python 函数库:强大的工具箱,提升编程效率
https://www.shuihudhg.cn/3366.html
Python向CSV文件写入数据
https://www.shuihudhg.cn/372.html
Python 静态代码分析:提升代码质量的利器
https://www.shuihudhg.cn/4753.html
Python 文件名命名规范:最佳实践
https://www.shuihudhg.cn/5836.html