手机app,微服务异步架构—MQ之RocketMQ,网

频道:新闻调查 日期: 浏览:244

咱们咱们都知道把一个微服务架构变成一个异步架构只需求加一个MQ,现在市道上有许多MQ的开源结构。究竟挑选哪一个MQ的开源结构才适宜呢?”

1

什么是MQ?MQ的原理是什么?

MQ便是音讯行列,是Message Queue的缩写。音讯行列是一种通讯办法。音讯的实质便是一种数据结构。由于MQ把项目中的音讯集中式的处理和存储,所以MQ首要有解耦,并发,和削峰的功用。

1,解耦:

MQ的音讯生产者和顾客相互不关心对方是否存在,经过MQ这臧志中个中间件的存在,使整个体系抵达解耦的效果。

假如服务之间用RPC通讯,当一个服务跟几百个服务通讯时,假如那个服务的通讯接口改动,那么几百个服务的通讯接口都的跟着变化,这是十分头疼的一件事。

可是选用MQ之后,不管是生产者或许顾客都能够独自改动自己。他们的改动不会影单玉柱响到其他服务。然后抵达解耦的意图。为什么要解耦呢?说白了便是便利,削减不必要的工作量。

2,并发

MQ有生产者集群和顾客集群,所以客户端是亿级用户时,他们都是并行的。然后大大提高响应速度。

3,削峰

由于MQ能存储的音讯量很大,所以他能够把许多的音讯恳求先存下了,然后再并发的办法渐渐处理。

假如选用RPC通讯,每一次恳求用调用RPC接口,当恳求量巨大的时分,由于RPC的恳求是很耗资源的,所以巨大的恳求一定会压垮服务器。

削峰的意图是用户体会变好,并且使整个体系安稳。能承受许多恳求音讯。

2

现在市道上有什么MQ,

关键介绍RocketMQ

现在市道上的MQ有许多,首要有RabbitMQ,ActiveMQ,ZeroMQ,RocketMQ,Kafka等等,这些都是开源的MQ产品。曾经许多人引荐运用RabbitMQ,他也是十分好用的MQ产品,这儿不做过多的介绍。Kafka也是高吞吐量的老迈,咱们这儿也不介绍。

咱们关键介绍一下RocketMQ,RocketMQ是阿里巴巴在2012年开源的分布式音讯中间件,现在现已捐赠给Apache软件基金会,并于并于2017年9月25日成为 Apache 的尖端项目。

作为经历过屡次阿里巴巴双十一这种“超级工程”的洗礼并有安稳超卓体现的国产中间件,以其高功能、低延时和高牢靠等特性近年来现已也被越来越多的国内企业运用。

功用概览图

能够看见RocketMQ支撑守时和延时音讯,这是RabbitMQ所没有的才干。

RocketMQ的物理结构

从这儿能够看出,RocketMQ涉及到四大集群,producer,Name Server,Consumer,Broker。

Producer集群:

是生产者集群,担任发生音讯,向顾客发送由事务运用程序体系生成的音讯,RocketMQ供给三种办法发送音讯:同步,异步,单向。

一,一般音讯wto姐妹会

1,同步原理图

同步音讯要害代码

try {
SendResult sendResult = producer.send(msg);
// 同步发送音讯,只需不抛反常便是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " msgId is: " + sendResult.getMessageId());
}
catch (Exception e) {
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}
}

2,异步原理图

异步音讯要害代码

producer.sendAsync(msg, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
// 消费发送成功
System.out.println("send message success. topic=" + sendResult.getTopic() + ", msgId=" + sendResult.getMessageId());
}

@Override
public void onException(OnExceptionContext context) {
System.out.println("boxsend message failed. topic=" + context.getTopic() + ", msgId=" + context.getMessageId());
}
});

3,单向(Oneway)发送原理图

单向只发送,不等候回来,所以速度最快,一般在微秒级,但或许丢掉

单向(Oneway)发送音讯要害代码

producer.sendOneway(msg);

三种发送音讯具体代码请参阅文档:https://help.aliyun.com/document_detail/29547.html?spm=a2芙丽芳丝c4g.11186623.6.566.7e49793fuueSlB

二,守时音讯和延时音讯

发送肝癌晚期症状守时音讯要害代码

try {
// 守时音讯,单位毫秒(ms),在指守时刻戳(当时时刻之后)进行投递,例如 2016-03-07 16:21:00 投递。假如被设置成当时时刻戳之前的某个时刻,音讯将马上投递给顾客。
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
msg.setStartDeliverTime(timeStamp);
// 发送音讯,只需不抛反常便是成功
SendResult sendResult = producer.send(msg);
System.out.println("MessageId:"+sendResult.getMessageId());
}
catch (Exception e) {
// 音讯发送失利,需求进行重试处理,可从头发送这条音讯或耐久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}

发送延时音讯要害代码

try {

// 延时音讯,单位毫秒(ms),在指定延迟时刻(当时时刻之后)进行投递,例如音讯在 3 秒后投递
long delayTime = System.currentTimeMillis() + 3000;
// 设置音讯需求被投递的时刻 msg.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(msg);
// 同步发送音讯,只需不抛反常便是成功
if (sendResult != null) {
System.out.println(new Date() + " Send mq message success. Topic is:" + msg.getTopic() + " 黑客杜天禹msgId is: " + sendResult.getMessageId());
}

} catch (Exception e) {

// 音讯发送失利,需求进行重试处理,可从头发送这条音讯或耐久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}

注意事项

1,守时和延时音讯的 msg.setStartDeliverTime 参数需求设朴丽芬置成当时时刻戳之后的某个时刻(单位毫秒)。假如被设置成当时时刻戳之前的某个时刻,音讯将马上投递给顾客。

2,守时和延时音讯的 msg.setStartDeliverTime 参数可设置40天内的任何时刻(单位毫秒),超越40天音讯发送将失利。

3,StartDeliverTime 是服务端开端向消费端投递的时刻。 假如顾客当时有音讯堆积,那么守时和延时音讯会排在堆积音讯后边,将不能严厉依照装备的时刻进行投递。

4,由于客户端和服务端或许存在时刻差,音讯的实践投递时刻与客户端设置的投递时刻之间或许存在误差。

5,设置守时和延时音讯的投递时刻后,仍然受 3 天的音讯保存时长约束。例如,设置守时音讯 5 天后才干被消费,假如第 5 天后一向没被消费,那么这条音讯将在第8天被删去。

6,除 Java 言语支撑延时音讯外,其他来不及说我喜欢你言语都不支撑延时音讯。

发布音讯原理图

三,事务音讯

RocketMQ供给相似X/Open XA的分布式事务功用来确保事务发送方和MQ音讯的终究一致性,其实质是经过半音讯的办法把分布式事务放在MQ端来处理。

原理图

其间:

​ 1,发送方向音讯行列 手机app,微服务异步架构—MQ之RocketMQ,网RocketMQ 服务端发送音讯。

​ 2,服务端将音讯耐久化成功之后,向发送方 ACK 承认音讯现已发送成功,此刻音讯为半音讯。

​ 3,发送方开端履行本地事务逻辑。

​ 4,发送方依据本地事务履行成果向服务端提交二次承认(Commit 或是 Rollback),服务端收到 Commit 状况则将半音讯标记为可投递,订阅方终究将收到该音讯;服务端收到 Rollback 状况则删去半音讯,订阅方将不会承受该音讯。

​ 5,在断网或许是运用景甜性感重启的特别情况下,上述过程 4 提交的二次承认终究未抵达服务端,经过固守时刻后服务端将对该音讯建议音讯回查。

​ 6,发送方收到音讯回查后,需求查看对应音讯的本地事务履行的终究成果。

​ 7,发送方依据查看得到的本地手机app,微服务异步架构—MQ之RocketMQ,网事务的终究状况再次提交二次承认,服务端仍依照过程 4 对半音讯进行操作。手机app,微服务异步架构—MQ之RocketMQ,网

RocketMQ的半音讯机制的注意事项是

1,依据第六步能够看出他要求发送方供给事务回查接口。

2,不能确保发送方的音讯幂等,在ack没有回来的情况下,或许存在重复音讯

3,消费方要做幂等处理。

中心代码

final BusinessService businessService = new BusinessService(); 手机app,微服务异步架构—MQ之RocketMQ,网// 本地事务

TransactionProducer producer = ONSFactory.createTransactionProducer(properties,
new LocalTransactionCheckerImpl());
producer.start();
Message msg = new Message("Topic", "TagA", "Hello MQ transaction===".getBytes());
try {

SendResult sendResult = producer.send(msg, new LocalTransactionExecuter() {

@Override
public TransactionStatus execute(Message msg, Object arg) {

// 音讯 ID(有或许音讯体相同,但音讯 ID 不相同,当时音讯 ID 在控制台无法查询)
String msgId = msg.getMsgID();

// 音讯体内容进行 crc32,也能够运用其它的如 MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());

// 音讯 ID 和 crc32id 首要是用来避免音讯重复
// 假如事务本身是幂等的,能够疏忽,不然需求运用 msgId 或 crc32Id 来做幂等
// 假如要求音讯绝对不重复,引荐做法是对音讯体 body 运用 crc32 或 MD5 来避免重复音讯
Object businessServiceArgs = new Object();

Transa手机app,微服务异步架构—MQ之RocketMQ,网ctionStatus transactionStatus =TransactionStatus.Unknow;

try {

boolean isCommit = businessService.execbusinessService(businessServiceArgs);

if (isCommit) {

// 本地事务成功则提交音讯 transactionStatus = TransactionStatus.CommitTransaction;

} else {

// 本地事务失利则回滚音讯 transactionStatus = TransactionStatus.RollbackTransaction;

}

} catch (Exception e) {log.error("Message Id:{}", msgId, e);

}

System.out.println(msg.getMsgID());log.warn("Message Id:{}transactionStatus:{}", msgId, transactionStatus.name());

return transactionStatus;
}
}, null);
}
catch (Exception e) {

// 音讯发送失利,需求进行重试处理,可从头发送这条音讯或耐久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
e.printStackTrace();
}

具体代码参阅文档:https://help.aliyun.com/document_detail/29548.html?spm=a2c4g.11186623.6.570.5玲玲解忧d5738a49FJl1t

一切音讯发布原理图

producer彻底无状况,能够集群布置。

Name Server集群:

NameServer是一个简直无状况的节点,可集群布置,节点之间无任完美假妻168何信息同步,NameServer很像注册中心的功用。

传闻阿里之前的NameServer 是用ZooKeeper做的,或许由于Zookeeper不能满意大规模并发的要求,所以之后NameServer 是阿里自研的。

NameServer其实便是一个路由表,他办理Producer和Comsumer之间的发现和注册。

Broker集群:

Broker布置相对杂乱,Broker分为Master与Slave,一个Master能够对应多个Slaver,可是一个Slaver只能对应一个Master,Master与Slaver的对应联系经过指定相同的BrokerName。

不同的BrokerId来界说,BrokerId为0表明Master,非0表明Slaver。Master能够布置多个。每个Broker与NameServer集群中的一切节点建手机app,微服务异步架构—MQ之RocketMQ,网立长衔接,守时注册Topic信息到一切的NameServer。

Consumer集群:

订阅办法

音讯行列 RocketMQ 支撑以下两种订阅办法:

集群订阅:同一个 Group ID 所标识的一切 Consumer 均匀分摊消费音讯。 例如某个 Topic 有 9 条音讯,一个 Group ID 有 3 个 Consumer 实例,那么在集群消费形式下每个实例均匀分摊,只消费其间的 3 条音讯。

// 集群订阅办法设置(不设置的情况下,默以为集群订阅办法)
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);

播送订阅:同一个 Group ID 所标识的一切 Consumer 都会各自消费某条音讯一次精算师。 例如某个 Topic 有 9 条音讯,一个 Group ID 有 3 个 Consumer 实例,那么在播送消费形式下每个实例都会各自消费 9 条音讯。

// 播送订阅办法设置
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);

订阅音讯要害代码:

Consum手机app,微服务异步架构—MQ之RocketMQ,网er consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("TopicTestMQ", "TagA||TagB", **new** MessageListener() { //订阅多个 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receiv守望妻子e: " + message);
return Action.CommitMessage;
}
});
//订阅别的一个 Topic
consumer.subscribe("TopicTestMQ-Other", "*", **new** MessageListener() { //订阅全器宗武神部 Tag
public Action consume(Message message, ConsumeContext context) {
System.out.println("Receive: " + message);
return Action.CommitMessage;
}
});
consumer.start();

注意事项:

消费端要做幂等处理,一切MQ基本上都不会做幂等处理,需求事务端处理,原因是假如在MQ端做幂等处理睬带来MQ的杂乱度,并且严重影响MQ的功能。

音讯收发模型

主子账号创立

创立主子账号的原因是权限问题。下面是主账号创立流程图

具体前田香操作地址:http小三被扒s://help.aliyun.com/document_detail/3我就这样离别山下的家4411.html日本童贞?spm=a2c4g.11186623.6.555.38c57f91JXUK7o

子账号流程图

具体操作地址:https://help.aliyun.com/document_detail/96402.html?spm=a2c4g.11186623.6.556.60194fedfSkxIB

3

MQ是微服务架构

十分重要的部分

MQ的诞生把本来的同步架构思想转变到异步架构思想供给一种办法,为大规模,高并发的事务场景的安稳性完成供给了很好的处理思路。

Martin Fowler着重:分布式调用的榜首准则便是不要分布式。这句话看似颇具道理,然而就企业运用体系而言,只需整个体系在不停地演化,并有多个子体系一起存在时,这条准则就会被逼打破。

Martin Fowler提出的这条准则,一方面是期望设计者能够审慎地对待分布式调用,另一方面却也是分布式体系本身存在的缺点所造成的。

所以微服务并不是全能药,合适的架构才是最好的架构。

需求的Java架构师方面的材料能够重视之后私信哈,回复“材料”收取免费架构视频材料,记住关键赞转发噢!!!

热门
最新
推荐
标签