1. 安装所需库
pip install python-docx PyPDF2 pandas openpyxl markdown beautifulsoup4 python-pptx pillow pdfplumber
2. 完整代码实现
import os
import json
import pandas as pd
from typing import List, Dict, Any, Optional
import warnings
warnings.filterwarnings('ignore')
# 文本文件处理
class TextFileProcessor:
@staticmethod
def read_txt(file_path: str) -> str:
"""读取txt文件"""
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
@staticmethod
def read_md(file_path: str) -> str:
"""读取markdown文件"""
import markdown
with open(file_path, 'r', encoding='utf-8') as f:
text = f.read()
return text
# PDF文件处理
class PDFProcessor:
@staticmethod
def read_pdf(file_path: str) -> str:
"""读取PDF文件内容"""
try:
# 优先使用pdfplumber,效果更好
import pdfplumber
text = ""
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text.strip()
except ImportError:
# 备用方案:PyPDF2
from PyPDF2 import PdfReader
text = ""
with open(file_path, 'rb') as f:
pdf_reader = PdfReader(f)
for page in pdf_reader.pages:
page_text = page.extract_text()
if page_text:
text += page_text + "\n"
return text.strip()
# Word文档处理
class WordProcessor:
@staticmethod
def read_docx(file_path: str) -> str:
"""读取Word文档"""
from docx import Document
doc = Document(file_path)
text = []
for para in doc.paragraphs:
if para.text.strip():
text.append(para.text)
# 读取表格内容
for table in doc.tables:
for row in table.rows:
row_text = [cell.text.strip() for cell in row.cells if cell.text.strip()]
if row_text:
text.append(" | ".join(row_text))
return "\n".join(text)
# Excel文件处理
class ExcelProcessor:
@staticmethod
def read_excel(file_path: str) -> Dict[str, Any]:
"""读取Excel文件,返回结构化数据"""
try:
# 获取所有sheet名
excel_file = pd.ExcelFile(file_path)
result = {
'file_path': file_path,
'sheets': {}
}
for sheet_name in excel_file.sheet_names:
try:
df = pd.read_excel(file_path, sheet_name=sheet_name)
# 提取表格内容
sheet_data = {
'shape': f"{df.shape[0]}行×{df.shape[1]}列",
'columns': df.columns.tolist(),
'head_data': df.head(20).to_dict('records'), # 前20行数据
'summary': {
'total_rows': len(df),
'total_columns': len(df.columns),
'sample_rows': min(5, len(df))
}
}
# 添加统计信息
for col in df.select_dtypes(include=['float64', 'int64']).columns:
sheet_data['summary'][f'{col}_stats'] = {
'min': df[col].min(),
'max': df[col].max(),
'mean': df[col].mean(),
'std': df[col].std()
}
result['sheets'][sheet_name] = sheet_data
except Exception as e:
result['sheets'][sheet_name] = {
'error': f"读取sheet失败: {str(e)}"
}
return result
except Exception as e:
return {
'file_path': file_path,
'error': f"读取Excel文件失败: {str(e)}"
}
@staticmethod
def excel_to_text(data: Dict[str, Any]) -> str:
"""将Excel数据结构转换为文本"""
text_parts = []
text_parts.append(f"Excel文件: {data['file_path']}")
if 'error' in data:
text_parts.append(f"错误: {data['error']}")
return "\n".join(text_parts)
for sheet_name, sheet_data in data['sheets'].items():
text_parts.append(f"\n=== Sheet: {sheet_name} ===")
text_parts.append(f"表格形状: {sheet_data['shape']}")
text_parts.append(f"列名: {', '.join(sheet_data['columns'])}")
# 添加样本数据
text_parts.append("\n样本数据(前5行):")
for i, row in enumerate(sheet_data['head_data'][:5]):
row_text = [f"{k}: {v}" for k, v in row.items()]
text_parts.append(f"行{i+1}: {' | '.join(row_text)}")
# 添加统计信息
if 'summary' in sheet_data:
text_parts.append(f"\n汇总信息:")
for key, value in sheet_data['summary'].items():
if isinstance(value, dict):
text_parts.append(f" {key}:")
for k, v in value.items():
text_parts.append(f" {k}: {v}")
else:
text_parts.append(f" {key}: {value}")
return "\n".join(text_parts)
# PowerPoint处理
class PowerPointProcessor:
@staticmethod
def read_pptx(file_path: str) -> str:
"""读取PowerPoint文件"""
from pptx import Presentation
prs = Presentation(file_path)
text_parts = []
for i, slide in enumerate(prs.slides, 1):
text_parts.append(f"\n--- 幻灯片 {i} ---")
# 读取标题
if slide.shapes.title:
text_parts.append(f"标题: {slide.shapes.title.text}")
# 读取所有形状的文本
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text.strip():
if shape != slide.shapes.title: # 避免重复标题
text_parts.append(shape.text)
return "\n".join(text_parts)
# CSV文件处理
class CSVProcessor:
@staticmethod
def read_csv(file_path: str) -> str:
"""读取CSV文件"""
try:
df = pd.read_csv(file_path)
text_parts = [
f"CSV文件: {file_path}",
f"数据形状: {df.shape[0]}行×{df.shape[1]}列",
f"列名: {', '.join(df.columns.tolist())}",
"\n前10行数据:"
]
# 添加前10行数据
for i, row in df.head(10).iterrows():
row_text = [f"{col}: {row[col]}" for col in df.columns]
text_parts.append(f"行{i+1}: {' | '.join(row_text)}")
return "\n".join(text_parts)
except Exception as e:
return f"读取CSV文件失败: {str(e)}"
# 主文件处理器
class FileContentExtractor:
def __init__(self):
self.processors = {
'.txt': TextFileProcessor.read_txt,
'.md': TextFileProcessor.read_md,
'.pdf': PDFProcessor.read_pdf,
'.docx': WordProcessor.read_docx,
'.pptx': PowerPointProcessor.read_pptx,
'.csv': CSVProcessor.read_csv,
'.xlsx': lambda x: ExcelProcessor.excel_to_text(ExcelProcessor.read_excel(x)),
'.xls': lambda x: ExcelProcessor.excel_to_text(ExcelProcessor.read_excel(x))
}
def extract_content(self, file_path: str) -> Dict[str, Any]:
"""提取单个文件内容"""
file_ext = os.path.splitext(file_path)[1].lower()
result = {
'file_path': file_path,
'file_name': os.path.basename(file_path),
'file_type': file_ext,
'content': '',
'success': False
}
try:
if file_ext in self.processors:
if file_ext in ['.xlsx', '.xls']:
# Excel文件需要特殊处理
excel_data = ExcelProcessor.read_excel(file_path)
result['content'] = ExcelProcessor.excel_to_text(excel_data)
result['structured_data'] = excel_data
else:
result['content'] = self.processors[file_ext](file_path)
result['success'] = True
else:
result['content'] = f"不支持的文件格式: {file_ext}"
result['success'] = False
except Exception as e:
result['content'] = f"读取文件时出错: {str(e)}"
result['success'] = False
return result
def extract_from_directory(self, directory_path: str,
file_extensions: Optional[List[str]] = None) -> Dict[str, Any]:
"""从目录中提取所有文件内容"""
all_results = {
'directory': directory_path,
'total_files': 0,
'successful_files': 0,
'failed_files': 0,
'files': []
}
if not os.path.exists(directory_path):
return {
**all_results,
'error': f"目录不存在: {directory_path}"
}
for root, dirs, files in os.walk(directory_path):
for file in files:
file_path = os.path.join(root, file)
file_ext = os.path.splitext(file)[1].lower()
# 如果指定了文件类型,进行过滤
if file_extensions and file_ext not in file_extensions:
continue
all_results['total_files'] += 1
# 提取文件内容
result = self.extract_content(file_path)
all_results['files'].append(result)
if result['success']:
all_results['successful_files'] += 1
else:
all_results['failed_files'] += 1
return all_results
def generate_summary_report(self, extraction_results: Dict[str, Any]) -> str:
"""生成汇总报告"""
report_parts = [
"=" * 60,
"文件内容提取汇总报告",
"=" * 60,
f"目录: {extraction_results['directory']}",
f"总文件数: {extraction_results['total_files']}",
f"成功提取: {extraction_results['successful_files']}",
f"失败文件: {extraction_results['failed_files']}",
"\n详细文件列表:"
]
for i, file_result in enumerate(extraction_results['files'], 1):
status = "✓" if file_result['success'] else "✗"
report_parts.append(f"\n{i}. {status} {file_result['file_name']} ({file_result['file_type']})")
if not file_result['success']:
report_parts.append(f" 错误: {file_result['content']}")
elif len(file_result['content']) > 200:
preview = file_result['content'][:200] + "..."
report_parts.append(f" 预览: {preview}")
else:
report_parts.append(f" 内容: {file_result['content']}")
return "\n".join(report_parts)
def save_results(self, extraction_results: Dict[str, Any],
output_format: str = 'json',
output_path: str = 'extraction_results') -> str:
"""保存提取结果"""
if output_format == 'json':
output_file = f"{output_path}.json"
with open(output_file, 'w', encoding='utf-8') as f:
# 清理不能序列化的数据
for file_result in extraction_results['files']:
if 'structured_data' in file_result:
# 简化Excel结构化数据
file_result['structured_data'] = str(file_result['structured_data'])
json.dump(extraction_results, f, ensure_ascii=False, indent=2)
return output_file
elif output_format == 'txt':
output_file = f"{output_path}.txt"
report = self.generate_summary_report(extraction_results)
with open(output_file, 'w', encoding='utf-8') as f:
f.write(report)
return output_file
elif output_format == 'csv':
output_file = f"{output_path}.csv"
data = []
for file_result in extraction_results['files']:
data.append({
'file_name': file_result['file_name'],
'file_type': file_result['file_type'],
'status': '成功' if file_result['success'] else '失败',
'content_length': len(file_result['content']),
'preview': file_result['content'][:100] if file_result['success'] else file_result['content']
})
df = pd.DataFrame(data)
df.to_csv(output_file, index=False, encoding='utf-8-sig')
return output_file
else:
raise ValueError(f"不支持的输出格式: {output_format}")
# DeepSeek集成处理
class DeepSeekContentAnalyzer:
"""使用DeepSeek API进行内容分析(需要自行配置API密钥)"""
def __init__(self, api_key: str = None):
self.api_key = api_key
self.base_url = "https://api.deepseek.com/v1/chat/completions"
def analyze_content(self, content: str, prompt: str = None) -> str:
"""使用DeepSeek分析内容"""
if not self.api_key:
return "DeepSeek API密钥未配置,无法进行分析"
import requests
if prompt is None:
prompt = """请分析以下文档内容,并给出:
1. 主要内容摘要
2. 关键信息提取
3. 文档结构分析
4. 重要数据点(如果有)
"""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "deepseek-chat",
"messages": [
{"role": "system", "content": "你是一个专业的文档分析助手。"},
{"role": "user", "content": f"{prompt}\n\n文档内容:\n{content[:4000]}"} # 限制内容长度
],
"temperature": 0.3
}
try:
response = requests.post(self.base_url, headers=headers, json=payload, timeout=30)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
else:
return f"API调用失败: {response.status_code} - {response.text}"
except Exception as e:
return f"分析过程出错: {str(e)}"
def analyze_extracted_files(self, extraction_results: Dict[str, Any]) -> str:
"""分析所有提取的文件内容"""
if not self.api_key:
return "DeepSeek API密钥未配置"
# 合并所有成功提取的内容
all_content = []
for file_result in extraction_results['files']:
if file_result['success']:
file_summary = f"\n{'='*50}\n文件: {file_result['file_name']}\n{'='*50}\n"
file_summary += file_result['content'][:1000] # 限制每个文件内容长度
all_content.append(file_summary)
combined_content = "\n".join(all_content)
# 使用DeepSeek进行分析
analysis_prompt = """请综合分析以下多个文档的内容,提供:
1. 总体内容概述
2. 跨文档的共同主题
3. 重要信息和数据汇总
4. 发现的关键问题或亮点
5. 建议的后续步骤
文档内容如下:
"""
return self.analyze_content(combined_content, analysis_prompt)
# 使用示例
def main():
# 1. 初始化提取器
extractor = FileContentExtractor()
# 2. 提取单个文件
print("示例1: 提取单个PDF文件")
pdf_result = extractor.extract_content("sample.pdf")
print(f"提取成功: {pdf_result['success']}")
print(f"内容长度: {len(pdf_result['content'])} 字符")
# 3. 提取目录中所有文件
print("\n示例2: 提取目录中所有支持的文件")
directory_results = extractor.extract_from_directory(
"./documents", # 你的文档目录
file_extensions=['.pdf', '.docx', '.xlsx', '.txt', '.md']
)
# 4. 生成报告
report = extractor.generate_summary_report(directory_results)
print(report)
# 5. 保存结果
json_file = extractor.save_results(directory_results, 'json', 'extraction_results')
print(f"\n结果已保存到: {json_file}")
# 6. 使用DeepSeek进行分析(可选)
# analyzer = DeepSeekContentAnalyzer(api_key="your_deepseek_api_key")
# analysis = analyzer.analyze_extracted_files(directory_results)
# print("\nDeepSeek分析结果:")
# print(analysis)
# 命令行接口
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description='多格式文件内容提取与汇总工具')
parser.add_argument('--path', type=str, help='文件或目录路径')
parser.add_argument('--output', type=str, default='results', help='输出文件名')
parser.add_argument('--format', type=str, choices=['json', 'txt', 'csv'], default='json', help='输出格式')
args = parser.parse_args()
if args.path:
extractor = FileContentExtractor()
if os.path.isfile(args.path):
# 处理单个文件
result = extractor.extract_content(args.path)
print(f"文件: {result['file_name']}")
print(f"类型: {result['file_type']}")
print(f"成功: {result['success']}")
print(f"\n内容预览:\n{result['content'][:500]}...")
elif os.path.isdir(args.path):
# 处理目录
results = extractor.extract_from_directory(args.path)
output_file = extractor.save_results(results, args.format, args.output)
print(f"提取完成!结果已保存到: {output_file}")
print(f"处理文件: {results['total_files']}个")
print(f"成功: {results['successful_files']}个")
print(f"失败: {results['failed_files']}个")
else:
print(f"路径不存在: {args.path}")
else:
print("请使用 --path 参数指定文件或目录路径")
print("示例: python file_extractor.py --path ./documents --format json")
3. 简化版快速使用
# quick_start.py
from file_extractor import FileContentExtractor
# 快速开始
extractor = FileContentExtractor()
# 提取单个文件
result = extractor.extract_content("your_file.pdf")
print(result['content'][:1000]) # 打印前1000字符
# 批量提取目录
results = extractor.extract_from_directory("./your_docs")
# 保存为JSON
extractor.save_results(results, 'json', 'my_docs_content')
# 生成文本报告
report = extractor.generate_summary_report(results)
print(report)
4. 配置说明
支持的格式:
- 📄 文本文件: .txt, .md
- 📊 电子表格: .xlsx, .xls, .csv
- 📝 文档: .docx, .pdf
- 🎨 演示文稿: .pptx
DeepSeek API配置:
# 如果需要使用DeepSeek分析功能
analyzer = DeepSeekContentAnalyzer(api_key="your-api-key-here")
analysis = analyzer.analyze_extracted_files(results)
5. 功能特点
多格式支持: 覆盖主流办公文档格式
批量处理: 支持整个目录的文件处理
结构化输出: JSON、TXT、CSV多种输出格式
容错处理: 单个文件失败不影响其他文件
内容预览: 智能截断长内容
扩展性: 易于添加新的文件类型处理器
6. 安装和运行
# 1. 克隆或创建项目
mkdir file-extractor
cd file-extractor
# 2. 安装依赖
pip install -r requirements.txt
# 3. 运行示例
python file_extractor.py --path ./your_documents --format json
这个系统可以轻松扩展到其他文件类型,并且与DeepSeek API集成后,可以提供智能的内容分析和摘要功能。