DEV Community

Cover image for GraphBit's Rust Core Modules: Workflow Engine, Graph Management, and Agent System Overview
Yeahia Sarker
Yeahia Sarker

Posted on

GraphBit's Rust Core Modules: Workflow Engine, Graph Management, and Agent System Overview

Overview

Below is an implementation‑grounded inventory of GraphBit’s core modules based on the current Rust source files I examined (not docs). For each, I outline purpose, implementation status, key components, integration points, and Python exposure where visible or inferable. I include brief, verified code excerpts as evidence.

Note: The Rust core is well‑structured and functional for many workflow use cases (agent, condition, transform, delay, document loading). Split/Join/HttpRequest/Custom are declared but not executed by the engine yet. Tool calling is implemented as a two‑phase orchestration with execution delegated to the Python layer via a structured “tool_calls_required” result.

Workflow Engine

Module: core/src/workflow.rs

  • Purpose
    • Orchestrates validated graphs in dependency‑aware batches with concurrent execution, per‑node retries, and per‑agent circuit breakers; resolves LLM config per node; auto‑registers missing agents with hierarchical config resolution; injects implicit context from parent node outputs into prompts; two‑phase tool orchestration.
  • Implementation Status
    • Implemented and functional for Agent, Condition, Transform, Delay, DocumentLoader nodes.
    • Tool orchestration: partial (returns structured tool calls; delegates execution to Python).
    • Split/Join/HttpRequest/Custom: declared (in graph) but rejected at execution time (unsupported).
  • Key Components
    • Workflow, WorkflowBuilder, WorkflowExecutor (profiles: default/high‑throughput/low‑latency/memory‑optimized)
    • Dependency batching + parallel task spawning
    • Retry logic (types::RetryConfig), Circuit breaker per agent (types::CircuitBreaker)
    • Implicit parent‑output preamble and Context JSON for agent prompts
    • Tool orchestration via node_config["tool_schemas"] → returns "tool_calls_required"
  • Integration Points
    • Uses graph::WorkflowGraph for structure/validation
    • Uses agents::AgentTrait, llm::* (via agents)
    • Uses types::{RetryConfig, CircuitBreaker, ConcurrencyManager, WorkflowContext, NodeExecutionResult}
    • Calls document_loader::DocumentLoader (for DocumentLoader node)
  • Python API Exposure
    • The engine is designed to surface tool calls back to Python for execution. I didn’t open the Python bindings in this pass; earlier analysis indicated PyO3 bindings expose workflow construction/execution. Confidence: medium.

Evidence: two‑phase tool orchestration and implicit parent context injection:

Ok(serde_json::json!({

"type": "tool_calls_required",

"content": llm_response.content,

"tool_calls": tool_calls_json,

"original_prompt": prompt,

"message": "Tool execution should be handled by Python layer..."

}))

let implicit_preamble = if sections.is_empty() && context_json_block.is_empty() { "" .to_string() } else {

let directive_line = "Instruction: You MUST use the [Context JSON]...";

...

format!("Context from prior nodes (auto-injected):\n{sections_block}{directive_line}\n{context_json_block}")

};

Unsupported node types handled explicitly:

_ => Err(GraphBitError::workflow_execution(format!(

"Unsupported node type: {:?}", node.node_type

))),

Graph Management

Module: core/src/graph.rs

  • Purpose
    • Defines DAG structure, nodes/edges, caching of dependencies/dependents, validation (cycles; node integrity; supported DocumentLoader types; optional unique names), and enforces unique agent_id across agent nodes.
  • Implementation Status
    • Implemented and functional. Validation is comprehensive; adjacency caches speed readiness checks and next‑node selection.
  • Key Components
    • WorkflowGraph (petgraph DiGraph; caches), WorkflowNode (config/schemas/retry/timeout), NodeType (Agent, Condition, Transform, Split, Join, Delay, HttpRequest, Custom, DocumentLoader), WorkflowEdge and EdgeType
  • Integration Points
    • Consumed by WorkflowExecutor; NodeType drives execution handler choice; RetryConfig moved from types.rs; DocumentLoader validation plugs into supported types.
  • Python API Exposure
    • Typically exposed via PyO3 wrappers for node/edge creation; not directly verified in this pass. Confidence: medium.

Evidence: unique agent_id enforcement and DocumentLoader validation:

if !duplicates.is_empty() {

return Err(GraphBitError::graph(format!(

   `"Duplicate agent_id detected... Conflicts: {}", parts.join("; ")`
Enter fullscreen mode Exit fullscreen mode

)));

}

let supported_types = ["pdf","txt","docx","json","csv","xml","html"];

if !supported_types.contains(&document_type.to_lowercase().as_str()) {

return Err(GraphBitError::graph(format!("Unsupported document type: {document_type}...")));

}

NodeType coverage:

pub enum NodeType {

Agent { agent_id: AgentId, prompt_template: String },

Condition { expression: String },

Transform { transformation: String },

Split, Join,

Delay { duration_seconds: u64 },

HttpRequest { url: String, method: String, headers: HashMap<String,String> },

Custom { function_name: String },

DocumentLoader { document_type: String, source_path: String, encoding: Option<String> },

}

Agent System

Module: core/src/agents.rs

  • Purpose
    • Defines AgentTrait and a standard LLM‑based Agent implementation that builds LLM requests, validates provider config at construction, processes messages, executes for structured JSON if possible, and supports output validation.
  • Implementation Status
    • Implemented and functional; Agents use the LlmProvider wrapper; validates API config by a test call during creation.
  • Key Components
    • AgentConfig, Agent, AgentTrait, AgentBuilder; build_llm_request(), execute(), process_message(), validate_output()
  • Integration Points
    • Used by WorkflowExecutor; obtains LlmProvider via LlmProviderFactory (llm/mod.rs)
  • Python API Exposure
    • Typically exposed via bindings for constructing agents and their configs. Not verified in this pass. Confidence: medium.

Evidence: provider validation on agent creation:

let test_request = LlmRequest::new("test").with_max_tokens(1);

if let Err(e) = llm_provider.complete(test_request).await {

return Err(GraphBitError::config(format!(

   `"LLM configuration validation failed: {e}"`
Enter fullscreen mode Exit fullscreen mode

)));

}

LLM Abstraction and Providers

Module: core/src/llm/mod.rs

  • Purpose
    • Unifies provider API behind LlmProviderTrait and a factory; defines request/response/message types and tool metadata structures (LlmTool, LlmToolCall).
  • Implementation Status
    • Implemented and functional as an abstraction; factory covers many providers (openai, anthropic, deepseek, huggingface, ollama, perplexity, openrouter, fireworks, xai). Streaming/function‑calling are trait hooks; default false.
    • Actual provider implementations are referenced via submodules; I didn’t open each concrete file in this pass, but the factory wiring and module declarations are present. Confidence: medium for specific provider completeness.
  • Key Components
    • LlmRequest/LlmMessage/LlmTool/LlmToolCall; LlmProviderTrait; LlmProvider wrapper; LlmProviderFactory with robust config routing; LlmResponse/LlmUsage
  • Integration Points
    • Used by agents::Agent; tool orchestration in workflow.rs constructs tools via LlmTool
  • Python API Exposure
    • Config payloads (LlmConfig) are serde‑serializable and can be passed from Python; bindings likely expose config creation helpers. Not verified in this pass.

Evidence: provider factory wiring:

match config {

LlmConfig::OpenAI { api_key, model, .. } =>

   `Ok(Box::new(openai::OpenAiProvider::new(api_key, model)?)),`
Enter fullscreen mode Exit fullscreen mode

LlmConfig::Anthropic { api_key, model, .. } =>

   `Ok(Box::new(anthropic::AnthropicProvider::new(api_key, model)?)),`
Enter fullscreen mode Exit fullscreen mode

LlmConfig::Unconfigured { message } =>

   `Err(GraphBitError::config(format!("LLM provider not configured: {}", message))),`
Enter fullscreen mode Exit fullscreen mode

}

Module: core/src/llm/providers.rs

  • Purpose
    • Strongly typed LlmConfig enum across vendors; LlmProviderTrait; LlmProvider wrapper including optional stream interface.
  • Implementation Status
    • Implemented and functional; default implementations return “not supported” for streaming/function‑calling unless providers override.
  • Key Components
    • LlmConfig builders (openai(), anthropic(), ollama(), etc.); LlmProviderTrait; LlmProvider wrapper with logging
  • Integration Points
    • Used by LlmProviderFactory and agents::Agent to route calls
  • Python API Exposure
    • LlmConfig is serde‑friendly and used in node config fields in workflow; likely exposed to Python. Not verified in this pass.

Evidence: default unsupported streaming:

async fn stream(&self, _request: LlmRequest)

-> GraphBitResult<Box<dyn futures::Stream<Item=GraphBitResult<LlmResponse>> + Unpin + Send>>

{

Err(GraphBitError::config("Streaming not supported by this provider"))

}

Resilience and Concurrency

Module: core/src/types.rs (Retry, Circuit Breaker, Concurrency)

  • Purpose
    • Provides RetryConfig with exponential backoff + jitter and retryable error classification; CircuitBreaker with Closed/Open/HalfOpen state machine; a per‑node‑type ConcurrencyManager using atomics and notify queues (no global semaphore bottleneck); execution stats and workflow context types.
  • Implementation Status
    • Implemented and functional. Concurrency acquisition uses atomic CAS; stats tracked.
  • Key Components
    • RetryConfig, RetryableErrorType, CircuitBreakerConfig/CircuitBreaker, ConcurrencyConfig/ConcurrencyManager/ConcurrencyPermits, ConcurrencyStats
  • Integration Points
    • Consumed by WorkflowExecutor for retries and circuit breaking; per‑agent breaker keyed by AgentId; concurrency for agent nodes
  • Python API Exposure
    • Exposed indirectly via workflow executor options; not directly verified.

Evidence: atomic concurrency acquisition:

loop {

let current = current_count.load(Ordering::Acquire);

if current < max_concurrent {

   `if current_count.compare_exchange(current, current+1, Ordering::AcqRel, Ordering::Acquire).is_ok() {`

       `break`

   `}`
Enter fullscreen mode Exit fullscreen mode

}

wait_queue.notified().await;

}

Tool Orchestration

  • Where Implemented
    • In workflow.rs agent execution path, enabled when node.config contains "tool_schemas".
  • Purpose
    • Converts tool schemas to LlmTool and calls the provider; if tool calls are returned, the engine returns a structured JSON payload indicating “tool_calls_required” for execution in Python (keeps Rust core provider‑agnostic and lets Python manage tool registry/execution).
  • Implementation Status
    • Partial but functional for round‑trip signaling; tool execution is not performed in Rust.
  • Key Components
    • Node config key "tool_schemas"; LlmTool; execute_agent_with_tools() orchestration
  • Integration Points
    • Expects Python side to read the returned payload, execute tools, and potentially resume
  • Python API Exposure
    • Designed for Python tool registry/execution; bindings should expose workflow execution results transparently. Not verified in this pass.

Evidence: detection of tool_schemas:

let has_tools = node_config.contains_key("tool_schemas");

if has_tools {

let result = Self::execute_agent_with_tools(...).await;

return result

}

Document Loading

  • Where Implemented
    • Validation in graph.rs; execution path in workflow.rs delegates to crate::document_loader::DocumentLoader (module file not opened in this pass, but invocation is wired).
  • Purpose
    • Node to load documents by type (pdf/txt/docx/json/csv/xml/html) from a source path; makes outputs available to downstream nodes and implicit agent preambles.
  • Implementation Status
    • Node type is validated; engine has a static execution handler that calls DocumentLoader. Confidence: high for node handling; medium for full breadth of loaders without inspecting the module file.
  • Integration Points
    • WorkflowExecutor::execute_document_loader_node_static(); parent output flows into agent prompts
  • Python API Exposure
    • Outputs are just JSON in context; typically surfaced to Python automatically.

Evidence: node execution path (call site):

NodeType::DocumentLoader { document_type, source_path, .. } => {

Self::execute_document_loader_node_static(document_type, source_path, context.clone()).await

}

Embeddings and Text Processing

  • Where Implemented
    • Indicated by earlier analysis (core/src/embeddings.rs and Python bindings for text splitting). I didn’t re‑open these files in this pass.
  • Purpose
    • Unified EmbeddingService with multiple providers (e.g., OpenAI, HuggingFace); text splitters for RAG‑style preprocessing.
  • Implementation Status
    • Previously observed as implemented (Rust) and exposed (Python) with batch support and similarity helpers. Confidence: medium (not re‑verified here).
  • Integration Points
    • Complements LLM agents and DocumentLoader for RAG workflows; accessible from Python.
  • Python API Exposure
    • Exposed via python/src/embeddings and python/src/text_splitter. Confidence: medium.

Python Bindings and Utilities

  • Where Implemented
    • Via PyO3 modules (earlier analysis of python/src/lib.rs and submodules like python/src/workflow/, python/src/llm/, python/src/embeddings/).
  • Purpose
    • Expose Workflow composition/execution, LLM configs, embeddings, text splitters, and tool registry/execution helpers to Python developers.
  • Implementation Status
    • Previously observed as present and wired; not re‑validated in this pass. Confidence: medium.
  • Integration Points
    • Tool orchestration expects Python to execute tool calls; workflow results are JSON‑friendly.
  • Testing/Packaging
    • Prior work indicated maturin develop in test pipeline; not re‑checked here.

What’s Fully Working vs. Partial/Planned

  • Fully Working (Rust engine paths I verified)
    • Graph build/validate (cycles, node validation, unique agent_id)
    • Workflow execution for: Agent, Condition, Transform, Delay, DocumentLoader
    • Per‑node retries + backoff + jitter; per‑agent circuit breaker
    • Per‑node‑type concurrency; dependency‑aware batching; implicit parent context injection into prompts
    • Multi‑provider LLM abstraction; factory wiring for many providers (OpenAI/Anthropic/Ollama as best‑supported by design)
  • Partial
    • Tool orchestration: structured tool call return to Python (no in‑Rust tool execution)
    • Streaming/function calling: trait hooks exist; provider‑specific support not confirmed here
  • Declared but not executed
    • Split, Join, HttpRequest, Custom (return “Unsupported node type” at execution)

Specific Use Case Advantages (from architecture)

  • Multi‑step agent workflows where parent outputs must be reliably injected into downstream prompts without boilerplate
  • Production reliability under flaky networks or provider hiccups (retries + CB)
  • High‑throughput parallel agent tasks with per‑type concurrency control without GIL bottlenecks
  • Mixed cloud/local model usage with a single LLM abstraction (e.g., OpenAI + Ollama)
  • Workflows that prefer Python‑side tool registries but a fast Rust orchestrator (two‑phase tool calls)

Top comments (0)