极速赛车APP下载

IBM Websphere MQ 使用图解

电脑杂谈  发布时间:2019-08-31 13:10:35  来源:网络整理

ibm websphere mq下载_ibm websphere mq搭建_ibm websphere mq基础教程

本文链接:

安装和配置的内容,可参考

下面是安装配置后的图片:

IBM MQ 部署后截图

ibm websphere mq下载_ibm websphere mq基础教程_ibm websphere mq搭建

队列对应Buffer,通道对应Channel。

根据通道是本方还是我方,启用相应的通道(即服务)。

MQ队列

远程队列_remote发送到对方的本地队列_local。

ibm websphere mq搭建_ibm websphere mq下载_ibm websphere mq基础教程

比如ibm websphere mq搭建,在abc_remote上放入消息“123”,在cz_local队列上能够获得“123”。

更具体的理解请点击

import grp.pt.util.StringUtil;
import org.apache.log4j.Logger;
import com.ibm.mq.MQC;
import com.ibm.mq.MQEnvironment;
import com.ibm.mq.MQException;
import com.ibm。mq.MQGetMessageOptions;
import com.ibm。mq.MQMessage;
import com.ibm.mq.MQPutMessageOptions;
import com。ibm.mq。MQQueue;
import com。ibm.mq。MQQueueManager;
import com.ibm.mq.MQSimpleConnectionManager;
import com。river.common.UploadFileUtil;
/**
 * MQ配置 连接信息
 */
public class MqHandler {
	private static Logger log = Logger.getLogger(MqHandler。class);
	private static String MQ_IP = UploadFileUtil
			.getFromPro("mqconfig", "MQ_IP");
	private static int MQ_PORT = Integer.parseInt(UploadFileUtil.getFromPro(
			"mqconfig", "MQ_PORT"));
	// 队列管理器
	private static String MQ_QMANAGER = UploadFileUtil.getFromPro("mqconfig",
			"MQ_QMANAGER");
	// mq服务方通道
	private static String MQ_CHANNEL = UploadFileUtil。getFromPro("mqconfig",
			"MQ_CHANNEL");
	// mq通讯编码集
	private static int MQ_CCSID = Integer.parseInt(UploadFileUtil。getFromPro(
			"mqconfig", "MQ_CCSID"));
	// mq请求方发送队列
	private static String MQ_CLENT_SENDQUEUE = UploadFileUtil.getFromPro(
			"mqconfig", "MQ_CLENT_SENDQUEUE");
	// mq请求方接收队列
	private static String MQ_CLENT_RECQUEUE = UploadFileUtil.getFromPro(
			"mqconfig", "MQ_CLENT_RECQUEUE");
	
	// MQ自带连接池
	private static MQSimpleConnectionManager myConnMan = null;
	
	// 异步长连接接收队列管理器
	private MQQueueManager queueManager = null;
	// 异步发送队列
	private MQQueue SendQueue = null;
	// 异步接收队列
	public MQQueue ReceiveQueue = null;
	
	/**
	 * 构造方法
	 */
	public MqHandler() throws Exception {
		init();
	}
	/**
	 * 异步发送消息
	 * 
	 * @param msg
	 * @param messageId
	 * @param charSet
	 * @throws Exception
	 */
	public void sendMsg(String msg, String messageId, String charSet)
			throws Exception {
		try {
			if (StringUtil.isEmpty(charSet)) {
				charSet = "GBK";
			}
			byte[] msgId = StringUtil.isEmpty(messageId) ? null : messageId
					.getBytes(charSet);
			// 打开队列
			queueManager = new MQQueueManager(MQ_QMANAGER, myConnMan);
			SendQueue = queueManager。accessQueue(MQ_CLENT_SENDQUEUE,
					MQC。MQOO_OUTPUT | MQC.MQPMO_NEW_MSG_ID
							| MQC.MQOO_FAIL_IF_QUIESCING, null, null, null);
			// 初始化消息选项
			MQPutMessageOptions pmo = new MQPutMessageOptions();
			// 确保每次发送前为消息自动生成唯一的msgId
			pmo。options = pmo。options + MQC.MQPMO_NEW_MSG_ID;
			// 如果设置了该参数,则发送后必须调用commit功能。否则无法将消息发送出
			pmo。options = pmo.options + MQC。MQPMO_SYNCPOINT;
			// 创建消息对象
			MQMessage outMsg = new MQMessage();
			// 设置MQMD格式字段
			outMsg.format = MQC.MQFMT_STRING;
			outMsg.messageId = msgId == null ? MQC。MQMI_NONE : msgId;
			outMsg.encoding = MQ_CCSID;
			outMsg。characterSet = MQ_CCSID;
			// 消息发送时必须以字节流的方式发送
			outMsg。write(msg.getBytes(charSet));
			// 在队列上放置消息
			SendQueue.put(outMsg, pmo);
			// 和MQC.MQPMO_SYNCPOINT属性对应。如果设置了该属性,则发送后需要提交。
			queueManager.commit();
		} finally {
			close(SendQueue);
		}
	}
	/**
	 * 接收消息
	 * @param correlationId
	 * @param charSet
	 * @return
	 * @throws Exception
	 */
	public String receive(String correlationId, String charSet)
			throws Exception {
		try {
			if (StringUtil.isEmpty(charSet)) {
				charSet = "GBK";
			}
			
			queueManager = new MQQueueManager(MQ_QMANAGER, myConnMan);
			ReceiveQueue = queueManager.accessQueue(MQ_CLENT_RECQUEUE,
					MQC。MQOO_INPUT_AS_Q_DEF | MQC.MQOO_INQUIRE
							| MQC.MQOO_FAIL_IF_QUIESCING);
			MQMessage respMessage = new MQMessage();
			// 设置MQMD 格式字段
			respMessage.format = MQC。MQFMT_STRING;
			// 设置编码格式与MQ服务一致
			respMessage.encoding = MQ_CCSID;
			// 设置字符集与MQ服务一致
			respMessage.characterSet = MQ_CCSID;
			MQGetMessageOptions gmo = new MQGetMessageOptions();
			gmo.options = gmo.options + MQC.MQGMO_WAIT; // 如果设置了该参数,则当前线程将阻塞,直到等到回复的消息或超时
			gmo。waitInterval = MQC.MQWI_UNLIMITED;
			if (StringUtil.isNotEmpty(correlationId)) {
				// 如果设置了该参数,则根据消息的correlId去匹配对应的响应消息
				gmo.matchOptions = MQC。MQMO_MATCH_CORREL_ID;
				respMessage.correlationId = correlationId.getBytes(charSet);
			}
			ReceiveQueue.get(respMessage, gmo);
			byte[] msgBuffer = new byte[respMessage.getMessageLength()];
			respMessage.readFully(msgBuffer);
			String respMsg = new String(msgBuffer, charSet);
			return respMsg;
		} finally {
			close(ReceiveQueue);
		}
	}
	/**
	 * 初始化连接
	 * 
	 * @param type
	 * @throws MQException
	 */
	private void init() throws MQException {
		MQEnvironment.hostname = MQ_IP;
		MQEnvironment.port = MQ_PORT;
		MQEnvironment。CCSID = MQ_CCSID;
		MQEnvironment.channel = MQ_CHANNEL;
		MQEnvironment.properties.put(MQC。TRANSPORT_PROPERTY,
				MQC.TRANSPORT_MQSERIES_CLIENT);
		
		myConnMan = new MQSimpleConnectionManager();
        myConnMan。setActive(MQSimpleConnectionManager.MODE_AUTO);
        myConnMan.setTimeout(3600000);
        myConnMan.setMaxConnections(75);
        myConnMan.setMaxUnusedConnections(50);
        MQEnvironment.setDefaultConnectionManager(myConnMan);
        
		log.info("初始化队列管理器receiverQueueManager....." + MQ_QMANAGER);
	}
	private void close(MQQueue queue) {
		try {
			if (queue != null && queue.isOpen) {
				queue。close();
			}
			if (queueManager != null) {
				queueManager.disconnect();
			}
		} catch (MQException ex) {
			log。error("", ex);
		}
	}
}

网上类似的代码太多,下面对代码中的细节进行解释。

ibm websphere mq搭建_ibm websphere mq下载_ibm websphere mq基础教程

初始化

MQEnvironment的静态属性:hostname、port、CCSID等初始化后,MQQueueManager利用这种属性来构造实例对象。

MQSimpleConnectionManager是简单的连接池,当设定了连接池后MQEnvironment.setDefaultConnectionManager(myConnMan)ibm websphere mq搭建,再使用close方法时,并不会破坏连接,而是归还池中。

连接池详细内容可以参考官网的API

ibm websphere mq下载_ibm websphere mq搭建_ibm websphere mq基础教程

消息参数

更多参数参见官方表明

import org。jfree.util.Log;
import org.junit.Test;
import org.junit.runner。RunWith;
import org.junit。runners.JUnit4;
import com。ctjsoft。treasury.util。MqHandler;
@RunWith(JUnit4.class)
public class MqTest {
	
	@Test
	public void mqSend(){
		try {
			MqHandler mq = new MqHandler();
			mq。sendMsg("sq--send1", null, "gbk");
			mq.sendMsg("sq--send2", null, "gbk");
			mq.sendMsg("sq--send3", null, "gbk");
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e。printStackTrace();
		}
	}
	
	@Test
	public void mqReceive(){
		try {
			MqHandler mq = new MqHandler();
			String msg = mq.receive(null, "gbk");
			System。out.println(msg);
			String msg1 = mq.receive(null, "gbk");
			System.out.println(msg1);
			String msg2 = mq.receive(null, "gbk");
			System.out。println(msg2);
			System.out.println("任务完成");
		} catch (Exception e) {
			Log.error("", e);
		}
	}
}

测试过程中发生的问题举例:

感觉有用请点赞


本文来自电脑杂谈,转载请注明本文网址:
http://www.0531mai.com/a/jisuanjixue/article-121212-1.html

    相关阅读
    发表评论  请自觉遵守互联网相关的政策法规,严禁发布、暴力、反动的言论

    • 周浩
      周浩

      百万雄师过大海

    • 张炳亮
      张炳亮

      对比大豆的收购价我真不知道食用油售价低于5元

    • 常涵予
      常涵予

      应该是个不错的选择

    极速赛车APP 极速赛车手机版下载 极速赛车APP下载 极速赛车双面盘 极速赛车手机版下载 极速赛车手机官网 极速赛车手机官网 极速赛车手机官网 极速赛车APP下载 极速赛车双面盘