第三章、ActiveMQ发送消息过程分析【源码】
2019-02-17 本文已影响10人
YQ_1024
1.ActiveMQ基本使用
(1)创建JMS连接工厂
//获取jms连接工程
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.102:61616");
(2)通过连接工厂获取连接
connection = connectionFactory.createConnection();
(3)通过连接获取会话
//在JMS中session用于获取消息发送者,消息接受者,是消息发送接收这一过程。与Connection不同,Connection是在建立会话之前,只有建立了连接才可能会话
Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
(4)通过session创建Destination
Destination destination = session.createQueue("myQueue");
(5)通过Session创建发送者
MessageProducer producer = session.createProducer(destination);
(6)发送消息
TextMessage message = session.createTextMessage("Hello Worlds" );
producer.send(message);
2.消息发送过程分析
(1)调用ActiveMQMessageProducer的send(Message message)方法,这里调用的其父类中抽象类ActiveMQMessageProducerSupport方法
//父类中调用子类的send方法,典型的模版设计模式
public void send(Message message) throws JMSException {
this.send(this.getDestination(),
message,
this.defaultDeliveryMode,
this.defaultPriority,
this.defaultTimeToLive);
}
(2) ActiveMQMessageProducer中的覆盖方法
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
this.send(destination, message, deliveryMode, priority, timeToLive, null);
}
//不管参数多少个,都统一调用这个方法
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination == null) {
if (info.getDestination() == null) {
throw new UnsupportedOperationException("A destination must be specified.");
}
throw new InvalidDestinationException("Don't understand null destinations");
}
ActiveMQDestination dest;
//向上提升ActiveMQDestination
if (destination.equals(info.getDestination())) {
dest = (ActiveMQDestination)destination;
} else if (info.getDestination() == null) {
dest = ActiveMQDestination.transform(destination);
} else {
throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
}
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
//对消息进行处理,封装相关属性
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
if (producerWindow != null) {
try {
//判断是否应该阻塞
producerWindow.waitForSpace();
} catch (InterruptedException e) {
throw new JMSException("Send aborted due to thread interrupt.");
}
}
//调用session的send方法
this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
stats.onMessage();
}
(3)调用ActiveMQSession中的方法
ActiveMQProducer和ActiveMQSession是组合关系,ActiveMQProducer的send方法能力是ActiveMQSession提供的。
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {
throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
}
//同一个session对象,发送消息是同步处理的,因为ActiveMQsender对象持有一个创建该对象的session对象,因此同一个session的会话是同步的处理的。
synchronized (sendMutex) {
// tell the Broker we are about to start a new transaction
doStartTransaction();
TransactionId txid = transactionContext.getTransactionId();
long sequenceNumber = producer.getMessageSequence();
//Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
message.setJMSDeliveryMode(deliveryMode);
long expiration = 0L;
if (!producer.getDisableMessageTimestamp()) {
long timeStamp = System.currentTimeMillis();
message.setJMSTimestamp(timeStamp);
if (timeToLive > 0) {
expiration = timeToLive + timeStamp;
}
}
message.setJMSExpiration(expiration);
message.setJMSPriority(priority);
message.setJMSRedelivered(false);
// transform to our own message format here
ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
msg.setDestination(destination);
msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
// Set the message id.
if (msg != message) {
message.setJMSMessageID(msg.getMessageId().toString());
// Make sure the JMS destination is set on the foreign messages too.
message.setJMSDestination(destination);
}
//clear the brokerPath in case we are re-sending this message
msg.setBrokerPath(null);
msg.setTransactionId(txid);
if (connection.isCopyMessageOnSend()) {
msg = (ActiveMQMessage)msg.copy();
}
msg.setConnection(connection);
msg.onSend();
msg.setProducerId(msg.getMessageId().getProducerId());
if (LOG.isTraceEnabled()) {
LOG.trace(getSessionId() + " sending message: " + msg);
}
//判断是同步发送消息还是异步发送消息?
if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
//调用ActiveMQ的异步处理
this.connection.asyncSendPacket(msg);
if (producerWindow != null) {
// Since we defer lots of the marshaling till we hit the
// wire, this might not
// provide and accurate size. We may change over to doing
// more aggressive marshaling,
// to get more accurate sizes.. this is more important once
// users start using producer window
// flow control.
int size = msg.getSize();
producerWindow.increaseUsage(size);
}
} else {
if (sendTimeout > 0 && onComplete==null) {
this.connection.syncSendPacket(msg,sendTimeout);
}else {
this.connection.syncSendPacket(msg, onComplete);
}
}
}
}
(4)ActiveMQConnection的asyncSendPacket方法
public void asyncSendPacket(Command command) throws JMSException {
if (isClosed()) {
throw new ConnectionClosedException();
} else {
doAsyncSendPacket(command);
}
}
private void doAsyncSendPacket(Command command) throws JMSException {
try {
//调用协议的oneway方法
this.transport.oneway(command);
} catch (IOException e) {
throw JMSExceptionSupport.create(e);
}
}
(5)transport对象分析
- 从哪里来的?
回到最初创建connection对象分析
connection = connectionFactory.createConnection();
ActiveMQConnectionFactory 的createConnection()方法
@Override
public Connection createConnection() throws JMSException {
return createActiveMQConnection();
}
protected ActiveMQConnection createActiveMQConnection() throws JMSException {
return createActiveMQConnection(userName, password);
}
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
//创建协议
Transport transport = createTransport();
//将协议组合到createActiveMQConnection中
connection = createActiveMQConnection(transport, factoryStats);
connection.setUserName(userName);
connection.setPassword(password);
configureConnection(connection);
transport.start();
if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} catch (JMSException e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw e;
} catch (Exception e) {
// Clean up!
try {
connection.close();
} catch (Throwable ignore) {
}
throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
}
}
(6)ActiveMQConnectionFactory 的createTransport方法分析
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
//我们的案例应该是创建这样一个tcp connectBrokerUL
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
//调用TransportFactory.connect创建protocol,工厂方法
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}
(7)TransportFactory 的connect反法
public static Transport connect(URI location) throws Exception {
//TcpTransportFactory
TransportFactory tf = findTransportFactory(location);
//这里调用的是TcpTransportFactory的父亲类(TransportFactory)中的方法
return tf.doConnect(location);
}
//这里应该是根据location适配协议工厂类,META-INF/services/org/apache/activemq/transport/应该配置有工厂的实现
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
if (tf == null) {
// Try to load if from a META-INF property.
try {
//private static final FactoryFinder TRANSPORT_FACTORY_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/");
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}
(8)TransportFactory中doConnect 方法
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
//创建协议正在的地方,这里调用的是TcpTransportFactory中的方法
Transport transport = createTransport(location, wf);
//对TcpTransport对象进形包装
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");
if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}
//对tcpTransport进行包装
@SuppressWarnings("rawtypes")
public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
transport = compositeConfigure(transport, wf, options);
transport = new MutexTransport(transport);
transport = new ResponseCorrelator(transport);
return transport;
}
(9)TcpTransportFactory createTransport方法
@Override
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(':');
try {
Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
} catch (Exception e) {
LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage());
if(LOG.isDebugEnabled()) {
LOG.debug("Failure detail", e);
}
}
}
//创建socket
SocketFactory socketFactory = createSocketFactory();
//将socket包装在TCP协议中,创建了一个TcpTransport对象
return createTcpTransport(wf, socketFactory, location, localLocation);
}
(10)TcpTransport 中的oneway方法
*/
@Override
public void oneway(Object command) throws IOException {
checkStarted();
wireFormat.marshal(command, dataOut);
dataOut.flush();
}