Integrating Apache Nifi with IBM MQ
Integrating Apache Nifi with IBM MQ
This would be a continuation of the IBM MQ and Hadoop integration article I first posted a few years ago. This explains how to integrate IBM MQ with Apache Nifi or Hortonworks HDF. IBM MQ is extremely important when attempting to integrate new technologies with legacy environments specifically mainframe environments where a lot of useful business data resides.
How to Integrate with Apache Nifi and Hadoop using IBM MQ
To integrate IBM MQ with Apache Nifi or Hortonworks HDF you will first need to download Apache Nifi or Hortonworks HDF and than you will want to download IBM MQ Advanced Developer Edition https://developer.ibm.com/messaging/mq-downloads/ which is free for developers or users to try out. So once you have Hortonworks HDF or Apache Nifi installed and configured you will proceed to install and configure IBM MQ.
In this example below I am installing and configuring a Local QMGR in Bindings Mode to interoperate with Apache Nifi’s ConsumeJMS Function and the PutHDFS function to send the messages to HDFS.
And the question will probably come up why Bindings vs using Clients with MQ. Bindings provides better performance. https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_9.0.0/com.ibm.mq.dev.doc/q030520_.htm and IBM even states Bindings provides about a 30% performance improvement over clients. https://www.ibm.com/support/knowledgecenter/en/SSAW57_9.0.0/com.ibm.websphere.nd.multiplatform.doc/ae/ucli_pqcfm.html and http://bjansen.github.io/java/2018/03/04/high-performance-mq-jms.html
Installing IBM MQ Local QMGR and Samples
Installing IBM MQ on Linux:
root@hdpclient tmp]# tar -xvf mqadv_dev80_linux_x86-64.tar.gz
root@hdpclient tmp]# cd MQServer
[root@hdpclient MQServer]# ./mqlicense.sh
[root@hdpclient MQServer]# yum install ./MQSeriesRuntime-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesServer-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesGSKit-8.0.0-4.x86_64.rpm MQSeriesJRE-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesJava-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesClient-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesAMQP-8.0.0-4.x86_64.rpm
[root@hdpclient MQServer]# yum install MQSeriesSamples-8.0.0-4.x86_64.rpm
Configuring you Local QMGR with IBM MQ 8.0+
#Switch to MQM User
su – mqm
# Source Local QMGR Environment Variables
. /opt/ibm/mqm/bin/setmqenv
# Create a Local QMGR if it doesn’t already exist
crtmqm GSSMQP1
# Start the Local QMGR if it’s not already started
strmqm GSSMQP1
# Create Local Queues if none are defined for testing
runmqsc GSSMQP1
define qlocal(GSS.REQUEST.REPLY.QUEUE)
end
Configuring IBM MQ to allow Nifi to read messages
Configuring the local QMGR to allow the “nifi†group or user-id you are using to run the local Nifi process on the same server as your IBM MQ QMGR.
#Switch to the mqm user:
su – mqm
#Source Local QMGR Environment Variables
. /opt/ibm/mqm/bin/setmqenv
#ACL the Local QMGR for Flume and the Queue(s) being used by Nifi
setmqaut -m GSSMQP1 -t qmgr -g nifi +connect +inq +dsp
setmqaut -m GSSMQP1 -t queue -n GSS.REQUEST.REPLY.QUEUE -g nifi +inq +browse +get +put +dsp
#Refresh Security on the Local QMGR
runmqsc GSSMQP1
refresh security
end
Setup Nifi Server nifi-bootstrap.conf. with the following additional java.args
java.arg.18=-Djavax.security.auth.useSubjectCredsOnly=true
+java.arg.19=-Djava.library.path=/opt/mqm/java/lib64/
+java.arg.20=-Dcom.ibm.mq.cfg.jmqi.libpath=/opt/mqm/java/lib64
A strange requirement after doing some testing disabling and re-enabling the IBMMQJMSController when in bindings mode causes a stack trace to be thrown I’ve raised the issue NIFI-5184 with the Apache Nifi Project
A solution to this problem is to force the IBM MQ libs to load with the system classloader but since Nifi uses a RunNifi class to build this classpath you must copy in the jars to nifi/lib or symlink in nifi/lib:
- cd /usr/nifi/lib
- MQJARS=`ls -1 /opt/mqm/java/lib/*.jar`; for mqjar in $MQJARS; do ln -fns $mqjar; done
NIFI IBM MQ Setup to take messages from MQ Queue and Put to HDFS:
1) Create a ConsumeJMS Processor and setting it up to consume IBM MQ with a Local QMGR using MQ Bindings:
- Create a new Controller Service using JMSConnectionFactoryProvider from within the ConsumeJMS Processor
- Settings “JMSConnectionFactoryProvider controller service:
- Name ->Â IBMMQConnectionFactory
- Properties IBMMQConnectionFactory controller service:
- MQ ConnectionFactory Implementation -> com.ibm.mq.jms.MQConnectionFactory
- MQ Client Libraries path (i.e., /usr/jms/lib) –> /opt/mqm/java/lib/
- Broker URI -> mqm
- SSL Context Service -> Not Set
- queueManager -> GSSMQP1
- transportType -> 0
- Settings “JMSConnectionFactoryProvider controller service:
- Setup the ConsumeJMS Processor:
- SettingsConsumeJMS:
- Name Processor -> IBMMQConsumeJMS
- Properties IBMMQConsumeJMS:
- Connection Factory Service -> IBMMQConnectionFactory
- Destination Name -> GSS.REQUEST.REPLY.QUEUE
- Destination Type -> QUEUE
- Session Cache size -> 1
- Acknowledgement Mode -> 2
- SettingsConsumeJMS:
2) Create a PutHDFS Processor:
- Settings:
- Name Processor -> JMSPut2HDFS
- Hadoop Configuration Resources -> /etc/hadoop/conf/core-site.xml,/etc/hadoop/conf/hdfs-site.xml
- Kerberos Principal -> nifi/hdpclient.hdp.senia.org@HDP.SENIA.ORG
- Kerberos Keytab -> /etc/security/keytabs/nifi.service.keytab
- Directory -> /tmp/nifi_jms
Sending Sample Messages to the Local Queue on QMGR using Sample Put Command
- cd /opt/mqm
- . ./bin/setmqenv
- ./samp/bin/amqsput GSS.REQUEST.REPLY.QUEUE GSSMQP1
Sample AMQSPUT0 start
target queue is GSS.REQUEST.REPLY.QUEUE
GSS TEST END MESSAGE SEND MESSAGE
end
Sample AMQSPUT0 end
Screenshots showing the Nifi Configuration
Nifi IBM MQJMS to HDFS
Nifi Flow Controller for IBMMQConnectionFactory
Nifi IBMMQConnectionFactory Properties
Nifi ConsumeJMS IBMMQ Properties
Nifi PutHDFS Processor