JSON合并工具

JSON合并工具

JSON合并工具

1. 项目概述

本项目旨在开发一个强大而灵活的JSON合并工具,能够合并多个JSON文件,处理复杂的嵌套结构,提供详细的合并报告,并实现全面的验证和错误处理机制。

2. 功能需求

2.1 基本合并功能

支持合并两个或多个JSON文件处理嵌套的JSON结构提供不同的合并策略选项(如覆盖、保留原值)

2.2 验证和错误处理

JSON结构验证数据类型一致性检查文件大小限制检查键名验证详细的错误报告警告系统错误恢复机制操作日志记录

2.3 合并报告生成

生成详细的合并过程报告包含基本信息、合并统计、详细操作日志、警告和错误摘要、性能指标

2.4 多文件合并

支持任意数量的输入JSON文件按指定顺序依次合并文件为每个输入文件生成单独的统计信息

3. 技术设计

3.1 合并算法

使用递归方法处理嵌套的JSON结构:

遍历第二个JSON对象的所有键值对如果键在第一个对象中不存在,直接添加如果键存在且值都是字典,递归合并如果键存在且值都是列表,合并列表如果键存在但值类型不同,根据策略处理(覆盖或保留)如果键存在且值类型相同,根据策略更新

3.2 验证机制

JSON结构验证:使用json.loads()验证JSON格式深度检查:递归检查JSON嵌套深度,设置最大深度限制大小检查:在读取文件前检查文件大小类型一致性:在合并过程中检查相同键的值类型

3.3 错误处理

使用try-except块捕获并处理异常实现自定义异常类处理特定错误使用logging模块记录警告和错误对于非致命错误,提供继续处理的选项

3.4 报告生成

使用MergeReport类管理报告生成:

在合并过程中记录每个操作统计新增、更新和冲突的键数量记录警告和错误生成性能指标(处理时间、内存使用)格式化输出详细的报告

3.5 多文件处理

使用列表存储多个输入文件路径逐个处理文件,将结果合并到一个主JSON对象中在报告中分别记录每个文件的处理情况

3.6 命令行接口

使用argparse模块处理命令行参数:

输入文件路径(支持多个)输出文件路径合并策略选项报告输出路径选项

4. 实现细节

4.1 主要类和函数

MergeReport 类:管理报告生成merge_json() 函数:实现JSON合并逻辑merge_json_files() 函数:处理文件I/O和调用合并函数main() 函数:处理命令行参数和orchestrate整个过程

4.2 数据结构

使用Python的字典表示JSON对象使用列表存储多个输入文件路径

4.3 外部依赖

json:用于JSON解析和序列化argparse:用于命令行参数处理logging:用于日志记录psutil:用于获取内存使用情况(可选)

5. 使用示例

python merge_json.py file1.json file2.json file3.json output.json --strategy overwrite --report merge_report.txt

6. 未来扩展

性能优化:实现流式处理或分块处理大文件并行处理:使用多线程或多进程加速处理配置文件:支持通过配置文件指定复杂的合并规则可视化:生成合并过程的可视化表示GUI界面:开发图形用户界面,提高易用性

7. 结论

这个JSON合并工具提供了强大的功能,包括多文件合并、详细的报告生成、全面的验证和错误处理。考虑了灵活性和可扩展性,能够满足各种复杂的JSON合并需求。持续的优化和功能扩展,这个工具可以成为处理JSON数据的有力助手。

8.代码

import json

import sys

import os

import logging

import time

import argparse

from typing import Dict, Any, List

class MergeReport:

def __init__(self):

self.start_time = time.time()

self.total_keys = 0

self.new_keys = 0

self.updated_keys = 0

self.conflict_keys = 0

self.warnings = []

self.errors = []

self.detailed_log = []

self.file_stats = {}

def add_operation(self, file: str, key: str, operation: str, details: str = ""):

self.detailed_log.append(f"{file} - {key}: {operation} - {details}")

self.total_keys += 1

if operation == "新增":

self.new_keys += 1

elif operation == "更新":

self.updated_keys += 1

elif operation == "冲突":

self.conflict_keys += 1

if file not in self.file_stats:

self.file_stats[file] = {"新增": 0, "更新": 0, "冲突": 0}

self.file_stats[file][operation] += 1

def add_warning(self, message: str):

self.warnings.append(message)

def add_error(self, message: str):

self.errors.append(message)

def generate_report(self, input_files: List[str], output_file: str, strategy: str) -> str:

end_time = time.time()

process_time = end_time - self.start_time

report = f"""

合并报告

========

基本信息:

- 合并时间: {time.strftime('%Y-%m-%d %H:%M:%S')}

- 输入文件:

{chr(10).join([' - ' + file for file in input_files])}

- 输出文件: {output_file}

- 合并策略: {strategy}

合并统计:

- 总处理键数: {self.total_keys}

- 新增键数: {self.new_keys}

- 更新键数: {self.updated_keys}

- 冲突键数: {self.conflict_keys}

文件统计:

"""

for file, stats in self.file_stats.items():

report += f"- {file}:\n"

report += f" 新增: {stats['新增']}, 更新: {stats['更新']}, 冲突: {stats['冲突']}\n"

report += f"""

详细操作日志:

{chr(10).join(self.detailed_log)}

警告:

{chr(10).join(self.warnings) if self.warnings else "无"}

错误:

{chr(10).join(self.errors) if self.errors else "无"}

性能指标:

- 处理时间: {process_time:.2f} 秒

- 峰值内存使用: {self.get_peak_memory_usage()} MB

"""

return report

def get_peak_memory_usage(self):

import psutil

process = psutil.Process(os.getpid())

return process.memory_info().peak_wset / 1024 / 1024 # 转换为MB

def merge_json(data1: Dict[str, Any], data2: Dict[str, Any], strategy: str, report: MergeReport, file_name: str) -> Dict[str, Any]:

result = data1.copy()

for key, value in data2.items():

if key in result:

if isinstance(result[key], dict) and isinstance(value, dict):

result[key] = merge_json(result[key], value, strategy, report, file_name)

report.add_operation(file_name, key, "更新", "合并嵌套字典")

elif isinstance(result[key], list) and isinstance(value, list):

result[key] = result[key] + value

report.add_operation(file_name, key, "更新", "合并列表")

elif type(result[key]) != type(value):

report.add_warning(f"类型不匹配: 文件 {file_name} 中的键 '{key}' 与现有数据类型不同")

if strategy == 'overwrite':

result[key] = value

report.add_operation(file_name, key, "冲突", f"类型不匹配,使用新文件的值")

else:

report.add_operation(file_name, key, "冲突", f"类型不匹配,保留原值")

elif strategy == 'overwrite':

result[key] = value

report.add_operation(file_name, key, "更新", "覆盖现有值")

else:

report.add_operation(file_name, key, "保留", "保留原有值")

else:

result[key] = value

report.add_operation(file_name, key, "新增", "添加新键")

return result

def merge_json_files(input_files: List[str], output_file: str, strategy: str = 'overwrite') -> str:

report = MergeReport()

merged_data = {}

try:

for file in input_files:

with open(file, 'r', encoding='utf-8') as f:

data = json.load(f)

merged_data = merge_json(merged_data, data, strategy, report, file)

with open(output_file, 'w', encoding='utf-8') as out_file:

json.dump(merged_data, out_file, ensure_ascii=False, indent=4)

return report.generate_report(input_files, output_file, strategy)

except Exception as e:

report.add_error(f"合并过程中发生错误: {str(e)}")

return report.generate_report(input_files, output_file, strategy)

def main():

parser = argparse.ArgumentParser(description="合并多个JSON文件并生成报告")

parser.add_argument('input_files', nargs='+', type=str, help="输入JSON文件的路径列表")

parser.add_argument('output', type=str, help="输出JSON文件的路径")

parser.add_argument('--strategy', type=str, choices=['overwrite', 'keep'], default='overwrite',

help="合并策略: 'overwrite' 覆盖重复键, 'keep' 保留原始值 (默认: overwrite)")

parser.add_argument('--report', type=str, help="合并报告输出路径")

args = parser.parse_args()

try:

report = merge_json_files(args.input_files, args.output, args.strategy)

if args.report:

with open(args.report, 'w', encoding='utf-8') as report_file:

report_file.write(report)

print(f"合并报告已保存到: {args.report}")

else:

print(report)

except Exception as e:

print(f"错误: {str(e)}")

sys.exit(1)

if __name__ == "__main__":

main()