165 lines
5.9 KiB
Python
165 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
import sys
|
||
import json
|
||
import os
|
||
from pathlib import Path
|
||
from urllib.request import urlopen, Request
|
||
from urllib.parse import urlencode, quote
|
||
|
||
# WeChat API 配置
|
||
APPID = "wx834051fbc89ae2b1"
|
||
APPSECRET = os.environ.get("WECHAT_APPSECRET", "")
|
||
|
||
class WeChatPublisher:
|
||
def __init__(self, appid, appsecret):
|
||
self.appid = appid
|
||
self.appsecret = appsecret
|
||
self.token = None
|
||
|
||
def get_token(self):
|
||
url = f"https://api.weixin.qq.com/cgi-bin/token?grant_type=client_credential&appid={self.appid}&secret={self.appsecret}"
|
||
try:
|
||
resp = urlopen(url, timeout=10)
|
||
result = json.loads(resp.read().decode('utf-8'))
|
||
if "access_token" in result:
|
||
self.token = result["access_token"]
|
||
return self.token
|
||
else:
|
||
print(f"获取token失败: {result}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"Token请求失败: {e}")
|
||
return None
|
||
|
||
def upload_image(self, image_path):
|
||
if not self.token:
|
||
self.get_token()
|
||
if not self.token:
|
||
return None
|
||
|
||
url = f"https://api.weixin.qq.com/cgi-bin/media/upload?access_token={self.token}&type=image"
|
||
try:
|
||
with open(image_path, 'rb') as f:
|
||
image_data = f.read()
|
||
|
||
boundary = '----FormBoundary7MA4YWxkTrZu0gW'
|
||
body = f"--{boundary}\r\nContent-Disposition: form-data; name=\"media\"; filename=\"cover.png\"\r\nContent-Type: image/png\r\n\r\n".encode('utf-8') + image_data + f"\r\n--{boundary}--".encode('utf-8')
|
||
|
||
req = Request(url, data=body, headers={'Content-Type': f'multipart/form-data; boundary={boundary}'})
|
||
resp = urlopen(req, timeout=30)
|
||
result = json.loads(resp.read().decode('utf-8'))
|
||
print(f"封面上传结果: {result}")
|
||
return result.get("media_id")
|
||
except Exception as e:
|
||
print(f"封面上传失败: {e}")
|
||
return None
|
||
|
||
def markdown_to_html(self, content):
|
||
"""简化版 Markdown 转 HTML"""
|
||
import re
|
||
html = []
|
||
lines = content.split('\n')
|
||
in_code_block = False
|
||
|
||
for line in lines:
|
||
line = line.strip()
|
||
if not line:
|
||
continue
|
||
|
||
# 代码块
|
||
if line.startswith('```'):
|
||
if in_code_block:
|
||
html.append('</code></pre>')
|
||
in_code_block = False
|
||
else:
|
||
html.append('<pre><code>')
|
||
in_code_block = True
|
||
continue
|
||
|
||
if in_code_block:
|
||
html.append(line)
|
||
continue
|
||
|
||
# 标题
|
||
if line.startswith('# '):
|
||
html.append(f'<h2>{line[2:]}</h2>')
|
||
elif line.startswith('## '):
|
||
html.append(f'<h3>{line[3:]}</h3>')
|
||
elif line.startswith('### '):
|
||
html.append(f'<h4>{line[4:]}</h4>')
|
||
# 引用
|
||
elif line.startswith('>'):
|
||
html.append(f'<blockquote>{line[1:].strip()}</blockquote>')
|
||
# 列表
|
||
elif line.startswith('- ') or line.startswith('* '):
|
||
html.append(f'<li>{line[2:].strip()}</li>')
|
||
# 粗体/斜体
|
||
elif '**' in line:
|
||
line = re.sub(r'\*\*(.+?)\*\*', r'<strong>\1</strong>', line)
|
||
html.append(f'<p>{line}</p>')
|
||
else:
|
||
html.append(f'<p>{line}</p>')
|
||
|
||
return ''.join(html)
|
||
|
||
def create_draft(self, title, content, thumb_media_id, digest=""):
|
||
if not self.token:
|
||
self.get_token()
|
||
if not self.token:
|
||
return None
|
||
|
||
html_content = self.markdown_to_html(content)
|
||
draft_data = {
|
||
"articles": [{
|
||
"title": title,
|
||
"content": html_content,
|
||
"author": "上海橙轩智能",
|
||
"digest": digest,
|
||
"thumb_media_id": thumb_media_id,
|
||
"show_cover_pic": 1
|
||
}]
|
||
}
|
||
|
||
url = f"https://api.weixin.qq.com/cgi-bin/draft/add?access_token={self.token}"
|
||
json_data = json.dumps(draft_data, ensure_ascii=False).encode('utf-8')
|
||
|
||
try:
|
||
req = Request(url, data=json_data, headers={'Content-Type': 'application/json'})
|
||
resp = urlopen(req, timeout=30)
|
||
result = json.loads(resp.read().decode('utf-8'))
|
||
print(f"草稿创建结果: {result}")
|
||
return result
|
||
except Exception as e:
|
||
print(f"草稿创建失败: {e}")
|
||
return None
|
||
|
||
def publish(self, article_path, cover_path, title, digest=""):
|
||
print(f"开始发布文章:{title}")
|
||
|
||
# 上传封面
|
||
media_id = self.upload_image(cover_path)
|
||
if not media_id:
|
||
print("封面上传失败,终止发布")
|
||
return None
|
||
|
||
# 读取文章
|
||
content = Path(article_path).read_text(encoding='utf-8')
|
||
|
||
# 创建草稿
|
||
result = self.create_draft(title, content, media_id, digest)
|
||
if result and result.get("media_id"):
|
||
print(f"✅ 发布成功!草稿 media_id: {result['media_id']}")
|
||
return result["media_id"]
|
||
else:
|
||
print(f"❌ 发布失败: {result}")
|
||
return None
|
||
|
||
if __name__ == "__main__":
|
||
publisher = WeChatPublisher(APPID, APPSECRET)
|
||
result = publisher.publish(
|
||
article_path="/home/node/.openclaw/workspace/drafts/2026-04-21_公众号_协议打通-OEE提升42pct.md",
|
||
cover_path="/home/node/.openclaw/workspace/assets/xhs-cover-1.png",
|
||
title="给120台设备装一个大脑:一家工厂如何在2周内实现全协议互通"
|
||
)
|
||
print(f"最终结果: {result}") |