June 3rd, 2020

Manage Service Bus Queue messages with Python

Developer Support
Cloud Solution Architects

In this post, Big Data Consultant Rakhi Guha shows how to get started with Python scripting to manage Service Bus Queue messages.


This article provides a detailed idea on the following operation using python scripting platform.

  1. Send messages to Service Bus Queue
  2. Resubmit messages from Dead letter queue to Main queue
  3. Manage latest message instance by pushing existing message to Dead letter queue before submission.

Send Message to Service Bus Queue:

from azure.servicebus import ServiceBusClient
from azure.servicebus import QueueClient, Message
# Create the QueueClient
queue_client = QueueClient.from_connection_string("<Connection string>", "<QUEUE NAME>")
# Send a test message to the queue
msg = Message(b'{"MessageName":"DEMO","MessageText":"TestMessage"}')
queue_client.send(msg)

Resubmit messages from Dead letter Queue to Main Queue:

from azure.servicebus import ServiceBusClient
from azure.servicebus import ServiceBusClient, Message
import json
connectionString = "<Connection String>"
serviceBusClient = ServiceBusClient.from_connection_string(connectionString)
queueName = "<Queue Name>"
queueClient = serviceBusClient.get_queue(queueName)
messageToSend=[]
with queueClient.get_deadletter_receiver(prefetch=5) as queueReceiver:
	messages = queueReceiver.fetch_next(timeout=100)
	for message in messages:
		body=next(message.body)
		messageToSend.append(body)	
		message.complete()
with queueClient.get_sender() as sender:
	for message in messageToSend:
		newmessage=Message(message)
		sender.send(newmessage)

Manage latest message instance by pushing existing message to Dead letter Queue before submission:

from azure.servicebus import ServiceBusClient
from azure.servicebus import ServiceBusClient, Message
import json
MessageToSend='{"MessageName":"DEMO","MessageText":"TestMessage1"}'
connectionString = "<Connection String>"
serviceBusClient = ServiceBusClient.from_connection_string(connectionString)
queueName = "<QUEUE NAME>"
queueClient = serviceBusClient.get_queue(queueName)
MessageTosendJson=json.loads(MessageToSend)
with queueClient.get_receiver(prefetch=5) as queueReceiver:
	messages = queueReceiver.fetch_next(timeout=100)
	for message in messages:
		existingMessage=json.loads(str(message.body,'utf-8'))
		if MessageTosendJson["MessageName"] == existingMessage["MessageName"]:	message.dead_letter()

msg = Message(MessageToSend)
queueClient.send(msg)

Note:

  1. azure.servicebus namespace contains the required code artifacts for all the operation and class instances to be required.
  2. In the above scripts, the place holder needs to be replaced with the appropriate value.
  3. Message format will be use case specific and based on the message format, the logic for message sanity check in script 3 will be changed.

Author

Developer Support
Cloud Solution Architects

Microsoft Developer Support helps software developers rapidly build and deploy quality applications for Microsoft platforms.

0 comments

Discussion are closed.

Feedback