#!/usr/bin/env python # -*- coding: utf-8 -*- # blocking publisher for testing import argparse import os import sys import pika def confirm_callback(frame): """ """ if type(frame.method) == pika.spec.Confirm.SelectOk: print('The channel is now in confirm mode') elif type(frame.method) == pika.spec.Basic.Nack: print('Message {0} lost!'.format(frame.method.delivery_tag)) sys.exit(1) elif type(frame.method) == pika.spec.Basic.Ack: print('Message {0} delivered!'.format(frame.method.delivery_tag)) sys.exit(0) def main(): """ """ parser = argparse.ArgumentParser( description='The following options are available') parser.add_argument( 'message', metavar='MESSAGE', type=str, help='Message string / text-file containing the message') parser.add_argument( '-e', '--exchange', metavar='EXCHANGE', type=str, dest='exchange', default='amqp_exchange', help='Exchange (default: amqp_exchange)') parser.add_argument( '-H', '--hostname', metavar='HOSTNAME/IP', type=str, dest='hostname', default='localhost', help='MQ hostname / IP address (default: localhost)') parser.add_argument( '-k', '--routing-key', metavar='KEY', type=str, dest='routing_key', default='amqp_key', help='Routing key (default: amqp_key)') parser.add_argument( '-P', '--port', metavar='PORT', type=int, dest='port', default=5672, help='AMQP port (default: 5672)') parser.add_argument( '-p', '--password', metavar='PASSWORD[FILE]', type=str, dest='password', default='', help='BEINC taget-password / text-file containing the target password' ' (default & recommended: prompt for passwd)') parser.add_argument( '-s', '--ssl', action='store_true', dest='ssl', default=False, help='Use TLS/SSL') parser.add_argument( '-t', '--content-type', metavar='TYPE', type=str, dest='content_type', default='text/plain', help='Content type (default: text/plain)') parser.add_argument( '-u', '--username', metavar='USERNAME', type=str, dest='username', default='amqp_user', help='Subscriber username (default: amqp_user)') parser.add_argument( '-V', '--vhost', metavar='VHOST', type=str, dest='vhost', default='/', help='Vhost (default: /)') args = parser.parse_args() if not args.password: try: args.password = getpass.getpass() except Exception as e: sys.stderr.write('Prompt terminated\n') sys.exit(errno.EACCES) elif os.path.isfile(args.password): try: with open(args.password, 'r') as fp: passwd = fp.readline() if passwd.strip(): args.password = passwd.strip() except Exception as e: sys.stderr.write('Unable to open password file: {0}'.format(e)) sys.exit(1) if os.path.isfile(args.message): try: with open (args.message, 'r') as fp: args.message = fp.read() except Exception as e: sys.stderr.write('Unable to open message file: {0}'.format(e)) sys.exit(1) creds_broker = pika.PlainCredentials(args.username, args.password) conn_params = pika.ConnectionParameters(args.hostname, args.port, virtual_host=args.vhost, ssl=args.ssl, credentials=creds_broker) conn_broker = pika.BlockingConnection(conn_params) channel = conn_broker.channel() print('channel: {0}'.format(type(channel))) ## channel.confirm_delivery() # RabbitMQ extension msg_properties = pika.BasicProperties( content_type=args.content_type, delivery_mode=2 # persistent ) print('Sending message...') result = channel.basic_publish( body=args.message, exchange=args.exchange, properties=msg_properties, routing_key=args.routing_key, mandatory=True, # return an unroutable message with a Return method immediate=False # require an immediate consumer ) if result: print('SUCCESS') sys.exit(0) print('FAILURE') sys.exit(1) if __name__ == '__main__': main()