azure-service-bus
Azure Service Bus Skill
Build reliable messaging solutions with Azure Service Bus for enterprise integration.
Triggers
Use this skill when you see:
- azure service bus, service bus, message queue
- service bus queue, service bus topic
- message subscription, dead letter queue
- message session, message broker
Instructions
Create Service Bus Resources
# Create namespace
az servicebus namespace create \
--name myservicebus \
--resource-group mygroup \
--location eastus \
--sku Premium
# Create queue
az servicebus queue create \
--name myqueue \
--namespace-name myservicebus \
--resource-group mygroup \
--max-size 5120 \
--default-message-time-to-live P14D \
--lock-duration PT1M \
--enable-dead-lettering-on-message-expiration true
# Create topic
az servicebus topic create \
--name mytopic \
--namespace-name myservicebus \
--resource-group mygroup \
--max-size 5120 \
--default-message-time-to-live P14D
# Create subscription
az servicebus topic subscription create \
--name mysubscription \
--topic-name mytopic \
--namespace-name myservicebus \
--resource-group mygroup \
--max-delivery-count 10 \
--default-message-time-to-live P7D
# Get connection string
az servicebus namespace authorization-rule keys list \
--namespace-name myservicebus \
--resource-group mygroup \
--name RootManageSharedAccessKey
Python SDK - Queue Operations
from azure.servicebus import ServiceBusClient, ServiceBusMessage
from azure.identity import DefaultAzureCredential
import json
# Using connection string
connection_string = "Endpoint=sb://..."
client = ServiceBusClient.from_connection_string(connection_string)
# Using managed identity
credential = DefaultAzureCredential()
client = ServiceBusClient(
fully_qualified_namespace="myservicebus.servicebus.windows.net",
credential=credential
)
# Send message
with client.get_queue_sender("myqueue") as sender:
message = ServiceBusMessage(
body=json.dumps({"orderId": "12345", "amount": 99.99}),
content_type="application/json",
subject="order-created",
application_properties={"priority": "high"}
)
sender.send_messages(message)
# Send batch
with client.get_queue_sender("myqueue") as sender:
batch = sender.create_message_batch()
for i in range(10):
try:
batch.add_message(ServiceBusMessage(f"Message {i}"))
except ValueError:
# Batch is full
sender.send_messages(batch)
batch = sender.create_message_batch()
batch.add_message(ServiceBusMessage(f"Message {i}"))
sender.send_messages(batch)
# Receive messages
with client.get_queue_receiver("myqueue") as receiver:
messages = receiver.receive_messages(max_message_count=10, max_wait_time=5)
for message in messages:
print(f"Received: {str(message)}")
# Process message
receiver.complete_message(message)
# Receive with peek lock (manual completion)
with client.get_queue_receiver("myqueue", receive_mode="peek_lock") as receiver:
messages = receiver.receive_messages()
for message in messages:
try:
process_message(message)
receiver.complete_message(message)
except Exception:
receiver.abandon_message(message) # Return to queue
# or receiver.dead_letter_message(message) # Move to DLQ
TypeScript SDK - Queue Operations
import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus";
const connectionString = "Endpoint=sb://...";
const client = new ServiceBusClient(connectionString);
// Send message
const sender = client.createSender("myqueue");
await sender.sendMessages({
body: { orderId: "12345", amount: 99.99 },
contentType: "application/json",
subject: "order-created",
applicationProperties: { priority: "high" }
});
// Send batch
const batch = await sender.createMessageBatch();
for (let i = 0; i < 100; i++) {
if (!batch.tryAddMessage({ body: `Message ${i}` })) {
await sender.sendMessages(batch);
batch = await sender.createMessageBatch();
batch.tryAddMessage({ body: `Message ${i}` });
}
}
await sender.sendMessages(batch);
await sender.close();
// Receive messages
const receiver = client.createReceiver("myqueue");
const messages = await receiver.receiveMessages(10, { maxWaitTimeInMs: 5000 });
for (const message of messages) {
console.log(`Received: ${message.body}`);
await receiver.completeMessage(message);
}
await receiver.close();
// Subscribe to messages (continuous processing)
receiver.subscribe({
processMessage: async (message) => {
console.log(`Received: ${message.body}`);
// No need to complete - done automatically
},
processError: async (err) => {
console.error(`Error: ${err}`);
}
});
Topic/Subscription Operations
# Send to topic
with client.get_topic_sender("mytopic") as sender:
message = ServiceBusMessage(
body=json.dumps({"event": "order.created"}),
subject="orders",
application_properties={"region": "us-east"}
)
sender.send_messages(message)
# Receive from subscription
with client.get_subscription_receiver(
topic_name="mytopic",
subscription_name="mysubscription"
) as receiver:
messages = receiver.receive_messages(max_message_count=10)
for message in messages:
print(f"Received: {str(message)}")
receiver.complete_message(message)
Subscription Filters
# Create subscription with SQL filter
az servicebus topic subscription create \
--name orders-us \
--topic-name mytopic \
--namespace-name myservicebus \
--resource-group mygroup
az servicebus topic subscription rule create \
--name us-filter \
--subscription-name orders-us \
--topic-name mytopic \
--namespace-name myservicebus \
--resource-group mygroup \
--filter-sql-expression "region = 'us'"
# Correlation filter
az servicebus topic subscription rule create \
--name priority-filter \
--subscription-name priority-orders \
--topic-name mytopic \
--namespace-name myservicebus \
--resource-group mygroup \
--correlation-filter correlation-id=high-priority
Sessions (Ordered Processing)
# Send messages with session
with client.get_queue_sender("session-queue") as sender:
for i in range(10):
message = ServiceBusMessage(
body=f"Message {i}",
session_id="order-12345" # Group messages by session
)
sender.send_messages(message)
# Receive from session
with client.get_queue_receiver(
"session-queue",
session_id="order-12345"
) as receiver:
messages = receiver.receive_messages()
for message in messages:
# Messages arrive in order within session
receiver.complete_message(message)
# Accept next available session
with client.get_queue_receiver(
"session-queue",
session_id=None # Accept any session
) as receiver:
# Process messages from the accepted session
pass
Dead Letter Queue
# Receive from dead letter queue
dlq_receiver = client.get_queue_receiver(
"myqueue",
sub_queue="deadletter"
)
with dlq_receiver:
messages = dlq_receiver.receive_messages(max_message_count=10)
for message in messages:
print(f"DLQ Message: {str(message)}")
print(f"Dead letter reason: {message.dead_letter_reason}")
print(f"Dead letter description: {message.dead_letter_error_description}")
# Reprocess or log
dlq_receiver.complete_message(message)
Scheduled Messages
from datetime import datetime, timedelta
# Schedule message for future delivery
with client.get_queue_sender("myqueue") as sender:
scheduled_time = datetime.utcnow() + timedelta(hours=1)
message = ServiceBusMessage("Scheduled message")
sequence_number = sender.schedule_messages(message, scheduled_time)
# Cancel scheduled message
sender.cancel_scheduled_messages(sequence_number)
Message Deferral
# Defer message for later processing
with client.get_queue_receiver("myqueue") as receiver:
messages = receiver.receive_messages()
for message in messages:
if not ready_to_process(message):
# Defer the message
sequence_number = message.sequence_number
receiver.defer_message(message)
# Store sequence_number for later retrieval
# Receive deferred message
with client.get_queue_receiver("myqueue") as receiver:
deferred_message = receiver.receive_deferred_messages([sequence_number])
receiver.complete_message(deferred_message[0])
Best Practices
- Message Size: Keep messages small; use claim-check pattern for large payloads
- Sessions: Use sessions for ordered processing within groups
- Dead Letter: Always monitor and handle dead letter messages
- Batching: Use batch operations for throughput
- Retry: Implement exponential backoff for transient failures
Common Workflows
Message Processing Pipeline
- Create queue with dead-letter enabled
- Send messages with correlation IDs
- Process with peek-lock mode
- Complete or abandon based on success
- Monitor DLQ for failures
Pub/Sub Pattern
- Create topic for events
- Create subscriptions with filters
- Publishers send to topic
- Subscribers receive from filtered subscriptions
- Scale independently
More from housegarofalo/claude-code-base
mqtt-iot
Configure MQTT brokers (Mosquitto, EMQX) for IoT messaging, device communication, and smart home integration. Manage topics, QoS levels, authentication, and bridging. Use when setting up IoT messaging, smart home communication, or device-to-cloud connectivity. (project)
22devops-engineer-agent
Infrastructure and DevOps specialist. Manages Docker, Kubernetes, CI/CD pipelines, and cloud deployments. Expert in GitHub Actions, Azure DevOps, Terraform, and container orchestration. Use for deployment automation, infrastructure setup, or CI/CD optimization.
6postgresql
Design, optimize, and manage PostgreSQL databases. Covers indexing, pgvector for AI embeddings, JSON operations, full-text search, and query optimization. Use when working with PostgreSQL, database design, or building data-intensive applications.
6home-assistant
Ultimate Home Assistant skill - complete administration, wireless protocols (Zigbee/ZHA/Z2M, Z-Wave JS, Thread, Matter), ESPHome device building, advanced troubleshooting, performance optimization, security hardening, custom integration development, and professional dashboard design. Covers configuration, REST API, automation debugging, database optimization, SSL/TLS, Jinja2 templating, and HACS custom cards. Use for any HA task.
6testing
Comprehensive testing skill covering unit, integration, and E2E testing with pytest, Jest, Cypress, and Playwright. Use for writing tests, improving coverage, debugging test failures, and setting up testing infrastructure.
5react-typescript
Build modern React applications with TypeScript. Covers React 18+ patterns, hooks, component architecture, state management (Zustand, Redux Toolkit), server components, and best practices. Use for React development, TypeScript integration, component design, and frontend architecture.
5