roi-measure

SKILL.md

ROI 测算

区服对应关系

区服列表从华语服活动服务动态获取:

POST https://activity-dev-sgp.weplayapp.com/activity_v2/common/get_region_list
参数:无

响应示例:

{"code":200,"msg":"ok","result":{"list":[
  {"region":"C","name":"华语服","act_domain":"https://activity-sgp.weplayapp.com","api_domain":"https://api-sgp.weplayapp.com"},
  {"region":"U","name":"美服","act_domain":"https://activity-sv.weplayapp.com","api_domain":"https://api-sv.weplayapp.com"}
]}}

完整执行流程

Step 1 — 调用 get_region_list 接口获取区服列表,根据用户输入的区服名称/缩写匹配对应的 act_domainapi_domain

Step 2 — 确认用户提供的输入信息:

  • 区服(从接口返回的列表中匹配)
  • 活动ID(整数)
  • 测算需求描述(自然语言)

Step 3 — 【必须执行】查询接口文档

对需求描述中涉及的每个操作(如抽奖、送礼、签到等),必须逐一调用 apifox-query skill 查询对应的 API 路径和完整参数列表。

⚠️ 禁止跳过此步骤,即使你认为已知接口路径。参数名称必须以 apifox-query 返回值为准,不得猜测或沿用历史经验。 ⚠️ 每个操作必须单独查询,不得合并查询。 ⚠️ 查询报错时中止整个流程,提示用户进行处理。

Step 4 — 生成完整 Python3 脚本(结构见下方代码规范)。先确定 task_id(格式 roi_{timestamp}_{random8}),然后:

  • 用 Write 工具将完整 Python 代码写入 /tmp/roi_measure/<task_id>/main.py
  • 按以下固定格式输出(不内联展示脚本全文):
## 输入信息
区服: xxx | 活动ID: xxx | 需求: xxx

## 执行计划

### 步骤 1:[操作名称]
- 接口:`POST [api_path]`
- 关键参数:`uid=ctx.uid(1)`、`act_id=ctx.act_id`、`[param_name]=[value]`、...
- 说明:[该步骤的测算目的]

### 步骤 2:[操作名称]
- 接口:`POST [api_path]`
- 关键参数:...
- 说明:...

(按实际步骤数量续写,每步一个 ### 三级标题)

代码已写入: /tmp/roi_measure/<task_id>/main.py

Step 5 — 等待用户确认:

  • 用户确认无误 → 进入 Step 6 执行
  • 用户提出调整 → 更新执行计划和脚本文件,重新输出 Step 4 的格式,再次等待确认

Step 6 — 执行:

python3 /tmp/roi_measure/<task_id>/main.py \
  --act_url "<act_domain>" \
  --api_url "<api_domain>" \
  --act_id <活动ID> \
  [--task_id <task_id>]

Step 7 — 将 stdout 原样展示,并按以下固定格式提取输出中的结果段落(字段缺失时输出 (未获取到)):

## 测算结果
总消耗: xxx
总产出: xxx
ROI: xxx%
消耗/产出明细: https://activity-xxx.weplayapp.com/...

Python 代码规范

生成的代码是完整独立的 Python3 脚本,直接用 python3 main.py 运行,无需编译。

固定框架(原样复制,不得修改)

import argparse
import json
import random
import signal
import string
import sys
import threading
import time

import requests

# ---------- 框架类(固定,不得修改)----------

class Response:
    def __init__(self, code: int, msg: str, result=None):
        self.code = code
        self.msg = msg
        self.result = result

class Context:
    def __init__(self, api_base_url: str, uids: list, act_id: int, task_id: str):
        self.api_base_url = api_base_url
        self.uids = uids
        self.act_id = act_id
        self.task_id = task_id
        self._session = requests.Session()  # User-Agent 自动为 python-requests/x.x.x
        self._lock = threading.Lock()
        self._stopped = False
        self.ok_count = 0
        self.fail_count = 0
        self.fail_errors = []

    def call(self, path: str, params: dict) -> Response:
        with self._lock:
            if self._stopped:
                return Response(-1, "task stopped")
        url = self.api_base_url + path
        try:
            resp = self._session.post(url, data={k: str(v) for k, v in params.items()}, timeout=30)
            r = resp.json()
            result = Response(r.get("code", -1), r.get("msg", ""), r.get("result"))
            with self._lock:
                if result.code == 200:
                    self.ok_count += 1
                else:
                    self.fail_count += 1
                    self.fail_errors.append(json.dumps({"path": path, "code": result.code, "msg": result.msg}, ensure_ascii=False))
            return result
        except Exception as e:
            with self._lock:
                self.fail_count += 1
                self.fail_errors.append(json.dumps({"path": path, "err": str(e)}, ensure_ascii=False))
            return Response(-1, str(e))

    def uid(self, slot: int) -> int:
        """返回第 slot 个 UID(1-indexed)"""
        if 1 <= slot <= len(self.uids):
            return self.uids[slot - 1]
        return 0

    def concurrent(self, workers: int, fn):
        """并发执行 fn(worker_id),最多 workers 个线程"""
        sem = threading.Semaphore(workers)
        threads = []
        def _run(wid):
            sem.acquire()
            try:
                fn(wid)
            finally:
                sem.release()
        for i in range(workers):
            t = threading.Thread(target=_run, args=(i,))
            threads.append(t)
            t.start()
        for t in threads:
            t.join()

    def log(self, msg: str, *args):
        print(msg % args if args else msg, flush=True)

    def stop(self) -> bool:
        with self._lock:
            return self._stopped

    def sleep(self, ms: int):
        time.sleep(ms / 1000)

def _call_form(session: requests.Session, url: str, params: dict) -> dict:
    resp = session.post(url, data={k: str(v) for k, v in params.items()}, timeout=30)
    r = resp.json()
    if r.get("code") != 200:
        raise RuntimeError(f"api error code={r.get('code')} msg={r.get('msg')}")
    return r.get("result", {})

def _rand_hex(n: int) -> str:
    return "".join(random.choices(string.hexdigits[:16], k=n))

execute 函数(AI 生成部分)

# ---------- 测算逻辑(AI 填写)----------

def execute(ctx: Context):
    # AI 根据测算需求填写,只能使用 ctx 提供的方法
    pass

execute 约束:

  • 只能调用 ctx.call / ctx.uid / ctx.concurrent / ctx.log / ctx.stop / ctx.sleep / ctx.act_id
  • 禁止重新定义 Context、Response 类
  • 禁止导入额外第三方包(只能用标准库 + requests)
  • 禁止调用 sys.exitos.systemsubprocess 等系统操作
  • 循环内必须检查 ctx.stop()
  • 并发访问共享变量必须加锁(threading.Lock
  • 只实现用户描述的步骤,不自行添加重试、汇总、前置校验等逻辑

main 入口(固定,原样复制,REQUIRED_UIDS 替换为实际数字)

# ---------- 入口(固定,不得修改框架,只填 REQUIRED_UIDS)----------

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--act_url", required=True, help="区服 activity 域名")
    parser.add_argument("--api_url", required=True, help="区服 api 域名")
    parser.add_argument("--act_id",  required=True, type=int, help="活动ID")
    parser.add_argument("--task_id", default="", help="任务ID(可选)")
    args = parser.parse_args()

    task_id = args.task_id or f"roi_{int(time.time())}_{_rand_hex(8)}"

    REQUIRED_UIDS = 1  # AI 填写:执行该计划需要的 UID 数量

    session = requests.Session()  # User-Agent 自动为 python-requests/x.x.x

    # 1. 开始测算
    print(f"[ROI] 开始测算 task_id={task_id} required_uids={REQUIRED_UIDS}", flush=True)
    start_result = _call_form(session, args.act_url + "/activity_v2/roi_measure_v2/start", {
        "act_id": args.act_id,
        "task_id": task_id,
        "required_uid_count": REQUIRED_UIDS,
    })
    uids = [int(u) for u in start_result.get("uid_list", [])]
    print(f"[ROI] 获取到 uid 列表: {uids}", flush=True)

    # 2. 构建上下文,注册取消信号
    ctx = Context(api_base_url=args.api_url, uids=uids, act_id=args.act_id, task_id=task_id)

    def _on_signal(signum, frame):
        with ctx._lock:
            ctx._stopped = True
        print("[ROI] 收到终止信号,停止执行", flush=True)
    signal.signal(signal.SIGTERM, _on_signal)
    signal.signal(signal.SIGINT, _on_signal)

    # 3. 执行测算逻辑
    execute(ctx)
    print(f"[ROI] 执行完成 成功={ctx.ok_count} 失败={ctx.fail_count}", flush=True)
    if ctx.fail_errors:
        print(f"[ROI] 失败详情: {'; '.join(ctx.fail_errors)}", flush=True)

    # 4. 等待 5s
    print("[ROI] 等待 5s 后获取测算结果...", flush=True)
    time.sleep(5)

    # 5. 结束测算
    stop_result = _call_form(session, args.act_url + "/activity_v2/roi_measure_v2/stop", {
        "task_id": task_id,
    })

    # 6. 打印汇总结果
    print("\n========== ROI 测算结果 ==========")
    print(f"总消耗:  {stop_result.get('total_cost', '(未获取到)')}")
    print(f"总产出:  {stop_result.get('total_reward_value', '(未获取到)')}")
    print(f"ROI:     {stop_result.get('roi_percentage', '(未获取到)')}%")
    print(f"消耗/产出明细: {stop_result.get('record_url', '(未获取到)')}")
    print("==================================")

注意事项

  • REQUIRED_UIDS 由 AI 根据计划中涉及的 UID 数量填写,必须是具体数字
  • 不得修改 if __name__ == "__main__" 块的框架结构,只填写 REQUIRED_UIDS 的值
  • execute 函数中通过 ctx.uid(1)ctx.uid(2) ... 引用各 UID,最大 slot = REQUIRED_UIDS
  • 执行出错时原样展示异常,不继续执行
  • 执行期间的 [ROI] 前缀日志为进度信息,正常展示
  • 若 start 接口返回非 200,_call_form 抛出异常自动中止,提示用户检查区服配置
  • 所有临时文件统一在 /tmp/roi_measure/<task_id>/,执行完毕且用户确认无误则自动清理;用户中途放弃则由 OS 定期清理 /tmp
Installs
2
First Seen
Apr 8, 2026