#!/usr/bin/env python
#
# Copyright 2017 Red Hat Inc.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Proton reactive API python sender client """
from __future__ import absolute_import, print_function, division
import re
import sys
import proton
import proton.reactor
import proton.handlers
from cli_proton_python import coreclient, options, utils
[docs]class Send(coreclient.CoreClient):
""" Proton reactive API python sender client
implements various handler methods for reactor events triggered by proton.reactor
"""
def __init__(self, opts):
"""
Send constructor
:param opts: sender client options
:type opts: optparse.Values instance
"""
reactor_opts = {}
if opts.reactor_auto_settle_off:
reactor_opts['auto_settle'] = False
if opts.reactor_peer_close_is_error:
reactor_opts['peer_close_is_error'] = True
super(Send, self).__init__(opts, reactor_opts)
self.msg_sent_cnt = 0
self.msg_confirmed_cnt = 0
self.msg_total_cnt = opts.count
self.msg_content_fmt = False
self.msg = None
self.msg_content = None
self.event = None # sendable event
self.link_opts = self.parse_link_options()
self.delay_before = self.set_delay_before()
self.delay_after = self.set_delay_after()
[docs] @staticmethod
def prepare_content_from_file(filename):
"""
reads and returns file contents
:param filename: path to file to be opened and read
:type filename: str
:return: contents of filename as unicode string
:rtype: str (unicode) or None
"""
try:
content_file = open(filename, "r")
content = content_file.read()
content_file.close()
except IOError as exc:
utils.dump_error(exc)
return None
content = content.rstrip()
try:
# python2.x convert to unicode
return content.decode(sys.getfilesystemencoding())
except AttributeError:
return content
[docs] def prepare_list_content(self):
"""
prepares list content
:return: list constructed from options list items
:rtype: list
"""
content = []
if self.opts.msg_list_items != ['']:
for item in self.opts.msg_list_items:
if self.opts.content_type:
if item.startswith('~'):
item = item[1:]
content.append(utils.retype_content(item, self.opts.content_type))
elif item.startswith('~'):
content.append(utils.hard_retype(item[1:]))
else:
content.append(item)
return content
[docs] def prepare_map_content(self):
"""
prepares map content
:return: flat map constructed from options map items
:rtype: dict
"""
content = {}
if self.opts.msg_map_items != ['']:
content = utils.prepare_flat_map(self.opts.msg_map_items, self.opts.content_type)
return content
[docs] def prepare_string_content(self, content):
"""
prepares string content
re-types content accoding content-type given,
enables message sequence numbering if formatting string (%[ 0-9]*d) is found
:param content: message content string
:type content: str (unicode)
:return: string message content
:rtype: str (unicode)
"""
if content is not None and re.search('%[ 0-9]*d', content) is not None:
self.msg_content_fmt = True
if self.opts.content_type:
return utils.retype_content(content, self.opts.content_type)
return content
[docs] def prepare_content(self):
"""
prepares the content depending on type
.. note::
* if self.opts.msg_list_items are set amqp/map content is constructed,
* elif self.opts.msg_map_items are set amqp/list content is constructed,
* else the content is considered as text/plain
:return: string message content
:rtype: str (unicode) or list or dict
"""
if self.opts.msg_content:
content = self.opts.msg_content
elif self.opts.msg_content_from_file:
content = self.prepare_content_from_file(self.opts.msg_content_from_file)
else:
content = None
if self.opts.msg_list_items:
return self.prepare_list_content(), "amqp/list"
elif self.opts.msg_map_items:
return self.prepare_map_content(), "amqp/map"
return self.prepare_string_content(content), "text/plain"
[docs] def prepare_message(self):
"""
compose and return the message
:return: message to be sent
:rtype: proton.Message
"""
msg = proton.Message()
msg_content, msg_content_type = self.prepare_content()
if self.opts.msg_durable.lower() == "yes" or self.opts.msg_durable.lower() == "true":
msg.durable = True
if self.opts.msg_priority is not None:
msg.priority = self.opts.msg_priority
if self.opts.msg_id is not None:
msg.id = self.opts.msg_id
if self.opts.msg_correlation_id is not None:
msg.correlation_id = self.opts.msg_correlation_id
if self.opts.msg_user_id is not None:
msg.user_id = self.opts.msg_user_id.encode('utf-8')
if self.opts.msg_group_id is not None:
msg.group_id = self.opts.msg_group_id.encode('utf-8')
if self.opts.msg_group_seq:
msg.group_sequence = self.opts.msg_group_seq
if self.opts.msg_reply_to is not None:
msg.reply_to = self.opts.msg_reply_to
if self.opts.msg_subject is not None:
msg.subject = self.opts.msg_subject
if self.opts.msg_ttl is not None:
msg.ttl = self.opts.msg_ttl / 1000
if self.opts.msg_content_type is not None:
msg.content_type = self.opts.msg_content_type
if self.opts.msg_address is not None:
msg.address = self.opts.msg_address
else:
msg.content_type = msg_content_type
msg.properties = utils.prepare_flat_map(self.opts.msg_properties)
msg.body = msg_content
return msg
[docs] def send_message(self):
""" sends a message """
# close the connection if nothing to send
if self.msg_total_cnt == 0:
self.event.connection.close()
if self.msg_content_fmt:
self.msg.body = self.msg_content % self.msg_sent_cnt
self.event.sender.send(self.msg)
self.msg_sent_cnt += 1
self.print_message(self.msg)
if self.opts.log_stats == 'endpoints':
utils.dump_event(self.event)
if self.opts.link_at_most_once and self.msg_sent_cnt == self.msg_total_cnt:
self.tear_down(self.event)
[docs] def on_start(self, event):
"""
called when the event loop starts, creates a sender for given url
:param event: reactor event
:type event: proton.Event
"""
if len([(opt, val) for opt, val in self.opts.__dict__.items()
if opt.startswith('conn') and val is not None]) != 0:
# some connection options were given
self.set_up_ssl(event)
conn_opts = self.parse_connection_options()
conn = event.container.connect(self.url, **conn_opts)
event.container.create_sender(conn, self.url.path, options=self.link_opts)
else:
event.container.create_sender(self.url, options=self.link_opts)
[docs] def on_sendable(self, event):
"""
called when sending can proceed
:param event: reactor event
:type event: proton.Event
"""
if self.event is None:
self.msg = self.prepare_message()
self.msg_content = self.msg.body
self.event = event
# we are sending first message or first after we previously ran out of credit
if not self.tearing_down and self.msg_sent_cnt < self.msg_total_cnt:
if self.msg_sent_cnt != 0:
# we previously ran out of credit
self.next_task = event.reactor.schedule(0, self)
else:
self.next_task = event.reactor.schedule(self.delay_before, self)
[docs] def on_accepted(self, event):
"""
called when the remote peer accepts an outgoing message
:param event: reactor event
:type event: proton.Event
"""
self.msg_confirmed_cnt += 1
if self.msg_confirmed_cnt == self.msg_total_cnt:
self.tear_down(event)
[docs] def on_rejected(self, event):
"""
called when the remote peer rejects an outgoing message
:param event: reactor event
:type event: proton.Event
"""
self.tear_down(event)
[docs] def on_settled(self, event):
"""
called when the remote peer has settled the outgoing message,
this is the point at which it should never be retransmitted
:param event: reactor event
:type event: proton.Event
"""
if not self.auto_settle and self.msg_confirmed_cnt == self.msg_total_cnt:
self.tear_down(event, settled=True)
[docs] def on_disconnected(self, event):
"""
called when the socket is disconnected
:param event: reactor event
:type event: proton.Event
"""
if not self.opts.link_at_most_once:
self.msg_sent_cnt = self.msg_confirmed_cnt
[docs] def on_timer_task(self, _):
"""
next send action scheduler
the Send object itself is shecduled to perform next send action, that is why it contains
timer_task method method
incoming event object does not contain many fields, that is why self.event is used
"""
# can send message
if self.event.sender.credit and self.msg_sent_cnt < self.msg_total_cnt:
self.send_message()
else:
self.event = None
return
# should send more messages
if not self.tearing_down and self.msg_sent_cnt < self.msg_total_cnt:
self.next_task = self.event.reactor.schedule(self.delay_before + self.delay_after, self)
else:
self.event.reactor.schedule(self.delay_after, coreclient.DelayedNoop())
[docs]class TxSend(Send, proton.handlers.TransactionHandler):
"""
Proton reactive API python transactional sender client
implements various handler methods for reactor events triggered by proton.reactor
"""
def __init__(self, opts):
"""
TxSend constructor
:param opts: sender client options
:type opts: optparse.Values instance
"""
super(TxSend, self).__init__(opts)
if not self.auto_settle:
raise NotImplementedError("Manual settling not supported in TX mode")
self.current_batch = 0
self.msg_processed_cnt = 0
self.sender = None
[docs] def transaction_process(self, event):
"""
transactionally send a message, process reporting and control options
:param event: reactor event
:type event: proton.Event
"""
msg = self.prepare_message()
if self.msg_content_fmt:
msg_content = msg.body
while event.transaction and self.sender.credit and (
self.msg_processed_cnt + self.current_batch) < self.msg_total_cnt:
if self.msg_content_fmt:
msg.body = msg_content % (self.msg_processed_cnt + self.current_batch)
if self.opts.duration != 0 and self.opts.duration_mode == 'before-send':
utils.sleep4next(self.start_tm, self.msg_total_cnt, self.opts.duration,
self.msg_processed_cnt + self.current_batch + 1)
event.transaction.send(self.sender, msg)
self.current_batch += 1
if self.opts.log_stats == 'endpoints':
utils.dump_event(event)
self.print_message(msg)
if self.opts.duration != 0 and self.opts.duration_mode == 'after-send':
utils.sleep4next(self.start_tm, self.msg_total_cnt, self.opts.duration,
self.msg_processed_cnt + self.current_batch)
self.transaction_finish(event)
[docs] def transaction_finish(self, event):
"""
finish transaction, do tranaction action, process reporting and control options
:param event: reactor event
:type event: proton.Event
"""
if self.current_batch == self.opts.tx_size:
if self.opts.tx_action == 'commit':
event.transaction.commit()
elif self.opts.tx_action == 'rollback':
event.transaction.abort()
event.transaction = None
if self.opts.tx_action == 'none':
if self.msg_processed_cnt + self.current_batch == self.msg_total_cnt:
self.tear_down(event)
else:
self.msg_processed_cnt += self.current_batch
self.current_batch = 0
event.reactor.declare_transaction(event.connection, handler=self)
if self.opts.duration != 0 and self.opts.duration_mode == 'after-send-tx-action':
utils.sleep4next(self.start_tm, self.msg_total_cnt, self.opts.duration,
self.msg_processed_cnt + self.current_batch)
elif self.msg_processed_cnt + self.current_batch == self.msg_total_cnt:
if self.opts.tx_endloop_action == 'commit':
event.transaction.commit()
elif self.opts.tx_endloop_action == 'rollback':
event.transaction.abort()
else:
self.tear_down(event)
[docs] def on_start(self, event):
"""
called when the event loop starts, creates a transactional sender for given url
:param event: reactor event
:type event: proton.Event
"""
conn = event.container.connect(self.url)
self.sender = event.container.create_sender(conn, self.url.path, options=self.link_opts)
event.container.declare_transaction(conn, handler=self)
[docs] def on_transaction_declared(self, event):
"""
called when the transaction is declared
:param event: reactor event
:type event: proton.Event
"""
self.transaction_process(event)
[docs] def on_transaction_committed(self, event):
"""
called when the transaction is committed
:param event: reactor event
:type event: proton.Event
"""
self.msg_processed_cnt += self.current_batch
if self.msg_processed_cnt == self.msg_total_cnt:
self.tear_down(event)
else:
self.current_batch = 0
event.reactor.declare_transaction(event.connection, handler=self)
[docs] def on_transaction_aborted(self, event):
"""
called when the transaction is aborted
:param event: reactor event
:type event: proton.Event
"""
self.msg_processed_cnt += self.current_batch
if self.msg_processed_cnt == self.msg_total_cnt:
self.tear_down(event)
else:
self.current_batch = 0
event.reactor.declare_transaction(event.connection, handler=self)
[docs] def on_sendable(self, event):
"""
suppressed in transactional, no actions are performed
:param event: reactor event
:type event: proton.Event
"""
pass
[docs] def on_accepted(self, event):
"""
suppressed in transactional, no actions are performed
:param event: reactor event
:type event: proton.Event
"""
pass
[docs] def on_settled(self, event):
"""
suppressed in transactional, no actions are performed
:param event: reactor event
:type event: proton.Event
"""
pass
[docs] def on_disconnected(self, event):
"""
suppressed in transactional, no actions are performed
:param event: reactor event
:type event: proton.Event
"""
self.current_batch = 0
[docs] def on_timer_task(self, _):
"""
suppressed in transactional, no actions are performed
:param event: reactor event
:type event: proton.Event
"""
pass
[docs]def main():
""" main loop """
ecode = 0
parser = options.SenderOptions()
opts, _ = parser.parse_args()
if opts.log_lib is not None:
utils.set_up_client_logging(opts.log_lib)
try:
# main loop
if opts.tx_size or opts.tx_endloop_action is not None:
container = proton.reactor.Container(TxSend(opts))
else:
container = proton.reactor.Container(Send(opts))
super(proton.reactor.Container, container).global_handler.add(
coreclient.ErrorsHandler(opts.conn_reconnect))
container.run()
except Exception: # pylint: disable=broad-except
raise
sys.exit(ecode)
if __name__ == '__main__':
main()