roi-measure
SKILL.md
ROI 测算
区服对应关系
区服列表从华语服活动服务动态获取:
POST https://activity-dev-sgp.weplayapp.com/activity_v2/common/get_region_list
参数:无
响应示例:
{"code":200,"msg":"ok","result":{"list":[
{"region":"C","name":"华语服","act_domain":"https://activity-sgp.weplayapp.com","api_domain":"https://api-sgp.weplayapp.com"},
{"region":"U","name":"美服","act_domain":"https://activity-sv.weplayapp.com","api_domain":"https://api-sv.weplayapp.com"}
]}}
完整执行流程
Step 1 — 调用 get_region_list 接口获取区服列表,根据用户输入的区服名称/缩写匹配对应的 act_domain 和 api_domain。
Step 2 — 确认用户提供的输入信息:
- 区服(从接口返回的列表中匹配)
- 活动ID(整数)
- 测算需求描述(自然语言)
Step 3 — 【必须执行】查询接口文档
对需求描述中涉及的每个操作(如抽奖、送礼、签到等),必须逐一调用 apifox-query skill 查询对应的 API 路径和完整参数列表。
⚠️ 禁止跳过此步骤,即使你认为已知接口路径。参数名称必须以 apifox-query 返回值为准,不得猜测或沿用历史经验。 ⚠️ 每个操作必须单独查询,不得合并查询。 ⚠️ 查询报错时中止整个流程,提示用户进行处理。
Step 4 — 生成完整 Python3 脚本(结构见下方代码规范)。先确定 task_id(格式 roi_{timestamp}_{random8}),然后:
- 用 Write 工具将完整 Python 代码写入
/tmp/roi_measure/<task_id>/main.py - 按以下固定格式输出(不内联展示脚本全文):
## 输入信息
区服: xxx | 活动ID: xxx | 需求: xxx
## 执行计划
### 步骤 1:[操作名称]
- 接口:`POST [api_path]`
- 关键参数:`uid=ctx.uid(1)`、`act_id=ctx.act_id`、`[param_name]=[value]`、...
- 说明:[该步骤的测算目的]
### 步骤 2:[操作名称]
- 接口:`POST [api_path]`
- 关键参数:...
- 说明:...
(按实际步骤数量续写,每步一个 ### 三级标题)
代码已写入: /tmp/roi_measure/<task_id>/main.py
Step 5 — 等待用户确认:
- 用户确认无误 → 进入 Step 6 执行
- 用户提出调整 → 更新执行计划和脚本文件,重新输出 Step 4 的格式,再次等待确认
Step 6 — 执行:
python3 /tmp/roi_measure/<task_id>/main.py \
--act_url "<act_domain>" \
--api_url "<api_domain>" \
--act_id <活动ID> \
[--task_id <task_id>]
Step 7 — 将 stdout 原样展示,并按以下固定格式提取输出中的结果段落(字段缺失时输出 (未获取到)):
## 测算结果
总消耗: xxx
总产出: xxx
ROI: xxx%
消耗/产出明细: https://activity-xxx.weplayapp.com/...
Python 代码规范
生成的代码是完整独立的 Python3 脚本,直接用 python3 main.py 运行,无需编译。
固定框架(原样复制,不得修改)
import argparse
import json
import random
import signal
import string
import sys
import threading
import time
import requests
# ---------- 框架类(固定,不得修改)----------
class Response:
def __init__(self, code: int, msg: str, result=None):
self.code = code
self.msg = msg
self.result = result
class Context:
def __init__(self, api_base_url: str, uids: list, act_id: int, task_id: str):
self.api_base_url = api_base_url
self.uids = uids
self.act_id = act_id
self.task_id = task_id
self._session = requests.Session() # User-Agent 自动为 python-requests/x.x.x
self._lock = threading.Lock()
self._stopped = False
self.ok_count = 0
self.fail_count = 0
self.fail_errors = []
def call(self, path: str, params: dict) -> Response:
with self._lock:
if self._stopped:
return Response(-1, "task stopped")
url = self.api_base_url + path
try:
resp = self._session.post(url, data={k: str(v) for k, v in params.items()}, timeout=30)
r = resp.json()
result = Response(r.get("code", -1), r.get("msg", ""), r.get("result"))
with self._lock:
if result.code == 200:
self.ok_count += 1
else:
self.fail_count += 1
self.fail_errors.append(json.dumps({"path": path, "code": result.code, "msg": result.msg}, ensure_ascii=False))
return result
except Exception as e:
with self._lock:
self.fail_count += 1
self.fail_errors.append(json.dumps({"path": path, "err": str(e)}, ensure_ascii=False))
return Response(-1, str(e))
def uid(self, slot: int) -> int:
"""返回第 slot 个 UID(1-indexed)"""
if 1 <= slot <= len(self.uids):
return self.uids[slot - 1]
return 0
def concurrent(self, workers: int, fn):
"""并发执行 fn(worker_id),最多 workers 个线程"""
sem = threading.Semaphore(workers)
threads = []
def _run(wid):
sem.acquire()
try:
fn(wid)
finally:
sem.release()
for i in range(workers):
t = threading.Thread(target=_run, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
def log(self, msg: str, *args):
print(msg % args if args else msg, flush=True)
def stop(self) -> bool:
with self._lock:
return self._stopped
def sleep(self, ms: int):
time.sleep(ms / 1000)
def _call_form(session: requests.Session, url: str, params: dict) -> dict:
resp = session.post(url, data={k: str(v) for k, v in params.items()}, timeout=30)
r = resp.json()
if r.get("code") != 200:
raise RuntimeError(f"api error code={r.get('code')} msg={r.get('msg')}")
return r.get("result", {})
def _rand_hex(n: int) -> str:
return "".join(random.choices(string.hexdigits[:16], k=n))
execute 函数(AI 生成部分)
# ---------- 测算逻辑(AI 填写)----------
def execute(ctx: Context):
# AI 根据测算需求填写,只能使用 ctx 提供的方法
pass
execute 约束:
- 只能调用
ctx.call / ctx.uid / ctx.concurrent / ctx.log / ctx.stop / ctx.sleep / ctx.act_id - 禁止重新定义 Context、Response 类
- 禁止导入额外第三方包(只能用标准库 + requests)
- 禁止调用
sys.exit、os.system、subprocess等系统操作 - 循环内必须检查
ctx.stop() - 并发访问共享变量必须加锁(
threading.Lock) - 只实现用户描述的步骤,不自行添加重试、汇总、前置校验等逻辑
main 入口(固定,原样复制,REQUIRED_UIDS 替换为实际数字)
# ---------- 入口(固定,不得修改框架,只填 REQUIRED_UIDS)----------
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--act_url", required=True, help="区服 activity 域名")
parser.add_argument("--api_url", required=True, help="区服 api 域名")
parser.add_argument("--act_id", required=True, type=int, help="活动ID")
parser.add_argument("--task_id", default="", help="任务ID(可选)")
args = parser.parse_args()
task_id = args.task_id or f"roi_{int(time.time())}_{_rand_hex(8)}"
REQUIRED_UIDS = 1 # AI 填写:执行该计划需要的 UID 数量
session = requests.Session() # User-Agent 自动为 python-requests/x.x.x
# 1. 开始测算
print(f"[ROI] 开始测算 task_id={task_id} required_uids={REQUIRED_UIDS}", flush=True)
start_result = _call_form(session, args.act_url + "/activity_v2/roi_measure_v2/start", {
"act_id": args.act_id,
"task_id": task_id,
"required_uid_count": REQUIRED_UIDS,
})
uids = [int(u) for u in start_result.get("uid_list", [])]
print(f"[ROI] 获取到 uid 列表: {uids}", flush=True)
# 2. 构建上下文,注册取消信号
ctx = Context(api_base_url=args.api_url, uids=uids, act_id=args.act_id, task_id=task_id)
def _on_signal(signum, frame):
with ctx._lock:
ctx._stopped = True
print("[ROI] 收到终止信号,停止执行", flush=True)
signal.signal(signal.SIGTERM, _on_signal)
signal.signal(signal.SIGINT, _on_signal)
# 3. 执行测算逻辑
execute(ctx)
print(f"[ROI] 执行完成 成功={ctx.ok_count} 失败={ctx.fail_count}", flush=True)
if ctx.fail_errors:
print(f"[ROI] 失败详情: {'; '.join(ctx.fail_errors)}", flush=True)
# 4. 等待 5s
print("[ROI] 等待 5s 后获取测算结果...", flush=True)
time.sleep(5)
# 5. 结束测算
stop_result = _call_form(session, args.act_url + "/activity_v2/roi_measure_v2/stop", {
"task_id": task_id,
})
# 6. 打印汇总结果
print("\n========== ROI 测算结果 ==========")
print(f"总消耗: {stop_result.get('total_cost', '(未获取到)')}")
print(f"总产出: {stop_result.get('total_reward_value', '(未获取到)')}")
print(f"ROI: {stop_result.get('roi_percentage', '(未获取到)')}%")
print(f"消耗/产出明细: {stop_result.get('record_url', '(未获取到)')}")
print("==================================")
注意事项
REQUIRED_UIDS由 AI 根据计划中涉及的 UID 数量填写,必须是具体数字- 不得修改
if __name__ == "__main__"块的框架结构,只填写REQUIRED_UIDS的值 execute函数中通过ctx.uid(1)、ctx.uid(2)... 引用各 UID,最大 slot = REQUIRED_UIDS- 执行出错时原样展示异常,不继续执行
- 执行期间的
[ROI]前缀日志为进度信息,正常展示 - 若 start 接口返回非 200,
_call_form抛出异常自动中止,提示用户检查区服配置 - 所有临时文件统一在
/tmp/roi_measure/<task_id>/,执行完毕且用户确认无误则自动清理;用户中途放弃则由 OS 定期清理/tmp