Python自动化操作Synology群晖文件:从下载到上传的完整实践

张开发
2026/4/19 22:43:19 15 分钟阅读

分享文章

Python自动化操作Synology群晖文件:从下载到上传的完整实践
1. 为什么需要Python自动化操作群晖文件每次手动登录群晖后台上传下载文件这种重复性操作简直让人抓狂。我最早管理团队文件时每天要花半小时处理各种报表同步直到用Python写出自动化脚本才彻底解放双手。现在我的电脑每天凌晨自动完成所有文件同步连咖啡都不用喝就能搞定工作。群晖NAS作为企业文件管理中心存储着大量需要定期处理的文档。比如财务部的日报、市场部的活动素材、研发部的版本包这些文件往往需要按固定周期下载到本地分析处理后重新上传到指定目录在不同部门间自动流转手动操作不仅效率低下还容易出错。上周隔壁团队就有人把季度报表传错了目录导致全员加班找文件。而用Python脚本可以精准控制文件传输路径自动重试失败任务记录完整操作日志更妙的是这些脚本可以设置为定时任务。我的生产环境脚本已经稳定运行478天期间处理过23万次文件传输从没出过差错。下面我就把多年实战总结的完整方案分享给你。2. 环境准备与API配置2.1 安装必备工具链工欲善其事必先利其器我们先准备好这些工具Python 3.6推荐3.8更稳定requests库处理HTTP请求synology-api第三方封装库可选文本编辑器VS Code或PyCharm安装核心依赖只要一行命令pip install requests synology-api如果是内网环境可以先下载whl文件离线安装pip install requests-2.28.1-py3-none-any.whl2.2 开启群晖API服务很多新手卡在这一步注意群晖默认不开启所有API权限。登录DSM后台进入控制面板 应用程序 终端机和SNMP勾选启用SSH服务进入控制面板 文件服务 高级勾选启用家目录打开File Station 设置 常规启用允许File Station通过HTTP上传文件最后到控制面板 用户账号给执行脚本的账号开启以下权限File Station读写权限WebDAV访问权限必要时开启API权限2.3 获取API连接参数准备好这些关键信息NAS的IP地址建议用内网固定IP端口号默认5000/5001管理员账号密码要操作的共享文件夹路径我建议专门创建API专用账号而不是直接用admin。这样即使密钥泄露也能控制损害范围。测试连接可以用这个代码片段import requests response requests.get(http://your_nas_ip:5000/webapi/query.cgi?apiSYNO.API.Infoversion1methodqueryqueryall) print(response.json())看到返回API列表就说明连接成功。如果报错403检查防火墙设置和账号权限。3. 文件下载实战技巧3.1 基础下载方法群晖提供两种下载方式通过File Station的共享链接调用SYNO.FileStation.Download API这里推荐直接用API更稳定可控。核心代码结构如下def download_file(sid, remote_path, local_path): url fhttp://nas_ip:5000/webapi/entry.cgi params { api: SYNO.FileStation.Download, version: 2, method: download, path: remote_path, mode: open, _sid: sid } response requests.get(url, paramsparams, streamTrue) if response.status_code 200: with open(local_path, wb) as f: for chunk in response.iter_content(1024): f.write(chunk) print(f文件下载成功: {local_path}) else: print(f下载失败: {response.text})实际使用时要注意路径要转义特殊字符大文件要用stream模式检查磁盘剩余空间3.2 处理下载中的坑我踩过最深的坑是中文路径问题。解决方案是双重编码from urllib.parse import quote path quote(quote(/共享文件夹/日报/2023年报告.xlsx))另一个常见问题是证书验证失败。如果是自签名证书可以这样处理session requests.Session() session.verify /path/to/cert.pem # 或者直接关闭验证对于需要定期下载的场景建议加上这些功能下载前检查文件是否存在下载后校验MD5失败后自动重试3次记录下载日志完整代码示例def safe_download(sid, remote, local, retry3): for i in range(retry): try: if check_remote_exists(sid, remote): download_file(sid, remote, local) if validate_md5(remote, local): log_success(remote, local) return True except Exception as e: log_error(f第{i1}次尝试失败: {str(e)}) time.sleep(5) return False4. 文件上传最佳实践4.1 基础上传实现上传比下载复杂些需要处理multipart/form-data格式。核心代码如下def upload_file(sid, local_path, remote_dir): url fhttp://nas_ip:5000/webapi/entry.cgi with open(local_path, rb) as file: files {file: (os.path.basename(local_path), file)} params { api: SYNO.FileStation.Upload, version: 2, method: upload, path: remote_dir, create_parents: true, overwrite: true, _sid: sid } response requests.post(url, dataparams, filesfiles) if response.json().get(success): print(f上传成功: {local_path} - {remote_dir}) else: print(f上传失败: {response.text})关键参数说明create_parents自动创建不存在的目录overwrite覆盖同名文件file文件对象要包含文件名4.2 高级上传技巧实际项目中我总结这些经验大文件分块上传chunk_size 1024*1024 # 1MB with open(bigfile.iso, rb) as f: chunk f.read(chunk_size) while chunk: upload_chunk(chunk) chunk f.read(chunk_size)断点续传实现def resume_upload(sid, local, remote): uploaded get_uploaded_size(sid, remote) # 查询已上传大小 with open(local, rb) as f: f.seek(uploaded) while True: chunk f.read(CHUNK_SIZE) if not chunk: break upload_chunk(chunk)上传后自动触发操作def upload_and_process(sid, local, remote): if upload_file(sid, local, remote): trigger_workflow(remote) # 触发后续处理流程 send_notification(f{local} 已上传) # 发送通知5. 完整工作流示例5.1 日报自动同步系统这是我给生产部门实现的真实案例每天凌晨2点自动运行下载前日生产报表填充最新数据到模板生成可视化图表上传到管理层目录邮件通知相关负责人核心调度代码def daily_report_flow(): sid login() try: # 下载昨日报表 yesterday (datetime.now() - timedelta(days1)).strftime(%Y%m%d) remote f/生产数据/日报/{yesterday}.xlsx local f/tmp/{yesterday}.xlsx download_file(sid, remote, local) # 数据处理 process_report(local) # 上传结果 upload_file(sid, local, /管理层/生产日报/) # 清理临时文件 os.remove(local) finally: logout(sid)5.2 错误处理与日志健壮的生产环境脚本必须包含网络异常重试磁盘空间检查文件校验机制详细运行日志我的日志模块是这样实现的def init_logger(): logger logging.getLogger(nas_sync) logger.setLevel(logging.INFO) # 文件日志 file_handler logging.FileHandler(sync.log) file_handler.setFormatter(logging.Formatter( %(asctime)s - %(levelname)s - %(message)s )) # 控制台日志 console_handler logging.StreamHandler() console_handler.setFormatter(logging.Formatter( [%(levelname)s] %(message)s )) logger.addHandler(file_handler) logger.addHandler(console_handler) return logger使用时直接调用logger init_logger() try: download_file(...) except Exception as e: logger.error(f下载失败: {str(e)}) send_alert(f下载失败: {str(e)})6. 性能优化技巧当处理大量文件时这些优化手段很有效并行处理文件from concurrent.futures import ThreadPoolExecutor def batch_upload(files): with ThreadPoolExecutor(max_workers4) as executor: futures [] for local, remote in files: futures.append(executor.submit( upload_file, sid, local, remote )) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(str(e))缓存SID减少登录次数class NASClient: def __init__(self): self.sid None self.last_login 0 def get_sid(self): if not self.sid or time.time() - self.last_login 3600: self.sid login() self.last_login time.time() return self.sid使用连接池提升性能adapter requests.adapters.HTTPAdapter( pool_connections10, pool_maxsize10, max_retries3 ) session requests.Session() session.mount(http://, adapter) session.mount(https://, adapter)7. 安全防护方案自动化脚本要特别注意这些安全问题密码不要硬编码在代码中# 错误做法 password 123456 # 正确做法 from dotenv import load_dotenv load_dotenv() password os.getenv(NAS_PASSWORD)使用临时令牌代替长期凭证def get_temp_token(): response requests.post( https://nas_ip:5000/webapi/auth.cgi, params{ api: SYNO.API.Auth, version: 6, method: login, account: os.getenv(NAS_USER), passwd: os.getenv(NAS_PASS), session: FileStation, format: sid }, timeout10 ) return response.json().get(data, {}).get(sid, )文件传输加密import ssl context ssl.create_default_context() context.load_cert_chain(certfileclient.crt, keyfileclient.key) response requests.get( https://nas_ip:5001/webapi/entry.cgi, paramsparams, verify/path/to/ca.crt, cert(client.crt, client.key) )8. 部署与监控最后把脚本部署到生产环境Linux系统用systemd管理# /etc/systemd/system/nas_sync.service [Unit] DescriptionNAS Auto Sync Service [Service] ExecStart/usr/bin/python3 /opt/scripts/nas_sync.py Restartalways Usernasuser [Install] WantedBymulti-user.target添加监控检查def check_health(): # 检查最近一次运行日志 if not os.path.exists(sync.log): send_alert(日志文件丢失) return False # 检查磁盘空间 if psutil.disk_usage(/).percent 90: send_alert(磁盘空间不足) return False return True配置告警通知def send_alert(message): requests.post( https://api.alert.com/v1/send, json{ title: NAS同步告警, content: message, level: critical }, timeout5 )

更多文章