Python高效处理大型CSV文件:多维度切割策略与最佳实践66


在数据分析和处理的日常工作中,我们经常会遇到需要处理大型CSV文件的情况。这些文件可能包含数百万甚至数十亿行数据,直接加载到内存中进行操作不仅效率低下,还可能导致内存溢出。此时,将大型CSV文件切割成更小、更易管理的文件就显得尤为重要。这不仅有助于优化内存使用,还能方便数据并行处理、分布式存储或按特定业务逻辑进行数据分发。

Python作为数据科学领域的主流语言,提供了多种强大的工具和库来应对这一挑战。本文将作为一名专业的程序员,深入探讨如何使用Python高效、灵活地切割大型CSV文件,涵盖按行数、按文件大小以及按列值等多种切割策略,并提供详细的代码示例和最佳实践。

一、为什么需要切割大型CSV文件?

在深入技术细节之前,我们先来明确切割大型CSV文件的核心驱动力:

内存限制: 许多大型CSV文件的大小远超可用内存,无法一次性加载到如Pandas DataFrame这样的数据结构中。


性能优化: 处理较小的文件通常比处理一个巨型文件更快,尤其是在进行迭代计算或部分数据分析时。


并行处理: 将文件切割后,可以更容易地在多核CPU或分布式系统(如Spark、Dask)上并行处理这些小文件,从而大幅缩短处理时间。


数据分发: 根据业务需求,可能需要将数据按特定字段(如地区、日期、产品类别)分发给不同的团队或系统,切割文件是实现这一目标的有效手段。


版本控制与管理: 小文件更便于版本控制、备份和传输。



二、准备工作:创建测试数据与依赖

为了演示各种切割策略,我们首先需要创建一个模拟的大型CSV文件。同时,我们将主要依赖Python内置的`csv`模块和流行的数据科学库`pandas`。

1. 安装依赖


如果您尚未安装`pandas`,请先通过pip进行安装:pip install pandas

2. 创建一个大型模拟CSV文件


以下代码将生成一个包含100万行(或您指定数量)的CSV文件,其中包含一些随机数据,方便我们进行后续测试。import csv
import random
import os
def generate_large_csv(filename='', num_rows=1000000):
"""
生成一个大型CSV文件用于测试。
包含'id', 'name', 'age', 'city', 'salary'等列。
"""
if (filename):
print(f"文件 '{filename}' 已存在,跳过生成。")
return
print(f"正在生成 {num_rows} 行的CSV文件 '{filename}'...")
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = (f)
# 写入CSV头
(['id', 'name', 'age', 'city', 'salary', 'category', 'timestamp'])
cities = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix', 'Shanghai', 'Beijing', 'San Francisco', 'London', 'Paris']
categories = ['A', 'B', 'C', 'D', 'E']
for i in range(1, num_rows + 1):
row = [
i,
f"User_{i}",
(18, 65),
(cities),
round((30000, 120000), 2),
(categories),
f"2023-01-{(1, 28):02d} {(0, 23):02d}:{(0, 59):02d}:{(0, 59):02d}"
]
(row)
print(f"CSV文件 '{filename}' 生成完毕。")
# 调用函数生成文件
generate_large_csv('', num_rows=1000000) # 生成100万行数据

三、基础切割策略:按行数切割

按行数切割是最直观和常见的策略。我们将演示两种实现方式:使用Python内置的`csv`模块和使用`pandas`。

1. 使用 `csv` 模块(适用于超大型文件,低内存消耗)


当文件大小远超内存限制时,`csv`模块是最佳选择。它以流式方式逐行读取和写入,避免了一次性加载整个文件。import csv
import os
def split_csv_by_rows_csv_module(input_csv_path, output_dir, rows_per_file, encoding='utf-8'):
"""
使用Python的csv模块按行数切割CSV文件。
:param input_csv_path: 输入CSV文件路径。
:param output_dir: 输出子文件存放的目录。
:param rows_per_file: 每个子文件的行数(不包括头)。
:param encoding: 文件编码。
"""
if not (output_dir):
(output_dir)
with open(input_csv_path, 'r', newline='', encoding=encoding) as infile:
reader = (infile)
header = next(reader) # 读取头行
file_idx = 0
current_rows = 0
writer = None
outfile = None
print(f"开始按行切割文件: {input_csv_path}")
for i, row in enumerate(reader):
if i % rows_per_file == 0:
if outfile:
()

file_idx += 1
output_filename = (output_dir, f"part_{file_idx:04d}.csv")
print(f"正在创建新的输出文件: {output_filename}")
outfile = open(output_filename, 'w', newline='', encoding=encoding)
writer = (outfile)
(header) # 每个子文件都写入头行

(row)

if outfile: # 关闭最后一个打开的文件
()

print(f"文件切割完成。共生成 {file_idx} 个子文件。")
# 示例调用
input_file = ''
output_folder_rows_csv = 'output_by_rows_csv'
rows_to_split = 100000 # 每个子文件10万行
split_csv_by_rows_csv_module(input_file, output_folder_rows_csv, rows_to_split)

代码解释:

我们首先读取输入CSV文件的头行,这对于所有子文件都非常重要。


通过`enumerate(reader)`迭代器逐行读取数据,`i`代表当前行号(从0开始,不包括头行)。


当行号是`rows_per_file`的倍数时(即达到了每个子文件的行数限制),我们关闭当前输出文件(如果存在),然后打开一个新的输出文件,并写入头行。


每一行数据都被写入到当前的输出文件中。


确保在循环结束后关闭最后一个打开的文件。



2. 使用 `pandas`(适用于中大型文件,便捷高效)


对于能够一次性加载到内存或通过分块读取的文件,`pandas`提供了更简洁、功能更强大的方式来切割。`pandas`的`read_csv`函数支持`chunksize`参数,可以分块读取大型文件。import pandas as pd
import os
def split_csv_by_rows_pandas(input_csv_path, output_dir, rows_per_file, encoding='utf-8'):
"""
使用Pandas按行数切割CSV文件。
适用于文件能被分块读取到内存中,或总行数不是特别巨大的情况。
:param input_csv_path: 输入CSV文件路径。
:param output_dir: 输出子文件存放的目录。
:param rows_per_file: 每个子文件的行数。
:param encoding: 文件编码。
"""
if not (output_dir):
(output_dir)
print(f"开始按行切割文件 (Pandas): {input_csv_path}")
# 读取整个文件(如果内存允许),或者使用 chunksize 分块读取
# 这里为了演示方便,假设文件可以整体读取。对于超大文件,推荐使用 chunksize
# total_rows = sum(1 for line in open(input_csv_path, encoding=encoding)) - 1 # 获取总行数,不包括头

# 更好的方法是使用 chunksize 来处理大文件,避免一次性加载所有数据
chunk_size = rows_per_file # 每次读取的行数就是我们希望的子文件行数
file_idx = 0

for chunk in pd.read_csv(input_csv_path, chunksize=chunk_size, encoding=encoding):
file_idx += 1
output_filename = (output_dir, f"part_{file_idx:04d}.csv")
print(f"正在写入输出文件: {output_filename}")
# to_csv的index=False参数很重要,避免写入额外的行索引
chunk.to_csv(output_filename, index=False, encoding=encoding)

print(f"文件切割完成 (Pandas)。共生成 {file_idx} 个子文件。")
# 示例调用
input_file = ''
output_folder_rows_pandas = 'output_by_rows_pandas'
rows_to_split_pandas = 100000 # 每个子文件10万行
split_csv_by_rows_pandas(input_file, output_folder_rows_pandas, rows_to_split_pandas)

代码解释:

`pd.read_csv(..., chunksize=rows_per_file)`会返回一个迭代器,每次迭代产生一个包含`rows_per_file`行数据的DataFrame。


每个这样的DataFrame块可以直接使用`to_csv`方法保存为一个新的CSV文件。


`index=False`参数确保Pandas不会将DataFrame的索引也作为一列写入CSV文件。



四、高级切割策略:按文件大小切割

按文件大小切割不如按行数切割精确,因为每行数据的长度可能不同。但它在某些存储或传输场景下非常有用,例如限制每个文件的最大MB数。由于CSV是文本文件,我们需要逐行写入并动态检查文件大小。import csv
import os
def split_csv_by_size(input_csv_path, output_dir, max_file_size_mb, encoding='utf-8'):
"""
使用Python的csv模块按文件大小切割CSV文件。
:param input_csv_path: 输入CSV文件路径。
:param output_dir: 输出子文件存放的目录。
:param max_file_size_mb: 每个子文件的最大大小(MB)。
:param encoding: 文件编码。
"""
if not (output_dir):
(output_dir)
max_file_size_bytes = max_file_size_mb * 1024 * 1024

with open(input_csv_path, 'r', newline='', encoding=encoding) as infile:
reader = (infile)
header = next(reader) # 读取头行
file_idx = 0
current_outfile_path = None
outfile = None
writer = None
print(f"开始按文件大小切割文件: {input_csv_path}")
def open_new_output_file():
nonlocal file_idx, current_outfile_path, outfile, writer
if outfile:
()

file_idx += 1
current_outfile_path = (output_dir, f"part_size_{file_idx:04d}.csv")
print(f"正在创建新的输出文件: {current_outfile_path}")
outfile = open(current_outfile_path, 'w', newline='', encoding=encoding)
writer = (outfile)
(header) # 每个子文件都写入头行
# 返回当前文件已写入的字节数,此时只有header
return ()
current_size_bytes = open_new_output_file() # 打开第一个文件
for i, row in enumerate(reader):
# 将当前行数据格式化为字符串,估算其字节大小
# 注意:这是估算,实际写入可能因编码和newline字符有微小差异
row_str = ','.join(map(str, row)) + ''
row_bytes_estimate = len((encoding))
# 检查加上当前行是否会超过最大文件大小
if (current_size_bytes + row_bytes_estimate) > max_file_size_bytes and current_size_bytes > len(','.join(map(str,header)) + ''.encode(encoding)) :
# 如果当前文件已经写入了一些内容 (大于header的大小) 并且加上新行会超限
# 那么就开启新文件
current_size_bytes = open_new_output_file()

(row)
# 更新当前文件大小,使用tell()获取文件指针位置,近似等于已写入的字节数
current_size_bytes = ()
if outfile:
()

print(f"文件切割完成。共生成 {file_idx} 个子文件。")
# 示例调用
input_file = ''
output_folder_by_size = 'output_by_size'
max_size_mb = 10 # 每个子文件最大10MB
split_csv_by_size(input_file, output_folder_by_size, max_size_mb)

代码解释:

`max_file_size_bytes`将MB转换为字节。


`()`函数返回文件流的当前位置(通常是已写入的字节数),我们用它来近似追踪文件大小。


在每次写入一行之前,我们估算该行写入后文件的大小。如果会超出`max_file_size_bytes`,并且当前文件不为空(大于头行的大小),则关闭当前文件并打开一个新文件。


这种方法是近似的,因为CSV的文本格式决定了字节数并非完全精确可控,但对于大多数实际应用已足够。



五、高级切割策略:按列值切割(基于Pandas)

按列值切割是一种非常强大的策略,它允许你根据CSV文件中的某个或多个列的唯一值来创建不同的子文件。例如,你可以将所有“城市”为“New York”的记录保存到一个文件,将“Los Angeles”的记录保存到另一个文件。这种切割方式通常使用`pandas`来实现,因为它提供了便捷的`groupby`功能。import pandas as pd
import os
def split_csv_by_column_value(input_csv_path, output_dir, column_name, encoding='utf-8', chunk_size=100000):
"""
使用Pandas按指定列的值切割CSV文件。
对于大型文件,使用chunksize分块读取以节省内存。
:param input_csv_path: 输入CSV文件路径。
:param output_dir: 输出子文件存放的目录。
:param column_name: 用于切割的列名。
:param encoding: 文件编码。
:param chunk_size: 每次读取的行数,用于分块处理大型文件。
"""
if not (output_dir):
(output_dir)
print(f"开始按列 '{column_name}' 切割文件: {input_csv_path}")
# 使用字典来存储每个分组的writer,确保只打开一次文件
output_files = {}
for i, chunk in enumerate(pd.read_csv(input_csv_path, chunksize=chunk_size, encoding=encoding)):
print(f"正在处理第 {i+1} 个数据块...")
# 确保列存在
if column_name not in :
raise ValueError(f"列 '{column_name}' 不存在于CSV文件中。")

# 按照指定列进行分组
grouped = (column_name)

for group_value, group_df in grouped:
# 清理group_value,防止文件名非法字符
safe_group_value = str(group_value).replace('/', '_').replace('\\', '_').replace(':', '_').replace('*', '_').replace('?', '_').replace('"', '_').replace('', '_').replace('|', '_')
output_filename = (output_dir, f"{column_name}_{safe_group_value}.csv")

# 如果文件已打开,直接追加;否则,打开新文件并写入头
if output_filename not in output_files:
# header=True表示写入头,mode='w'表示创建新文件
group_df.to_csv(output_filename, mode='w', index=False, header=True, encoding=encoding)
output_files[output_filename] = True # 标记此文件已创建并写入了头
else:
# header=False表示不写入头,mode='a'表示追加
group_df.to_csv(output_filename, mode='a', index=False, header=False, encoding=encoding)

print(f"文件切割完成 (按列 '{column_name}')。")
# 示例调用
input_file = ''
output_folder_by_column = 'output_by_column_city'
column_to_split = 'city' # 以 'city' 列进行切割
split_csv_by_column_value(input_file, output_folder_by_column, column_to_split)
# 也可以尝试按 'category' 列切割
output_folder_by_category = 'output_by_column_category'
column_to_split_category = 'category'
split_csv_by_column_value(input_file, output_folder_by_category, column_to_split_category)

代码解释:

我们依然使用`pd.read_csv`的`chunksize`参数来分块读取文件,以处理大型数据集。


对于每个数据块`chunk`,我们使用`(column_name)`将其按指定列分组。


然后遍历每个分组,`group_value`是列的唯一值,`group_df`是该值对应的所有行组成的DataFrame。


通过维护`output_files`字典来判断某个子文件是否已经创建。如果是第一次写入该`group_value`对应的文件,则使用`mode='w'`和`header=True`创建文件并写入头;之后则使用`mode='a'`(追加模式)和`header=False`,避免重复写入头。


`safe_group_value`用于处理列值中可能包含的非法文件名字符。



六、性能优化与注意事项

在切割大型CSV文件时,除了选择合适的策略,还需要考虑一些优化和注意事项:

1. 内存管理



`csv`模块优先: 对于内存极度受限的超大型文件,优先使用Python内置的`csv`模块进行流式处理,避免一次性加载过多数据。


Pandas `chunksize`: 使用`pd.read_csv`时,务必利用`chunksize`参数分块读取,而不是一次性读取整个文件。这可以将内存占用保持在可控范围内。


生成器: 结合`csv`模块,可以编写生成器函数来逐行或逐块处理数据,进一步优化内存。



2. 文件IO效率



批量写入: 尽管我们是逐行或逐块写入,但每次打开/关闭文件都是有开销的。在`csv`模块的实现中,我们尽量保持文件打开状态,直到达到切割条件。


SSD vs HDD: 将输出目录设置在SSD上可以显著提高文件I/O速度。



3. 错误处理与健壮性



文件路径验证: 在操作文件之前,检查输入文件是否存在、输出目录是否可写。


编码问题: CSV文件常见的编码有UTF-8、GBK、Latin-1等。务必在`open()`和`pd.read_csv()`中指定正确的`encoding`,否则可能导致乱码或读取失败。 # 尝试不同编码
try:
df = pd.read_csv(input_csv_path, encoding='utf-8')
except UnicodeDecodeError:
df = pd.read_csv(input_csv_path, encoding='gbk')
# 或者更系统地检测编码
# import chardet
# with open(input_csv_path, 'rb') as f:
# rawdata = (100000) # 读取文件开头一部分
# result = (rawdata)
# encoding = result['encoding']


空文件/空行: 确保代码能够处理空文件或包含空行的CSV,通常`csv`模块和`pandas`都能较好地处理这些情况。


列名缺失: 在按列值切割时,要检查指定的列名是否存在于CSV头中。



4. 头文件处理


无论是哪种切割方式,都需要确保每个切割后的子文件都包含原始的头行,除非业务逻辑明确要求不需要。

5. 输出文件命名规范


为切割后的文件使用清晰、有规律的命名方式(如``,``),方便后续处理和识别。

七、总结

本文详细介绍了使用Python切割大型CSV文件的多种策略,从基础的按行数切割到高级的按文件大小和按列值切割。我们分别探讨了使用Python内置的`csv`模块和强大的`pandas`库的实现方式,并强调了每种方法的适用场景和优缺点:

`csv`模块: 适用于处理超大型文件,内存效率高,但代码相对更底层,需要手动管理文件读写和头行。


`pandas`库: 提供了更高级、更简洁的API,尤其适合按列值切割等复杂逻辑。通过`chunksize`参数,也能有效处理大型文件,但在处理数十GB甚至TB级别的文件时,仍需谨慎评估内存消耗。



在实际应用中,您应该根据文件的具体大小、可用的内存资源、切割的业务逻辑以及对处理速度的要求来选择最合适的策略。通过合理地利用Python的这些工具,您可以高效、灵活地处理各种规模的CSV数据,为后续的数据分析和应用奠定坚实基础。

2025-10-10


上一篇:Python实战宝典:从基础到进阶,构建你的实用代码工具箱

下一篇:Python字符串比较深度解析:掌握各种场景下的高效策略与技巧