文件操作
AIO Sandbox 通过 /v1/file/* 提供文件 API,可在沙盒内读取、写入、搜索、上传、下载和监听文件。
下面示例默认沙盒入口为 http://localhost:8080,路径均指容器内路径。
端点速览
| 端点 | 用途 |
|---|
POST /v1/file/read | 读取文本文件,可指定行范围 |
POST /v1/file/write | 写入文本或 base64 编码的二进制内容 |
POST /v1/file/replace | 替换文件中的文本 |
POST /v1/file/search | 在单个文件中用正则搜索 |
POST /v1/file/find | 使用简单 glob 查找文件 |
POST /v1/file/grep | 在文件树中搜索,支持 include/exclude |
POST /v1/file/glob | 使用高级 glob 列出文件 |
POST /v1/file/list | 列出目录 |
POST /v1/file/upload | 通过 multipart/form-data 上传文件 |
GET /v1/file/download | 下载文件流 |
POST /v1/file/watch | 创建文件 watcher |
POST /v1/file/watch/{watcher_id}/poll | 长轮询 watcher 事件 |
POST /v1/file/watch/wait | 等待单个文件事件 |
GET /v1/file/watch/{watcher_id}/events | 通过 SSE 消费 watcher 事件 |
DELETE /v1/file/watch/{watcher_id} | 停止 watcher |
错误处理
大多数文件 API 的预期内文件系统失败,会返回 HTTP 200、success=false,并在 data 中提供结构化错误。接入时建议按这个顺序判断:
- 先看 HTTP 状态码。
- 再看
success。
- 如果
success=false,优先读 data.error_type 和 data.errno_name。
文件不存在示例:
{
"success": false,
"message": "Failed to read file: [Errno 2] No such file or directory: '/tmp/missing.txt'",
"data": {
"path": "/tmp/missing.txt",
"operation": "read",
"message": "Failed to read file: [Errno 2] No such file or directory: '/tmp/missing.txt'",
"error_type": "not_found",
"retryable": false,
"errno": 2,
"errno_name": "ENOENT",
"exception_type": "FileNotFoundError"
}
}
常见 error_type 包括 not_found、permission_denied、invalid_target、already_exists、invalid_path、read_only_filesystem、no_space_left、decode_error、io_error。
GET /v1/file/download 是主要例外:成功时返回二进制流,失败时按 HTTP 错误处理。File watch 端点也更接近资源型 JSON 和 HTTP 状态码,不统一使用 success 包装。
跨 API 规则见 错误处理。
读取文件
读取文件内容,可指定 0-based 行范围。end_line 不包含在返回结果内。
curl -X POST http://localhost:8080/v1/file/read \
-H "Content-Type: application/json" \
-d '{
"file": "/home/gem/.bashrc",
"start_line": 0,
"end_line": 10,
"sudo": false
}'
成功响应:
{
"success": true,
"message": "File read successfully",
"data": {
"content": "export PATH=...",
"line_count": 10,
"file": "/home/gem/.bashrc"
}
}
写入文件
文本
curl -X POST http://localhost:8080/v1/file/write \
-H "Content-Type: application/json" \
-d '{
"file": "/tmp/output.txt",
"content": "Hello, World!",
"encoding": "utf-8",
"append": false,
"leading_newline": false,
"trailing_newline": true,
"sudo": false
}'
二进制
图片、PDF、压缩包等二进制文件使用 encoding: "base64"。客户端先把字节编码成 base64,再传给 content。
curl -X POST http://localhost:8080/v1/file/write \
-H "Content-Type: application/json" \
-d '{
"file": "/tmp/pixel.png",
"content": "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk+M9QDwADhgGAWjR9awAAAABJRU5ErkJggg==",
"encoding": "base64"
}'
支持的编码:
| Encoding | 适合用途 | content 形式 |
|---|
utf-8 | 文本文件 | 原始文本 |
base64 | 二进制文件 | Base64 字符串 |
raw | 高级字节处理 | Latin-1 风格字符串 |
替换与搜索
替换文本:
curl -X POST http://localhost:8080/v1/file/replace \
-H "Content-Type: application/json" \
-d '{
"file": "/tmp/output.txt",
"old_str": "World",
"new_str": "Sandbox",
"sudo": false
}'
在单个文件中用正则搜索:
curl -X POST http://localhost:8080/v1/file/search \
-H "Content-Type: application/json" \
-d '{
"file": "/tmp/output.txt",
"regex": "Hello,\\s+\\w+"
}'
在目录树中搜索:
curl -X POST http://localhost:8080/v1/file/grep \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem/workspace",
"pattern": "TODO",
"include": ["*.py", "*.ts", "*.tsx"],
"exclude": ["node_modules", ".git"],
"case_insensitive": true,
"max_results": 100
}'
查找、Glob 与目录列表
简单文件名匹配用 find:
curl -X POST http://localhost:8080/v1/file/find \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem/workspace/src",
"glob": "*.py"
}'
需要递归匹配、元数据、排序和隐藏文件控制时使用 glob:
curl -X POST http://localhost:8080/v1/file/glob \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem/workspace",
"pattern": "**/*.json",
"exclude": ["node_modules/**"],
"include_hidden": false,
"files_only": true,
"include_metadata": true,
"max_results": 200,
"sort_by": "path"
}'
列出目录:
curl -X POST http://localhost:8080/v1/file/list \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem/workspace",
"recursive": false,
"show_hidden": true,
"include_size": true
}'
上传与下载
使用 multipart 表单上传:
curl -X POST http://localhost:8080/v1/file/upload \
-F "file=@./report.pdf" \
-F "path=/tmp/report.pdf"
以文件流下载:
curl -o report.pdf \
"http://localhost:8080/v1/file/download?path=/tmp/report.pdf"
如果下载的文件可能还在被后台任务写入,可以使用 change_policy=abort。服务端在传输开始前或传输中检测到源文件变化时,会返回 HTTP 409 或中断本次传输。
curl -o report.pdf \
"http://localhost:8080/v1/file/download?path=/tmp/report.pdf&change_policy=abort"
文件监听
File watch 适合等待生成文件、监听浏览器下载、或在后台命令结束后刷新文件树。
等待单个文件
只关心一个文件被创建、写入、删除、重命名或 chmod 时,使用 wait。
curl -X POST http://localhost:8080/v1/file/watch/wait \
-H "Content-Type: application/json" \
-d '{
"path": "/tmp/demo/result.json",
"timeout": 30,
"event_types": ["create", "write"]
}'
响应示例:
{
"event": {
"seq": 1,
"type": "write",
"path": "/tmp/demo/result.json",
"relative_path": "result.json",
"old_path": null,
"is_dir": false,
"timestamp": 1776823501.334,
"mtime": 1776823501.321,
"size": 2048,
"inode": 91827555
}
}
如果文件在调用时已经存在,且 event_types 包含 create,wait 可能立即返回 create 事件。只关心后续变化时,不要把 create 放进 event_types。
长轮询目录
CLI、CI 和 Agent 持续消费变化时,推荐 create + poll + delete,清理最明确。
WATCHER_ID=$(curl -s -X POST http://localhost:8080/v1/file/watch \
-H "Content-Type: application/json" \
-d '{
"path": "/home/gem/workspace",
"recursive": true,
"debounce": 200,
"exclude": [".git", "node_modules", "dist", ".next"],
"include_patterns": ["*.py", "*.ts", "*.tsx", "*.json"]
}' | jq -r ".data.watcher_id")
curl -X POST "http://localhost:8080/v1/file/watch/${WATCHER_ID}/poll" \
-H "Content-Type: application/json" \
-d '{
"cursor": 0,
"limit": 100,
"timeout": 10
}'
curl -X DELETE "http://localhost:8080/v1/file/watch/${WATCHER_ID}"
cursor 表示“已经消费到的最后一条 seq”。下一次请求直接传回响应里的 cursor,不要自行加一。如果返回 overflow=true,说明增量历史不完整,应做一次全量刷新后继续。
UI 实时刷新使用 SSE
浏览器前端可以使用 Server-Sent Events:
GET /v1/file/watch/{watcher_id}/events
Accept: text/event-stream
事件流会发出 watch_started、file_change 和 overflow。每个 file_change 都是上面的文件事件结构,并带有 {watcher_id}:{seq} 形式的 SSE id。
集成示例
Python 集成
import requests
class SandboxFileAPI:
def __init__(self, base_url="http://localhost:8080"):
self.base_url = base_url.rstrip("/")
def read_file(self, file_path, start_line=None, end_line=None):
payload = {"file": file_path}
if start_line is not None:
payload["start_line"] = start_line
if end_line is not None:
payload["end_line"] = end_line
response = requests.post(
f"{self.base_url}/v1/file/read",
json=payload,
timeout=30,
)
return response.json()
def write_file(self, file_path, content, append=False):
response = requests.post(
f"{self.base_url}/v1/file/write",
json={
"file": file_path,
"content": content,
"append": append,
},
timeout=30,
)
return response.json()
def search_files(self, pattern, directory="/home/gem/workspace"):
response = requests.post(
f"{self.base_url}/v1/file/find",
json={
"path": directory,
"glob": pattern,
},
timeout=30,
)
return response.json()
api = SandboxFileAPI()
api.write_file("/home/gem/workspace/config.json", '{"debug": false}\n')
config = api.read_file("/home/gem/workspace/config.json")
print(config["data"]["content"])
api.write_file("/home/gem/workspace/app.log", "Process started\n", append=True)
files = api.search_files("*.py", "/home/gem/workspace/src")
for file_path in files["data"]["files"]:
print(f"Found: {file_path}")
JavaScript / Node.js 集成
class SandboxFileAPI {
constructor(baseUrl = "http://localhost:8080") {
this.baseUrl = baseUrl.replace(/\/$/, "");
}
async readFile(filePath, options = {}) {
const response = await fetch(`${this.baseUrl}/v1/file/read`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ file: filePath, ...options }),
});
return response.json();
}
async writeFile(filePath, content, options = {}) {
const response = await fetch(`${this.baseUrl}/v1/file/write`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ file: filePath, content, ...options }),
});
return response.json();
}
async replaceInFile(filePath, oldStr, newStr) {
const response = await fetch(`${this.baseUrl}/v1/file/replace`, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
file: filePath,
old_str: oldStr,
new_str: newStr,
}),
});
return response.json();
}
}
const api = new SandboxFileAPI();
const result = await api.readFile("/home/gem/workspace/data.txt");
if (result.success) {
console.log("File content:", result.data.content);
await api.replaceInFile(
"/home/gem/workspace/config.json",
'"debug": false',
'"debug": true',
);
}
文件系统集成
共享访问
文件在 Shell、浏览器下载、代码执行、code-server 和文件 API 之间共享:
# 通过 API 创建文件
curl -X POST http://localhost:8080/v1/file/write \
-H "Content-Type: application/json" \
-d '{"file": "/home/gem/workspace/shared.txt", "content": "Shared content"}'
# 在终端中读取
# ws://localhost:8080/v1/shell/ws
# > cat /home/gem/workspace/shared.txt
# Shared content
# 在 Code Server 中编辑
# http://localhost:8080/code-server/
# 打开 /home/gem/workspace/shared.txt
工作流示例
完整的文件处理流程通常是:
- 通过浏览器或上传接口把文件放到
/home/gem/workspace。
- 使用文件 API 读取和转换。
- 执行 Shell 或 Code Execution 命令处理数据。
- 在 code-server 中查看结果,或通过下载接口取回产物。
import json
content = client.file.read_file(
file="/home/gem/Downloads/data.csv",
).data.content
processed = process_csv(content)
client.file.write_file(
file="/home/gem/workspace/results.json",
content=json.dumps(processed),
)
python /home/gem/workspace/analyze.py /home/gem/workspace/results.json
curl -o report.pdf \
"http://localhost:8080/v1/file/download?path=/home/gem/workspace/report.pdf"
高级功能
批量操作
高效处理多个文件:
def batch_process_files(api, directory, pattern):
files_result = api.search_files(pattern, directory)
for file_path in files_result["data"]["files"]:
content_result = api.read_file(file_path)
if content_result["success"]:
content = content_result["data"]["content"]
output_path = file_path.replace(".txt", "_processed.txt")
api.write_file(output_path, content.upper())
batch_process_files(api, "/home/gem/workspace/data", "*.txt")
错误处理
文件操作建议同时处理 HTTP 错误和业务响应里的 success=false:
import json
import requests
def safe_file_operation(api, operation, **kwargs):
try:
result = operation(**kwargs)
if result["success"]:
return result["data"]
error = result.get("data") or {}
if error.get("error_type") == "not_found":
print("File does not exist")
return None
print(f"Operation failed: {result['message']}")
return None
except requests.exceptions.RequestException as exc:
print(f"Network error: {exc}")
return None
except json.JSONDecodeError as exc:
print(f"JSON decode error: {exc}")
return None
content = safe_file_operation(
api,
api.read_file,
file_path="/home/gem/workspace/data.txt",
)
权限管理
默认使用沙盒内普通用户权限。只有确实需要访问受保护路径时再启用 sudo:
result = api.read_file("/home/gem/workspace/file.txt")
result = api.read_file("/etc/nginx/nginx.conf", sudo=True)
api.write_file(
"/etc/cron.d/example",
"0 2 * * * root /home/gem/workspace/backup.sh\n",
sudo=True,
)
安全注意事项
文件访问控制
- 默认使用应用用户权限。
- 可按需启用
sudo,但应避免把用户输入直接用于受保护路径。
- 对用户提供的路径做归一化和目录范围校验。
- 对上传大小和下载大小设置业务侧限制。
安全实现示例
import os
import re
def secure_file_operation(file_path, base_directory="/home/gem/workspace"):
normalized = os.path.normpath(file_path)
if not normalized.startswith(base_directory):
raise ValueError("Path is outside the allowed directory")
if not re.match(r"^[a-zA-Z0-9._/-]+$", normalized):
raise ValueError("Invalid characters in filename")
return normalized
try:
safe_path = secure_file_operation("/home/gem/workspace/data.txt")
except ValueError as exc:
print(f"Security violation: {exc}")
性能优化
大文件处理
对大文本文件,按行范围分块读取,避免一次性把所有内容放进响应:
def read_large_file(api, file_path, chunk_size=1000):
total_lines = 0
content_parts = []
while True:
result = api.read_file(
file_path,
start_line=total_lines,
end_line=total_lines + chunk_size,
)
if not result["success"] or not result["data"]["content"]:
break
content = result["data"]["content"]
content_parts.append(content)
total_lines += chunk_size
if len(content.splitlines()) < chunk_size:
break
return "\n".join(content_parts)
并发操作
import aiohttp
import asyncio
async def read_file_async(session, file_path):
payload = {"file": file_path}
async with session.post(
"http://localhost:8080/v1/file/read",
json=payload,
) as response:
return await response.json()
async def parallel_file_operations(files):
async with aiohttp.ClientSession() as session:
tasks = [read_file_async(session, file_path) for file_path in files]
return await asyncio.gather(*tasks)
准备集成文件操作?查看 API 参考 获取完整 schema。