SynapSynapDocs
DevelopmentExtending

Hybrid Plugins

Hybrid Plugins

Combine Data Pod schema with remote processing


When to Use

✅ Need custom data types (requires schema)
✅ Need event sourcing (requires events)
✅ But processing should be external (AI, scaling)

Perfect for: AI features that need persistent data.


How It Works

Split responsibilities:

Data Pod provides:

  • Database tables
  • Typed events
  • Webhook endpoints
  • Event handlers

Remote Service provides:

  • AI/ML processing
  • Business logic
  • External integrations

Architecture

┌─────────────────────────────────────────┐
│ Data Pod                                │
│                                         │
│ ┌─────────┐  ┌────────┐  ┌──────────┐ │
│ │ Schema  │  │ Events │  │ Webhooks │ │
│ └─────────┘  └────────┘  └──────────┘ │
│      │            │            │       │
│      └────────────┴────────────┘       │
│                   │                    │
└───────────────────┼────────────────────┘

              ┌─────▼─────┐
              │   HTTP    │
              └─────┬─────┘

┌───────────────────▼────────────────────┐
│ Remote Service                         │
│                                        │
│ ┌──────────┐  ┌──────────┐           │
│ │ AI Logic │  │ Callback │           │
│ └──────────┘  └──────────┘           │
└────────────────────────────────────────┘

Step-by-Step Tutorial

Step 1: Add to Data Pod

Add Schema

// packages/database/src/schema/my-feature.ts
export const myFeatureItems = pgTable('my_feature_items', {
  id: uuid('id').defaultRandom().primaryKey(),
  userId: text('user_id').notNull(),
  content: text('content').notNull(),
  status: text('status').default('pending'),
  aiAnalysis: jsonb('ai_analysis'),  // Results from remote service
});

Define Events

// packages/events/src/domain-events.ts
export type MyFeatureReceivedEvent = BaseEvent<
  'myfeature.received',
  'myfeature_item',
  {
    content: string;
    metadata: Record<string, unknown>;
  }
>;
 
export type MyFeatureAnalyzedEvent = BaseEvent<
  'myfeature.analyzed',
  'myfeature_item',
  {
    analysis: {
      sentiment: 'positive' | 'negative' | 'neutral';
      keywords: string[];
    };
  }
>;

Create Webhooks

// apps/api/src/webhooks/my-feature.ts
import { publishEvent, createMyFeatureReceivedEvent } from '@synap/events';
 
export const myFeatureWebhook = new Hono();
 
// Ingestion endpoint
myFeatureWebhook.post('/receive', async (c) => {
  const { content } = await c.req.json();
  const itemId = generateId();
  
  // Publish event
  const event = createMyFeatureReceivedEvent(itemId, {
    content,
    metadata: {}
  });
  
  await publishEvent(event, {
    userId: c.get('userId'),
    source: 'webhook'
  });
  
  return c.json({ success: true, itemId });
});
 
// Callback endpoint (from remote service)
myFeatureWebhook.post('/callback', async (c) => {
  const { itemId, analysis } = await c.req.json();
  
  // Publish analyzed event
  const event = createMyFeatureAnalyzedEvent(itemId, { analysis });
  
  await publishEvent(event, {
    userId: 'system',
    source: 'ai-service'
  });
  
  return c.json({ success: true });
});

Create Event Handlers

// packages/api/src/event-handlers/my-feature-storage.ts  
export async function handleMyFeatureReceived(
  event: MyFeatureReceivedEvent
) {
  // Store in database
  await db.insert(myFeatureItems).values({
    id: event.subjectId,
    userId: event.userId,
    content: event.data.content,
    status: 'pending'
  });
}
 
// packages/api/src/event-handlers/my-feature-intelligence.ts
export async function handleMyFeatureIntelligence(
  event: MyFeatureReceivedEvent
) {
  // Find registered service
  const services = await db
    .select()
    .from(intelligenceServices)
    .where(eq(intelligenceServices.status, 'active'));
  
  const aiService = services.find(s => 
    s.capabilities.includes('my-feature-analysis')
  );
  
  if (!aiService) return;
  
  // Call service
  await fetch(aiService.webhookUrl, {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${aiService.apiKey}`
    },
    body: JSON.stringify({
      itemId: event.subjectId,
      content: event.data.content,
      callbackUrl: `${process.env.API_URL}/webhooks/my-feature/callback`
    })
  });
}
 
// packages/api/src/event-handlers/my-feature-analysis.ts
export async function handleMyFeatureAnalyzed(
  event: MyFeatureAnalyzedEvent
) {
  // Update with analysis
  await db
    .update(myFeatureItems)
    .set({
      aiAnalysis: event.data.analysis,
      status: 'analyzed'
    })
    .where(eq(myFeatureItems.id, event.subjectId));
}

Step 2: Build Remote Service

// my-ai-service/src/index.ts
import { Hono } from 'hono';
 
const app = new Hono();
 
app.post('/webhook', async (c) => {
  const { itemId, content, callbackUrl } = await c.req.json();
  
  // AI processing
  const analysis = await analyzeContent(content);
  
  // Call back
  await fetch(callbackUrl, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({
      itemId,
      analysis: {
        sentiment: analysis.sentiment,
        keywords: analysis.keywords
      }
    })
  });
  
  return c.json({ success: true });
});
 
// Register on startup
await registerService({
  serviceId: 'my-feature-ai',
  name: 'My Feature AI',
  webhookUrl: process.env.SERVICE_URL + '/webhook',
  capabilities: ['my-feature-analysis']
});

Complete Flow

1. Data arrives → /webhooks/my-feature/receive

2. Publish: myfeature.received event

3. Storage handler → Write to database

4. Intelligence handler → Call AI service

5. AI service processes → Calls back

6. Callback endpoint → Publishes myfeature.analyzed

7. Analysis handler → Update database with results

8. ✅ Frontend sees analyzed item

Real Example: Life Feed

Life Feed is the perfect hybrid plugin example:

Data Pod side:

  • Schema: inbox_items table
  • Events: InboxItemReceivedEvent, InboxItemAnalyzedEvent
  • Webhooks: /webhooks/n8n/inbox, /webhooks/intelligence/callback
  • Handlers: storage, intelligence, analysis

Remote Service side:

  • Service: synap-intelligence-service
  • Registration: Registers with Intelligence Registry
  • Analysis: AI-powered inbox analysis
  • Callback: Returns priority/tags/summary

See full implementation →


Benefits

Clean separation: Data in Data Pod, logic external
Type safety: Full TypeScript types throughout
Scalable: Scale AI service independently
Event sourcing: Can rebuild state from events
Flexible: Update AI logic without Data Pod changes


Next Steps

On this page