Knowledge Nodes (Knowledge Ontology Execution Nodes)

Build Intelligent Data Integrations with Enterprise-Grade Governance

Introduction

Welcome to the KOEL (Knowledge Ontology Execution Layer) Reasoning Flow Developer Guide. This documentation will teach you how to build custom data integrations and processing logic inside ARPIA Workshop that power Knowledge Ontology nodes.

The Power of KOEL Reasoning Flows

KOEL Reasoning Flows represent a paradigm shift in enterprise data integration. Instead of building separate APIs, microservices, or ETL pipelines for each data source, you can now create intelligent, schema-driven integrations that automatically benefit from:

  • 🔐 Enterprise Security: Built-in bearer token authentication, ISO 42001 compliance, and complete audit trails
  • 🎯 Dynamic Query Execution: Filter, sort, paginate, and aggregate data on-the-fly without hardcoding logic
  • 🔄 Universal Data Access: Seamlessly integrate MySQL, PostgreSQL, REST APIs, GraphQL, cloud services, and legacy systems
  • 📊 Dual Report Modes: Generate both detailed row-level data (DETAIL) and aggregated analytics (GROUP) from a single codebase
  • 🚀 Zero Infrastructure: Write Python in the IDE, deploy instantly—no servers, containers, or DevOps overhead
  • 📝 Automatic Documentation: Your column schema becomes self-documenting API contracts
  • 🔍 Full Traceability: Every execution is logged with payload, response, and timing for compliance and debugging
  • 🧩 Composable Architecture: Chain multiple reasoning flows, cache results, and build complex data pipelines
  • 🤖 AI-Powered Reasoning: Full integration with ARPIA's AI governance framework for intelligent decision-making and autonomous agents

The result? What traditionally takes weeks of backend development, API design, authentication setup, and deployment pipelines can now be accomplished in hours. You write focused Python logic that solves your specific integration challenge, and KOEL handles everything else—validation, transformation, pagination, security, and governance.

Native Integration with ARPIA AI Platform

Your KOEL nodes are first-class citizens in the ARPIA ecosystem. Once deployed, they automatically become available across the entire platform:

  • 🎨 App Studio: Use your nodes as data sources for visual applications, dashboards, and user interfaces—no additional configuration needed
  • 🔌 MCP (Model Context Protocol): Expose your nodes as MCP tools that AI agents (Claude, GPT-4, etc.) can discover and invoke autonomously
  • 🌐 API Collections: Your nodes are instantly available as REST APIs with auto-generated OpenAPI specs for external integrations
  • 🧠 AI Reasoning Governance: Every node execution participates in ARPIA's AI governance framework with:
    • Reasoning Audit Trails: Track which AI agents accessed what data and why
    • Policy Enforcement: Apply data access policies, rate limits, and compliance rules automatically
    • Context Management: AI agents receive proper context about data schemas, business rules, and constraints
    • Decision Lineage: Trace AI decisions back through data sources and reasoning steps

This means your reasoning flow becomes:

  • A visual component in App Studio (drag-and-drop data widgets)
  • An AI agent tool via MCP (Claude can query your CRM data)
  • A REST API endpoint (external systems can consume)
  • A governed data node (full compliance and audit)

All from a single Python file in Workshop. No duplicate code, no synchronization issues, no deployment complexity.

Real-World Impact

Whether you're connecting to external CRM systems, processing real-time IoT data, aggregating financial reports, or building ML-powered predictions, KOEL Reasoning Flows give you enterprise power with startup speed.

Imagine:

  • An AI assistant that autonomously queries your Salesforce data via MCP
  • A dashboard that visualizes that same data in App Studio
  • An external partner consuming it via REST API
  • All governed by the same security, validation, and audit framework

That's the power of KOEL: write once, deploy everywhere, govern consistently.


What is a Reasoning Flow?

A Reasoning Flow is a Python-based Workshop Project that acts as the "brain" behind a Knowledge Ontology node. When users query a KOEL node through the API, your reasoning flow:

  1. Receives a structured payload with filters, sorting, and pagination parameters
  2. Processes the request by querying databases, calling APIs, or computing results
  3. Returns a properly formatted dataset that matches your defined schema

Think of it as a serverless function that bridges external data sources with ARPIA's Knowledge Ontology system.

Your Role as a Developer

You write Python code in the Workshop IDE (index.py) that:

  • ✅ Reads the incoming request payload
  • ✅ Applies filters, sorting, and pagination to your queries
  • ✅ Integrates with external systems (MySQL, REST APIs, etc.)
  • ✅ Returns data using column names defined in your schema

What This Guide Covers

This comprehensive guide includes:

  • Quick Start: Get running in 5 minutes with a minimal example
  • Column Schema: How to define and use your data structure
  • Payload Structure: Understanding filters, sorting, pagination, and report types
  • Response Format: How to structure your output correctly
  • Report Types: DETAIL (raw rows) vs GROUP (aggregations) with examples
  • Filters & Sorting: Complete guide to handling all filter types and sorting
  • Integration Examples: Real-world code for MySQL, REST APIs, Slack, and more
  • Advanced Patterns: Production-ready patterns for error handling, caching, and performance
  • Troubleshooting: Common errors and how to fix them

Before You Start

Prerequisites:

  • Basic Python knowledge
  • Understanding of SQL (for database integrations)
  • Familiarity with REST APIs (for web service integrations)
  • Access to ARPIA Workshop IDE

Tools Available:

  • Repository database connection via dkconnect.connect()
  • Variable storage/retrieval via ap_var(dbl, 'var_name', value)
  • Custom Python packages via requirements.txt
  • Full Python 3.x standard library

Creating Your Reasoning Flow

Before writing any code, create your Flow in Workshop:

  1. In Reasoning Flows, press New Flow.
  2. Enter a Name.
  3. Set Project Type — for most KOEL integrations use Batch Process. Use API Calls if your node needs to run on-demand via REST.
  4. Set Project Category to KOEL.
  5. Add an optional Description.
  6. Press Create.

Your flow's index.py opens in the Workshop IDE — ready for development.

📘

Project Category is organizational only — it does not change how the flow executes. It groups your KOEL flows in the workspace for navigation and filtering. For the full reference of all Project Types and Categories, see Reasoning Flows.

Image placeholder — New Flow dialog showing Name, Project Type, and Project Category = KOEL.


Quick Start

Minimal Example

# index.py
from dkconnect import connect

# Connect to repository database
dbl = connect()

# 1. Read payload
payload = ap_var(dbl, 'payload')

# 2. Process data (example: simple query)
cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers LIMIT 10")
rows = cursor.fetchall()

# 3. Return response using Column Ids from schema
response = [
    {
        "customer_id": row["customer_id"],
        "customer_name": row["customer_name"]
    }
    for row in rows
]

# 4. Save response
ap_var(dbl, 'response', response)

# Clean up
cursor.close()
dbl.close()

That's it! KOEL handles column transformation and pagination metadata in the API response.


Column Schema Configuration

Defining Columns in Workshop UI

Navigate to Column Schema tab and define your columns:

NameColumn IdOrientationType
Customer Idcustomer_idLeftNormal
Customer Namecustomer_nameLeftNormal
Creation Datecreation_dateLeftDate
Total Salestotal_salesRightCurrency
Churn Predictionchurn_predictionRightPercentage

✅ Critical Rule: Use Column Id as Response Keys

Your Python code MUST return dictionaries using Column Id as keys:

# ✅ CORRECT - using Column Id
response = [
    {
        "customer_id": "12345",
        "customer_name": "Acme Corp",
        "creation_date": "2024-01-15",
        "total_sales": 50000.00,
        "churn_prediction": 0.75
    }
]

# ❌ WRONG - using Name instead of Column Id
response = [
    {
        "Customer Id": "12345",  # Wrong!
        "Customer Name": "Acme Corp"  # Wrong!
    }
]

Return ALL Columns in Schema

Always return all columns defined in your schema. KOEL will filter columns based on Detail_Columns in the payload.


Reading the Payload

Payload Structure

The payload is automatically saved by KOEL before your code runs. Access it with:

payload = ap_var(dbl, 'payload')

Payload Contents

{
    "reportType": "DETAIL",  # or "GROUP"
    "Column_Format": "col_name",
    "page": 1,
    "page_limit_rows": 100,
    "Detail_Columns": ["customer_id", "customer_name"],
    "GroupByCol": ["customer_name"],
    "GroupByValues": ["total_sales"],
    "GroupFilter": {
        "filter_group_1": {
            "filter_1": {
                "Filter_Union": "AND",
                "Filter_Plugin": "M",
                "Filter_Column": "customer_id",
                "Filter_Type": "IN",
                "Filter_Value": ["12345", "12346"]
            }
        }
    },
    "order_by": [
        {"field": "customer_name", "direction": "ASC"}
    ],
    "debug": false
}

Extracting Values

report_type = payload.get('reportType', 'DETAIL')
filters = payload.get('GroupFilter', {})
order_by = payload.get('order_by', [])
group_by_cols = payload.get('GroupByCol', [])
group_by_values = payload.get('GroupByValues', [])
debug_mode = payload.get('debug', False)

Response Format

Basic Response Structure

response = [
    {
        "column_id_1": value1,
        "column_id_2": value2,
    }
]

ap_var(dbl, 'response', response)

Data Type Handling

response = [
    {
        "customer_id": "12345",           # String
        "customer_name": "Acme Corp",     # String
        "creation_date": "2024-01-15",    # String (YYYY-MM-DD)
        "total_sales": 50000.00,          # Float (Currency)
        "order_count": 25,                # Integer
        "churn_prediction": 0.75,         # Float (0-1 for percentage)
        "is_active": True,                # Boolean
        "metadata": {"key": "value"}      # Dict/Object
    }
]

Handling Null Values

response = [
    {
        "customer_id": row.get("id") or "",
        "customer_name": row.get("name") or "Unknown",
        "total_sales": float(row.get("sales") or 0.0),
        "last_order_date": row.get("last_order") if row.get("last_order") else None
    }
    for row in rows
]

Report Types

DETAIL Report

Returns raw rows — no aggregation. Apply filters, apply sorting, return individual rows.

Don't worry about: pagination (KOEL handles it), column selection (KOEL filters it).

Example: DETAIL Report with Pagination

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

page = payload.get('page', 1)
page_size = payload.get('page_limit_rows', 100)
offset = (page - 1) * page_size

where_clauses = []
params = []

filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']

        if op == 'IN':
            placeholders = ','.join(['%s'] * len(val))
            where_clauses.append(f"{col} IN ({placeholders})")
            params.extend(val)
        elif op == '=':
            where_clauses.append(f"{col} = %s")
            params.append(val)
        elif op == 'LIKE':
            where_clauses.append(f"{col} LIKE %s")
            params.append(f"%{val}%")

order_clauses = [f"{o['field']} {o['direction']}" for o in payload.get('order_by', [])]

query = "SELECT customer_id, customer_name, creation_date, total_sales FROM customers"
if where_clauses:
    query += " WHERE " + " AND ".join(where_clauses)
if order_clauses:
    query += " ORDER BY " + ", ".join(order_clauses)
query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])

cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()

response = [
    {
        "customer_id": row["customer_id"],
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

GROUP Report

Returns aggregated data. Use GroupByCol for dimensions, GroupByValues for aggregations.

Example: GROUP Report with Pagination

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

page = payload.get('page', 1)
page_size = payload.get('grp_page_limit_rows', 50)
offset = (page - 1) * page_size

group_by_cols = payload.get('GroupByCol', [])
group_by_values = payload.get('GroupByValues', [])

group_cols_sql = ', '.join(group_by_cols)
agg_sql = [f"SUM({col}) as {col}" for col in group_by_values]

where_clauses = []
params = []

filters = payload.get('GroupFilter', {})
for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']
        if op == '>=':
            where_clauses.append(f"{col} >= %s")
            params.append(val)

query = f"SELECT {', '.join([group_cols_sql] + agg_sql)} FROM orders"
if where_clauses:
    query += " WHERE " + " AND ".join(where_clauses)
query += f" GROUP BY {group_cols_sql}"

order_clauses = [f"{o['field']} {o['direction']}" for o in payload.get('order_by', [])]
if order_clauses:
    query += " ORDER BY " + ", ".join(order_clauses)

query += " LIMIT %s OFFSET %s"
params.extend([page_size, offset])

cursor = dbl.cursor(dictionary=True)
cursor.execute(query, params)
rows = cursor.fetchall()

response = [
    {
        "customer_name": row["customer_name"],
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Filters & Sorting

Filter Types Reference

Filter_TypeSQL EquivalentValue TypeExample
=column = valueSingle"ACTIVE"
!=column != valueSingle"INACTIVE"
>column > valueSingle1000
<column < valueSingle500
>=column >= valueSingle100
<=column <= valueSingle5000
INcolumn IN (...)Array["A", "B", "C"]
NOT INcolumn NOT IN (...)Array["X", "Y"]
LIKEcolumn LIKE %value%String"Acme"
BETWEENcolumn BETWEEN x AND yArray [min, max][100, 500]

Filter Handler Function

def build_where_clause(filters):
    where_clauses = []
    params = []

    for group_key, group_filters in filters.items():
        group_conditions = []
        for filter_key, filter_obj in group_filters.items():
            col = filter_obj['Filter_Column']
            op = filter_obj['Filter_Type']
            val = filter_obj['Filter_Value']

            if op in ['=', '!=', '>', '<', '>=', '<=']:
                group_conditions.append(f"{col} {op} %s")
                params.append(val)
            elif op == 'IN':
                placeholders = ','.join(['%s'] * len(val))
                group_conditions.append(f"{col} IN ({placeholders})")
                params.extend(val)
            elif op == 'NOT IN':
                placeholders = ','.join(['%s'] * len(val))
                group_conditions.append(f"{col} NOT IN ({placeholders})")
                params.extend(val)
            elif op == 'LIKE':
                group_conditions.append(f"{col} LIKE %s")
                params.append(f"%{val}%")
            elif op == 'BETWEEN':
                group_conditions.append(f"{col} BETWEEN %s AND %s")
                params.extend(val)

        if group_conditions:
            where_clauses.append(f"({' AND '.join(group_conditions)})")

    where_sql = ' OR '.join(where_clauses) if where_clauses else ""
    return where_sql, params

Sorting Handler

def build_order_clause(order_by):
    if not order_by:
        return ""
    order_parts = [f"{o['field']} {o['direction']}" for o in order_by]
    return "ORDER BY " + ", ".join(order_parts)

Integration Examples

Example 1: Local MySQL Query

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

cursor = dbl.cursor(dictionary=True)
cursor.execute("""
    SELECT
        customer_id,
        customer_name,
        DATE_FORMAT(creation_date, '%Y-%m-%d') as creation_date,
        total_sales
    FROM customers
    WHERE status = 'ACTIVE'
    ORDER BY total_sales DESC
    LIMIT 1000
""")

rows = cursor.fetchall()
response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"],
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Example 2: External MySQL Database

import mysql.connector
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

external_db = mysql.connector.connect(
    host="external-db.example.com",
    port=3306,
    user="readonly_user",
    password="secure_password",
    database="crm_prod"
)

try:
    cursor = external_db.cursor(dictionary=True)
    cursor.execute("""
        SELECT cust_id, cust_name, DATE(created_at) as creation_date, revenue_total
        FROM external_customers
        WHERE status = 'ACTIVE'
        ORDER BY revenue_total DESC
    """)
    rows = cursor.fetchall()
    response = [
        {
            "customer_id": str(row["cust_id"]),
            "customer_name": row["cust_name"],
            "creation_date": str(row["creation_date"]),
            "total_sales": float(row["revenue_total"])
        }
        for row in rows
    ]
    ap_var(dbl, 'response', response)
    cursor.close()
finally:
    external_db.close()
    dbl.close()

requirements.txt:

mysql-connector-python==8.2.0

Example 3: REST API Integration

import requests
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

api_response = requests.get(
    "https://api.example.com/v1/customers",
    headers={"Authorization": "Bearer YOUR_API_KEY"},
    params={"status": "active", "limit": 1000},
    timeout=30
)
api_response.raise_for_status()
api_data = api_response.json()

response = [
    {
        "customer_id": item["id"],
        "customer_name": item["name"],
        "creation_date": item["created_at"][:10],
        "total_sales": float(item["total_purchases"])
    }
    for item in api_data.get("customers", [])
]

ap_var(dbl, 'response', response)
dbl.close()

requirements.txt:

requests==2.31.0

Example 4: Slack Messages Integration

import requests
from datetime import datetime
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

SLACK_TOKEN = "xoxb-your-slack-bot-token"
CHANNEL_ID = "C01234567"

response_api = requests.get(
    "https://slack.com/api/conversations.history",
    headers={"Authorization": f"Bearer {SLACK_TOKEN}"},
    params={"channel": CHANNEL_ID, "limit": 100}
)
slack_data = response_api.json()

if not slack_data.get("ok"):
    raise Exception(f"Slack API error: {slack_data.get('error')}")

response = [
    {
        "message_id": msg.get("ts", ""),
        "user_id": msg.get("user", ""),
        "message_text": msg.get("text", ""),
        "timestamp": datetime.fromtimestamp(float(msg.get("ts", 0))).strftime("%Y-%m-%d %H:%M:%S"),
        "reactions_count": len(msg.get("reactions", []))
    }
    for msg in slack_data.get("messages", [])
]

ap_var(dbl, 'response', response)
dbl.close()

Example 5: Combined Data Sources

import requests
from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name FROM customers WHERE status = 'ACTIVE'")
local_customers = {row['customer_id']: row for row in cursor.fetchall()}
cursor.close()

api_response = requests.get(
    "https://api.analytics.example.com/sales",
    headers={"Authorization": "Bearer API_KEY"},
    timeout=30
)
sales_data = api_response.json()

response = []
for sale in sales_data.get("sales", []):
    cust_id = sale["customer_id"]
    if cust_id in local_customers:
        response.append({
            "customer_id": cust_id,
            "customer_name": local_customers[cust_id]["customer_name"],
            "creation_date": sale["order_date"],
            "total_sales": float(sale["amount"])
        })

ap_var(dbl, 'response', response)
dbl.close()

Advanced Patterns

Pattern 1: Dynamic Query Building

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

query_parts = ["SELECT customer_id, customer_name, creation_date, total_sales FROM customers"]
params = []

filters = payload.get('GroupFilter', {})
where_clauses = []
for group_key, group_filters in filters.items():
    for filter_key, filter_obj in group_filters.items():
        col = filter_obj['Filter_Column']
        op = filter_obj['Filter_Type']
        val = filter_obj['Filter_Value']
        if op == 'IN':
            placeholders = ','.join(['%s'] * len(val))
            where_clauses.append(f"{col} IN ({placeholders})")
            params.extend(val)
        elif op == 'LIKE':
            where_clauses.append(f"{col} LIKE %s")
            params.append(f"%{val}%")

if where_clauses:
    query_parts.append("WHERE " + " AND ".join(where_clauses))

order_by = payload.get('order_by', [])
if order_by:
    order_clauses = [f"{o['field']} {o['direction']}" for o in order_by]
    query_parts.append("ORDER BY " + ", ".join(order_clauses))

final_query = " ".join(query_parts)

cursor = dbl.cursor(dictionary=True)
cursor.execute(final_query, params)
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Pattern 2: Error Handling

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

try:
    cursor = dbl.cursor(dictionary=True)
    cursor.execute("SELECT * FROM customers")
    rows = cursor.fetchall()
    response = [
        {
            "customer_id": str(row["customer_id"]),
            "customer_name": row["customer_name"],
            "creation_date": str(row["creation_date"]),
            "total_sales": float(row["total_sales"])
        }
        for row in rows
    ]
    ap_var(dbl, 'response', response)
    cursor.close()
except Exception as e:
    print(f"❌ Error: {str(e)}")
    ap_var(dbl, 'response', [])
finally:
    dbl.close()

Pattern 3: Handling Pagination

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

report_type = payload.get('reportType', 'DETAIL')
page = payload.get('page', 1)
page_size = payload.get('page_limit_rows', 100) if report_type == 'DETAIL' else payload.get('grp_page_limit_rows', 50)
offset = (page - 1) * page_size

cursor = dbl.cursor(dictionary=True)
cursor.execute("""
    SELECT customer_id, customer_name, creation_date, total_sales
    FROM customers
    WHERE status = 'ACTIVE'
    ORDER BY customer_id
    LIMIT %s OFFSET %s
""", (page_size, offset))
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Pattern 4: Using Pandas for Data Processing

import pandas as pd
from dkconnect import connect, connect_engine

dbl = connect()
payload = ap_var(dbl, 'payload')

engine = connect_engine()
df = pd.read_sql("""
    SELECT customer_id, customer_name, creation_date, order_amount
    FROM orders
    WHERE status = 'COMPLETED'
""", engine)

df_grouped = df.groupby('customer_id').agg({
    'customer_name': 'first',
    'creation_date': 'min',
    'order_amount': 'sum'
}).reset_index()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d"),
        "total_sales": float(row["order_amount"])
    }
    for _, row in df_grouped.iterrows()
]

ap_var(dbl, 'response', response)
dbl.close()

requirements.txt:

pandas==2.0.0
sqlalchemy==2.0.0

Pattern 5: Caching with Custom Variables

from dkconnect import connect
from datetime import datetime

dbl = connect()
payload = ap_var(dbl, 'payload')

try:
    cache_data = ap_var(dbl, 'cached_customers')
    cache_time = ap_var(dbl, 'cache_timestamp')
    cache_dt = datetime.fromisoformat(cache_time)
    age_minutes = (datetime.utcnow() - cache_dt).total_seconds() / 60
    if age_minutes < 60:
        print(f"✅ Using cached data ({age_minutes:.1f} minutes old)")
        ap_var(dbl, 'response', cache_data)
        dbl.close()
        exit()
except:
    pass

cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT customer_id, customer_name, creation_date, total_sales FROM customers")
rows = cursor.fetchall()

response = [
    {
        "customer_id": str(row["customer_id"]),
        "customer_name": row["customer_name"],
        "creation_date": row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None,
        "total_sales": float(row["total_sales"])
    }
    for row in rows
]

ap_var(dbl, 'cached_customers', response)
ap_var(dbl, 'cache_timestamp', datetime.utcnow().isoformat())
ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Troubleshooting

Error: "No 'response' variable found"

Always call ap_var(dbl, 'response', dataset) before the script ends.

Error: "Column 'xyz' not in schema whitelist"

Only return Column Ids defined in your Column Schema tab.

Error: "Response validation failed"

Use Column Id (e.g. customer_id), not the display Name (e.g. Customer Id).

Empty Response

Always return a list — even if empty:

ap_var(dbl, 'response', [])

Script Crashes

Wrap in try/except and always return a response:

try:
    response = fetch_data()
    ap_var(dbl, 'response', response)
except Exception as e:
    print(f"❌ Error: {str(e)}")
    ap_var(dbl, 'response', [])
finally:
    dbl.close()

Date Format Issues

Use ISO format (YYYY-MM-DD):

creation_date = row["creation_date"].strftime("%Y-%m-%d") if row["creation_date"] else None

Quick Reference

Minimal Template

from dkconnect import connect

dbl = connect()
payload = ap_var(dbl, 'payload')

cursor = dbl.cursor(dictionary=True)
cursor.execute("SELECT * FROM table")
rows = cursor.fetchall()

response = [
    {
        "column_id_1": row["field1"],
        "column_id_2": row["field2"]
    }
    for row in rows
]

ap_var(dbl, 'response', response)
cursor.close()
dbl.close()

Key Functions

payload = ap_var(dbl, 'payload')       # Read payload
ap_var(dbl, 'response', dataset)       # Save response
ap_var(dbl, 'custom_var', value)       # Save custom variable
value = ap_var(dbl, 'custom_var')      # Read custom variable
dbl = connect()                         # Repository DB connection
engine = connect_engine()               # SQLAlchemy engine

Column Schema Mapping

UI FieldUse In PythonExample
NameDisplay only"Customer Id"
Column IdResponse key"customer_id"
TypeData type hintNormal, Date, Currency