In this post, Big Data Consultant Rakhi Guha shows how to get started with Python scripting to manage Service Bus Queue messages.
This article provides a detailed idea on the following operation using python scripting platform.
- Send messages to Service Bus Queue
- Resubmit messages from Dead letter queue to Main queue
- Manage latest message instance by pushing existing message to Dead letter queue before submission.
Send Message to Service Bus Queue:
from azure.servicebus import ServiceBusClient from azure.servicebus import QueueClient, Message # Create the QueueClient queue_client = QueueClient.from_connection_string("<Connection string>", "<QUEUE NAME>") # Send a test message to the queue msg = Message(b'{"MessageName":"DEMO","MessageText":"TestMessage"}') queue_client.send(msg)
Resubmit messages from Dead letter Queue to Main Queue:
from azure.servicebus import ServiceBusClient from azure.servicebus import ServiceBusClient, Message import json connectionString = "<Connection String>" serviceBusClient = ServiceBusClient.from_connection_string(connectionString) queueName = "<Queue Name>" queueClient = serviceBusClient.get_queue(queueName) messageToSend=[] with queueClient.get_deadletter_receiver(prefetch=5) as queueReceiver: messages = queueReceiver.fetch_next(timeout=100) for message in messages: body=next(message.body) messageToSend.append(body) message.complete() with queueClient.get_sender() as sender: for message in messageToSend: newmessage=Message(message) sender.send(newmessage)
Manage latest message instance by pushing existing message to Dead letter Queue before submission:
from azure.servicebus import ServiceBusClient from azure.servicebus import ServiceBusClient, Message import json MessageToSend='{"MessageName":"DEMO","MessageText":"TestMessage1"}' connectionString = "<Connection String>" serviceBusClient = ServiceBusClient.from_connection_string(connectionString) queueName = "<QUEUE NAME>" queueClient = serviceBusClient.get_queue(queueName) MessageTosendJson=json.loads(MessageToSend) with queueClient.get_receiver(prefetch=5) as queueReceiver: messages = queueReceiver.fetch_next(timeout=100) for message in messages: existingMessage=json.loads(str(message.body,'utf-8')) if MessageTosendJson["MessageName"] == existingMessage["MessageName"]: message.dead_letter() msg = Message(MessageToSend) queueClient.send(msg)
Note:
- azure.servicebus namespace contains the required code artifacts for all the operation and class instances to be required.
- In the above scripts, the place holder needs to be replaced with the appropriate value.
- Message format will be use case specific and based on the message format, the logic for message sanity check in script 3 will be changed.
0 comments