virtual-insanity
← 리포트 목록

analyst 4종 공통 Telegram wrapper 리팩토링

2026-04-15 analyst [analyst, telegram, message-id, launchd, observability]

결론

analyst 4개(macro/fundamental/technical/pm)의 Telegram 발송을 공통 wrapper로 통일했다.

  • 공통 wrapper: /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh
  • 4개 plist 모두 wrapper 호출로 변경 완료
  • 4개 analyst 전체 실제 생성 실행 완료: exit 0
  • 4개 market topic 실제 발송 완료: message_id 2362/2363/2364/2365
  • caller trace 보강 후 send-only 검증 완료: message_id 2366/2367/2368/2369
  • 4개 TSV 모두 최신 message_id/status 기록 확인

1. 기존 구조 분석

기존 analyst_runner.sh

확인한 문제:

  1. Claude CLI 실행과 Telegram fallback이 한 파일에 섞여 있었다.
  2. 실패 시 직접 DM curl fallback 경로가 있었다.
  3. latest.json 생성 여부와 Telegram 도달 여부가 분리되어 있지 않았다.
  4. analyst별 TSV 포맷이 서로 달랐다.
  5. sector topic 발송은 각 Claude 프롬프트가 자체적으로 수행해 message_id 판정/로깅이 일관되지 않았다.

기존 exit 판정:

  • Claude stream-json에서 이번 실행의 type=result 이벤트를 읽어 success/error/none 판정
  • success + 최근 history JSON 있으면 latest.json sync
  • 실패 시 기존 direct Telegram fallback 호출
  • 다만 이 fallback은 message_id를 남기지 않고, 공통 sector routing도 아니었다.

기존 TSV 상태

작업 전에도 이전 수동 재전송 흔적은 있었지만 포맷이 통일되어 있지 않았다.

  • macro: 수동 resend row 존재
  • fundamental: resend row 존재
  • technical: 기존 analyst_technical_last_telegram.tsv 존재
  • pm: resend row 존재

2. 설계

공통 wrapper 책임:

  1. 입력: macro|fundamental|technical|pm
  2. 기존 analyst runner 실행
  3. runner 성공/실패 exit code 확인
  4. ~/.hermes/workspace/memory/analyst-<name>/latest.json 확인
  5. latest JSON에서 analyst별 HTML 요약 생성
  6. send_sector_result("market", ...) 호출
  7. ~/.hermes/logs/analyst_<name>_last_telegram.tsv에 기록
  8. 성공 시 exit 0, 실패 시 exit 1

TSV 통일 포맷:

timestamp   status  message_id  sector  chat_id topic_id    error_type  error

추가로 기존 프롬프트 내부 Telegram 발송과 중복되지 않게 analyst_runner.sh가 Claude에 전달하는 프롬프트 끝에 운영 오버라이드를 주입했다.

Telegram/DM/sector 발송은 절대 실행하지 마세요.
send_sector, telegram_send.py, curl api.telegram.org 호출 금지.
latest.json과 필요한 볼트/메모리 산출물만 생성/갱신하세요.
Telegram 발송과 message_id 로깅은 wrapper가 일괄 처리합니다.

3. 변경 파일

  • /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh — 신규
  • /Users/ron/.hermes/workspace/scripts/analyst_runner.sh — 직접 DM fallback 제거 + Telegram 금지 prompt override 추가
  • /Users/ron/.hermes/workspace/scripts/shared/telegram.pysend_sector_result() 추가, TELEGRAM_CALLER trace 지원
  • /Users/ron/Library/LaunchAgents/com.openclaw.analyst-macro.plist
  • /Users/ron/Library/LaunchAgents/com.openclaw.analyst-fundamental.plist
  • /Users/ron/Library/LaunchAgents/com.openclaw.analyst-technical.plist
  • /Users/ron/Library/LaunchAgents/com.openclaw.analyst-pm.plist

백업:

  • runner 백업: /Users/ron/.hermes/workspace/scripts/analyst_runner.sh.bak-common-wrapper-20260415T204136
  • plist 백업: /Users/ron/.hermes/backups/analyst_common_wrapper_plists_20260415T204546

4. 공통 wrapper 코드 전문

#!/usr/bin/env bash
# analyst_common_wrapper.sh — analyst 4종 공통 실행/Telegram message_id 로깅 wrapper
# Usage: analyst_common_wrapper.sh <macro|fundamental|technical|pm>

set -o pipefail

ANALYST="${1:-}"
case "$ANALYST" in
  macro|fundamental|technical|pm) ;;
  *) echo "Usage: $0 <macro|fundamental|technical|pm>" >&2; exit 2 ;;
esac

HOME_DIR="${HOME:-/Users/ron}"
SCRIPTS_DIR="$HOME_DIR/.hermes/workspace/scripts"
RUNNER="$SCRIPTS_DIR/analyst_runner.sh"
LOG_DIR="$HOME_DIR/.hermes/logs"
WRAP_LOG="$LOG_DIR/analyst_${ANALYST}_common_wrapper.log"
RUN_STDOUT="$LOG_DIR/analyst_${ANALYST}_common_runner_stdout.log"
RUN_STDERR="$LOG_DIR/analyst_${ANALYST}_common_runner_stderr.log"
LATEST="$HOME_DIR/.hermes/workspace/memory/analyst-${ANALYST}/latest.json"
TSV="$LOG_DIR/analyst_${ANALYST}_last_telegram.tsv"

mkdir -p "$LOG_DIR"

log() {
  echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$WRAP_LOG"
}

append_tsv() {
  local status="$1" msgid="$2" sector="$3" chat_id="$4" topic_id="$5" error_type="$6" error="$7"
  if [ ! -f "$TSV" ]; then
    printf 'timestamp\tstatus\tmessage_id\tsector\tchat_id\ttopic_id\terror_type\terror\n' > "$TSV"
  fi
  printf '%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n' \
    "$(date '+%Y-%m-%dT%H:%M:%S%z')" "$status" "$msgid" "$sector" "$chat_id" "$topic_id" "$error_type" "${error//$'\n'/ }" >> "$TSV"
}

RUN_RC=0
if [ "${ANALYST_COMMON_SKIP_RUN:-0}" = "1" ]; then
  log "SKIP generation by ANALYST_COMMON_SKIP_RUN=1 analyst=$ANALYST"
else
  if [ ! -x "$RUNNER" ]; then
    log "ERROR runner not executable: $RUNNER"
    append_tsv "FAIL" "" "market" "" "" "runner_missing" "$RUNNER"
    exit 1
  fi
  log "START generation analyst=$ANALYST runner=$RUNNER"
  "$RUNNER" "$ANALYST" >"$RUN_STDOUT" 2>"$RUN_STDERR"
  RUN_RC=$?
  log "END generation analyst=$ANALYST rc=$RUN_RC stdout=$RUN_STDOUT stderr=$RUN_STDERR"
fi

if [ "$RUN_RC" -ne 0 ]; then
  append_tsv "FAIL" "" "market" "" "" "runner_failed" "runner rc=$RUN_RC"
  log "FAIL runner rc=$RUN_RC — Telegram send skipped to avoid stale success"
  exit 1
fi

if [ ! -f "$LATEST" ]; then
  append_tsv "FAIL" "" "market" "" "" "latest_missing" "$LATEST"
  log "FAIL latest.json missing: $LATEST"
  exit 1
fi

TELEGRAM_CALLER="analyst_common_wrapper.sh:${ANALYST}" \
PYTHONPATH="$SCRIPTS_DIR:$SCRIPTS_DIR/shared:$SCRIPTS_DIR/pipeline:${PYTHONPATH:-}" \
  python3 - "$ANALYST" "$LATEST" "$TSV" "$WRAP_LOG" <<'PY'
from __future__ import annotations

import html
import json
import os
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any

from shared import telegram as tg

analyst = sys.argv[1]
latest_path = Path(sys.argv[2])
tsv_path = Path(sys.argv[3])
wrap_log = Path(sys.argv[4])
sector = "market"

NAMES = {
    "macro": "🌍 매크로",
    "fundamental": "📊 펀더멘탈",
    "technical": "📈 테크니컬",
    "pm": "🧭 PM",
}


def classify_error(err: object) -> str:
    s = str(err or "")
    low = s.lower()
    if not s:
        return "none"
    if "dedup" in low:
        return "dedupe"
    if "timeout" in low or "timed out" in low:
        return "timeout"
    if "nodename" in low or "resolve" in low or "name or service" in low:
        return "dns_resolution"
    if "http error 400" in low or "bad request" in low:
        return "telegram_http_400_bad_request"
    if "http error 401" in low or "unauthorized" in low:
        return "telegram_http_401_unauthorized"
    if "http error 403" in low or "forbidden" in low:
        return "telegram_http_403_forbidden"
    if "topic" in low or "thread" in low:
        return "topic_or_thread"
    return "network_or_api"


def esc(v: Any, limit: int | None = None) -> str:
    if v is None:
        return ""
    text = str(v).strip()
    if limit and len(text) > limit:
        text = text[:limit].rstrip() + "…"
    return html.escape(text)


def list_lines(items: Any, limit: int = 4) -> list[str]:
    out: list[str] = []
    if isinstance(items, dict):
        iterable = items.items()
        for k, v in list(iterable)[:limit]:
            out.append(f"• <b>{esc(k, 40)}</b>: {esc(v, 180)}")
    elif isinstance(items, list):
        for item in items[:limit]:
            if isinstance(item, dict):
                label = item.get("label") or item.get("type") or item.get("sector") or item.get("condition") or item.get("action") or json.dumps(item, ensure_ascii=False)
                out.append("• " + esc(label, 220))
            else:
                out.append("• " + esc(item, 220))
    return out


def build_message(data: dict[str, Any]) -> str:
    title = NAMES.get(analyst, analyst)
    date = data.get("date") or datetime.now().strftime("%Y-%m-%d")
    sent_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S KST")
    summary = data.get("summary") or data.get("one_liner") or data.get("narrative") or "요약 없음"
    lines: list[str] = [
        f"<b>{title} 리포트</b>",
        f"날짜: <code>{esc(date)}</code>",
        f"전송시각: <code>{esc(sent_at)}</code>",
        "",
    ]

    if analyst == "macro":
        regime = data.get("regime") or ""
        conf = data.get("confidence")
        if regime or conf is not None:
            lines.append(f"<b>레짐</b>: {esc(data.get('regime_emoji'))} {esc(regime)}" + (f" ({esc(conf)}%)" if conf is not None else ""))
        lines.append("<b>핵심 요약</b>")
        lines.append(esc(summary, 900))
        sigs = list_lines(data.get("signals"), 5)
        if sigs:
            lines += ["", "<b>주요 신호</b>"] + sigs
    elif analyst == "fundamental":
        lines.append("<b>핵심 요약</b>")
        lines.append(esc(summary, 900))
        earnings = data.get("earnings") or {}
        for title2, key in [("긍정/상향", "upgrades"), ("주의/하향", "downgrades"), ("밸류에이션 플래그", "key_flags")]:
            vals = list_lines(earnings.get(key), 4)
            if vals:
                lines += ["", f"<b>{title2}</b>"] + vals
    elif analyst == "technical":
        lines.append("<b>핵심 요약</b>")
        lines.append(esc(summary, 900))
        for title2, key in [("자산 로테이션", "asset_rotation"), ("시장 구조", "market_structure"), ("시나리오", "scenarios")]:
            vals = list_lines(data.get(key), 4)
            if vals:
                lines += ["", f"<b>{title2}</b>"] + vals
    elif analyst == "pm":
        pos = data.get("positioning") or ""
        conf = data.get("confidence")
        if pos or conf is not None:
            lines.append(f"<b>포지셔닝</b>: {esc(data.get('positioning_emoji'))} {esc(pos)}" + (f" ({esc(conf)}%)" if conf is not None else ""))
        lines.append("<b>한줄 결론</b>")
        lines.append(esc(data.get("one_liner") or summary, 900))
        vals = list_lines(data.get("sector_positions"), 6)
        if vals:
            lines += ["", "<b>섹터 포지션</b>"] + vals
        triggers = list_lines(data.get("action_triggers"), 4)
        if triggers:
            lines += ["", "<b>액션 트리거</b>"] + triggers
    else:
        lines.append(esc(summary, 1000))

    lines += ["", "<i>analyst_common_wrapper.sh 공통 발송 · message_id TSV 기록</i>"]
    text = "\n".join([line for line in lines if line is not None])
    if len(text) > 3900:
        text = text[:3850].rstrip() + "\n…\n<i>일부 생략</i>"
    return text


def append_tsv(status: str, message_id: Any, result: dict[str, Any], error: Any = None) -> None:
    if not tsv_path.exists():
        tsv_path.write_text("timestamp\tstatus\tmessage_id\tsector\tchat_id\ttopic_id\terror_type\terror\n", encoding="utf-8")
    ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")
    err = str(error if error is not None else result.get("error") or "").replace("\n", " ")[:500]
    row = [
        ts,
        status,
        "" if message_id is None else str(message_id),
        str(result.get("sector") or sector),
        "" if result.get("chat_id") is None else str(result.get("chat_id")),
        "" if result.get("topic_id") is None else str(result.get("topic_id")),
        classify_error(err),
        err,
    ]
    with tsv_path.open("a", encoding="utf-8") as f:
        f.write("\t".join(row) + "\n")


def atomic_update_sent(data: dict[str, Any], result: dict[str, Any]) -> None:
    data["telegram_sent"] = bool(result.get("ok"))
    data["telegram_common_wrapper"] = {
        "at": datetime.now().isoformat(timespec="seconds"),
        "sector": result.get("sector") or sector,
        "message_id": result.get("message_id"),
        "status": result.get("status"),
        "chat_id": result.get("chat_id"),
        "topic_id": result.get("topic_id"),
        "source": "analyst_common_wrapper.sh",
    }
    fd, tmp = tempfile.mkstemp(dir=str(latest_path.parent), prefix=latest_path.name + ".", suffix=".tmp")
    with os.fdopen(fd, "w", encoding="utf-8") as out:
        json.dump(data, out, ensure_ascii=False, indent=2)
        out.write("\n")
    os.replace(tmp, latest_path)


def main() -> int:
    data = json.loads(latest_path.read_text(encoding="utf-8"))
    text = build_message(data)
    result = tg.send_sector_result(sector, text, parse_mode="HTML")
    ok = bool(result.get("ok"))
    status = "OK" if ok else "FAIL"
    append_tsv(status, result.get("message_id"), result)
    atomic_update_sent(data, result)
    with wrap_log.open("a", encoding="utf-8") as f:
        f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] send analyst={analyst} ok={ok} message_id={result.get('message_id')} status={result.get('status')} error={result.get('error')}\n")
    print(json.dumps({"analyst": analyst, "ok": ok, "message_id": result.get("message_id"), "status": result.get("status"), "sector": result.get("sector"), "chat_id": result.get("chat_id"), "topic_id": result.get("topic_id"), "error": result.get("error")}, ensure_ascii=False))
    return 0 if ok else 1


if __name__ == "__main__":
    raise SystemExit(main())
PY

5. plist 변경 요약

4개 plist 모두 아래 형태로 통일했다.

ProgramArguments = [
  "/bin/bash",
  "/Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh",
  "<macro|fundamental|technical|pm>"
]

스케줄은 기존 유지:

label schedule wrapper arg
com.openclaw.analyst-macro 07:00 macro
com.openclaw.analyst-fundamental 07:10 fundamental
com.openclaw.analyst-technical 07:20 technical
com.openclaw.analyst-pm 07:35 pm

Hermes 경로 검증:

active analyst plists/wrapper에서 /Users/ron/.openclaw, ~/.openclaw 문자열 없음

6. 실제 실행 검증 — 생성 + 공통 발송

실행 명령:

for n in macro fundamental technical pm; do
  /bin/bash /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh "$n"
done

결과:

analyst generation rc send ok message_id sector chat_id topic_id
macro 0 true 2362 market -1003522748967 5
fundamental 0 true 2363 market -1003522748967 5
technical 0 true 2364 market -1003522748967 5
pm 0 true 2365 market -1003522748967 5

실행 로그:

[2026-04-15 20:51:43] END generation analyst=macro rc=0
{"analyst": "macro", "ok": true, "message_id": 2362, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}

[2026-04-15 20:59:23] END generation analyst=fundamental rc=0
{"analyst": "fundamental", "ok": true, "message_id": 2363, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}

[2026-04-15 21:05:28] END generation analyst=technical rc=0
{"analyst": "technical", "ok": true, "message_id": 2364, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}

[2026-04-15 21:12:24] END generation analyst=pm rc=0
{"analyst": "pm", "ok": true, "message_id": 2365, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}

7. caller trace 보강 후 send-only 재검증

초기 embedded Python 호출은 sector_trace caller가 <stdin>으로 찍혔다. 그래서 TELEGRAM_CALLER=analyst_common_wrapper.sh:<name>shared.telegram.send_sector_result()에서 우선 읽도록 보강했다.

검증 명령:

for n in macro fundamental technical pm; do
  ANALYST_COMMON_SKIP_RUN=1 /bin/bash /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh "$n"
done

최종 TSV 최신 row:

macro       2026-04-15T21:13:11 OK 2366 market -1003522748967 5 none
fundamental 2026-04-15T21:13:12 OK 2367 market -1003522748967 5 none
technical   2026-04-15T21:13:14 OK 2368 market -1003522748967 5 none
pm          2026-04-15T21:13:15 OK 2369 market -1003522748967 5 none

sector_trace 확인:

2026-04-15 21:13:10 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:macro text='<b>🌍 매크로 리포트</b>...'
2026-04-15 21:13:11 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:fundamental text='<b>📊 펀더멘탈 리포트</b>...'
2026-04-15 21:13:12 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:technical text='<b>📈 테크니컬 리포트</b>...'
2026-04-15 21:13:14 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:pm text='<b>🧭 PM 리포트</b>...'

8. LaunchAgent reload 시도

plist 파일 자체는 교체 완료됐다. 다만 이 세션에서 launchctl bootstrap gui/501 ...는 여전히 error 5로 실패했다.

===== com.openclaw.analyst-macro =====
Boot-out failed: 5: Input/output error
Bootstrap failed: 5: Input/output error
bootstrap_rc=5
Could not find service "com.openclaw.analyst-macro" in domain for user gui: 501

이 error 5는 이전 launchd 복구 작업에서도 동일하게 발생한 사용자 domain bootstrap 제약이다. 수동 실행과 plist 파일 rewrite는 완료됐지만, 실제 launchd reload는 GUI 터미널에서 다시 실행해야 한다.

9. 검증 커맨드

bash -n /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh
bash -n /Users/ron/.hermes/workspace/scripts/analyst_runner.sh
python3 -m py_compile /Users/ron/.hermes/workspace/scripts/shared/telegram.py

grep -RIn "/Users/ron/.openclaw\|~/.openclaw" \
  /Users/ron/Library/LaunchAgents/com.openclaw.analyst-{macro,fundamental,technical,pm}.plist \
  /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh \
  /Users/ron/.hermes/workspace/scripts/analyst_runner.sh

결과:

syntax OK
OpenClaw path grep: no hits

10. 남은 리스크

  1. launchd user domain bootstrap은 이 세션에서 error 5로 실패했다. plist 파일 변경은 완료됐으므로 GUI 터미널에서 reload 필요.
  2. analyst_runner.sh는 Claude CLI 기반이라 실행 시간이 5~8분/개 수준이다. wrapper는 실패 시 exit 1로 드러내지만, Claude 자체 장애는 여전히 upstream 리스크다.
  3. prompt override로 Claude 내부 Telegram 발송을 금지했지만, 모델이 지시를 무시할 가능성은 0이 아니다. 다만 wrapper가 최종 TSV/message_id의 단일 기준이 되도록 했다.

자체평가

  • 정확성: 4.8/5 — 4개 analyst 생성 실행 + 공통 발송 + message_id TSV 기록 완료.
  • 완성도: 4.7/5 — wrapper, shared result API, plist 변경, trace caller 보강까지 완료. launchctl reload만 환경 제약.
  • 검증: 4.8/5 — 실제 market topic 발송 8건(생성 후 4건 + caller 보강 검증 4건), TSV/sector_trace 확인.
  • 최소 변경: 4.5/5 — analyst 원본 py는 미수정. runner/prompt 전달부와 shared.telegram에 필요한 관측 API만 추가.
  • 종합: 4.7/5