← 리포트 목록
analyst 4종 공통 Telegram wrapper 리팩토링
2026-04-15
analyst
[analyst, telegram, message-id, launchd, observability]
결론
analyst 4개(macro/fundamental/technical/pm)의 Telegram 발송을 공통 wrapper로 통일했다.
- 공통 wrapper:
/Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh - 4개 plist 모두 wrapper 호출로 변경 완료
- 4개 analyst 전체 실제 생성 실행 완료: exit 0
- 4개 market topic 실제 발송 완료: message_id
2362/2363/2364/2365 - caller trace 보강 후 send-only 검증 완료: message_id
2366/2367/2368/2369 - 4개 TSV 모두 최신
message_id/status기록 확인
1. 기존 구조 분석
기존 analyst_runner.sh
확인한 문제:
- Claude CLI 실행과 Telegram fallback이 한 파일에 섞여 있었다.
- 실패 시 직접 DM curl fallback 경로가 있었다.
latest.json생성 여부와 Telegram 도달 여부가 분리되어 있지 않았다.- analyst별 TSV 포맷이 서로 달랐다.
- sector topic 발송은 각 Claude 프롬프트가 자체적으로 수행해
message_id판정/로깅이 일관되지 않았다.
기존 exit 판정:
- Claude stream-json에서 이번 실행의
type=result이벤트를 읽어success/error/none판정 - success + 최근 history JSON 있으면
latest.jsonsync - 실패 시 기존 direct Telegram fallback 호출
- 다만 이 fallback은 message_id를 남기지 않고, 공통 sector routing도 아니었다.
기존 TSV 상태
작업 전에도 이전 수동 재전송 흔적은 있었지만 포맷이 통일되어 있지 않았다.
- macro: 수동 resend row 존재
- fundamental: resend row 존재
- technical: 기존
analyst_technical_last_telegram.tsv존재 - pm: resend row 존재
2. 설계
공통 wrapper 책임:
- 입력:
macro|fundamental|technical|pm - 기존 analyst runner 실행
- runner 성공/실패 exit code 확인
~/.hermes/workspace/memory/analyst-<name>/latest.json확인- latest JSON에서 analyst별 HTML 요약 생성
send_sector_result("market", ...)호출~/.hermes/logs/analyst_<name>_last_telegram.tsv에 기록- 성공 시 exit 0, 실패 시 exit 1
TSV 통일 포맷:
timestamp status message_id sector chat_id topic_id error_type error
추가로 기존 프롬프트 내부 Telegram 발송과 중복되지 않게 analyst_runner.sh가 Claude에 전달하는 프롬프트 끝에 운영 오버라이드를 주입했다.
Telegram/DM/sector 발송은 절대 실행하지 마세요.
send_sector, telegram_send.py, curl api.telegram.org 호출 금지.
latest.json과 필요한 볼트/메모리 산출물만 생성/갱신하세요.
Telegram 발송과 message_id 로깅은 wrapper가 일괄 처리합니다.
3. 변경 파일
/Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh— 신규/Users/ron/.hermes/workspace/scripts/analyst_runner.sh— 직접 DM fallback 제거 + Telegram 금지 prompt override 추가/Users/ron/.hermes/workspace/scripts/shared/telegram.py—send_sector_result()추가,TELEGRAM_CALLERtrace 지원/Users/ron/Library/LaunchAgents/com.openclaw.analyst-macro.plist/Users/ron/Library/LaunchAgents/com.openclaw.analyst-fundamental.plist/Users/ron/Library/LaunchAgents/com.openclaw.analyst-technical.plist/Users/ron/Library/LaunchAgents/com.openclaw.analyst-pm.plist
백업:
- runner 백업:
/Users/ron/.hermes/workspace/scripts/analyst_runner.sh.bak-common-wrapper-20260415T204136 - plist 백업:
/Users/ron/.hermes/backups/analyst_common_wrapper_plists_20260415T204546
4. 공통 wrapper 코드 전문
#!/usr/bin/env bash
# analyst_common_wrapper.sh — analyst 4종 공통 실행/Telegram message_id 로깅 wrapper
# Usage: analyst_common_wrapper.sh <macro|fundamental|technical|pm>
set -o pipefail
ANALYST="${1:-}"
case "$ANALYST" in
macro|fundamental|technical|pm) ;;
*) echo "Usage: $0 <macro|fundamental|technical|pm>" >&2; exit 2 ;;
esac
HOME_DIR="${HOME:-/Users/ron}"
SCRIPTS_DIR="$HOME_DIR/.hermes/workspace/scripts"
RUNNER="$SCRIPTS_DIR/analyst_runner.sh"
LOG_DIR="$HOME_DIR/.hermes/logs"
WRAP_LOG="$LOG_DIR/analyst_${ANALYST}_common_wrapper.log"
RUN_STDOUT="$LOG_DIR/analyst_${ANALYST}_common_runner_stdout.log"
RUN_STDERR="$LOG_DIR/analyst_${ANALYST}_common_runner_stderr.log"
LATEST="$HOME_DIR/.hermes/workspace/memory/analyst-${ANALYST}/latest.json"
TSV="$LOG_DIR/analyst_${ANALYST}_last_telegram.tsv"
mkdir -p "$LOG_DIR"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$WRAP_LOG"
}
append_tsv() {
local status="$1" msgid="$2" sector="$3" chat_id="$4" topic_id="$5" error_type="$6" error="$7"
if [ ! -f "$TSV" ]; then
printf 'timestamp\tstatus\tmessage_id\tsector\tchat_id\ttopic_id\terror_type\terror\n' > "$TSV"
fi
printf '%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n' \
"$(date '+%Y-%m-%dT%H:%M:%S%z')" "$status" "$msgid" "$sector" "$chat_id" "$topic_id" "$error_type" "${error//$'\n'/ }" >> "$TSV"
}
RUN_RC=0
if [ "${ANALYST_COMMON_SKIP_RUN:-0}" = "1" ]; then
log "SKIP generation by ANALYST_COMMON_SKIP_RUN=1 analyst=$ANALYST"
else
if [ ! -x "$RUNNER" ]; then
log "ERROR runner not executable: $RUNNER"
append_tsv "FAIL" "" "market" "" "" "runner_missing" "$RUNNER"
exit 1
fi
log "START generation analyst=$ANALYST runner=$RUNNER"
"$RUNNER" "$ANALYST" >"$RUN_STDOUT" 2>"$RUN_STDERR"
RUN_RC=$?
log "END generation analyst=$ANALYST rc=$RUN_RC stdout=$RUN_STDOUT stderr=$RUN_STDERR"
fi
if [ "$RUN_RC" -ne 0 ]; then
append_tsv "FAIL" "" "market" "" "" "runner_failed" "runner rc=$RUN_RC"
log "FAIL runner rc=$RUN_RC — Telegram send skipped to avoid stale success"
exit 1
fi
if [ ! -f "$LATEST" ]; then
append_tsv "FAIL" "" "market" "" "" "latest_missing" "$LATEST"
log "FAIL latest.json missing: $LATEST"
exit 1
fi
TELEGRAM_CALLER="analyst_common_wrapper.sh:${ANALYST}" \
PYTHONPATH="$SCRIPTS_DIR:$SCRIPTS_DIR/shared:$SCRIPTS_DIR/pipeline:${PYTHONPATH:-}" \
python3 - "$ANALYST" "$LATEST" "$TSV" "$WRAP_LOG" <<'PY'
from __future__ import annotations
import html
import json
import os
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import Any
from shared import telegram as tg
analyst = sys.argv[1]
latest_path = Path(sys.argv[2])
tsv_path = Path(sys.argv[3])
wrap_log = Path(sys.argv[4])
sector = "market"
NAMES = {
"macro": "🌍 매크로",
"fundamental": "📊 펀더멘탈",
"technical": "📈 테크니컬",
"pm": "🧭 PM",
}
def classify_error(err: object) -> str:
s = str(err or "")
low = s.lower()
if not s:
return "none"
if "dedup" in low:
return "dedupe"
if "timeout" in low or "timed out" in low:
return "timeout"
if "nodename" in low or "resolve" in low or "name or service" in low:
return "dns_resolution"
if "http error 400" in low or "bad request" in low:
return "telegram_http_400_bad_request"
if "http error 401" in low or "unauthorized" in low:
return "telegram_http_401_unauthorized"
if "http error 403" in low or "forbidden" in low:
return "telegram_http_403_forbidden"
if "topic" in low or "thread" in low:
return "topic_or_thread"
return "network_or_api"
def esc(v: Any, limit: int | None = None) -> str:
if v is None:
return ""
text = str(v).strip()
if limit and len(text) > limit:
text = text[:limit].rstrip() + "…"
return html.escape(text)
def list_lines(items: Any, limit: int = 4) -> list[str]:
out: list[str] = []
if isinstance(items, dict):
iterable = items.items()
for k, v in list(iterable)[:limit]:
out.append(f"• <b>{esc(k, 40)}</b>: {esc(v, 180)}")
elif isinstance(items, list):
for item in items[:limit]:
if isinstance(item, dict):
label = item.get("label") or item.get("type") or item.get("sector") or item.get("condition") or item.get("action") or json.dumps(item, ensure_ascii=False)
out.append("• " + esc(label, 220))
else:
out.append("• " + esc(item, 220))
return out
def build_message(data: dict[str, Any]) -> str:
title = NAMES.get(analyst, analyst)
date = data.get("date") or datetime.now().strftime("%Y-%m-%d")
sent_at = datetime.now().strftime("%Y-%m-%d %H:%M:%S KST")
summary = data.get("summary") or data.get("one_liner") or data.get("narrative") or "요약 없음"
lines: list[str] = [
f"<b>{title} 리포트</b>",
f"날짜: <code>{esc(date)}</code>",
f"전송시각: <code>{esc(sent_at)}</code>",
"",
]
if analyst == "macro":
regime = data.get("regime") or ""
conf = data.get("confidence")
if regime or conf is not None:
lines.append(f"<b>레짐</b>: {esc(data.get('regime_emoji'))} {esc(regime)}" + (f" ({esc(conf)}%)" if conf is not None else ""))
lines.append("<b>핵심 요약</b>")
lines.append(esc(summary, 900))
sigs = list_lines(data.get("signals"), 5)
if sigs:
lines += ["", "<b>주요 신호</b>"] + sigs
elif analyst == "fundamental":
lines.append("<b>핵심 요약</b>")
lines.append(esc(summary, 900))
earnings = data.get("earnings") or {}
for title2, key in [("긍정/상향", "upgrades"), ("주의/하향", "downgrades"), ("밸류에이션 플래그", "key_flags")]:
vals = list_lines(earnings.get(key), 4)
if vals:
lines += ["", f"<b>{title2}</b>"] + vals
elif analyst == "technical":
lines.append("<b>핵심 요약</b>")
lines.append(esc(summary, 900))
for title2, key in [("자산 로테이션", "asset_rotation"), ("시장 구조", "market_structure"), ("시나리오", "scenarios")]:
vals = list_lines(data.get(key), 4)
if vals:
lines += ["", f"<b>{title2}</b>"] + vals
elif analyst == "pm":
pos = data.get("positioning") or ""
conf = data.get("confidence")
if pos or conf is not None:
lines.append(f"<b>포지셔닝</b>: {esc(data.get('positioning_emoji'))} {esc(pos)}" + (f" ({esc(conf)}%)" if conf is not None else ""))
lines.append("<b>한줄 결론</b>")
lines.append(esc(data.get("one_liner") or summary, 900))
vals = list_lines(data.get("sector_positions"), 6)
if vals:
lines += ["", "<b>섹터 포지션</b>"] + vals
triggers = list_lines(data.get("action_triggers"), 4)
if triggers:
lines += ["", "<b>액션 트리거</b>"] + triggers
else:
lines.append(esc(summary, 1000))
lines += ["", "<i>analyst_common_wrapper.sh 공통 발송 · message_id TSV 기록</i>"]
text = "\n".join([line for line in lines if line is not None])
if len(text) > 3900:
text = text[:3850].rstrip() + "\n…\n<i>일부 생략</i>"
return text
def append_tsv(status: str, message_id: Any, result: dict[str, Any], error: Any = None) -> None:
if not tsv_path.exists():
tsv_path.write_text("timestamp\tstatus\tmessage_id\tsector\tchat_id\ttopic_id\terror_type\terror\n", encoding="utf-8")
ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%S%z")
err = str(error if error is not None else result.get("error") or "").replace("\n", " ")[:500]
row = [
ts,
status,
"" if message_id is None else str(message_id),
str(result.get("sector") or sector),
"" if result.get("chat_id") is None else str(result.get("chat_id")),
"" if result.get("topic_id") is None else str(result.get("topic_id")),
classify_error(err),
err,
]
with tsv_path.open("a", encoding="utf-8") as f:
f.write("\t".join(row) + "\n")
def atomic_update_sent(data: dict[str, Any], result: dict[str, Any]) -> None:
data["telegram_sent"] = bool(result.get("ok"))
data["telegram_common_wrapper"] = {
"at": datetime.now().isoformat(timespec="seconds"),
"sector": result.get("sector") or sector,
"message_id": result.get("message_id"),
"status": result.get("status"),
"chat_id": result.get("chat_id"),
"topic_id": result.get("topic_id"),
"source": "analyst_common_wrapper.sh",
}
fd, tmp = tempfile.mkstemp(dir=str(latest_path.parent), prefix=latest_path.name + ".", suffix=".tmp")
with os.fdopen(fd, "w", encoding="utf-8") as out:
json.dump(data, out, ensure_ascii=False, indent=2)
out.write("\n")
os.replace(tmp, latest_path)
def main() -> int:
data = json.loads(latest_path.read_text(encoding="utf-8"))
text = build_message(data)
result = tg.send_sector_result(sector, text, parse_mode="HTML")
ok = bool(result.get("ok"))
status = "OK" if ok else "FAIL"
append_tsv(status, result.get("message_id"), result)
atomic_update_sent(data, result)
with wrap_log.open("a", encoding="utf-8") as f:
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] send analyst={analyst} ok={ok} message_id={result.get('message_id')} status={result.get('status')} error={result.get('error')}\n")
print(json.dumps({"analyst": analyst, "ok": ok, "message_id": result.get("message_id"), "status": result.get("status"), "sector": result.get("sector"), "chat_id": result.get("chat_id"), "topic_id": result.get("topic_id"), "error": result.get("error")}, ensure_ascii=False))
return 0 if ok else 1
if __name__ == "__main__":
raise SystemExit(main())
PY
5. plist 변경 요약
4개 plist 모두 아래 형태로 통일했다.
ProgramArguments = [
"/bin/bash",
"/Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh",
"<macro|fundamental|technical|pm>"
]
스케줄은 기존 유지:
| label | schedule | wrapper arg |
|---|---|---|
| com.openclaw.analyst-macro | 07:00 | macro |
| com.openclaw.analyst-fundamental | 07:10 | fundamental |
| com.openclaw.analyst-technical | 07:20 | technical |
| com.openclaw.analyst-pm | 07:35 | pm |
Hermes 경로 검증:
active analyst plists/wrapper에서 /Users/ron/.openclaw, ~/.openclaw 문자열 없음
6. 실제 실행 검증 — 생성 + 공통 발송
실행 명령:
for n in macro fundamental technical pm; do
/bin/bash /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh "$n"
done
결과:
| analyst | generation rc | send ok | message_id | sector | chat_id | topic_id |
|---|---|---|---|---|---|---|
| macro | 0 | true | 2362 | market | -1003522748967 | 5 |
| fundamental | 0 | true | 2363 | market | -1003522748967 | 5 |
| technical | 0 | true | 2364 | market | -1003522748967 | 5 |
| pm | 0 | true | 2365 | market | -1003522748967 | 5 |
실행 로그:
[2026-04-15 20:51:43] END generation analyst=macro rc=0
{"analyst": "macro", "ok": true, "message_id": 2362, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}
[2026-04-15 20:59:23] END generation analyst=fundamental rc=0
{"analyst": "fundamental", "ok": true, "message_id": 2363, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}
[2026-04-15 21:05:28] END generation analyst=technical rc=0
{"analyst": "technical", "ok": true, "message_id": 2364, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}
[2026-04-15 21:12:24] END generation analyst=pm rc=0
{"analyst": "pm", "ok": true, "message_id": 2365, "status": "success", "sector": "market", "chat_id": -1003522748967, "topic_id": 5, "error": null}
7. caller trace 보강 후 send-only 재검증
초기 embedded Python 호출은 sector_trace caller가 <stdin>으로 찍혔다. 그래서 TELEGRAM_CALLER=analyst_common_wrapper.sh:<name>를 shared.telegram.send_sector_result()에서 우선 읽도록 보강했다.
검증 명령:
for n in macro fundamental technical pm; do
ANALYST_COMMON_SKIP_RUN=1 /bin/bash /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh "$n"
done
최종 TSV 최신 row:
macro 2026-04-15T21:13:11 OK 2366 market -1003522748967 5 none
fundamental 2026-04-15T21:13:12 OK 2367 market -1003522748967 5 none
technical 2026-04-15T21:13:14 OK 2368 market -1003522748967 5 none
pm 2026-04-15T21:13:15 OK 2369 market -1003522748967 5 none
sector_trace 확인:
2026-04-15 21:13:10 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:macro text='<b>🌍 매크로 리포트</b>...'
2026-04-15 21:13:11 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:fundamental text='<b>📊 펀더멘탈 리포트</b>...'
2026-04-15 21:13:12 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:technical text='<b>📈 테크니컬 리포트</b>...'
2026-04-15 21:13:14 [SECTOR_TRACE] send_sector(market) caller=analyst_common_wrapper.sh:pm text='<b>🧭 PM 리포트</b>...'
8. LaunchAgent reload 시도
plist 파일 자체는 교체 완료됐다. 다만 이 세션에서 launchctl bootstrap gui/501 ...는 여전히 error 5로 실패했다.
===== com.openclaw.analyst-macro =====
Boot-out failed: 5: Input/output error
Bootstrap failed: 5: Input/output error
bootstrap_rc=5
Could not find service "com.openclaw.analyst-macro" in domain for user gui: 501
이 error 5는 이전 launchd 복구 작업에서도 동일하게 발생한 사용자 domain bootstrap 제약이다. 수동 실행과 plist 파일 rewrite는 완료됐지만, 실제 launchd reload는 GUI 터미널에서 다시 실행해야 한다.
9. 검증 커맨드
bash -n /Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh
bash -n /Users/ron/.hermes/workspace/scripts/analyst_runner.sh
python3 -m py_compile /Users/ron/.hermes/workspace/scripts/shared/telegram.py
grep -RIn "/Users/ron/.openclaw\|~/.openclaw" \
/Users/ron/Library/LaunchAgents/com.openclaw.analyst-{macro,fundamental,technical,pm}.plist \
/Users/ron/.hermes/workspace/scripts/analyst_common_wrapper.sh \
/Users/ron/.hermes/workspace/scripts/analyst_runner.sh
결과:
syntax OK
OpenClaw path grep: no hits
10. 남은 리스크
- launchd user domain bootstrap은 이 세션에서 error 5로 실패했다. plist 파일 변경은 완료됐으므로 GUI 터미널에서 reload 필요.
analyst_runner.sh는 Claude CLI 기반이라 실행 시간이 5~8분/개 수준이다. wrapper는 실패 시 exit 1로 드러내지만, Claude 자체 장애는 여전히 upstream 리스크다.- prompt override로 Claude 내부 Telegram 발송을 금지했지만, 모델이 지시를 무시할 가능성은 0이 아니다. 다만 wrapper가 최종 TSV/message_id의 단일 기준이 되도록 했다.
자체평가
- 정확성: 4.8/5 — 4개 analyst 생성 실행 + 공통 발송 + message_id TSV 기록 완료.
- 완성도: 4.7/5 — wrapper, shared result API, plist 변경, trace caller 보강까지 완료. launchctl reload만 환경 제약.
- 검증: 4.8/5 — 실제 market topic 발송 8건(생성 후 4건 + caller 보강 검증 4건), TSV/sector_trace 확인.
- 최소 변경: 4.5/5 — analyst 원본 py는 미수정. runner/prompt 전달부와 shared.telegram에 필요한 관측 API만 추가.
- 종합: 4.7/5