Messaging & RPC
Phlow supports asynchronous messaging through AMQP (RabbitMQ) and RPC (Remote Procedure Call) patterns for building distributed systems.
Basic AMQP Producer
Send messages to a RabbitMQ queue:
amqp-producer.phlow
name: AMQP Producer
version: 1.0.0
description: Send messages to RabbitMQ queue
modules:
- module: amqp
version: latest
with:
host: localhost
port: 5672
username: guest
password: guest
vhost: /
steps:
- payload: !phs {
messages: [
{ id: 1, text: "Hello from Phlow!", priority: "high" },
{ id: 2, text: "Processing order #1234", priority: "medium" },
{ id: 3, text: "User registration completed", priority: "low" }
]
}
# Send each message
- payload: !phs {
...payload,
sent_messages: []
}
# Send message 1
- amqp:
action: publish
exchange: ""
routing_key: "task_queue"
message: !phs JSON.stringify(payload.messages[0])
properties:
delivery_mode: 2 # Make message persistent
priority: 3
- payload: !phs {
...payload,
sent_messages: [...payload.sent_messages, payload.messages[0]]
}
# Send message 2
- amqp:
action: publish
exchange: ""
routing_key: "task_queue"
message: !phs JSON.stringify(payload.messages[1])
properties:
delivery_mode: 2
priority: 2
- payload: !phs {
...payload,
sent_messages: [...payload.sent_messages, payload.messages[1]]
}
# Send message 3
- amqp:
action: publish
exchange: ""
routing_key: "task_queue"
message: !phs JSON.stringify(payload.messages[2])
properties:
delivery_mode: 2
priority: 1
- payload: !phs {
...payload,
sent_messages: [...payload.sent_messages, payload.messages[2]]
}
- return: !phs {
total_sent: payload.sent_messages.length,
messages: payload.sent_messages
}
AMQP Consumer
Consume messages from a RabbitMQ queue:
amqp-consumer.phlow
name: AMQP Consumer
version: 1.0.0
description: Consume messages from RabbitMQ queue
modules:
- module: amqp
version: latest
with:
host: localhost
port: 5672
username: guest
password: guest
vhost: /
steps:
# Declare queue
- amqp:
action: declare_queue
queue: "task_queue"
durable: true
# Consume messages
- amqp:
action: consume
queue: "task_queue"
auto_ack: false
prefetch_count: 1
# Process message
- payload: !phs {
received_message: JSON.parse(payload.message),
delivery_tag: payload.delivery_tag,
processed_at: new Date().toISOString()
}
- log:
message: !phs `Processing message: ${payload.received_message.text}`
# Simulate processing time based on priority
- sleep:
seconds: !phs payload.received_message.priority === "high" ? 1 : (payload.received_message.priority === "medium" ? 2 : 3)
# Acknowledge message
- amqp:
action: ack
delivery_tag: !phs payload.delivery_tag
- return: !phs {
processed: payload.received_message,
processing_time: payload.received_message.priority === "high" ? 1 : (payload.received_message.priority === "medium" ? 2 : 3),
processed_at: payload.processed_at
}
RPC Server
Create an RPC server that handles remote procedure calls:
rpc-server.phlow
name: RPC Server
version: 1.0.0
description: Handle RPC calls for mathematical operations
modules:
- module: rpc
version: latest
with:
host: localhost
port: 8080
methods:
- name: add
description: Add two numbers
- name: multiply
description: Multiply two numbers
- name: factorial
description: Calculate factorial
- name: fibonacci
description: Calculate Fibonacci number
steps:
- payload: !phs {
method: main.method,
params: main.params,
request_id: main.request_id
}
# Handle add method
- assert: !phs payload.method == "add"
then:
- payload: !phs {
...payload,
result: payload.params.a + payload.params.b
}
- return: !phs {
jsonrpc: "2.0",
result: payload.result,
id: payload.request_id
}
# Handle multiply method
- assert: !phs payload.method == "multiply"
then:
- payload: !phs {
...payload,
result: payload.params.a * payload.params.b
}
- return: !phs {
jsonrpc: "2.0",
result: payload.result,
id: payload.request_id
}
# Handle factorial method
- assert: !phs payload.method == "factorial"
then:
- payload: !phs {
...payload,
n: payload.params.n,
result: 1
}
- payload: !phs {
...payload,
result: Array.from({length: payload.n}, (_, i) => i + 1)
.reduce((acc, val) => acc * val, 1)
}
- return: !phs {
jsonrpc: "2.0",
result: payload.result,
id: payload.request_id
}
# Handle fibonacci method
- assert: !phs payload.method == "fibonacci"
then:
- payload: !phs {
...payload,
n: payload.params.n
}
- payload: !phs {
...payload,
result: payload.n <= 1 ? payload.n :
(() => {
let a = 0, b = 1;
for (let i = 2; i <= payload.n; i++) {
[a, b] = [b, a + b];
}
return b;
})()
}
- return: !phs {
jsonrpc: "2.0",
result: payload.result,
id: payload.request_id
}
# Handle unknown method
- return: !phs {
jsonrpc: "2.0",
error: {
code: -32601,
message: "Method not found"
},
id: payload.request_id
}
RPC Client
Create an RPC client to call remote procedures:
rpc-client.phlow
name: RPC Client
version: 1.0.0
description: Call remote procedures
main: cli
modules:
- module: cli
version: latest
with:
args:
- name: method
description: RPC method to call
index: 1
type: string
required: true
choices: ["add", "multiply", "factorial", "fibonacci"]
- name: params
description: JSON parameters for the method
index: 2
type: string
required: true
- name: server_url
long: server
short: s
description: RPC server URL
type: string
default: "http://localhost:8080/rpc"
- module: rpc
version: latest
- module: log
version: latest
steps:
- payload: !phs {
method: main.method,
params: JSON.parse(main.params),
server_url: main.server_url,
request_id: Math.random().toString(36).substr(2, 9)
}
- log:
message: !phs `Calling ${payload.method} with params: ${JSON.stringify(payload.params)}`
# Make RPC call
- rpc:
url: !phs payload.server_url
method: !phs payload.method
params: !phs payload.params
id: !phs payload.request_id
- log:
message: !phs `Response: ${JSON.stringify(payload)}`
- return: !phs payload
Usage Examples
# Add two numbers
phlow rpc-client.phlow add '{"a": 5, "b": 3}'
# Multiply numbers
phlow rpc-client.phlow multiply '{"a": 4, "b": 6}'
# Calculate factorial
phlow rpc-client.phlow factorial '{"n": 5}'
# Calculate Fibonacci
phlow rpc-client.phlow fibonacci '{"n": 10}'
Work Queue Pattern
Implement a work queue for distributed task processing:
work-queue.phlow
name: Work Queue
version: 1.0.0
description: Distribute tasks across multiple workers
main: cli
modules:
- module: cli
version: latest
with:
args:
- name: role
description: Role to run (producer or consumer)
index: 1
type: string
required: true
choices: ["producer", "consumer"]
- name: task_count
description: Number of tasks to produce
index: 2
type: number
default: 10
- module: amqp
version: latest
with:
host: localhost
port: 5672
username: guest
password: guest
vhost: /
steps:
- payload: !phs {
role: main.role,
task_count: main.task_count || 10
}
# Producer role
- assert: !phs payload.role == "producer"
then:
- log:
message: !phs `Starting producer - will create ${payload.task_count} tasks`
# Declare queue
- amqp:
action: declare_queue
queue: "work_queue"
durable: true
# Create tasks
- payload: !phs {
...payload,
tasks: Array.from({length: payload.task_count}, (_, i) => ({
id: i + 1,
task_type: ["email", "image_processing", "data_analysis"][Math.floor(Math.random() * 3)],
complexity: Math.floor(Math.random() * 5) + 1,
created_at: new Date().toISOString()
}))
}
# Send tasks to queue
- payload: !phs {
...payload,
sent_tasks: []
}
# Send each task (simplified - in real implementation, you'd loop)
- amqp:
action: publish
exchange: ""
routing_key: "work_queue"
message: !phs JSON.stringify(payload.tasks[0])
properties:
delivery_mode: 2
- return: !phs {
message: `Successfully sent ${payload.tasks.length} tasks to queue`,
tasks: payload.tasks
}
# Consumer role
- assert: !phs payload.role == "consumer"
then:
- log:
message: "Starting consumer - waiting for tasks"
# Declare queue
- amqp:
action: declare_queue
queue: "work_queue"
durable: true
# Consume task
- amqp:
action: consume
queue: "work_queue"
auto_ack: false
prefetch_count: 1
# Process task
- payload: !phs {
task: JSON.parse(payload.message),
delivery_tag: payload.delivery_tag,
worker_id: `worker_${Math.random().toString(36).substr(2, 5)}`
}
- log:
message: !phs `${payload.worker_id} processing task ${payload.task.id}: ${payload.task.task_type}`
# Simulate work based on complexity
- sleep:
seconds: !phs payload.task.complexity
# Acknowledge task completion
- amqp:
action: ack
delivery_tag: !phs payload.delivery_tag
- return: !phs {
message: `Task ${payload.task.id} completed by ${payload.worker_id}`,
task: payload.task,
processing_time: payload.task.complexity,
completed_at: new Date().toISOString()
}
Pub/Sub Pattern
Implement publish-subscribe messaging:
pubsub.phlow
name: Pub/Sub Pattern
version: 1.0.0
description: Publish-subscribe messaging pattern
main: cli
modules:
- module: cli
version: latest
with:
args:
- name: mode
description: Mode to run (publisher or subscriber)
index: 1
type: string
required: true
choices: ["publisher", "subscriber"]
- name: topic
description: Topic to publish/subscribe to
index: 2
type: string
default: "notifications"
- name: subscriber_name
description: Subscriber name
index: 3
type: string
default: "subscriber1"
- module: amqp
version: latest
with:
host: localhost
port: 5672
username: guest
password: guest
vhost: /
steps:
- payload: !phs {
mode: main.mode,
topic: main.topic,
subscriber_name: main.subscriber_name
}
# Publisher mode
- assert: !phs payload.mode == "publisher"
then:
- log:
message: !phs `Publishing to topic: ${payload.topic}`
# Declare exchange
- amqp:
action: declare_exchange
exchange: !phs payload.topic
type: "fanout"
# Create messages
- payload: !phs {
...payload,
messages: [
{ type: "user_registered", user_id: 123, email: "user@example.com" },
{ type: "order_created", order_id: 456, total: 99.99 },
{ type: "system_alert", level: "warning", message: "High CPU usage detected" }
]
}
# Publish messages
- amqp:
action: publish
exchange: !phs payload.topic
routing_key: ""
message: !phs JSON.stringify(payload.messages[0])
- return: !phs {
message: `Published ${payload.messages.length} messages to ${payload.topic}`,
messages: payload.messages
}
# Subscriber mode
- assert: !phs payload.mode == "subscriber"
then:
- log:
message: !phs `Subscribing to topic: ${payload.topic} as ${payload.subscriber_name}`
# Declare exchange
- amqp:
action: declare_exchange
exchange: !phs payload.topic
type: "fanout"
# Declare exclusive queue for this subscriber
- amqp:
action: declare_queue
queue: !phs `${payload.topic}_${payload.subscriber_name}`
exclusive: true
# Bind queue to exchange
- amqp:
action: bind_queue
queue: !phs `${payload.topic}_${payload.subscriber_name}`
exchange: !phs payload.topic
routing_key: ""
# Consume messages
- amqp:
action: consume
queue: !phs `${payload.topic}_${payload.subscriber_name}`
auto_ack: true
# Process message
- payload: !phs {
...payload,
received_message: JSON.parse(payload.message),
received_at: new Date().toISOString()
}
- log:
message: !phs `${payload.subscriber_name} received: ${payload.received_message.type}`
- return: !phs {
subscriber: payload.subscriber_name,
message: payload.received_message,
received_at: payload.received_at
}
Testing Messaging and RPC
Test messaging and RPC functionality:
messaging-tests.phlow
name: Messaging Tests
version: 1.0.0
description: Test messaging and RPC functionality
modules:
- module: rpc
version: latest
- module: amqp
version: latest
with:
host: localhost
port: 5672
username: guest
password: guest
vhost: /
tests:
# Test RPC add method
- main:
method: "add"
params: { a: 5, b: 3 }
payload: null
assert: !phs payload.result == 8
# Test RPC multiply method
- main:
method: "multiply"
params: { a: 4, b: 6 }
payload: null
assert: !phs payload.result == 24
# Test message queue
- main:
queue: "test_queue"
message: "Hello Test"
payload: null
assert: !phs payload.sent == true
steps:
# Test RPC functionality
- assert: !phs main.method && main.params
then:
- payload: !phs {
method: main.method,
params: main.params,
request_id: Math.random().toString(36).substr(2, 9)
}
# Handle add method
- assert: !phs payload.method == "add"
then:
return: !phs {
jsonrpc: "2.0",
result: payload.params.a + payload.params.b,
id: payload.request_id
}
# Handle multiply method
- assert: !phs payload.method == "multiply"
then:
return: !phs {
jsonrpc: "2.0",
result: payload.params.a * payload.params.b,
id: payload.request_id
}
# Test AMQP functionality
- assert: !phs main.queue && main.message
then:
- amqp:
action: declare_queue
queue: !phs main.queue
durable: false
- amqp:
action: publish
exchange: ""
routing_key: !phs main.queue
message: !phs main.message
- return: !phs { sent: true, queue: main.queue, message: main.message }
Key Features Demonstrated
- AMQP Messaging: Publish and consume messages with RabbitMQ
- RPC Communication: Remote procedure calls with JSON-RPC
- Work Queue Pattern: Distribute tasks across multiple workers
- Pub/Sub Pattern: Publish-subscribe messaging for event-driven systems
- Message Acknowledgment: Reliable message processing
- Queue Management: Declare queues, exchanges, and bindings
- Error Handling: Proper error handling for distributed systems
- Testing: Comprehensive testing of messaging functionality
These examples show how to build distributed systems using Phlow's messaging and RPC capabilities.