一、介绍

在处理大量数据或需要实时更新的场景中,JSON数据的流式传输变得越来越重要。本文将详细介绍如何实现一个支持分页、过滤、错误处理等功能的JSON流式传输系统。

二、基础实现

2.1 后端实现(FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import json
from typing import Dict, Any

app = FastAPI()

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

async def json_stream() -> Dict[str, Any]:
    data_list = [
        {
            "id": 1,
            "name": "Product 1",
            "price": 100
        },
        {
            "id": 2, 
            "name": "Product 2",
            "price": 200
        }
    ]
    
    for data in data_list:
        yield json.dumps(data, ensure_ascii=False) + "\n"
        await asyncio.sleep(1)

@app.get("/stream/json")
async def stream_json():
    return StreamingResponse(
        json_stream(),
        media_type='application/x-ndjson',
        headers={
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive'
        }
    )

2.2 前端实现

<!DOCTYPE html>
<html>
<head>
    <title>JSON Stream Basic Demo</title>
    <style>
        .container {
            margin: 20px;
            padding: 10px;
            border: 1px solid #ccc;
        }
        .json-item {
            margin: 10px 0;
            padding: 10px;
            background-color: #f5f5f5;
        }
    </style>
</head>
<body>
    <button onclick="startStream()">开始接收数据</button>
    <div id="container" class="container"></div>

    <script>
        async function startStream() {
            try {
                const response = await fetch('http://localhost:8000/stream/json');
                const reader = response.body.getReader();
                const decoder = new TextDecoder();

                while (true) {
                    const {value, done} = await reader.read();
                    if (done) break;
                    
                    const text = decoder.decode(value);
                    const lines = text.split('\n');
                    
                    for (const line of lines) {
                        if (line.trim()) {
                            displayData(JSON.parse(line));
                        }
                    }
                }
            } catch (error) {
                console.error('Error:', error);
            }
        }

        function displayData(data) {
            const container = document.getElementById('container');
            const div = document.createElement('div');
            div.className = 'json-item';
            div.innerHTML = `<pre>${JSON.stringify(data, null, 2)}</pre>`;
            container.appendChild(div);
        }
    </script>
</body>
</html>

三、进阶功能实现

3.1 支持分页和过滤的后端代码

from fastapi import FastAPI, Query
from typing import Optional

# 模拟数据源
def get_mock_data():
    return [
        {
            "id": i,
            "name": f"Product {i}",
            "category": "A" if i % 2 == 0 else "B",
            "price": i * 100,
            "status": "active" if i % 3 == 0 else "inactive"
        }
        for i in range(1, 101)
    ]

async def filtered_json_stream(
    page_size: int = 10,
    filter_key: Optional[str] = None,
    filter_value: Optional[str] = None
):
    all_data = get_mock_data()
    
    # 应用过滤
    if filter_key and filter_value:
        filtered_data = [
            item for item in all_data 
            if str(item.get(filter_key, "")).lower() == str(filter_value).lower()
        ]
    else:
        filtered_data = all_data

    # 计算分页信息
    total_items = len(filtered_data)
    total_pages = (total_items + page_size - 1) // page_size

    # 分页处理
    for page in range(total_pages):
        start_idx = page * page_size
        end_idx = min(start_idx + page_size, total_items)
        batch = filtered_data[start_idx:end_idx]

        response_data = {
            "page": page + 1,
            "total_pages": total_pages,
            "total_items": total_items,
            "page_size": page_size,
            "data": batch
        }

        yield json.dumps(response_data, ensure_ascii=False) + "\n"
        await asyncio.sleep(1)

@app.get("/stream/filtered-json")
async def stream_filtered_json(
    page_size: int = Query(default=10, gt=0, le=100),
    filter_key: Optional[str] = None,
    filter_value: Optional[str] = None
):
    return StreamingResponse(
        filtered_json_stream(page_size, filter_key, filter_value),
        media_type='text/event-stream'
    )

3.2 增强的前端实现

<!DOCTYPE html>
<html>
<head>
    <title>Advanced JSON Stream Demo</title>
    <style>
        /* 样式定义... */
    </style>
</head>
<body>
    <div class="controls">
        <label>Page Size: 
            <input type="number" id="pageSize" value="10" min="1" max="100">
        </label>
        <label>Filter Key: 
            <select id="filterKey">
                <option value="">None</option>
                <option value="category">Category</option>
                <option value="status">Status</option>
            </select>
        </label>
        <label>Filter Value: 
            <input type="text" id="filterValue">
        </label>
        <button onclick="startFilteredStream()">Start Stream</button>
    </div>
    <div id="paginationInfo"></div>
    <div id="container"></div>

    <script>
        async function startFilteredStream() {
            // 构建查询参数
            const params = new URLSearchParams({
                page_size: document.getElementById('pageSize').value
            });

            const filterKey = document.getElementById('filterKey').value;
            const filterValue = document.getElementById('filterValue').value;

            if (filterKey && filterValue) {
                params.append('filter_key', filterKey);
                params.append('filter_value', filterValue);
            }

            try {
                const response = await fetch(
                    `http://localhost:8000/stream/filtered-json?${params}`
                );
                const reader = response.body.getReader();
                const decoder = new TextDecoder();

                while (true) {
                    const {value, done} = await reader.read();
                    if (done) break;
                    
                    const text = decoder.decode(value);
                    processStreamData(text);
                }
            } catch (error) {
                handleError(error);
            }
        }

        function processStreamData(text) {
            const lines = text.split('\n');
            for (const line of lines) {
                if (line.trim()) {
                    const data = JSON.parse(line);
                    displayPagedData(data);
                }
            }
        }

        function displayPagedData(data) {
            // 显示分页信息
            document.getElementById('paginationInfo').innerHTML = 
                `Page ${data.page} of ${data.total_pages}`;

            // 显示数据
            const container = document.getElementById('container');
            const div = document.createElement('div');
            div.className = 'json-item';
            div.innerHTML = `
                <h3>Page ${data.page}</h3>
                <pre>${JSON.stringify(data.data, null, 2)}</pre>
            `;
            container.appendChild(div);
        }

        function handleError(error) {
            console.error('Error:', error);
            document.getElementById('container').innerHTML = `
                <div class="error">Error: ${error.message}</div>
            `;
        }
    </script>
</body>
</html>

四、进阶特性

4.1 错误处理和重试机制

class StreamManager {
    constructor(maxRetries = 3) {
        this.maxRetries = maxRetries;
        this.retryCount = 0;
    }

    async startStream(url) {
        while (this.retryCount < this.maxRetries) {
            try {
                await this.handleStream(url);
                break;
            } catch (error) {
                this.retryCount++;
                if (this.retryCount >= this.maxRetries) {
                    throw error;
                }
                await this.delay(1000 * this.retryCount);
            }
        }
    }

    async delay(ms) {
        return new Promise(resolve => setTimeout(resolve, ms));
    }
}

4.2 内存管理

function manageMemory(container, maxItems = 1000) {
    const items = container.children;
    if (items.length > maxItems) {
        // 移除旧数据
        for (let i = 0; i < items.length - maxItems; i++) {
            container.removeChild(items[i]);
        }
    }
}

4.3 数据验证

function validateJsonData(data) {
    const requiredFields = ['page', 'total_pages', 'data'];
    for (const field of requiredFields) {
        if (!(field in data)) {
            throw new Error(`Missing required field: ${field}`);
        }
    }
    return true;
}

五、性能优化建议

  1. 使用WebWorker处理数据

// worker.js
self.onmessage = function(e) {
    const data = e.data;
    // 处理数据
    const processed = processData(data);
    self.postMessage(processed);
};

  1. 实现虚拟滚动

function setupVirtualScroll(container) {
    const observer = new IntersectionObserver(
        (entries) => {
            entries.forEach(entry => {
                if (entry.isIntersecting) {
                    loadMoreData();
                }
            });
        },
        { threshold: 0.5 }
    );
    
    // 观察容器底部元素
    const sentinel = document.createElement('div');
    container.appendChild(sentinel);
    observer.observe(sentinel);
}

六、最佳实践

  1. 合理的缓冲区大小

  2. 适当的错误处理

  3. 内存管理策略

  4. 用户反馈机制

  5. 数据完整性验证

七、注意事项

  1. 需要考虑浏览器兼容性

  2. 处理断网重连情况

  3. 合理控制数据流速率

  4. 注意内存泄露问题

  5. 实现适当的错误提示

八、总结

JSON数据流式传输是一个强大的技术,通过合理实现可以很好地处理大量数据的实时传输需求。本文介绍的进阶实现方案,结合了分页、过滤、错误处理等特性,可以作为实际开发的参考。

关键点:

  • 使用合适的数据传输格式

  • 实现可靠的错误处理

  • 注意性能优化

  • 保持良好的用户体验

希望这篇教程对你实现JSON数据流式传输有所帮助!

九、参考资源

  1. FastAPI官方文档

  2. MDN Web Docs - Streams

  3. JavaScript性能优化