Manage Service Bus Queue messages with Python

Premier Developer

Premier

In this post, Big Data Consultant Rakhi Guha shows how to get started with Python scripting to manage Service Bus Queue messages.


This article provides a detailed idea on the following operation using python scripting platform.

  1. Send messages to Service Bus Queue
  2. Resubmit messages from Dead letter queue to Main queue
  3. Manage latest message instance by pushing existing message to Dead letter queue before submission.

Send Message to Service Bus Queue:

from azure.servicebus import ServiceBusClient
from azure.servicebus import QueueClient, Message
# Create the QueueClient
queue_client = QueueClient.from_connection_string("<Connection string>", "<QUEUE NAME>")
# Send a test message to the queue
msg = Message(b'{"MessageName":"DEMO","MessageText":"TestMessage"}')
queue_client.send(msg)

Resubmit messages from Dead letter Queue to Main Queue:

from azure.servicebus import ServiceBusClient
from azure.servicebus import ServiceBusClient, Message
import json
connectionString = "<Connection String>"
serviceBusClient = ServiceBusClient.from_connection_string(connectionString)
queueName = "<Queue Name>"
queueClient = serviceBusClient.get_queue(queueName)
messageToSend=[]
with queueClient.get_deadletter_receiver(prefetch=5) as queueReceiver:
	messages = queueReceiver.fetch_next(timeout=100)
	for message in messages:
		body=next(message.body)
		messageToSend.append(body)	
		message.complete()
with queueClient.get_sender() as sender:
	for message in messageToSend:
		newmessage=Message(message)
		sender.send(newmessage)

Manage latest message instance by pushing existing message to Dead letter Queue before submission:

from azure.servicebus import ServiceBusClient
from azure.servicebus import ServiceBusClient, Message
import json
MessageToSend='{"MessageName":"DEMO","MessageText":"TestMessage1"}'
connectionString = "<Connection String>"
serviceBusClient = ServiceBusClient.from_connection_string(connectionString)
queueName = "<QUEUE NAME>"
queueClient = serviceBusClient.get_queue(queueName)
MessageTosendJson=json.loads(MessageToSend)
with queueClient.get_receiver(prefetch=5) as queueReceiver:
	messages = queueReceiver.fetch_next(timeout=100)
	for message in messages:
		existingMessage=json.loads(str(message.body,'utf-8'))
		if MessageTosendJson["MessageName"] == existingMessage["MessageName"]:	message.dead_letter()

msg = Message(MessageToSend)
queueClient.send(msg)

Note:

  1. azure.servicebus namespace contains the required code artifacts for all the operation and class instances to be required.
  2. In the above scripts, the place holder needs to be replaced with the appropriate value.
  3. Message format will be use case specific and based on the message format, the logic for message sanity check in script 3 will be changed.

0 comments

Leave a comment