Takopi Architecture & Lifecycle¶
Layer Diagram¶
flowchart TB
subgraph CLI["CLI Layer"]
cli[cli.py]
cli_desc["Entry point, config loading, lock file"]
end
subgraph Plugins["Plugin Layer"]
entrypoints[plugins.py<br/>entrypoint discovery]
engines[engines.py]
transports[transports.py]
commands[commands.py]
api[api.py<br/>public plugin API]
end
subgraph Orchestration["Orchestration Layer"]
router[AutoRouter<br/>router.py]
scheduler[ThreadScheduler<br/>scheduler.py]
projects[ProjectsConfig<br/>config.py]
runtime[TransportRuntime<br/>transport_runtime.py]
end
subgraph Bridge["Bridge Layer"]
tg_bridge[telegram/bridge.py<br/>run_main_loop]
runner_bridge[runner_bridge.py<br/>handle_message]
end
subgraph Runner["Runner Layer"]
runner_proto[Runner Protocol<br/>runner.py]
runners[runners/<br/>claude, codex, opencode, pi]
schemas[schemas/<br/>JSONL decoders]
end
subgraph Transport["Transport Layer"]
transport[Transport Protocol]
presenter[Presenter Protocol]
tg_client[telegram/client.py]
tg_render[telegram/render.py]
markdown[markdown.py]
end
subgraph External["External"]
agent_clis[Agent CLIs<br/>claude, codex, pi]
telegram_api[Telegram Bot API]
end
cli --> router
cli --> scheduler
cli --> projects
cli --> engines
cli --> transports
cli --> commands
engines --> entrypoints
transports --> entrypoints
commands --> entrypoints
router --> runtime
projects --> runtime
router --> tg_bridge
scheduler --> tg_bridge
runtime --> tg_bridge
tg_bridge --> commands
tg_bridge --> runner_bridge
runner_bridge --> runner_proto
runner_proto --> runners
runners --> schemas
runners --> agent_clis
runner_bridge --> transport
runner_bridge --> presenter
transport --> tg_client
presenter --> tg_render
presenter --> markdown
tg_client --> telegram_api
Plugin Architecture¶
Takopi discovers plugins via Python entrypoints and keeps loading lazy:
- Engine backends (
takopi.engine_backends) - Transport backends (
takopi.transport_backends) - Command backends (
takopi.command_backends)
Entrypoint names become plugin IDs, are validated up front (reserved names, regex),
and are only loaded when needed. The public surface for plugin authors lives in
takopi.api, while transports and commands interact with core routing via
TransportRuntime.
Domain Model¶
classDiagram
class ResumeToken {
+engine: EngineId
+value: str
}
class Action {
+id: str
+kind: ActionKind
+title: str
+detail: dict
}
class StartedEvent {
+type: "started"
+engine: EngineId
+resume: ResumeToken
+title: str?
}
class ActionEvent {
+type: "action"
+engine: EngineId
+action: Action
+phase: started|updated|completed
+ok: bool?
+message: str?
}
class CompletedEvent {
+type: "completed"
+engine: EngineId
+ok: bool
+answer: str
+resume: ResumeToken?
+usage: dict?
}
StartedEvent --> ResumeToken
ActionEvent --> Action
CompletedEvent --> ResumeToken
note for Action "ActionKind: command | tool | file_change |\nweb_search | subagent | note | turn | warning | telemetry"
Message Lifecycle¶
sequenceDiagram
participant User
participant Telegram
participant Bridge as telegram/bridge.py
participant Scheduler as ThreadScheduler
participant RunnerBridge as runner_bridge.py
participant Runner
participant AgentCLI as Agent CLI
participant Command as Command Plugin
User->>Telegram: Send message
Telegram->>Bridge: poll_incoming()
Bridge->>Bridge: Parse slash command
alt Command plugin
Bridge->>Command: handle(ctx)
Command->>RunnerBridge: run_one/run_many (optional)
RunnerBridge->>Telegram: Send progress/final
else Default routing
Bridge->>Bridge: Parse directives<br/>(/engine, /project, @branch)
Bridge->>Bridge: Extract resume token<br/>from reply
Bridge->>Bridge: Resolve worktree<br/>(if @branch)
Bridge->>Scheduler: enqueue(ThreadJob)
Scheduler->>RunnerBridge: handle_message()
RunnerBridge->>Telegram: Send progress message
RunnerBridge->>Runner: run(prompt, resume)
end
Runner->>AgentCLI: Spawn subprocess
loop JSONL Stream
AgentCLI-->>Runner: JSONL event
Runner-->>RunnerBridge: TakopiEvent
RunnerBridge->>Telegram: Edit progress message
end
AgentCLI-->>Runner: Completed
Runner-->>RunnerBridge: CompletedEvent
RunnerBridge->>Telegram: Send final answer
RunnerBridge->>Telegram: Delete progress message
Runner Execution Flow¶
flowchart TD
A[runner.run\nprompt, resume_token] --> B[Acquire Session Lock<br/>SessionLockMixin]
B --> C[Build Command]
C --> D{Engine?}
D -->|Claude| D1["claude --print --output-format stream-json<br/>[--resume id] prompt"]
D -->|Codex| D2["codex exec --json<br/>[resume <token>] -"]
D -->|Pi| D3["pi --print --mode json<br/>--session <id> <prompt>"]
D -->|OpenCode| D4["opencode run --format json<br/>[--session id] -- <prompt>"]
D1 --> E[Spawn Subprocess<br/>anyio.open_process]
D2 --> E
D3 --> E
D4 --> E
E --> F[Stream JSONL from stdout]
F --> G[Decode with msgspec]
G --> H[Translate to TakopiEvent]
H --> I[yield event]
I --> F
F -->|EOF| J[Return]
Resume Token Flow¶
sequenceDiagram
participant User
participant Bridge
participant Runner
participant CLI as Agent CLI
Note over User,CLI: New Conversation
User->>Bridge: "fix the bug"
Bridge->>Runner: run(prompt, None)
Runner->>CLI: claude "fix the bug"
CLI-->>Runner: StartedEvent(resume=abc123)
Runner-->>Bridge: Stream events
Bridge->>User: Final message with:<br/>claude --resume abc123<br/>ctx: project @branch
Note over User,CLI: Resume Conversation
User->>Bridge: Reply: "now add tests"
Bridge->>Bridge: extract_resume(reply_text)<br/>→ ResumeToken(claude, abc123)
Bridge->>Bridge: parse_ctx_line()<br/>→ project, branch
Bridge->>Runner: run("now add tests", token)
Runner->>CLI: claude --resume abc123 "now add tests"
CLI-->>Runner: Continues session
Runner-->>Bridge: Stream events
Bridge->>User: Final message
Component Dependencies¶
flowchart TD
cli[cli.py] --> config[config.py]
cli --> engines[engines.py]
cli --> transports[transports.py]
cli --> commands[commands.py]
cli --> lockfile[lockfile.py]
engines --> plugins[plugins.py]
transports --> plugins
commands --> plugins
engines --> backends[backends.py]
backends --> runners[runners/]
backends --> runner[runner.py]
subgraph runners[runners/]
claude[claude.py]
codex[codex.py]
opencode[opencode.py]
pi[pi.py]
end
subgraph schemas[schemas/]
claude_s[claude.py]
codex_s[codex.py]
opencode_s[opencode.py]
pi_s[pi.py]
end
claude --> claude_s
codex --> codex_s
opencode --> opencode_s
pi --> pi_s
cli --> router[router.py]
tg_bridge --> runtime[transport_runtime.py]
runtime --> router
runtime --> config
tg_bridge --> commands
runner --> runner_bridge[runner_bridge.py]
runner_bridge --> tg_bridge
tg_bridge --> client[telegram/client.py]
tg_bridge --> render[telegram/render.py]
client --> transport[transport.py]
runner_bridge --> progress[progress.py]
runner_bridge --> events[events.py]
render --> presenter[presenter.py]
presenter --> markdown[markdown.py]
Configuration Structure¶
flowchart LR
subgraph Config["~/.takopi/"]
toml[takopi.toml]
lock[takopi.lock]
end
subgraph toml_contents["takopi.toml"]
direction TB
global["transport<br/>default_engine<br/>default_project"]
telegram_cfg["[transports.telegram]<br/>bot_token = ...<br/>chat_id = ..."]
plugins_cfg["[plugins]<br/>enabled = [...]"]
plugins_extra["[plugins.mycommand]<br/>setting = ..."]
claude_cfg["[claude]<br/>model = ..."]
codex_cfg["[codex]<br/>model = ..."]
projects_cfg["[projects.alias]<br/>path = ...<br/>worktrees_dir = ...<br/>default_engine = ..."]
end
toml --> toml_contents
Thread Scheduling¶
flowchart TD
subgraph Incoming[Incoming Messages]
m1[Message 1<br/>new thread]
m2[Message 2<br/>reply to thread A]
m3[Message 3<br/>reply to thread A]
m4[Message 4<br/>new thread]
end
subgraph Scheduler[ThreadScheduler]
direction TB
q1[Thread A Queue]
q2[Thread B Queue]
q3[Thread C Queue]
end
subgraph Workers[Worker Tasks]
w1[Worker A]
w2[Worker B]
w3[Worker C]
end
m1 --> q2
m2 --> q1
m3 --> q1
m4 --> q3
q1 --> w1
q2 --> w2
q3 --> w3
w1 --> runner1[Runner.run]
w2 --> runner2[Runner.run]
w3 --> runner3[Runner.run]
note1[Jobs in same thread<br/>execute sequentially]
note2[Different threads<br/>execute in parallel]
Summary¶
| Layer | Components | Responsibility |
|---|---|---|
| CLI | cli.py |
Entry point, config, lock |
| Plugins | plugins.py, engines.py, transports.py, commands.py, api.py |
Entrypoint discovery, plugin loading, public API boundary |
| Orchestration | router.py, scheduler.py, config.py |
Engine selection, job queuing, project config |
| Bridge | telegram/bridge.py, runner_bridge.py |
Message handling, execution coordination |
| Runner | runner.py, runners/*.py, schemas/*.py |
Agent CLI subprocess, JSONL parsing, event translation |
| Transport | transport.py, presenter.py, telegram/client.py |
Telegram API, message rendering |
| Domain | model.py, progress.py, events.py |
Event types, action tracking |
| Utils | worktrees.py, utils/*.py, markdown.py |
Git worktrees, formatting, paths |