JMS¶
This sections contains module documentation of jms module.
client¶
Module client provides common way to initialize message queue client of any supported protocol. It implements factory design pattern, each client provides class JMSClient. Unit tests available at hydratk/lib/network/jms/client/01_methods_ut.jedi
Following engines are supported:
- JMS - module jms_client
- STOMP - module stomp_client
- AMQP - module amqp_client
- MQTT - module mqtt_client
Methods :
- JMSClient
Creates JMSClient instance of given engine (use engine name, case is ignored). Any constructor parameters can be passed as args, kwargs. When engine is not supported NotImplementedError is raised.
from hydratk.lib.network.jms.client import JMSClient c1 = JMSClient('STOMP') c2 = JMSClient('JMS', verbose=True, jvm_path='path/to/jvm')
jms_client¶
Module provides class JMSClient which implements client for JMS using Java bridge. Unit tests available at hydratk/lib/network/jms/jms_client/01_methods_ut.jedi It requires JMS driver for used message queue stored in /var/local/hydratk/java, drivers are not bundled with hydratk.
When PyPy is used method JMSClient raises NotImplementedError. External module JPype1 is not compatible without any alternative.
Attributes :
- _mh - MasterHead reference
- _bridge - Java bridge instance
- _client - JMSClient Java class instance
- _verbose - verbose mode, disabled by default
- _connection_factory - JMS connection factory
- _properties - JMS properties
- _is_connected - bool, set to True/False after successful connect/disconnect. Some methods are disabled if not connected.
Properties (Getters) :
- bridge - returns _bridge
- client - returns _client
- verbose - returns _verbose
- connection_factory - returns _connection_factory
- properties - returns _properties
- is_connected - returns _is_connected
Methods :
- __init__
Constructor called by JMSClient method. Provides parameters verbose, jvm_path, classpath, options. See Java bridge documentation for more details, usually the parameters mustn’t be provided, they are determined from default configuration. Initializes Java bridge, starts JVM and initializes JMSClient object (JMSClient.class in /var/local/hydratk, additional files JMSMessage.class, javaee.jar) Parameter verbose enables debug messages in JMSClient class (source code in java/JMSClient.java).
- close
Stops JVM.
- connect
Connects to message queue (specified via connection_factory, properties). Parameters are queue specific, check appropriate documentation. First fires event jms_before_connect where parameters can be rewritten. Calls Java method connect. After successful connection fires event jms_after_connect and returns bool. Connection timeout can’t be specified, it is handled by javax.jms.
from hydratk.lib.network.jms.client import JMSClient c = JMSClient('JMS') # activemq-all-5.13.0.jar is stored in /val/local/hydratk/java, it is automatically added to classpath connection_factory = 'ConnectionFactory' properties = {'provider_url': 'tcp://127.0.0.1:61616', 'initial_context_factory': 'org.apache.activemq.jndi.ActiveMQInitialContextFactory'} res = c.connect(connection_factory, properties)
Method JMSClient.connect initializes InitialContext, looks for ConnectionFactory, connect to message queue (createConnection, createSession) and returns bool.
- disconnect
Disconnects from queue using Java method disconnect and returns bool. Method JMSClient.disconnect closes connection and returns bool.
res = c.disconnect()
- send
Sends message to queue. First fires event jms_before_send where parameters (destination_name, message, destination_type, headers) can be rewritten. Method transforms headers to Java HashMap. Calls Java method send. After successful send fires event jms_after_send and returns bool.
# queue destination queue = 'dynamicQueues/HydraQueue' msg = 'test' res = c.send(queue, msg, 'queue') # message with headers headers = {'JMSCorrelationID': '1234', 'JMSType': 'test_msg'} res = c.send(queue, msg, 'queue', headers) # topic destination topic = 'dynamicTopics/HydraTopic' res = c.send(topic, msg, 'topic')
Method JMSClient.send initializes queue producer (for queue or topic). Prepares message, sets JMS headers using specific methods and send the message.
Supported headers: JMSCorrelationID, JMSDeliveryMode, JMSDestination, JMSExpiration, JMSMessageID, JMSPriority, JMSRedelivered, JMSReplyTo, JMSTimestamp, JMSType
- receive
Receives messages from queue (messages are deleted). First fires event jms_before_receive where parameters (destination_name, cnt) can be rewritten. Methods calls Java method receive. After that fires event jms_after_receive and returns list of dictionary (keys JMSCorrelationID, JMSDeliveryMode, JMSDestination, JMSExpiration, JMSMessageID, JMSPriority, JMSRedelivered, JMSReplyTo, JMSTimestamp, JMSType, message).
# single message queue = 'dynamicQueues/HydraQueue' res = c.receive(queue, 1) # multiple messages res = c.receive(queue, 10)
Method JMSClient.receive initializes queue consumer (queue only, topic not supported). Receives messages from queue (up to count or all), extracts specified JMS headers (JMSCorrelationID, JMSDeliveryMode, JMSDestination, JMSExpiration, JMSMessageID, JMSPriority, JMSRedelivered, JMSReplyTo, JMSTimestamp, JMSType) and returns ArrayList.
- browse
Browses message queue (messages are not deleted). First fires event jms_before_browse where parameters (destination_name, cnt, jms_correlation_id, jms_type) can be rewritten. Method calls Java method browse. After that fires event jms_after_browse and returns list of dictionary (same format as in receive).
# full queue queue = 'dynamicQueues/HydraQueue' res = c.browse(queue) # message filter jms_id, jms_type = '1234', 'test_msg' res = c.browse(queue, jms_correlation_id=jms_id, jms_type=jms_type)
Method JMSClient.browse initializes queue browser (queue only, topic not supported). Gets messages from queue (possible filter for JMSCorrelationID, JMSType), extracts specified JMS headers (JMSCorrelationID, JMSDeliveryMode, JMSDestination, JMSExpiration, JMSMessageID, JMSPriority, JMSRedelivered, JMSReplyTo, JMSTimestamp, JMSType) and returns ArrayList.
stomp_client¶
Module provides class JMSClient which implements client for STOMP protocol using external module stompest in version >= 2.2.5. When Python2.6 is used version 2.1.6 is installed. Unit tests available at hydratk/lib/network/jms/stomp_client/01_methods_ut.jedi
Attributes :
- _mh - MasterHead reference
- _client - stompest client instance
- _host - server hostname (or IP address)
- _port - port name (default 61613)
- _user - username
- _passw - password
- _verbose - verbose mode, disabled by default
- _is_connected - bool, set to True/False after successful connect/disconnect. Some methods are disabled if not connected.
Properties (Getters) :
- client - returns _client
- host - returns _host
- port - returns _port
- user - returns _user
- passw - returns _passw
- verbose - returns _verbose
- is_connected - returns _is_connected
Methods :
- __init__
Constructor called by JMSClient method. Provides parameter verbose. Sets MasterHead instance and turns on verbose mode if enabled.
- connect
Connects to message queue (specified via host, port, user, passw). First fires event jms_before_connect where parameters can be rewritten. Sets _client to stompest client instance (constructor Stomp) and connects to message queue using stompest method connect. After successful connection fires event jms_after_connect and returns bool. Connection timeout is 10s by default (parameter timeout).
from hydratk.lib.network.jms.client import JMSClient c = JMSClient('STOMP') res = c.connect(host='127.0.0.1', port=61613, user='admin', passw='password')
- disconnect
Disconnects from queue using stompest methods disconnect, close and returns bool.
res = c.disconnect()
- send
Sends message to queue. First fires event jms_before_send where parameters (destination_name, message, destination_type, headers) can be rewritten. Method transforms JMS headers (to be common with jms_client) to STOMP specific headers. Sends message using stompest method send. After successful send fires event jms_after_send and returns bool.
Supported headers: JMSCorrelationID -> correlation-id, JMSExpiration -> expires, JMSDeliveryMOde -> persistent, JMSPriority -> priority, JMSReplyTo -> reply-to, JMSType -> type, JMSMessageID -> message-id, JMSDestination -> destination, JMSTimestamp -> timestamp, JMSRedelivered -> redelivered
# queue destination queue = 'dynamicQueues/HydraQueue' msg = 'test' res = c.send(queue, msg, 'queue') # message with headers headers = {'JMSCorrelationID': '1234', 'JMSType': 'test_msg'} res = c.send(queue, msg, 'queue', headers) # topic destination topic = 'dynamicTopics/HydraTopic' res = c.send(topic, msg, 'topic')
- receive
Receives messages from queue (messages are deleted). First fires event jms_before_receive where parameters (destination_name, cnt) can be rewritten. Methods subscribes to queue using stompest method subscriber, receives message using method receiveFrame and deletes it using ack. Headers are translated to JMS header names. After that fires event jms_after_receive and returns list of dictionary (keys message, JMS header1, JMS header2, ...).
# single message queue = 'dynamicQueues/HydraQueue' res = c.receive(queue, 1) # multiple messages res = c.receive(queue, 10)
- browse
Browses message queue (messages are not deleted). First fires event jms_before_browse where parameters (destination_name, cnt, jms_correlation_id, jms_type) can be rewritten. Methods subscribes to queue using stompest method subscriber, receives message using method receiveFrame. Message filter for headers correlaion-id and type. Headers are translated to JMS header names. After that fires event jms_after_receive and returns list of dictionary (keys message, JMS header1, JMS header2, ...).
# full queue queue = 'dynamicQueues/HydraQueue' res = c.browse(queue) # message filter jms_id, jms_type = '1234', 'test_msg' res = c.browse(queue, jms_correlation_id=jms_id, jms_type=jms_type)
amqp_client¶
Module provides class JMSClient which implements client for AMQP protocol using external module python-qpid-proton in version >= 0.10. Unit tests available at hydratk/lib/network/jms/amqp_client/01_methods_ut.jedi
Attributes :
- _mh - MasterHead reference
- _client - stompest client instance
- _host - server hostname (or IP address)
- _port - port name (default 5672)
- _user - username
- _passw - password
- _verbose - verbose mode, disabled by default
- _is_connected - bool, set to True/False after successful connect/disconnect. Some methods are disabled if not connected.
Properties (Getters) :
- client - returns _client
- host - returns _host
- port - returns _port
- user - returns _user
- passw - returns _passw
- verbose - returns _verbose
- is_connected - returns _is_connected
Methods :
- __init__
Constructor called by JMSClient method. Provides parameter verbose. Sets MasterHead instance and turns on verbose mode if enabled.
- connect
Connects to message queue (specified via host, port, user, passw). First fires event jms_before_connect where parameters can be rewritten. Sets _client to proton client instance (constructor BlockingConnection). After successful connection fires event jms_after_connect and returns bool. Connection timeout is 10s by default (parameter timeout).
from hydratk.lib.network.jms.client import JMSClient c = JMSClient('AMQP') res = c.connect(host='127.0.0.1', port=5672, user='admin', passw='password')
- disconnect
Disconnects from queue using proton method close, close and returns bool.
res = c.disconnect()
- send
Sends message to queue. First fires event jms_before_send where parameters (destination_name, message, destination_type, headers) can be rewritten. Method transforms JMS headers (to be common with jms_client) to AMTP specific headers. Sends message using proton methods create_sender, send. After successful send fires event jms_after_send and returns bool.
Supported headers: JMSDeliveryMode -> header.durable, JMSPriority -> header.priority, JMSExpiration -> header.ttl, JMSType -> message-annotations.x-opt-jms-type, JMSMessageID -> properties.message-id, JMSDestination -> properties.to, JMSReplyTo -> properties.reply-to, JMSCorrealationID -> properties.correlation-id, JMSTimestamp -> properties.creation-time
# queue destination queue = 'dynamicQueues/HydraQueue' msg = 'test' res = c.send(queue, msg, 'queue') # message with headers headers = {'JMSCorrelationID': '1234', 'JMSType': 'test_msg'} res = c.send(queue, msg, 'queue', headers) # topic destination topic = 'dynamicTopics/HydraTopic' res = c.send(topic, msg, 'topic')
- receive
Receives messages from queue (messages are deleted). First fires event jms_before_receive where parameters (destination_name, cnt) can be rewritten. Methods subscribes to queue using proton method create_receiver, receives message using method receive and deletes it using accept. Headers are translated to JMS header names. After that fires event jms_after_receive and returns list of dictionary (keys message, JMS header1, JMS header2, ...).
# single message queue = 'dynamicQueues/HydraQueue' res = c.receive(queue, 1) # multiple messages res = c.receive(queue, 10)
- browse
Browses message queue (messages are not deleted). First fires event jms_before_browse where parameters (destination_name, cnt, jms_correlation_id, jms_type) Methods subscribes to queue using proton method create_receiver, receives message using method receive and accepts using accept (copied from queue, not deleted). Headers are translated to JMS header names. After that fires event jms_after_receive and returns list of dictionary (keys message, JMS header1, JMS header2, ...).
# full queue queue = 'dynamicQueues/HydraQueue' res = c.browse(queue) # message filter jms_id, jms_type = '1234', 'test_msg' res = c.browse(queue, jms_correlation_id=jms_id, jms_type=jms_type)
mqtt_client¶
Module provides class JMSClient which implements client for MQTT protocol using external module paho-mqtt in version >= 1.2. Unit tests available at hydratk/lib/network/jms/mqtt_client/01_methods_ut.jedi
Attributes :
- _mh - MasterHead reference
- _client - stompest client instance
- _host - server hostname (or IP address)
- _port - port name (default 1883)
- _user - username
- _passw - password
- _verbose - verbose mode, disabled by default
- _is_connected - bool, set to True/False after successful connect/disconnect. Some methods are disabled if not connected.
- _messages - auxiliary storage (it has no getter)
Properties (Getters) :
- client - returns _client
- host - returns _host
- port - returns _port
- user - returns _user
- passw - returns _passw
- verbose - returns _verbose
- is_connected - returns _is_connected
Methods :
- __init__
Constructor called by JMSClient method. Provides parameter verbose. Sets _client to paho client instance (constructor Client) and turns on verbose mode if enabled.
- connect
Connects to message queue (specified via host, port, user, passw). First fires event jms_before_connect where parameters can be rewritten. Connects to queue using paho method connect (authentication using username_pw_set). After successful connection fires event jms_after_connect and returns bool. Connection timeout is 10s by default (parameter timeout).
from hydratk.lib.network.jms.client import JMSClient c = JMSClient('MQTT') res = c.connect(host='127.0.0.1', port=1883, user='admin', passw='password')
- disconnect
Disconnects from queue using proton method disconnect, close and returns bool.
res = c.disconnect()
- send
Sends message to topic. First fires event jms_before_send where parameters (destination_name, message) can be rewritten. Method sends message using paho method publish. After successful send fires event jms_after_send and returns bool.
res = c.send('HydraTopic', 'test')
- receive
Receives messages from topic. First fires event jms_before_receive where parameters (destination_name, cnt, timeout) can be rewritten. Methods subscribes to topic using paho method subscribe, methods checks topic for incoming messages (up to count or timeout, asynchronous). After that fires event jms_after_receive and returns list of string.
# single message res = c.receive('HydraTopic') # multiple messages res = c.receive('HydraTopic', 2)
- _on_message
Auxiliary method with asynchronous message receiver.
simplejms¶
Modules provides simplified wrapper to jms_client with with possibility to send messages from template. Unit tests available at hydratk/lib/network/jms/simplejms/01_methods_ut.jedi
Classes :
- JMSClient
Attributes : _request, _response
Properties (Getters) : request, response
Properties (Setters : request
Methods : send - uses method jms_client.send, parameter jms_correlation_id
from hydratk.lib.network.jms.simplejms import JMSClient, JMSRequest, JMSRequestMessage c = JMSClient() msg = '<readCustomer><id>0</id></readCustomer>' rqm = JMSRequestMessage(msg, 'str') rq = JMSRequest('dynamicQueues/HydraQueue', 'test') rq.message = rqm c.request = rq properties = {'provider_url': 'tcp://127.0.0.1:61616', 'initial_context_factory': 'org.apache.activemq.jndi.ActiveMQInitialContextFactory'} c.connect('ConnectionFactory', properties) res = c.send('1234')
- JMSRequest
Attributes : _bind_lchr, _bind_rchr, _content
Properties (Getters) : content
Properties (Setters) : content
Methods : __init__ - sets _destination_queue, _jms_type
- JMSRequestMessage
Attributes : _msg, _destination_queue, _jms_type
Properties (Getters) : destination_queue, jms_type, msg, message
Properties (Setters) : destination_queue, jms_type, msg, message
Methods : __init__ - sets _content, it is possible to load message from file load_from_file - sets _content with file content bind_var - fills message template with given data passed as args, kwargs
msg = '<readCustomer><id>[id]</id><name>[name]</name></readCustomer>' # bind args c = JMSRequestMessage(msg, 'str') id, name = '1', 'Charlie Bowman' c.bind_var({'id': id}, {'name': name}) # bind kwargs c.bind_var({'id': id, 'name': name})