
JSON数据流式传输进阶教程
一、介绍
在处理大量数据或需要实时更新的场景中,JSON数据的流式传输变得越来越重要。本文将详细介绍如何实现一个支持分页、过滤、错误处理等功能的JSON流式传输系统。
二、基础实现
2.1 后端实现(FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import asyncio
import json
from typing import Dict, Any
app = FastAPI()
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
async def json_stream() -> Dict[str, Any]:
data_list = [
{
"id": 1,
"name": "Product 1",
"price": 100
},
{
"id": 2,
"name": "Product 2",
"price": 200
}
]
for data in data_list:
yield json.dumps(data, ensure_ascii=False) + "\n"
await asyncio.sleep(1)
@app.get("/stream/json")
async def stream_json():
return StreamingResponse(
json_stream(),
media_type='application/x-ndjson',
headers={
'Cache-Control': 'no-cache',
'Connection': 'keep-alive'
}
)
2.2 前端实现
<!DOCTYPE html>
<html>
<head>
<title>JSON Stream Basic Demo</title>
<style>
.container {
margin: 20px;
padding: 10px;
border: 1px solid #ccc;
}
.json-item {
margin: 10px 0;
padding: 10px;
background-color: #f5f5f5;
}
</style>
</head>
<body>
<button onclick="startStream()">开始接收数据</button>
<div id="container" class="container"></div>
<script>
async function startStream() {
try {
const response = await fetch('http://localhost:8000/stream/json');
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {value, done} = await reader.read();
if (done) break;
const text = decoder.decode(value);
const lines = text.split('\n');
for (const line of lines) {
if (line.trim()) {
displayData(JSON.parse(line));
}
}
}
} catch (error) {
console.error('Error:', error);
}
}
function displayData(data) {
const container = document.getElementById('container');
const div = document.createElement('div');
div.className = 'json-item';
div.innerHTML = `<pre>${JSON.stringify(data, null, 2)}</pre>`;
container.appendChild(div);
}
</script>
</body>
</html>
三、进阶功能实现
3.1 支持分页和过滤的后端代码
from fastapi import FastAPI, Query
from typing import Optional
# 模拟数据源
def get_mock_data():
return [
{
"id": i,
"name": f"Product {i}",
"category": "A" if i % 2 == 0 else "B",
"price": i * 100,
"status": "active" if i % 3 == 0 else "inactive"
}
for i in range(1, 101)
]
async def filtered_json_stream(
page_size: int = 10,
filter_key: Optional[str] = None,
filter_value: Optional[str] = None
):
all_data = get_mock_data()
# 应用过滤
if filter_key and filter_value:
filtered_data = [
item for item in all_data
if str(item.get(filter_key, "")).lower() == str(filter_value).lower()
]
else:
filtered_data = all_data
# 计算分页信息
total_items = len(filtered_data)
total_pages = (total_items + page_size - 1) // page_size
# 分页处理
for page in range(total_pages):
start_idx = page * page_size
end_idx = min(start_idx + page_size, total_items)
batch = filtered_data[start_idx:end_idx]
response_data = {
"page": page + 1,
"total_pages": total_pages,
"total_items": total_items,
"page_size": page_size,
"data": batch
}
yield json.dumps(response_data, ensure_ascii=False) + "\n"
await asyncio.sleep(1)
@app.get("/stream/filtered-json")
async def stream_filtered_json(
page_size: int = Query(default=10, gt=0, le=100),
filter_key: Optional[str] = None,
filter_value: Optional[str] = None
):
return StreamingResponse(
filtered_json_stream(page_size, filter_key, filter_value),
media_type='text/event-stream'
)
3.2 增强的前端实现
<!DOCTYPE html>
<html>
<head>
<title>Advanced JSON Stream Demo</title>
<style>
/* 样式定义... */
</style>
</head>
<body>
<div class="controls">
<label>Page Size:
<input type="number" id="pageSize" value="10" min="1" max="100">
</label>
<label>Filter Key:
<select id="filterKey">
<option value="">None</option>
<option value="category">Category</option>
<option value="status">Status</option>
</select>
</label>
<label>Filter Value:
<input type="text" id="filterValue">
</label>
<button onclick="startFilteredStream()">Start Stream</button>
</div>
<div id="paginationInfo"></div>
<div id="container"></div>
<script>
async function startFilteredStream() {
// 构建查询参数
const params = new URLSearchParams({
page_size: document.getElementById('pageSize').value
});
const filterKey = document.getElementById('filterKey').value;
const filterValue = document.getElementById('filterValue').value;
if (filterKey && filterValue) {
params.append('filter_key', filterKey);
params.append('filter_value', filterValue);
}
try {
const response = await fetch(
`http://localhost:8000/stream/filtered-json?${params}`
);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const {value, done} = await reader.read();
if (done) break;
const text = decoder.decode(value);
processStreamData(text);
}
} catch (error) {
handleError(error);
}
}
function processStreamData(text) {
const lines = text.split('\n');
for (const line of lines) {
if (line.trim()) {
const data = JSON.parse(line);
displayPagedData(data);
}
}
}
function displayPagedData(data) {
// 显示分页信息
document.getElementById('paginationInfo').innerHTML =
`Page ${data.page} of ${data.total_pages}`;
// 显示数据
const container = document.getElementById('container');
const div = document.createElement('div');
div.className = 'json-item';
div.innerHTML = `
<h3>Page ${data.page}</h3>
<pre>${JSON.stringify(data.data, null, 2)}</pre>
`;
container.appendChild(div);
}
function handleError(error) {
console.error('Error:', error);
document.getElementById('container').innerHTML = `
<div class="error">Error: ${error.message}</div>
`;
}
</script>
</body>
</html>
四、进阶特性
4.1 错误处理和重试机制
class StreamManager {
constructor(maxRetries = 3) {
this.maxRetries = maxRetries;
this.retryCount = 0;
}
async startStream(url) {
while (this.retryCount < this.maxRetries) {
try {
await this.handleStream(url);
break;
} catch (error) {
this.retryCount++;
if (this.retryCount >= this.maxRetries) {
throw error;
}
await this.delay(1000 * this.retryCount);
}
}
}
async delay(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
4.2 内存管理
function manageMemory(container, maxItems = 1000) {
const items = container.children;
if (items.length > maxItems) {
// 移除旧数据
for (let i = 0; i < items.length - maxItems; i++) {
container.removeChild(items[i]);
}
}
}
4.3 数据验证
function validateJsonData(data) {
const requiredFields = ['page', 'total_pages', 'data'];
for (const field of requiredFields) {
if (!(field in data)) {
throw new Error(`Missing required field: ${field}`);
}
}
return true;
}
五、性能优化建议
使用WebWorker处理数据
// worker.js
self.onmessage = function(e) {
const data = e.data;
// 处理数据
const processed = processData(data);
self.postMessage(processed);
};
实现虚拟滚动
function setupVirtualScroll(container) {
const observer = new IntersectionObserver(
(entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
loadMoreData();
}
});
},
{ threshold: 0.5 }
);
// 观察容器底部元素
const sentinel = document.createElement('div');
container.appendChild(sentinel);
observer.observe(sentinel);
}
六、最佳实践
合理的缓冲区大小
适当的错误处理
内存管理策略
用户反馈机制
数据完整性验证
七、注意事项
需要考虑浏览器兼容性
处理断网重连情况
合理控制数据流速率
注意内存泄露问题
实现适当的错误提示
八、总结
JSON数据流式传输是一个强大的技术,通过合理实现可以很好地处理大量数据的实时传输需求。本文介绍的进阶实现方案,结合了分页、过滤、错误处理等特性,可以作为实际开发的参考。
关键点:
使用合适的数据传输格式
实现可靠的错误处理
注意性能优化
保持良好的用户体验
希望这篇教程对你实现JSON数据流式传输有所帮助!
九、参考资源
本文是原创文章,完整转载请注明来自 俞泊
评论
匿名评论
隐私政策
你无需删除空行,直接评论以获取最佳展示效果