佛山市纤凝网

利用Python+DeepSeek实现多格式文件内容提取与汇总

2026-03-30 09:50:02 浏览次数:1
详细信息

1. 安装所需库

pip install python-docx PyPDF2 pandas openpyxl markdown beautifulsoup4 python-pptx pillow pdfplumber

2. 完整代码实现

import os
import json
import pandas as pd
from typing import List, Dict, Any, Optional
import warnings
warnings.filterwarnings('ignore')

# 文本文件处理
class TextFileProcessor:
    @staticmethod
    def read_txt(file_path: str) -> str:
        """读取txt文件"""
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()

    @staticmethod
    def read_md(file_path: str) -> str:
        """读取markdown文件"""
        import markdown
        with open(file_path, 'r', encoding='utf-8') as f:
            text = f.read()
        return text

# PDF文件处理
class PDFProcessor:
    @staticmethod
    def read_pdf(file_path: str) -> str:
        """读取PDF文件内容"""
        try:
            # 优先使用pdfplumber,效果更好
            import pdfplumber
            text = ""
            with pdfplumber.open(file_path) as pdf:
                for page in pdf.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n"
            return text.strip()
        except ImportError:
            # 备用方案:PyPDF2
            from PyPDF2 import PdfReader
            text = ""
            with open(file_path, 'rb') as f:
                pdf_reader = PdfReader(f)
                for page in pdf_reader.pages:
                    page_text = page.extract_text()
                    if page_text:
                        text += page_text + "\n"
            return text.strip()

# Word文档处理
class WordProcessor:
    @staticmethod
    def read_docx(file_path: str) -> str:
        """读取Word文档"""
        from docx import Document
        doc = Document(file_path)
        text = []
        for para in doc.paragraphs:
            if para.text.strip():
                text.append(para.text)

        # 读取表格内容
        for table in doc.tables:
            for row in table.rows:
                row_text = [cell.text.strip() for cell in row.cells if cell.text.strip()]
                if row_text:
                    text.append(" | ".join(row_text))

        return "\n".join(text)

# Excel文件处理
class ExcelProcessor:
    @staticmethod
    def read_excel(file_path: str) -> Dict[str, Any]:
        """读取Excel文件,返回结构化数据"""
        try:
            # 获取所有sheet名
            excel_file = pd.ExcelFile(file_path)
            result = {
                'file_path': file_path,
                'sheets': {}
            }

            for sheet_name in excel_file.sheet_names:
                try:
                    df = pd.read_excel(file_path, sheet_name=sheet_name)

                    # 提取表格内容
                    sheet_data = {
                        'shape': f"{df.shape[0]}行×{df.shape[1]}列",
                        'columns': df.columns.tolist(),
                        'head_data': df.head(20).to_dict('records'),  # 前20行数据
                        'summary': {
                            'total_rows': len(df),
                            'total_columns': len(df.columns),
                            'sample_rows': min(5, len(df))
                        }
                    }

                    # 添加统计信息
                    for col in df.select_dtypes(include=['float64', 'int64']).columns:
                        sheet_data['summary'][f'{col}_stats'] = {
                            'min': df[col].min(),
                            'max': df[col].max(),
                            'mean': df[col].mean(),
                            'std': df[col].std()
                        }

                    result['sheets'][sheet_name] = sheet_data

                except Exception as e:
                    result['sheets'][sheet_name] = {
                        'error': f"读取sheet失败: {str(e)}"
                    }

            return result

        except Exception as e:
            return {
                'file_path': file_path,
                'error': f"读取Excel文件失败: {str(e)}"
            }

    @staticmethod
    def excel_to_text(data: Dict[str, Any]) -> str:
        """将Excel数据结构转换为文本"""
        text_parts = []
        text_parts.append(f"Excel文件: {data['file_path']}")

        if 'error' in data:
            text_parts.append(f"错误: {data['error']}")
            return "\n".join(text_parts)

        for sheet_name, sheet_data in data['sheets'].items():
            text_parts.append(f"\n=== Sheet: {sheet_name} ===")
            text_parts.append(f"表格形状: {sheet_data['shape']}")
            text_parts.append(f"列名: {', '.join(sheet_data['columns'])}")

            # 添加样本数据
            text_parts.append("\n样本数据(前5行):")
            for i, row in enumerate(sheet_data['head_data'][:5]):
                row_text = [f"{k}: {v}" for k, v in row.items()]
                text_parts.append(f"行{i+1}: {' | '.join(row_text)}")

            # 添加统计信息
            if 'summary' in sheet_data:
                text_parts.append(f"\n汇总信息:")
                for key, value in sheet_data['summary'].items():
                    if isinstance(value, dict):
                        text_parts.append(f"  {key}:")
                        for k, v in value.items():
                            text_parts.append(f"    {k}: {v}")
                    else:
                        text_parts.append(f"  {key}: {value}")

        return "\n".join(text_parts)

# PowerPoint处理
class PowerPointProcessor:
    @staticmethod
    def read_pptx(file_path: str) -> str:
        """读取PowerPoint文件"""
        from pptx import Presentation
        prs = Presentation(file_path)
        text_parts = []

        for i, slide in enumerate(prs.slides, 1):
            text_parts.append(f"\n--- 幻灯片 {i} ---")

            # 读取标题
            if slide.shapes.title:
                text_parts.append(f"标题: {slide.shapes.title.text}")

            # 读取所有形状的文本
            for shape in slide.shapes:
                if hasattr(shape, "text") and shape.text.strip():
                    if shape != slide.shapes.title:  # 避免重复标题
                        text_parts.append(shape.text)

        return "\n".join(text_parts)

# CSV文件处理
class CSVProcessor:
    @staticmethod
    def read_csv(file_path: str) -> str:
        """读取CSV文件"""
        try:
            df = pd.read_csv(file_path)
            text_parts = [
                f"CSV文件: {file_path}",
                f"数据形状: {df.shape[0]}行×{df.shape[1]}列",
                f"列名: {', '.join(df.columns.tolist())}",
                "\n前10行数据:"
            ]

            # 添加前10行数据
            for i, row in df.head(10).iterrows():
                row_text = [f"{col}: {row[col]}" for col in df.columns]
                text_parts.append(f"行{i+1}: {' | '.join(row_text)}")

            return "\n".join(text_parts)
        except Exception as e:
            return f"读取CSV文件失败: {str(e)}"

# 主文件处理器
class FileContentExtractor:
    def __init__(self):
        self.processors = {
            '.txt': TextFileProcessor.read_txt,
            '.md': TextFileProcessor.read_md,
            '.pdf': PDFProcessor.read_pdf,
            '.docx': WordProcessor.read_docx,
            '.pptx': PowerPointProcessor.read_pptx,
            '.csv': CSVProcessor.read_csv,
            '.xlsx': lambda x: ExcelProcessor.excel_to_text(ExcelProcessor.read_excel(x)),
            '.xls': lambda x: ExcelProcessor.excel_to_text(ExcelProcessor.read_excel(x))
        }

    def extract_content(self, file_path: str) -> Dict[str, Any]:
        """提取单个文件内容"""
        file_ext = os.path.splitext(file_path)[1].lower()

        result = {
            'file_path': file_path,
            'file_name': os.path.basename(file_path),
            'file_type': file_ext,
            'content': '',
            'success': False
        }

        try:
            if file_ext in self.processors:
                if file_ext in ['.xlsx', '.xls']:
                    # Excel文件需要特殊处理
                    excel_data = ExcelProcessor.read_excel(file_path)
                    result['content'] = ExcelProcessor.excel_to_text(excel_data)
                    result['structured_data'] = excel_data
                else:
                    result['content'] = self.processors[file_ext](file_path)
                result['success'] = True
            else:
                result['content'] = f"不支持的文件格式: {file_ext}"
                result['success'] = False

        except Exception as e:
            result['content'] = f"读取文件时出错: {str(e)}"
            result['success'] = False

        return result

    def extract_from_directory(self, directory_path: str, 
                              file_extensions: Optional[List[str]] = None) -> Dict[str, Any]:
        """从目录中提取所有文件内容"""
        all_results = {
            'directory': directory_path,
            'total_files': 0,
            'successful_files': 0,
            'failed_files': 0,
            'files': []
        }

        if not os.path.exists(directory_path):
            return {
                **all_results,
                'error': f"目录不存在: {directory_path}"
            }

        for root, dirs, files in os.walk(directory_path):
            for file in files:
                file_path = os.path.join(root, file)
                file_ext = os.path.splitext(file)[1].lower()

                # 如果指定了文件类型,进行过滤
                if file_extensions and file_ext not in file_extensions:
                    continue

                all_results['total_files'] += 1

                # 提取文件内容
                result = self.extract_content(file_path)
                all_results['files'].append(result)

                if result['success']:
                    all_results['successful_files'] += 1
                else:
                    all_results['failed_files'] += 1

        return all_results

    def generate_summary_report(self, extraction_results: Dict[str, Any]) -> str:
        """生成汇总报告"""
        report_parts = [
            "=" * 60,
            "文件内容提取汇总报告",
            "=" * 60,
            f"目录: {extraction_results['directory']}",
            f"总文件数: {extraction_results['total_files']}",
            f"成功提取: {extraction_results['successful_files']}",
            f"失败文件: {extraction_results['failed_files']}",
            "\n详细文件列表:"
        ]

        for i, file_result in enumerate(extraction_results['files'], 1):
            status = "✓" if file_result['success'] else "✗"
            report_parts.append(f"\n{i}. {status} {file_result['file_name']} ({file_result['file_type']})")
            if not file_result['success']:
                report_parts.append(f"   错误: {file_result['content']}")
            elif len(file_result['content']) > 200:
                preview = file_result['content'][:200] + "..."
                report_parts.append(f"   预览: {preview}")
            else:
                report_parts.append(f"   内容: {file_result['content']}")

        return "\n".join(report_parts)

    def save_results(self, extraction_results: Dict[str, Any], 
                    output_format: str = 'json',
                    output_path: str = 'extraction_results') -> str:
        """保存提取结果"""

        if output_format == 'json':
            output_file = f"{output_path}.json"
            with open(output_file, 'w', encoding='utf-8') as f:
                # 清理不能序列化的数据
                for file_result in extraction_results['files']:
                    if 'structured_data' in file_result:
                        # 简化Excel结构化数据
                        file_result['structured_data'] = str(file_result['structured_data'])
                json.dump(extraction_results, f, ensure_ascii=False, indent=2)
            return output_file

        elif output_format == 'txt':
            output_file = f"{output_path}.txt"
            report = self.generate_summary_report(extraction_results)
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(report)
            return output_file

        elif output_format == 'csv':
            output_file = f"{output_path}.csv"
            data = []
            for file_result in extraction_results['files']:
                data.append({
                    'file_name': file_result['file_name'],
                    'file_type': file_result['file_type'],
                    'status': '成功' if file_result['success'] else '失败',
                    'content_length': len(file_result['content']),
                    'preview': file_result['content'][:100] if file_result['success'] else file_result['content']
                })
            df = pd.DataFrame(data)
            df.to_csv(output_file, index=False, encoding='utf-8-sig')
            return output_file

        else:
            raise ValueError(f"不支持的输出格式: {output_format}")

# DeepSeek集成处理
class DeepSeekContentAnalyzer:
    """使用DeepSeek API进行内容分析(需要自行配置API密钥)"""

    def __init__(self, api_key: str = None):
        self.api_key = api_key
        self.base_url = "https://api.deepseek.com/v1/chat/completions"

    def analyze_content(self, content: str, prompt: str = None) -> str:
        """使用DeepSeek分析内容"""
        if not self.api_key:
            return "DeepSeek API密钥未配置,无法进行分析"

        import requests

        if prompt is None:
            prompt = """请分析以下文档内容,并给出:
            1. 主要内容摘要
            2. 关键信息提取
            3. 文档结构分析
            4. 重要数据点(如果有)
            """

        headers = {
            "Authorization": f"Bearer {self.api_key}",
            "Content-Type": "application/json"
        }

        payload = {
            "model": "deepseek-chat",
            "messages": [
                {"role": "system", "content": "你是一个专业的文档分析助手。"},
                {"role": "user", "content": f"{prompt}\n\n文档内容:\n{content[:4000]}"}  # 限制内容长度
            ],
            "temperature": 0.3
        }

        try:
            response = requests.post(self.base_url, headers=headers, json=payload, timeout=30)
            if response.status_code == 200:
                return response.json()['choices'][0]['message']['content']
            else:
                return f"API调用失败: {response.status_code} - {response.text}"
        except Exception as e:
            return f"分析过程出错: {str(e)}"

    def analyze_extracted_files(self, extraction_results: Dict[str, Any]) -> str:
        """分析所有提取的文件内容"""
        if not self.api_key:
            return "DeepSeek API密钥未配置"

        # 合并所有成功提取的内容
        all_content = []
        for file_result in extraction_results['files']:
            if file_result['success']:
                file_summary = f"\n{'='*50}\n文件: {file_result['file_name']}\n{'='*50}\n"
                file_summary += file_result['content'][:1000]  # 限制每个文件内容长度
                all_content.append(file_summary)

        combined_content = "\n".join(all_content)

        # 使用DeepSeek进行分析
        analysis_prompt = """请综合分析以下多个文档的内容,提供:
        1. 总体内容概述
        2. 跨文档的共同主题
        3. 重要信息和数据汇总
        4. 发现的关键问题或亮点
        5. 建议的后续步骤

        文档内容如下:
        """

        return self.analyze_content(combined_content, analysis_prompt)

# 使用示例
def main():
    # 1. 初始化提取器
    extractor = FileContentExtractor()

    # 2. 提取单个文件
    print("示例1: 提取单个PDF文件")
    pdf_result = extractor.extract_content("sample.pdf")
    print(f"提取成功: {pdf_result['success']}")
    print(f"内容长度: {len(pdf_result['content'])} 字符")

    # 3. 提取目录中所有文件
    print("\n示例2: 提取目录中所有支持的文件")
    directory_results = extractor.extract_from_directory(
        "./documents",  # 你的文档目录
        file_extensions=['.pdf', '.docx', '.xlsx', '.txt', '.md']
    )

    # 4. 生成报告
    report = extractor.generate_summary_report(directory_results)
    print(report)

    # 5. 保存结果
    json_file = extractor.save_results(directory_results, 'json', 'extraction_results')
    print(f"\n结果已保存到: {json_file}")

    # 6. 使用DeepSeek进行分析(可选)
    # analyzer = DeepSeekContentAnalyzer(api_key="your_deepseek_api_key")
    # analysis = analyzer.analyze_extracted_files(directory_results)
    # print("\nDeepSeek分析结果:")
    # print(analysis)

# 命令行接口
if __name__ == "__main__":
    import argparse

    parser = argparse.ArgumentParser(description='多格式文件内容提取与汇总工具')
    parser.add_argument('--path', type=str, help='文件或目录路径')
    parser.add_argument('--output', type=str, default='results', help='输出文件名')
    parser.add_argument('--format', type=str, choices=['json', 'txt', 'csv'], default='json', help='输出格式')

    args = parser.parse_args()

    if args.path:
        extractor = FileContentExtractor()

        if os.path.isfile(args.path):
            # 处理单个文件
            result = extractor.extract_content(args.path)
            print(f"文件: {result['file_name']}")
            print(f"类型: {result['file_type']}")
            print(f"成功: {result['success']}")
            print(f"\n内容预览:\n{result['content'][:500]}...")

        elif os.path.isdir(args.path):
            # 处理目录
            results = extractor.extract_from_directory(args.path)
            output_file = extractor.save_results(results, args.format, args.output)
            print(f"提取完成!结果已保存到: {output_file}")
            print(f"处理文件: {results['total_files']}个")
            print(f"成功: {results['successful_files']}个")
            print(f"失败: {results['failed_files']}个")

        else:
            print(f"路径不存在: {args.path}")
    else:
        print("请使用 --path 参数指定文件或目录路径")
        print("示例: python file_extractor.py --path ./documents --format json")

3. 简化版快速使用

# quick_start.py
from file_extractor import FileContentExtractor

# 快速开始
extractor = FileContentExtractor()

# 提取单个文件
result = extractor.extract_content("your_file.pdf")
print(result['content'][:1000])  # 打印前1000字符

# 批量提取目录
results = extractor.extract_from_directory("./your_docs")

# 保存为JSON
extractor.save_results(results, 'json', 'my_docs_content')

# 生成文本报告
report = extractor.generate_summary_report(results)
print(report)

4. 配置说明

支持的格式:

DeepSeek API配置:

# 如果需要使用DeepSeek分析功能
analyzer = DeepSeekContentAnalyzer(api_key="your-api-key-here")
analysis = analyzer.analyze_extracted_files(results)

5. 功能特点

多格式支持: 覆盖主流办公文档格式 批量处理: 支持整个目录的文件处理 结构化输出: JSON、TXT、CSV多种输出格式 容错处理: 单个文件失败不影响其他文件 内容预览: 智能截断长内容 扩展性: 易于添加新的文件类型处理器

6. 安装和运行

# 1. 克隆或创建项目
mkdir file-extractor
cd file-extractor

# 2. 安装依赖
pip install -r requirements.txt

# 3. 运行示例
python file_extractor.py --path ./your_documents --format json

这个系统可以轻松扩展到其他文件类型,并且与DeepSeek API集成后,可以提供智能的内容分析和摘要功能。

相关推荐