Source code for cli_proton_python.formatter
#
# Copyright 2017 Red Hat Inc.
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
""" Clients output formatter module """
from __future__ import absolute_import
import sys
import json
import ast
import proton
if sys.version_info > (3,):
# define long for Python 3.x (int and long were unified)
long = int # pylint: disable=redefined-builtin,invalid-name
[docs]class Formatter(object):
""" Output formatter class for clients """
def __init__(self, message):
""" Formatter constructor
:param message: message to be printed
:type message: object
"""
self.msg = message
[docs] def print_message(self):
"""
prints message in default upstream format
:return: message to be printed in upstream format
:rtype: str
"""
args = []
props = self.msg.__dict__["properties"]
optsstring = ""
sorted_keys = sorted(props)
for k in sorted_keys:
optsstring = "%s'%s': '%s', " % (optsstring, str(k), str(props[k]))
args.append("%s=%s" % ("properties", "{" + optsstring[:-2] + "}"))
if self.msg.body is not None:
if args:
args.append("content='%s'" % self.msg.body)
else:
args.append(repr(self.msg.body))
else:
args.append("content=''")
return "Message(%s)" % (", ".join(args))
[docs] @staticmethod
def format_int(in_data):
"""
formats integer value
:param in_data: input data
:type in_data: int, long
:return: input data string formated as int
:rtype: str
"""
int_res = "%d" % in_data
return int_res
[docs] @staticmethod
def format_float(in_data):
"""
formats float value
:param in_data: input data
:type in_data: float
:return: input data string formated as float
:rtype: str
"""
int_res = "%f" % in_data
return int_res
[docs] @staticmethod
def format_dict(in_data):
"""
formats dictionary
:param in_data: input data
:type in_data: dict
:return: input data string formated as dict
:rtype: str
"""
int_res = '{'
list_data = []
for key, val in in_data.items():
list_data.append("'%s': %s" % (key, Formatter.format_object(val)))
int_res += ', '.join(list_data)
int_res += '}'
return int_res
[docs] @staticmethod
def format_list(in_data):
"""
formats list
:param in_data: input data
:type in_data: list
:return: input data string formated as list
:rtype: str
"""
int_res = '['
list_data = []
for val in in_data:
list_data.append("%s" % (Formatter.format_object(val)))
int_res += ', '.join(list_data)
int_res += ']'
return int_res
[docs] @staticmethod
def format_string(in_data):
"""
formats string
:param in_data: input data
:type in_data: str, unicode, bytes
:return: input data string formated as string
:rtype: str
"""
if isinstance(in_data, bytes):
in_data = in_data.decode()
int_res = "'%s'" % (Formatter.quote_string_escape(in_data))
return int_res
[docs] @staticmethod
def format_object(in_data):
"""
formats general object
:param in_data: input data
:type in_data: None, bool, int, long, float, dict, list, str, unicode, bytes
:return: input data converted to string
:rtype: str
"""
if in_data is None:
return "None"
elif isinstance(in_data, bool):
return str(in_data)
elif isinstance(in_data, (int, long)):
return Formatter.format_int(in_data)
elif isinstance(in_data, float):
return Formatter.format_float(in_data)
elif isinstance(in_data, dict):
return Formatter.format_dict(in_data)
elif isinstance(in_data, list):
return Formatter.format_list(in_data)
return Formatter.format_string(in_data)
[docs] def print_message_as_dict(self):
"""
prints message in python dictionary form
:return: message to be printed in dictionary format
:rtype: str
"""
if isinstance(self.msg, proton.Message):
int_list_keys = ['address', 'annotations', 'content', 'content_encoding',
'content_type', 'correlation_id', 'creation_time',
'delivery_count', 'durable', 'expiry_time',
'first_acquirer', 'group_id', 'group_sequence',
'id', 'inferred', 'instructions', 'priority', 'properties',
'reply_to', 'reply_to_group_id', 'subject', 'ttl', 'user_id']
else:
raise TypeError("Unable to detect message format", type(self.msg))
int_result = "{"
for k in int_list_keys:
if k == 'content':
int_value = getattr(self.msg, 'body')
else:
int_value = getattr(self.msg, k)
# fix output to conform to other clients
if k == 'expiry_time':
k = 'expiration' # rename field
int_value = int(int_value * 1000) # convert to milliseconds
if k == 'ttl':
int_value = int(int_value * 1000) # convert to milliseconds
int_result += "'%s': %s, " % (k, Formatter.format_object(int_value))
# last comma remove
int_result = int_result[0:-2]
int_result += "}"
return int_result
[docs] def print_message_as_interop(self):
"""
Print message in AMQP interoperable format
:return: message to be printed in interoperable format
:rtype: str
"""
if isinstance(self.msg, proton.Message):
int_list_keys = ['address', 'content', 'content_encoding',
'content_type', 'correlation_id', 'creation_time',
'delivery_count', 'durable', 'expiry_time',
'first_acquirer', 'group_id', 'group_sequence',
'id', 'priority', 'properties', 'reply_to',
'reply_to_group_id', 'subject', 'ttl', 'user_id']
else:
raise TypeError("Unable to detect message format", type(self.msg))
int_result = "{"
for k in int_list_keys:
if k == 'content':
int_value = getattr(self.msg, 'body')
else:
int_value = getattr(self.msg, k)
# fix output to conform to other clients
if k == 'expiry_time':
k = 'absolute-expiry-time' # rename field
int_value = int(int_value * 1000) # convert to milliseconds
if k == 'ttl':
int_value = int(int_value * 1000) # convert to milliseconds
if k == 'creation_time':
int_value = int(int_value * 1000) # convert to milliseconds
if k == 'user_id' and (int_value is None or int_value == ''):
int_value = None
if (k == 'content_type'
and (int_value is None or int_value == '' or int_value == 'None')):
int_value = None
if (k == 'content_encoding'
and (int_value is None or int_value == '' or int_value == 'None')):
int_value = None
if k in ['id', 'correlation_id']:
if int_value is not None and str(int_value).startswith("ID:"):
int_value = int_value[3:]
int_result += "'%s': %s, " % (k.replace("_", "-"), Formatter.format_object(int_value))
# last comma remove
int_result = int_result[0:-2]
int_result += "}"
return int_result
[docs] def print_message_as_json(self):
"""
Print message in JSON form
:return: message to be printed in JSON form
:rtype: str
"""
return json.dumps(ast.literal_eval(self.print_message_as_interop()))
[docs] def print_stats(self):
"""
print statistics information
:return: prefix message with string indicating statistics
:rtype: str
"""
return "STATS %s" % self.msg
[docs] def print_error(self):
"""
print error information
:return: prefix message with string indicating error
:rtype: str
"""
return "ERROR {'cause' :'%s'}" % self.msg
# ------ Support formatting functions ------ #
[docs] @staticmethod
def quote_string_escape(in_data):
"""
escapes quotes in given string
:param in_data: input string
:type in_data: str, unicode
:return: input string with quotes escaped
:rtype: str, unicode
"""
return in_data.replace("'", "\\'")