Just Do Java

Java 's Blog


  • 首页

  • 分类

  • 作者

  • 归档

  • 关于

分布式锁原来实现起来这么简单

发表于 2021-06-04 | 分类于 Java

阿粉最近迷上了 Redis,为什么呢?感觉 Redis 确实功能很强大呀,一个基于内存的 Key-Value 存储的数据库,竟然有这么多的功能,而阿粉也要实实在在的把 Redis 来弄一下,毕竟面试的时候,Redis 可以说是一个非常不错的加分项。

阅读全文 »

分布式全局唯一ID方案这么多?

发表于 2021-06-02 | 分类于 Java

前段时间阿粉想着如何去优化我们公司中已经存在的分布式中的唯一ID,而提起唯一的ID,相信如果不是从事传统行业的人,肯定都有所了解,分布式架构下,唯一ID生成方案,是我们在设计一个系统,尤其是数据库使用分库分表的时候常常会遇见的问题,尤其是当我们进行了分库分表之后,对这个唯一ID的要求也就越来越高。那么唯一ID方案都有哪些呢?

<–more–>

分布式全局唯一ID

往往一谈分布式,总是会 色变,因为在很多面试的时候,都会问你,会不会分布式?你们项目的架构是怎么做的,做的如何?你们既然使用了分布式,那么你们的分布式事务是怎么处理的,你们分布式全局唯一 ID 使用的是什么算法来实现的?

往往谈到这个的时候,很多面试的朋友就会很尴尬,我都是直接用的,我好像完全没有注意过。当你意识到这一点的时候,往往接下来的问题,你回答的就会开始磕磕绊绊,于是面试凉了。

并发越大的系统,数据就越大,数据越大就越需要分布式,而大量的分布式数据就越需要唯一标识来识别它们,而这些唯一标识,我们就称之为分布式全局唯一的ID。

Redis实现全局唯一ID

阿粉的项目说实话,还不是特别的差劲。于是阿粉就开始想着,这分布式的全局唯一ID,为啥生成的时候都是使用 UUID ,要么就是自增主键呢?

于是阿粉准备使用 Redis 来生成分布式全局唯一ID。

Redis实现全局唯一ID原理

因为 Redis 的所有命令是单线程的,所以可以利用 Redis 的原子操作 INCR 和 INCRBY,来生成全局唯一的ID。 方式一:StringRedisTemplate

1
2
3
4
5
public class Activity {
    private Long id;
    private String name;
    private BigDecimal price;
}

上面是我们的活动的实体类,马上就要 618 了,各位做电商的是不是开始准备搞事情了?可以学习一下用一下试试,我们活动中有 id ,活动的名称 name ,还有对应活动设置好的价格 price 等等,字段可能还会有很多,我们需要的暂时就列出这么多。

1
2
3
4
5
6
7
8
9
public class IdGeneratorService {
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private static final String ID_KEY = "id:generator:activity";

    public Long incrementId() {
        return stringRedisTemplate.opsForValue().increment(ID_KEY);
    }
1
long id = idGeneratorService.incrementId(); 调用生成

但是看起来是不是总是感觉好像有点 low ,我们是不是就要准备来整的高大上一点,毕竟代码就像一个程序员的内裤,虽然自己看着有洞感觉没啥,但是别人看到是不是就很不爽了,那就整个他们看起来比较高大上一点的。

方式二:

为什么会有方案二,那是因为我们的 Redis 很多时候都不是只有一个 Redis,都是搭建的集群,既然是集群,我们就要开始合理的利用上集群。

那么我们就要开始考虑到集群方面的知识了,那么我们的思路就有了。于是出现了:集群中每个节点预生成生成ID;然后与redis的已经存在的ID做比较。如果大于,则取节点生成的ID;小于的话,取Redis中最大ID自增。

这个时候我们还需要一段 lua 脚本来保证我们实现的ID是唯一的,这才是真正的本质,不然我们实现的ID在高端,不唯一,有个锤子用

核心脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
local function get_max_seq()
    local key = tostring(KEYS[1])
    local incr_amoutt = tonumber(KEYS[2])
    local seq = tostring(KEYS[3])
    local month_in_seconds = 24 * 60 * 60 * 30
    if (1 == redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

setnx

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key, seq)) then redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

expire

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key, month_in_seconds) return seq else local prev_seq = redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

get

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key) if (prev_seq < seq) then redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

set

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key, seq) return seq else --[[ 不能直接返回redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

incr

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key),因为返回的是number浮点数类型,会出现不精确情况。 注意: 类似"16081817202494579"数字大小已经快超时lua和reids最大数值,请谨慎的增加seq的位数 --]] redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

incrby

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key, incr_amoutt) return redis.call(

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

get

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

, key) end end end return get_max_seq()

以上的 lua 的脚本来自于Ydoing,一个博客的大佬,我们现在既然会使用他生成全局唯一的ID,那么是不是就得搞清楚为什么会选择 Redis 来实现分布式全局唯一的ID。

Redis 的所有命令是单线程的

上一段开头,阿粉就说 Redis 的命令都是单线程的,相信如果你在面试官面前这么说,面试官肯定会问你一句,为什么 Redis 是单线程而不是多线程的呢?

Redis 基于 Reactor 模式开发了网络事件处理器,这个处理器被称为文件事件处理器。它的组成结构为4部分:多个套接字、IO多路复用程序、文件事件分派器、事件处理器。因为文件事件分派器队列的消费是单线程的,所以 Redis 才叫单线程模型。

当你说到这个 Reactor 模式的时候,如果大家深入研究过 Netty 的模型,就会发现,这个模式在 Netty 中也是有使用的,我们这时候是不是就得需要去官网上去瞅瞅看,为什么这么说。

什么是Reactor模型

Reactor模型实际上都知道,就是一个多路复用I/O模型,主要用于在高并发、高吞吐量的环境中进行I/O处理。

而这种多路复用的模型所依赖的永远都是那么几个内容,事件分发器,事件处理器,还有调用的客户端,

Reactor模型是一个同步的I/O多路复用模型,我们还是先把这个同步的I/O多路复用模型给弄清楚了再看其他的。

这个相信大家肯定不是很熟悉,而阿粉在之前也给大家说了关于Netty中的Channel,文章地址发给大家,用Socket编程?我还是选择了Netty,在文章中,我们已经给大家说了关于Channel,而这种单线程的模型是什么样子的呢?

图已经给大家画出来了,丑是丑了点,但是意思还是表达出来了。

这种模型也就是说:Redis 单线程指的是网络请求模块使用了一个线程(所以不需考虑并发安全性),即一个线程处理所有网络请求,其他模块仍用了多个线程。

而面试官还会有一种问法,为什么使用 Redis 就会快。

这个相信大家肯定能回答出来,因为 Redis 是一种基于内存的存储数据,为什么内存快?

因为这种快速是针对存储在磁盘上的数据来说的,因为内存中的数据,断电之后,消失了,你下次来的时候,不还是需要从磁盘读取出来,然后保存,所以说在Redis速度快。扯远了,回来继续说 Redis的单线程。

我们来看看官网给我们的解释:

1
2
3
4
5
6
7
8
Redis is single threaded. How can I exploit multiple CPU / cores?
It's not very frequent that CPU becomes your bottleneck with Redis, as usually Redis is either memory or network bound. For instance, using pipelining Redis running on an average Linux system can deliver even 1 million requests per second, so if your application mainly uses O(N) or O(log(N)) commands, it is hardly going to use too much CPU.

However, to maximize CPU usage you can start multiple instances of Redis in the same box and treat them as different servers. At some point a single box may not be enough anyway, so if you want to use multiple CPUs you can start thinking of some way to shard earlier.

You can find more information about using multiple Redis instances in the Partitioning page.

However with Redis 4.0 we started to make Redis more threaded. For now this is limited to deleting objects in the background, and to blocking commands implemented via Redis modules. For future releases, the plan is to make Redis more and more threaded.

其实翻译过来大致就是说,当我们使用 Redis 的时候,CPU 成为瓶颈的情况并不常见,因为 Redis 通常是内存或网络受限的。

其实说白了,官网就是说我们 Redis 就是这么的快,并且正是由于在单线程模式的情况下已经很快了,就没有必要在使用多线程了。这整的是不是就有点恶心了。阿粉也说说自己的见解,毕竟这官网的话有点糊弄人的意思。

其实 Redis 使用单个 CPU 绑定一个内存,针对内存的处理就是单线程的,而我们使用多个 CPU 模拟出多个线程来,光在多个 CPU 之间的切换,然后再操作 Redis ,实际上就不如直接从内存中拿出来,毕竟耗时在这里摆着。

你认为的 Redis 为什么是单线程的?

阅读全文 »

你知道 Redis 服务器接收到一条命令是如何执行的吗?

发表于 2021-05-30 | 分类于 Java

Hello 大家好,我是阿粉,Redis 作为工作中不可缺少的缓存组件,相信很多小伙伴都会使用到,我们日常使用的时候都是通过代码或者客户端去链接 Redis 服务器来操作数据的。那么一条简单的set name ziyou 命令是如何执行的,中间都经历了哪些过程想必很少会有人去了解。今天阿粉就带大家看一下一条简单的set name ziyou 命令是如何执行的。

我们可以看到在执行set name ziyou 这个命令过后,先显示一个OK 在终端里面。下面我们看下这整个过程都经历了哪些步骤。

命令的整个执行分为下面几个步骤,我们先看流程,在仔细分析:

  1. 客户端发送命令请求;
  2. 服务端读取命令请求;
  3. 命令执行器进行操作
    1. 命令执行器查找命令实现函数;
    2. 命令执行器执行预备操作;
    3. 命令执行器调用命令的实现函数;
    4. 命令执行器执行后续工作;
  4. 服务端将命令回复发送给客户端;
  5. 客户端接收并打印命令回复内容;

客户端发送命令请求

首先当客户端和服务端建立好了链接过后,当我们输入命令 set name ziyou 命令请求的时候,客户端会将这个命令进行协议转换,然后通过连接将转换后的协议发送到服务端。

比如当我们输入命令set name ziyou 的时候,客户端会将这个原始命令转换成*3\r\n$3\r\nset\r\n$4\r\nname\r\n$5\r\nziyou,这个协议大家应该比较眼熟,就是 Redis 管道的文件格式。简单解释下这个协议的意思,前面的*3 表示这个命令总共有三个参数,其中的$3,$4,$5 表示相应参数的长度。

服务端读取命令请求

当服务端收到该客户端的数据时,就会调用命令请求处理器来处理对应的消息。这块主要涉及到三个操作,第一个是保存命令,也就是会将命名的请求信息读取出来保存到对应客户端的输入缓冲区里面;保存完了过后会对输入缓冲区里面的内容进行解析,也就是对上面转换后的协议进行解析,解析出要执行的命令和对应的参数,将参数内容和参数个数保存到客户端的对应参数里面;第三步是调用命令执行器来执行命令。执行的命令和参数保存在RedisClient 结构的 argv 参数中,如下图所示,命令分析完成后,第三步才能更好的进行执行操作:

命令执行器

命令执行器查找实现函数

思考一个问题,我们这里 argv[0] 参数中的命令的是进行set 操作,在这里是个 set 字符串,那么 Redis 服务器是如何进行执行的呢?我们可以想到的是需要根据这个字符串找到对应的函数来进行操作,Redis 在内部有个的命令表,是一个字典结果,key 就是对应的命令名字,字典的值就是一个个 RedisCommand 结构,记录了命令的实现信息。

结构如下,简单来说就是通过 argv[0] 中的命令名称找到命令表中对应的redisCommand 结构,然后根据 proc 指针找到对应的执行命令。这里说明一下,命令名称的大小写没有任何影响,我们在输入的时候不用关心命令名称的大小写问题。

命令执行器执行预备操作

在 Redis 服务器执行相关命令之前,为了保证命令能够正确的执行,还需要进行相关的预备处理,部分预操作如下:

  1. 检查命令的参数和输入的参数个数是否一致,不一致则直接返回错误;
  2. 检查客户端是否通过身份验证,未通过身份验证则只能执行 AUTH 命令进行身份验证;
  3. 检查服务器的内容使用情况,为了保证命令执行成功,可能会需要进行内容回收;

除了上面的功能之外还有很多需要预备执行的动作,而且根据服务器部署的情况不一样,单机还是集群需要执行的操作还有不同。只有当所有的 预备操作都执行成功过后,才会真正的执行用户的命令。

由此可见 Redis 的性能是真正的高效,在有这么做操作流程的情况下还能保住命令执行的如此快速,不得不说真的很优秀。

命令执行器调用命令的实现函数

当前面的预备操作都完成过后,命令执行器就会调用对应的实现函数,在我们这里的例子就是调用 setCommand(redisClient *c) 函数进行数据写入操作,具体的 key 值和 value 值在 redisClient 结构中已经保存了,所以只要传递一个指针进去就可以了。setCommand() 命令执行后会返回一个OK\r\n ,这个返回会被保存到客户端的输出缓冲区当中,输出缓冲区的内容后续会被返回到客户端,给用户展示出来,如前面的图片显示的内容。

命令执行器执行后续工作

当命令执行器调用具体的实现函数过后,服务器还会有相应的一些操作要做,比如如果开启了慢日志功能,会检查是否要写入慢日志;如果开启了 AOF 则需要将刚刚执行的命令写入 AOF 的缓冲区中;以及如果有服务器备份或者监听的时候,会把刚刚执行的命令广播过去。

服务端将命令回复发送给客户端

实现函数执行完过后会将执行结果保存到客户端的输出缓冲区中,此时服务器的命令回复处理器会将缓冲区中的命令回复发送给客户端。命令回复处理器发送完数据过后会将客户端的输出缓冲区清理,方便后续的命令存入数据,同样回复的数据也是经过协议转换的。

客户端接收并打印命令回复内容

客户端收到回复数据过后就数据转换成可读的形式,输出到控制台。这样就得到了我们第一张图片的结果。

总结

通过上面所有的过程,我们可以看到,就是一个简单的set name ziyou 这样的语句,整个执行的过程也还是很复杂的,Redis 服务器在设计的时候要考虑很多东西,安全,性能等等方面。

引用

《Redis 设计与实现第二版》

阅读全文 »

你知道 Redis 服务器接收到一条命令是如何执行的吗?

发表于 2021-05-30 | 分类于 Java

Hello 大家好,我是阿粉,Redis 作为工作中不可缺少的缓存组件,相信很多小伙伴都会使用到,我们日常使用的时候都是通过代码或者客户端去链接 Redis 服务器来操作数据的。那么一条简单的set name ziyou 命令是如何执行的,中间都经历了哪些过程想必很少会有人去了解。今天阿粉就带大家看一下一条简单的set name ziyou 命令是如何执行的。

阅读全文 »

用了这么久的Mybatis,结果面试官问的问题,我竟然还犹豫了

发表于 2021-05-17 | 分类于 Java

前段时间阿粉的一个朋友和阿粉吃饭,在吃饭的时候和阿粉疯狂的吐槽面试官,说面试官问的问题都是些什么问题呀,我一个干了三四年的开发,也不说问点靠谱的,阿粉很好奇,问题问完基础的,一般不都是根据你自己的简历进行提问么?而接下来他说的出来的问题,阿粉表示,阿粉需要继续学习了。

<–more–>

Mybatis是什么?

说到这个,读者大人们肯定心想,阿粉是在开玩笑么?你一个 Java 程序员,你不知道Mybatis是啥么?不就是个持久层的框架么,这东西有啥好说的呢?但是阿粉还是要给大家说。

Mybatis是一个半自动 ORM(对象关系映射)框架,它内部封装了JDBC,加载驱动、创建连接、创建 statement 等繁杂的过程,我们开发的时候只需要关注如何编写 SQL 语句,而不用关心其他的。

为什么说 Mybatis 是一个半自动 ORM 的框架呢?

ORM,是Object和Relation之间的映射,而Mybatis 在查询关联对象或关联集合对象时,需要手动编写 sql 来完成,所以,称之为半自动 ORM 框架,而Hibernate 属于全自动 ORM 映射工具,使用 Hibernate 查询关联对象或者关联集合对象时,可以根据对象关系模型直接获取,所以它是全自动的。

这也是为什么有些面试官在面试初级程序员的时候,很喜欢说,你觉得 Mybatis , 和 Hibernate 都有什么优缺点,为啥你们选择使用的 Mybatis 而不选择使用 Hibernate 呢?

我们都说了 Mybatis是什么了,接下来肯定需要说说面试官都问了什么问题,能让阿粉的朋友变得非常犹豫。

Mybatis的一级、二级缓存是什么你了解么?

Mybatis 的一级缓存

我们先说 Mybatis 的一级缓存,因为这是如果不手动配置,他是自己默认开启的一级缓存,一级缓存只是相对于同一个 SqlSession 而言,参数和SQL完全一样的情况下,我们使用同一个SqlSession对象调用一个Mapper方法,往往只执行一次SQL,因为使用SelSession第一次查询后,MyBatis会将其放在缓存中,以后再查询的时候,如果没有声明需要刷新,并且缓存没有超时的情况下,SqlSession都会取出当前缓存的数据,而不会再次发送SQL到数据库。

当我们面试的时候,说完这个,一般情况下,面试官一定会追问下去,毕竟技术就是要问到你的知识盲区才会停止。

那我们就来画个图表示一下一级缓存

那面试官肯定会说,直接从数据库查不就行了,为啥要一级缓存呢?

当我们使用MyBatis开启一次和数据库的会话时, MyBatis 会创建出一个 SqlSession 对象表示一次与数据库之间的信息传递,在我们执行 SQL 语句的过程中,们可能会反复执行完全相同的查询语句,如果不采取一些措施,我们每一次查询都会查询一次数据库,而如果在极短的时间内做了很多次相同的查询操作,那么这些查询返回的结果很可能相同。

也就是说,如果我们在短时间内,频繁的去执行一条 SQL ,查询返回的结果本来应该是改变了,但是我们查询出来的时候,会出现结果一致的情况,正是为了解决这种问题,也为了减轻数据库的开销,所以 Mybatis 默认开启了一级缓存。

Mybatis 的二级缓存

Mybatis 的二级缓存一般如果你不对他进行设置,他是不会开启的,而二级缓存是什么呢? Mybatis 中的二级缓存实际上就是 mapper 级别的缓存,而这时候肯定会有人说,那么不同之间的 Mapper 是同一个缓存么?

答案是否定的,他不是一个,Mapper 级别的缓存实际上就是相同的 Mapper 使用的是一个二级缓存,但是在二级缓存中,又有多个不同的 SqlSession ,而不同的 Mapper 之间的二级缓存也就是互相不会影响的。

就类似下面的图

这二级缓存是不是就看起来有点意思了?

那怎么能够开启二级缓存呢?

1.MyBatis 配置文件

1
2
3
<settings>
	<setting name = "cacheEnabled" value = "true" />
</settings>

2.MyBatis 要求返回的 POJO 必须是可序列化的

3.Mapper 的 xml 配置文件中加入 标签

既然我们想要了解这个二级缓存,那么必然,我们还得知道它里面的配置都有哪些含义。

我们先从标签看起,然后从源码里面看都有哪些配置信息提供给我们使用:

blocking : 直译就是调度,而在 Mybatis 中,如果缓存中找不到对应的 key ,是否会一直 blocking ,直到有对应的数据进入缓存。

eviction : 缓存回收策略

而缓存回收策略,在源码中是有直接体现的,那么他们分别都对应了什么形式呢?

1
2
3
4
5
typeAliasRegistry.registerAlias("PERPETUAL", PerpetualCache.class);
typeAliasRegistry.registerAlias("FIFO", FifoCache.class);
typeAliasRegistry.registerAlias("LRU", LruCache.class);
typeAliasRegistry.registerAlias("SOFT", SoftCache.class);
typeAliasRegistry.registerAlias("WEAK", WeakCache.class);
  • PERPETUAL : 选择 PERPETUAL 来命名缓存,暗示这是一个最底层的缓存,数据一旦存储进来,永不清除.好像这种缓存不怎么受待见。

  • FIFO : 先进先出:按对象进入缓存的顺序来移除它们

  • LRU : 最近最少使用的:移除最长时间不被使用的对象。

  • SOFT : 软引用:移除基于垃圾回收器状态和软引用规则的对象。

  • WEAK : 弱引用:更积极地移除基于垃圾收集器状态和弱引用规则的对象。

大家虽然看着 PERPETUAL 排在了第一位,但是它可不是默认的,在 Mybatis 的缓存策略里面,默认的是 LRU 。

PERPETUAL :

源代码如下:

1
2
3
4
5
6
public class PerpetualCache implements Cache {
  private final String id;
  private Map<Object, Object> cache = new HashMap<>();
  public PerpetualCache(String id) {
    this.id = id;
  }

恩?看着是不是有点眼熟,它怎么就只是包装了 HashMap ? 你还别奇怪,他还真的就是使用的 HashMap ,不得不说,虽然人家是使用的 HashMap ,但是那可是比咱们写的高端多了。

既然使用 HashMap ,那么必然就会有Key,那么他们的Key是怎么设计的?

CacheKey:

1
2
3
4
5
6
7
8
9
10
11
12
public class CacheKey implements Cloneable, Serializable {
  private static final long serialVersionUID = 1146682552656046210L;
  public static final CacheKey NULL_CACHE_KEY = new NullCacheKey();
  private static final int DEFAULT_MULTIPLYER = 37;
  private static final int DEFAULT_HASHCODE = 17;
  private final int multiplier;
  private int hashcode; //用于表示CacheKey的哈希码
  private long checksum; //总和校验,当出现复合key的时候,分布计算每个key的哈希码,然后求总和
  private int count;//当出现复合key的时候,计算key的总个数
  // 8/21/2017 - Sonarlint flags this as needing to be marked transient.  While true if content is not serializable, this is not always true and thus should not be marked transient.
  private List<Object> updateList;//当出现复合key的时候,保存每个key
  

确实牛逼,至于内部如何初始化,如何进行操作,大家有兴趣的可以去阅读一下源码,导入个源码包,打开自己看一下。

FIFO: 先进先出缓冲淘汰策略

1
2
3
4
5
6
7
8
9
10
11
12
public class FifoCache implements Cache {

  private final Cache delegate; //被装饰的Cache对象
  private final Deque<Object> keyList;//用于记录key 进入缓存的先后顺序
  private int size;//记录了缓存页的上限,超过该值需要清理缓存(FIFO)

  public FifoCache(Cache delegate) {
    this.delegate = delegate;
    this.keyList = new LinkedList<>();
    this.size = 1024;
  }

在 FIFO 淘汰策略中使用了 Java 中的 Deque,而 Deque 一种常用的数据结构,可以将队列看做是一种特殊的线性表,该结构遵循的先进先出原则。Java中,LinkedList实现了Queue接口,因为LinkedList进行插入、删除操作效率较高。

当你看完这个源码的时候,是不是就感觉源码其实也没有那么难看懂,里面都是我们已经掌握好的知识,只不过中间做了一些操作,进行了一些封装。

LRU : 最近最少使用的缓存策略

而 LUR 算法,阿粉之前都说过,如果对这个算法感兴趣的话,文章地址给大家送上,经典的 LRU 算法,你真的了解吗?

而我们需要看的源码则是在 Mybatis 中的源码,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LruCache implements Cache {

  private final Cache delegate;
  private Map<Object, Object> keyMap;
  private Object eldestKey;//记录最少被使用的缓存项key

  public LruCache(Cache delegate) {
    this.delegate = delegate;
    setSize(1024);//重新设置缓存的大小,会重置KeyMap 字段 如果到达上限 则更新eldestKey
  }
    public void putObject(Object key, Object value) {
      delegate.putObject(key, value);
      // 删除最近未使用的key
      cycleKeyList(key);
    }

SOFT: 基于垃圾回收器状态和软引用规则的对象

在看到基于垃圾回收器的时候,阿粉就已经开始兴奋了,竟然有GC的事情,那还不赶紧看看,这如此高大上(装杯)的事情,来瞅瞅吧!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class SoftCache implements Cache {
  //在SoftCache 中,最近使用的一部分缓存项不会被GC回收,这就是通过将其value添加到
  private final Deque<Object> hardLinksToAvoidGarbageCollection;
  //引用队列,用于记录GC回收的缓存项所对应的SoftEntry对象
  private final ReferenceQueue<Object> queueOfGarbageCollectedEntries;
  //底层被修饰的Cache 对象
  private final Cache delegate;
  //连接的个数,默认是256
  private int numberOfHardLinks;

  public SoftCache(Cache delegate) {
    this.delegate = delegate;
    this.numberOfHardLinks = 256;
    this.hardLinksToAvoidGarbageCollection = new LinkedList<>();
    this.queueOfGarbageCollectedEntries = new ReferenceQueue<>();
  }
  
  public void putObject(Object key, Object value) {
      // 清除被GC回收的缓存项
      removeGarbageCollectedItems();
      // 向缓存中添加缓存项
      delegate.putObject(key, new SoftEntry(key, value, queueOfGarbageCollectedEntries));
    }
   public Object getObject(Object key) {
       Object result = null;
       // 查找对应的缓存项
       @SuppressWarnings("unchecked") // assumed delegate cache is totally managed by this cache
       SoftReference<Object> softReference = (SoftReference<Object>) delegate.getObject(key);
       if (softReference != null) {
         result = softReference.get();
         // 已经被GC 回收
         if (result == null) {
           // 从缓存中清除对应的缓存项
           delegate.removeObject(key);
         } else {
           // See #586 (and #335) modifications need more than a read lock 
           synchronized (hardLinksToAvoidGarbageCollection) {
             hardLinksToAvoidGarbageCollection.addFirst(result);
             if (hardLinksToAvoidGarbageCollection.size() > numberOfHardLinks) {
               hardLinksToAvoidGarbageCollection.removeLast();
             }
           }
         }
       }
       return result;
     }
    public void clear() {
        synchronized (hardLinksToAvoidGarbageCollection) {
          // 清理强引用集合
          hardLinksToAvoidGarbageCollection.clear();
        }
        // 清理被GC回收的缓存项
        removeGarbageCollectedItems();
        delegate.clear();
      }
    //其中指向key的引用是强引用,而指向value的引用是弱引用
    private static class SoftEntry extends SoftReference<Object> {
      private final Object key;
  
      SoftEntry(Object key, Object value, ReferenceQueue<Object> garbageCollectionQueue) {
        super(value, garbageCollectionQueue);
        this.key = key;
      }
    }

WEAK : 基于垃圾收集器状态和弱引用规则的对象

1
2
3
4
5
6
7
8
9
10
11
12
public class WeakCache implements Cache {
  private final Deque<Object> hardLinksToAvoidGarbageCollection;
  private final ReferenceQueue<Object> queueOfGarbageCollectedEntries;
  private final Cache delegate;
  private int numberOfHardLinks;

  public WeakCache(Cache delegate) {
    this.delegate = delegate;
    this.numberOfHardLinks = 256;
    this.hardLinksToAvoidGarbageCollection = new LinkedList<>();
    this.queueOfGarbageCollectedEntries = new ReferenceQueue<>();
  }

WeakCache在实现上与SoftCache几乎相同,只是把引用对象由SoftReference软引用换成了WeakReference弱引用。

在这里阿粉也就不再多说了,关于 Mybatis 的二级缓存,你了解了么?下次遇到面试官问这个的时候,你应该知道怎么成功(装杯)不被打了吧。

阅读全文 »

笔试必刷的动态规划进阶题

发表于 2021-05-09 | 分类于 Java

Hello 大家好,我是阿粉,前面有篇文章给大家介绍了动态规划,并通过两个案例给大家演示,后台很多小伙伴也提供了很多建议,没看过的小伙伴可以去看下什么是动态规划——从青蛙跳台阶开始了解。今天再给大家介绍两个案例,帮助大家更好的掌握也顺便回顾一下。

案例 1

问:给定一个包含非负整数的 m x n 网格,请找出一条从左上角到右下角的路径,使得路径上的数字总和为最小,其中 arr[m][n] 表示具体的值。每次只能向下或者向右移动一步。这个问题是上篇文章中的案例的进阶篇。

思考

根据上篇文章提供的思路,我们依次进行相关的步骤:

  1. 定义变量:我们定义从左上角走到(i, j) 这个位置时,最小的路径和是 dp[i - ][j - 1]。那么,dp[m-1] [n-1] 就是我们要的答案;
  2. 寻找关系:dp[i][j] = min(dp[i-1][j],dp[i][j-1]) + arr[i][j]; arr[i][j] 表示网格中的数值,到达当前格子的最小路径等于左边或者上边中较小的路径加上格子本身的数值;
  3. 定义初始值:dp[i][0] = dp[i-1][0] + arr[i][0];,dp[0][i] = dp[0][i-1] + arr[0][i];;第一行或者第一列的时候就是整行和整列的数值累加。

编码

上面的分析可以想到,那么接下来我们就需要用代码来实现了,对于需要使用到之前的记录,我们可以考虑用二维数组来记录,所以就有了下面的这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public int dp(int[][] arr) {
    int m = arr.length;
  	int n = arr[0].length;
    if (m <=0 || n <= 0) {
        return 0;
    }
    int[][] dp = new int[m][n];
    // 初始化
  	dp[0][0] = arr[0][0];
  	// 初始化最左边的列
  	for(int i = 1; i < m; i++){
      dp[i][0] = dp[i-1][0] + arr[i][0];
    }
  	// 初始化最上边的行
  	for(int i = 1; i < n; i++){
      dp[0][i] = dp[0][i-1] + arr[0][i];
    }
    
    for (int i = 1; i < m; i++) {
        for (int j = 1; j < n; j++) {
            dp[i][j] = Math.min(dp[i-1][j],dp[i][j-1]) + arr[i][j];
        }
    }
    return dp[m - 1][n - 1];
}

解释下上面的代码,首先我们创建了一个二维数组 dp[m][n],用于记录到达位置的最短路径,由于当前的路径是由左边或者上边的最小路径加上当前格子的数值得到。这里我们需要找到对应关系,也就是dp[i][j] = min(dp[i-1][j],dp[i][j-1]) + arr[i][j],这里我们需要取相邻的最小值再加上当前格子的数值。

案例 2

问:给定不同面额的硬币 coins 和一个总金额 amount。编写一个函数来计算可以凑成总金额所需的最少的硬币个数。如果没有任何一种硬币组合能组成总金额,返回 -1。你可以认为每种硬币的数量是无限的。 Leetcode 322. 零钱兑换。

思考

  1. 定义变量:定义 dp[i] 表示凑成金额 i ,所需要的最少硬币个数,即 dp[amount] 则是我们需要求解的;
  2. 寻找关系:假设我们有三种硬币a,b,c,兑换的金币数为 m,我们可以推出 dp[m] = min(dp[m - a], dp[m - b], dp[m - c]) + 1,因为如果我们是需要求 m 金额的最少硬币个数,如果我们求出了 m - a 金额需要的硬币个数,在加上一个 a 就可以得到 m,硬币个数只要加 1。其实 b,c 同理。并且我们需要取所有硬币种类的最小数。
  3. 定义初始值:dp[0] = 0,没有金额当时也不需要硬币个数,dp[i - coins[j] 需要有解;

编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public int dp(int[] coins, int amount) {

        int[] dp = new int[amount + 1];
        int size = coins.length;
        int i = 0;
        int j = 0;
  			# 定义初始值
        dp[0] = 0;
        for (i = 1; i <= amount; i++) {
            #赋值,当不能组合和输出 -1 判断使用
            dp[i] = Integer.MAX_VALUE;
          	# 遍历 coins 中的硬币种类
            for (j = 0; j < size; j++) {
                if (i >= coins[j] && (dp[i - coins[j]]) != Integer.MAX_VALUE) {
                    dp[i] = Math.min(dp[i - coins[j]] + 1, dp[i]);
                }
            }
        }
        if (dp[amount] == Integer.MAX_VALUE) {
            dp[amount] = -1 ;
        }
        return dp[amount];
    }

总结

动规划的题目在 LeetCode 上面有很多,大家可以根据上面提供的思路去多刷几道题,慢慢就会有感觉了,刷完动态规划的题目,相信对大家工作或者找工作肯定有很大的帮助。https://leetcode-cn.com/problemset/all/?topicSlugs=dynamic-programming

阅读全文 »

当你发现你同事是培训出来的,你会怎么做

发表于 2021-05-09 | 分类于 Java

大家看到这个标题的时候,是不是感到有些惊讶,为什么阿粉能够这么敏锐,这都发现同事是培训出来的,但是还不好戳穿人家,毕竟人家面试都能通过,说明了技术是不是已经上升到这个水平了,阿粉今天就来给大家说说这是怎么一回事。

<–more–>

兄弟,这块代码应该怎么写?

阿粉相信大家肯定都遇到过这种情况,当一个同事新入职之后,一般领导给出的活都是第一天安装环境,第二天熟悉数据库和代码,第三天一般还是会让你熟悉公司的业务逻辑,可能一个礼拜之后,领导就会给你一点比较简单的工作交给你来实现,而这个时候,也相当于是对你的第一次考验开始了,这个时候一般人都是会先把需求问清楚,然后不明白的问题一般都会直接去问领导或者带你的那个人,但是,阿粉遇到了一个极其奇葩的哥们,上来问了一句,兄弟,这块的代码应该怎么写?

What?

你在逗我么?

阿粉心中已经开始有点崩溃了,我要是告诉你这块代码怎么写,那阿粉都胜过自己去写了,但是本着人道主义的精神,阿粉还是比较仗义的,把需求给他重新讲了一次,然后告诉他代码需要放到什么位置去,至于代码如何启动,没有特别的地方,无非就是自己装个环境,然后把代码down下来,然后改一下配置让代码运行起来就可以了。

于是第一幕结束,阿粉也没有在多说什么,毕竟新来的同事,还是需要给他温暖的问候的。而之后的两个事情,阿粉就从他的一系列的迷之操作中,判断出了他确实是培训出来的。

代码提交引发的崩溃

在阿粉的印象中,无论是工作一年还是工作2年的程序员,相信如果之前使用的不管是svn还是git,只要是属于在工作中使用过其中的任意一种,那么肯定会在工作之余会主动的去学习另外一种代码管理的工具,毕竟对于这代码提交,现在很多稍微新一点的公司都很少选择使用svn,除了一些比较老的公司,成立时间比较早,所以使用的还是svn,而阿粉目前的公司使用的是git。

所以阿粉问这个小伙子用过git吗?得到的回复是肯定的,阿粉这时候已经很放心了,觉得毕竟之前是使用过git的人,这样的话合并代码,冲突什么的应该都能解决。于是阿粉在他写完代码之后,阿粉说可以提交了,毕竟可以去提交给测试进行测试了。

结果相信大家都能猜到了,冲突了,但是不会解决,我的天,冲突代码不会解决,这是一个有三年开发经验的程序员么?

毕竟第一次,阿粉也没说啥,而在一个月里面,他就单纯的提交代码的问题,出现了四五次,只有一次没有问题,相信大家肯定也都知道为什么没问题,因为那一个类是他自己完整写的,不是和别人写的同一段代码。

单纯的代码提交,让阿粉觉得这个同事可能实际的工作经验好像没有三年,反而像是培训出来的,而阿粉也不能直接问,“同学,你是从哪个培训机构出来的?”

代码写的质量确实不是很好

说实话,阿粉虽然开发年限不多,但是也算是看过比较多的人的代码了,毕竟之前的项目组做过CodeReview,因为领导说他如果提交代码的话,你稍微看一下,别影响了其他的功能。因为领导发话了,于是阿粉就不得已看了人家的代码,于是就在看代码的过程中让阿粉感到了这个哥们确实是培训出来的。

上面这个是网图,代码实际上公司有要求不能外露,大家见谅,但是效果差不多。

代码混乱,而且判断语句非常多,相同类型的条件可以用Or来表示的时候,他会再后面重新在判断一遍,同样的方法会从其他的方法里面粘贴过来,不会去封装一下,然后进行调用。

阿粉对培训出来的朋友没有任何偏见,毕竟也算是经过了系统的学习,然后走上工作的道路,但是阿粉一直保持的是一个信条,有多大能力拿多少工资,问心无愧,而很多朋友之前就说,进公司,拿的多和拿的少的干的活都一样,为啥不多要点?

实际上这句话阿粉感觉不太合适,因为有些机构出来的学生有少部分简历造假,简历统一写着各种电商项目,当你准备和他细谈电商的技术架构时,一问三不知,因为有些东西不是你在培训机构里面能够完整学到的,比如你们之前的开发流程,大型的项目也没有那么的简单,况且现在的电商项目都已经被几家大公司给做了,京东,淘宝,唯品会,这些都是一些典型的电商项目,如果你做项目不是为了盈利,那你做项目是用来玩的么?

就像之前微笑哥说的一样:我不以有过培训经历为耻,也不会以此为荣,这不过是我过去的一段经历,仅此而已。

作为新人如何能够快速适应新公司的开发和工作节奏呢?

  1. 与老员工一起吃饭

说实在的,当你刚刚入职的时候,很多时候都有老员工带着你一起吃饭,如果很多人去食堂或外面小饭馆吃,这个时候主动一点,跟他们一起,如果有人邀请你和他一起点外卖,那其实就可以和他一起点外卖,吃饭时,大家都会聊聊非工作但又属于公司的事情,例如哪个部门美女多,公司什么时候有年会了,什么时候有聚餐,而且也非常容易能够让你和公司的老员工们快速的认识,但是有些敏感的话题,听着就行,最好不要参与到讨论的阶段,因为不排除“有心人”。

  1. 不懂的问题要自己考虑后再问

首先程序员开发,肯定会遇到各种各样的问题,但是这些问题呢,很多情况下都是可以百度解决,尤其是技术上的问题,Google和BaiDu都会给你答案,如果是业务上的问题,自己可以先熟悉一段时间,然后如果最后业务实在是真的没有办法理解,记录下来,和同事沟通,适当的发表自己的看法,这样才会更快的上手自己公司的项目和模块。

  1. 问问题不要很low的问题都要问

首先对于提交代码这块,我不相信一个开发了两三年的程序员对于提交代码这块,每次还需要去找熟人去问,然后说怎么解决冲突,怎么提交代码,如果你不知道代码写在什么位置,什么包结构里面,那么你可以问问,但是你提交代码这块,最好还是不要问了,如果你问了,那么你只能说自己在开发这块的业务上就没达标。

关于对新人的建议,阿粉就说这么多啦。如果有需要可以好好想想自己该怎么办。

所以阿粉想问大家,如果你发现你的同事是培训出来的,你会怎么做呢?欢迎大家积极留言。

阅读全文 »

阿粉告诉你如何在前端上监听到RabbitMQ发送消息,完成数据监控呢?

发表于 2021-05-04 | 分类于 Java

之前还记得阿粉给大家讲了关于RabbitMQ的经典实用还有整合到SpringBoot项目中的案例么?最近一段时间,阿粉的朋友问我说,公司安排他让他研究一下如何在前端实现对RabbitMQ发送消息的实时监控,而这也涉及到了阿粉的知识盲区,于是阿粉就开始了学习的道路,接下来就跟着阿粉一起来学习一下这关于如何在前端监听到RabbitMQ发送消息,以便实现自己项目中的功能吧。

<–more–>

RabbitMQ支持的协议

  1. stomp协议

stomp协议即Simple (or Streaming) Text Orientated Messaging Protocol,简单(流)文本定向消息协议,它提供了一个可互操作的连接格式,允许STOMP客户端与任意STOMP消息代理(Broker)进行交互。STOMP协议由于设计简单,易于开发客户端,因此在多种语言和多种平台上得到广泛地应用。

而我们在接下来的文章里面主要讲stomp如何对RabbitMQ实现监听。

stomp协议的前身是TTMP协议(一个简单的基于文本的协议),专为消息中间件设计。

这句话就说出了,专门为了消息中间件设计的,其实他并不是针对RabbitMQ在前端使用的,而是针对整个消息中间件的使用。

2.mqtt协议

还有一种经常使用的,就是mqtt协议了,mqtt协议全称(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(Publish/Subscribe)模式的轻量级通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布,目前最新版本为v3.1.1。

mqtt协议是属于在应用层协议的,这样也就是说只要是支持TCP/IP协议栈的地方,都可以使用mqtt.

RabbitMQ开通stomp协议

安装RabbitMQ的教程阿粉就不再给大家讲了,毕竟百度上有很多文章来告诉大家如何去安装RabbitMQ,不管是Linux还是Windows的,大家只要注意的一点就是,首先先安装erlang 语言支持,不然你安装RabbitMQ是安装不上的。

开通Stomp协议:

1
2
3
4
5
rabbitmq-plugins enable rabbitmq_web_stomp
rabbitmq-plugins enable rabbitmq_web_stomp_examples
#重启
service rabbitmq-server stop && service rabbitmq-server start

当我们开启之后,在我们的RabbitMQ中使能够看到的,如图:

大家可以看到,我们正确开启之后,在RabbitMQ的控制台上,我们能够看到http/web-stomp 的端口是15674。

接下来我们就要开始写一个案例进行测试。

前端Stomp监听RabbitMQ

如果这个时候我们发送一条消息到消息队列,那么接下来他就会在页面上展示出我们需要的内容。

我们看看代码是怎么写的吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 if (typeof WebSocket == 'undefined') {
        console.log('不支持websocket')
    }

    // 初始化 ws 对象

    var ws = new WebSocket('ws://localhost:15674/ws');

    // 获得Stomp client对象
    var client = Stomp.over(ws);

    // 定义连接成功回调函数
    var on_connect = function(x) {
        //data.body是接收到的数据
        client.subscribe("/Fanout_Exchange/testMessage", function(data) {
            var msg = data.body;
            alert("收到数据:" + msg);
        });
    };

    // 定义错误时回调函数
    var on_error =  function() {
        console.log('连接错误,请重试');
    };

    // 连接RabbitMQ
    client.connect('guest', 'guest', on_connect, on_error, '/');
    console.log(">>>RabbitMQ已连接,测试正式开始");

而这里面写的内容就比较有意思了,因为之前很多人都会发现,不管怎么写,都是不行,那是因为没有完全的理解,阿粉最后总结了一下关于Stomp的使用。

总结

1./exchange/(exchangeName)

  • 对于 SUBCRIBE frame,destination 一般为/exchange/(exchangeName)/[/pattern] 的形式。该 destination 会创建一个唯一的、自动删除的、名为(exchangeName)的 queue,并根据 pattern 将该 queue 绑定到所给的 exchange,实现对该队列的消息订阅。

  • 对于 SEND frame,destination 一般为/exchange/(exchangeName)/[/routingKey] 的形式。这种情况下消息就会被发送到定义的 exchange 中,并且指定了 routingKey。

2./queue/(queueName)

  • 对于 SUBCRIBE frame,destination 会定义(queueName)的共享 queue,并且实现对该队列的消息订阅。

  • 对于 SEND frame,destination 只会在第一次发送消息的时候会定义(queueName)的共享 queue。该消息会被发送到默认的 exchange 中,routingKey 即为(queueName)。

3./amq/queue/(queueName)

  • 这种情况下无论是 SUBCRIBE frame 还是 SEND frame 都不会产生 queue。但如果该 queue 不存在,SUBCRIBE frame 会报错。

  • 对于 SUBCRIBE frame,destination 会实现对队列(queueName)的消息订阅。

  • 对于 SEND frame,消息会通过默认的 exhcange 直接被发送到队列(queueName)中。

4./topic/(topicName)

  • 对于 SUBCRIBE frame,destination 创建出自动删除的、非持久的 queue 并根据 routingkey 为(topicName)绑定到 amq.topic exchange 上,同时实现对该 queue 的订阅。

  • 对于 SEND frame,消息会被发送到 amq.topic exchange 中,routingKey 为(topicName)。

关于如何在前端监听RabbitMQ消息,你学会了么?

阅读全文 »

从青蛙跳台阶开始了解什么是动态规划

发表于 2021-04-26 | 分类于 Java

Hello 大家好,我是阿粉,动态规划(Dynamic Programming),简称 DP 相信大家在日常的工作或者学习的过程中都遇到过这个词,而且动态规划也是面试过程中最喜欢被问到的题目,阿粉在经历的不多的几场面试中都被问到了,实在是苦不堪言,不过好在阿粉还是有学过的,一些简单的套路阿粉还是懂的。下面就从一个很多人应该都不陌生的题目讲起。

案例 1

问:一只青蛙一次可以跳上 1 级台阶,也可以跳上 2 级,求该青蛙跳上一个 n 级的台阶总共有多少种跳法?

思考

刚开始看到这个题目的时候可能没什么思路,不过我们可以一点点的想下去,我们假设青蛙跳上一个 n 级的台阶总共有多少种跳法 f(n) 种跳法,那当 n = 0 时,f(0) = 0,没有台阶当然没有跳法。n = 1,f(1) = 1;只有一个台阶的时候,只能跳 1 个;n = 2,f(2) = 2,当有两个台阶的时候,可以有 2 种跳法,一个一个跳和一下跳 2 个,那如果我们有三个台阶的话,是不是将一个台阶和两个台阶的总和加起来就可以了呢?所以我们就可以想到 f(3) = f(2) + f(1),所以我们能推导出 f(n) = f(n - 1) + f(n - 2);

编码

上面的分析可以想到,那么接下来我们就需要用代码来实现了,对于需要使用到之前的记录,我们可以考虑用一维数组来记录,所以就有了下面的这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public int dp(int n) {
    if (n <= 0) {
        return 0;
    }
    int[] dp = new int[n + 1];
    dp[0] = 0;
    dp[1] = 1;
    dp[2] = 2;
   // 之所以要从 3 开始,是因为 2 不符合下面的规则
    for (int i = 3; i <= n; i++) {
        dp[i] = dp[i - 1] + dp[i - 2];
    }
    return dp[n];
}

解释下上面的代码,首先我们创建了一个一维数组 dp,用于记录每个台阶有的跳法,然后从索引三开始遍历,运用公式f(n) = f(n - 1) + f(n - 2); 进行赋值,结果直接输出 dp[n] 对应的数值即可。

分析

通过上面的案例,我们思考一下对于动态规划的题目我们需要怎么做,我们一开始定义了 n 级台阶有 f(n) 种跳法,然后通过模拟的方式计算出 f(0),f(1),f(2),接着我们找到了 f(n) = f(n - 1) + f(n - 2); 的关系。按照这种思路我们可以总结出三个步骤,分别是

  1. 定义变量:把已知的和需要求解的,定义出变量,如上面的 n 和 f(n);
  2. 寻找表达式:找到 f(n) 和 f(n - 1) 以及 f(n - 2),等情况的表达式,如上面的 f(n) = f(n - 1) + f(n - 2),这一步往往是最难的;
  3. 寻找初始值:确保找到所有的临界条件,如上面的 f(0) = 0, f(1) = 1, f(2) = 2;

上面的步骤是通用步骤,往往在第一步的时候我们设置的 f(n) 是一个数组,根据具体的场景可能是一维数组也有可能是二维数组,上面的例子我们定义的就是一维数组,而且往往我们需要求解什么就定义什么数组。

下面我们通过这种方式再看一道 LeetCode 的原题

案例 2

问:一个机器人位于一个 m x n 网格的左上角 (起始点在下图中标记为“Start” )。机器人每次只能向下或者向右移动一步。机器人试图达到网格的右下角(在下图中标记为“Finish”)。问总共有多少条不同的路径?

img

根据上面的三个步骤,我们依次来解决,既然是 m x n 的网格,很显然我们需要用二维数组来解决问题,所以我们

  1. 定义 d[m][n] 表示在 m x n 网格上移到右下角需要的总步数;
  2. 因为机器人只能向右和向下移动,所以到达下一个格子只能是从左边或者上面,所以达到 m x n 的步数等于(m - 1) x n + m x (n - 1),也就是 d[m][n] = d[m - 1][n] + d[m][n - 1];
  3. 定义初始值d[0][n] = 1,d[n][0] = 1,也就是只有一行或者一列的时候只有一种方法,全部向下或者向右移动;

编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public int dp(int m, int n) {
    if (m <=0 || n <= 0) {
        return 0;
    }
    int[][] dp = new int[m][n];
    //只有一列的时候
    for (int i = 0; i < m; i++) {
        d[i][0] = 1;
    }
    //只有一行的时候
     for (int i = 0; i < n; i++) {
        d[0][i] = 1;
    }
   
    for (int i = 1; i < m; i++) {
        for (int j = 1; j < n; j++) {
            d[i][j] = d[i][j - 1] + d[i - 1][j];
        }
    }
    //数组的下标从 0 开始
    return d[m - 1][n - 1];
}

通过上面的三个步骤,我们可以完美的解决问题,动态规划的问题难点就在于找寻规律和初始值,有点时候如果我们找不到规律就没办法了,而且如果初始值找的不完全也会有问题,这个只能多多练习了。

总结

动规划的题目在 LeetCode 上面有很多,大家可以根据上面提供的思路去多刷几道题,慢慢就会有感觉了,刷完动态规划的题目,相信对大家工作或者找工作肯定有很大的帮助。

阅读全文 »

自己动手开发了一个 SpringMVC 框架,用起来太香了

发表于 2021-04-25 | 分类于 java

看完本文,你一定会有所收获

阅读全文 »

你要的不固定列excel导入导出,它来啦!

发表于 2021-04-18 | 分类于 java

看完本文,你一定会有所收获

阅读全文 »

Springboot 项目集成 Nacos 实现服务注册发现与配置管理

发表于 2021-04-12 | 分类于 Java

Hello 大家好,我是阿粉,前面的文章给大家介绍了一下如何在本地搭建微服务环境下的服务注册中心和配置管理中心 Nacos,今天通过

我们通过使用 SpringBoot 项目集成 Nacos 来给大家演示一下是如何使用 Nacos 来实现服务发现和配置管理的。

启动 Nacos 服务

启动完本地搭建的 Nacos 服务后,我们可以看到,目前的服务管理下面的服务列表里面在三个命名空间下都没有服务,这是正常的,因为目前我们还没有服务接入 Nacos。

Nacos 服务启动成功后,我们再创建两个 SpringBoot 项目,一个用于接入 Nacos 服务注册与发现和配置中心作为服务提供者 Producer,另一个只接入 Nacos的服务注册与发现,调用 Producer 获取配置中心的参数,我们叫做 Consumer。

服务提供者 Producer

  1. 我们首先创建一个 SpringBoot 的项目,bootstrap.properties 文件内容如下:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
     spring.application.name=producer
    
     #######################配置中心配置#################################
     # 指定的命名空间,只会在对应的命名空间下查找对应的配置文件
     spring.cloud.nacos.config.namespace=caeser-adsys-naming
     spring.cloud.nacos.config.file-extension=properties
     # 配置的分组名称
     spring.cloud.nacos.config.group=TEST1
     # 配置文件,数组形式,可以多个,依次递增
     spring.cloud.nacos.config.ext-config[0].data-id=com.example.properties
     spring.cloud.nacos.config.ext-config[0].group=TEST1
     # 配置中心的地址
     spring.cloud.nacos.config.server-addr=127.0.0.1:8848
     #启用自动刷新对应的配置文件
     spring.cloud.nacos.config.ext-config[0].refresh=true
     ######################服务注册发现配置##################################
    
     # 服务集群名称
     spring.cloud.nacos.discovery.cluster-name=TEST1_GROUP
     # 服务注册中心的地址
     spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
     # 服务的命名空间
     spring.cloud.nacos.discovery.namespace=caeser-adsys-naming
    
    
  2. application.properties 的文件内容如下,主要就是一个端口,其他配置根据情况自行添加或删除就好:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
     # 服务启动的端口
     server.port=8080
     spring.main.allow-bean-definition-overriding=true
     # tomcat 配置
     server.tomcat.max-threads=500
     spring.mvc.servlet.load-on-startup=1
     spring.servlet.multipart.max-file-size=40MB
     spring.servlet.multipart.max-request-size=100MB
     # 日志配置
     logging.level.root=info
     logging.level.com.alibaba=error
     logging.pattern.console=%clr{[%level]}{green} [%d{yyyy-MM-dd HH:mm:ss}] %clr{[${PID:-}]}{faint} %clr{[%thread]}{magenta} %clr{[%-40.40logger{80}:%line]}{cyan} %msg%n
    
    
  3. 在启动类上面增加如下注解

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    package com.ziyou.nacos.demo.producer;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
       
    @SpringBootApplication(scanBasePackages = "com.ziyou.nacos")
    @EnableDiscoveryClient
    @EnableCaching
    public class ProducerApplication {
       
        public static void main(String[] args) {
            SpringApplication.run(ProducerApplication.class, args);
        }
    }
    
  4. pom.xml 文件内容如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    <?xml version="1.0" encoding="UTF-8"?>
       
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
       
      <parent>
        <groupId>org.example</groupId>
        <artifactId>nacos-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
      </parent>
       
      <artifactId>producer</artifactId>
      <version>1.0-SNAPSHOT</version>
       
      <name>producer Maven Webapp</name>
      <!-- FIXME change it to the project's website -->
      <url>http://www.example.com</url>
       
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <spring.maven.plugin.version>2.2.2.RELEASE</spring.maven.plugin.version>
      </properties>
       
      <dependencies>
        <!-- Spring Boot -->
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter</artifactId>
          <exclusions>
            <exclusion>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-logging</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-log4j2</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
       
        <!-- nacos 配置中心 -->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
        </dependency>
        <!-- nacos 注册发现 -->
        <dependency>
          <groupId>com.alibaba.cloud</groupId>
          <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
      </dependencies>
       
      <build>
        <!--指定下面的目录为资源文件-->
        <resources>
          <!--设置自动替换-->
          <resource>
            <directory>src/main/resources</directory>
            <filtering>true</filtering>
            <includes>
              <include>**/**</include>
            </includes>
          </resource>
        </resources>
        <finalName>producer</finalName>
        <plugins>
          <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
            <version>${spring.maven.plugin.version}</version>
            <executions>
              <execution>
                <goals>
                  <goal>repackage</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </project>
    
  5. 在 Producer 侧提供一个获取配置里面内容的接口,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    package com.ziyou.nacos.demo.producer.controller;
       
    import com.ziyou.nacos.demo.producer.config.UserConfig;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
       
    /**
     * <br>
     * <b>Function:</b><br>
     * <b>Author:</b>@author ziyou<br>
     * <b>Date:</b>2021-04-11 19:59<br>
     * <b>Desc:</b>无<br>
     */
    @RestController
    @RequestMapping(value = "producer")
    public class ProducerController {
       
        private UserConfig userConfig;
       
        @GetMapping("/getUsername")
        private String getUsername() {
            String result = userConfig.getUsername() + "-" + userConfig.getPassword();
            System.out.println(result);
            return result;
        }
       
        @Autowired
        public void setUserConfig(UserConfig userConfig) {
            this.userConfig = userConfig;
        }
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    package com.ziyou.nacos.demo.producer.config;
       
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.cloud.context.config.annotation.RefreshScope;
    import org.springframework.stereotype.Component;
       
    /**
     * <br>
     * <b>Function:</b><br>
     * <b>Author:</b>@author ziyou<br>
     * <b>Date:</b>2021-04-11 20:39<br>
     * <b>Desc:</b>无<br>
     */
    @RefreshScope
    @Component
    public class UserConfig {
        @Value("${username}")
        private String username;
        @Value("${password}")
        private String password;
       
        public String getUsername() {
            return username;
        }
       
        public void setUsername(String username) {
            this.username = username;
        }
       
        public String getPassword() {
            return password;
        }
       
        public void setPassword(String password) {
            this.password = password;
        }
    }
       
    
  6. 启动 Producer,并且手动调用接口,启动 Producer 过后,我们在 Nacos 的服务注册列表可以看如下所示的内容,在 test1 的命名空间下,已经有了我们创建的 Producer 服务。

  7. 通过手动调用 Producer 的接口 http://127.0.0.1:8080/producer/getUsername 显示如下内容并且我们看下此时 Nacos 的配置中心里面配置文件com.example.properties 里面的内容正是这个,这个时候我们手动把配置里面password 参数的值改成JavaGeek666,再次访问接口,我们会发现接口的输出也自动改变了。

    修改配置内容如下:

    再次访问结果如下:

服务调用者 Consumer

前面我们已经完成了 Producer 的服务注册与配置动态生效的功能,这个时候基本已经可以使用了,不过我们还需要更进一步通过 Nacos 来实现服务发现,接下来我们创建 Consumer 的 SpringBoot 的项目,配置文件和pom.xml 文件基本一致,只要修改端口以及对应地方,下面贴一下不一样的地方

  1. boostrap.properties 内容如下,因为这里我们只调用Producer 的接口,不需要接入 Nacos 的配置中心,所以这里只配置发服务注册与发现

    1
    2
    3
    4
    5
    6
    spring.application.name=consumer
       
    ######################服务注册发现配置##################################
    spring.cloud.nacos.discovery.cluster-name=TEST1_GROUP
    spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848
    spring.cloud.nacos.discovery.namespace=caeser-adsys-naming
    
  2. 启动类,配置上 feignClient 需要扫描的包路径

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    package com.ziyou.nacos.demo.consumer;
       
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cache.annotation.EnableCaching;
    import org.springframework.cloud.openfeign.EnableFeignClients;
       
    /**
     * <br>
     * <b>Function:</b><br>
     * <b>Author:</b>@author ziyou<br>
     * <b>Date:</b>2021-04-11 17:07<br>
     * <b>Desc:</b>无<br>
     */
    @SpringBootApplication(scanBasePackages = "com.ziyou.nacos")
    @EnableFeignClients(basePackages = {"com.ziyou.nacos.demo.consumer.rpc"})
    @EnableCaching
    public class ConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class, args);
        }
    }
       
    
  3. 编写调用 Producer 的接口,FeignClient 里面的 value 就是 Producer 的应用名称

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package com.ziyou.nacos.demo.consumer.rpc;
       
    import org.springframework.cloud.openfeign.FeignClient;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.GetMapping;
       
    /**
     * <br>
     * <b>Function:</b><br>
     * <b>Author:</b>@author ziyou<br>
     * <b>Date:</b>2021-04-11 20:01<br>
     * <b>Desc:</b>无<br>
     */
    @FeignClient(value = "producer")
    @Component
    public interface IProducerFeign {
        /**
         * 获取生产者名称接口
         *
         * @return
         */
        @GetMapping("/producer/getUsername")
        String getUsername();
       
    }
       
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    package com.ziyou.nacos.demo.consumer.controller;
       
    import com.ziyou.nacos.demo.consumer.rpc.IProducerFeign;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
       
    /**
     * <br>
     * <b>Function:</b><br>
     * <b>Author:</b>@author ziyou<br>
     * <b>Date:</b>2021-04-11 19:59<br>
     * <b>Desc:</b>无<br>
     */
    @RestController
    @RequestMapping(value = "consumer")
    public class TestNacosController {
       
        private IProducerFeign iProducerFeign;
       
        @GetMapping("/testNacos")
        private String testNacos() {
            return iProducerFeign.getUsername();
        }
       
        @Autowired
        public void setiProducerFeign(IProducerFeign iProducerFeign) {
            this.iProducerFeign = iProducerFeign;
        }
    }
       
    
  4. 启动 Consumer,我们可以看到在 Nacos 如下图所示

  5. 调用 Consumer 的接口consumer/testNacos,结果如下图所示,同样的如果此时更改了 Nacos 配置文件中的内容,Consumer 这边也是可以实时更新的,感兴趣的小伙伴可以自己试试。

今天主要给大家介绍了一下如何通过 SpringBoot 项目来接入 Naocs 实现服务注册与发现,以及配置管理和动态刷新,相关的代码已经上传到 GitHub 了,公众号回复【源码】获取地址。

阅读全文 »

深度掌握java stream 流操作,让你的代码高出一个逼格!

发表于 2021-04-11 | 分类于 java

看完本文,你一定会有所收获

阅读全文 »

使用uuid作为数据库主键,被技术总监怼了一顿!

发表于 2021-04-11 | 分类于 java

看完本文,你一定会有所收获

阅读全文 »

手把手教你在本地搭建 Nacos 服务注册和配置管理中心

发表于 2021-04-09 | 分类于 Java

Hello 大家好,我是阿粉,现如今微服务早就成为每个互联网公司必备的架构,在微服务中服务的注册发现和统一配置中心是不可或缺的重要角色。现有的服务注册中心主要有 ZooKeeper、Eureka、Consul ,Etcd、Nacos,而统一配置中心主要有 Apollo,Spring Cloud Config,Disconf,Nacos 可以看到服务注册中心和统一配置管理有很多,而且每一种都是网上比较流行的,使用人数也偏多,但是我们可以发现只有 Nacos 是集两者于一身的。这篇文章主要给大家介绍一下 Naocs 的基本信息以及安装步骤,后面的文章再带大家更深入的了解一下Nacos。

Nacos 是什么

打开网站 Nacos 的首页,我们可以看到下面的内容,写到 Nacos 是一个更易于构建云原生应用的动态服务发现、配置管理和服务管理平台。

Nacos 致力于帮助我们发现、配置和管理微服务,提供了一组简单易用的特性集,帮助我们快速实现动态服务发现、服务配置、服务元数据及流量管理。

Nacos 本地搭建

Nacos 的安装可以通过安装包或者源码来时候,这里我们为了以后方便调试,所以选择采用源码来安装。

  1. 第一步在 GitHub 上面下载源码,这里可以使用 HTTP 协议也可以使用 SSH,阿粉这里为了简单就直接采用 HTTP 协议来下载了:git clone https://github.com/alibaba/nacos.git;
  2. 下载完成过后,我们使用 IDEA 的 File》Open 打开项目,如下图
  3. 刚打开后我们会看到有很多个模块,从模块的命名我们大概可以猜到有些模块是干嘛的,比如有 auth 权限模块,config 配置模块等这篇文章我们主要看安装和使用,后面慢慢深入了解具体的细节。
  4. 拉取下来过后,我们需要创建数据库,我们找到console 模块下的数据库脚本和配置文件,如下图,在 MySQL 数据库中创建一个名为nacos 的数据库,然后执行schema.sql 脚本,创建相应的表结构,同时在 application.properties 文件中找到对应配置数据源的地方,配置好账号密码。这里对于 MySQL 的数据库的搭建和创建不在这篇文章的讨论范围里面就不提了。
  5. 配置好数据库过后,在控制台执行 mvn 命令 mvn -Prelease-nacos -Dmaven.test.skip=true -Drat.skip=true clean install -U,编译一下,然后执行 console 模块下面的 Nacos.java 中的 main 方法,启动后我们会看到下图,显示的是集群模式启动,因为我们是本地单机启动,所以可以增加一个 JVM 参数-Dnacos.standalone=true 来设置单机模式。按照下图设置好过后,重新启动即可。
  6. 启动成功过后,我们可以访问地址http://127.0.0.1:8848/nacos ,我们可以看到如下页面,包含配置管理、服务管理、以及权限,命名空间和集群的管理。阿粉这里搭建的是最新的版本,旧版本是没有权限管理的。我们可以看到整个页面非常的简洁,其中配置管理服务管理基本上覆盖了我们常用的所有功能。

核心角色

上面我们已经搭建好了 Nacos 平台,现在给大家介绍一下,Nacos 的几个核心角色。

  1. 命名空间Namespace :用于进行租户粒度的配置隔离,不同的命名空间下,可以存在相同的 Group 或 Data ID 的配置。Namespace 的常用场景之一是不同环境的配置的区分隔离,例如开发测试环境和生产环境的资源(如配置、服务)隔离等。
  2. 分组group:每个命名空间下还可以进行分组,相同集群的服务可以设置在同一个分组里面,如果没有手动指定默认分组为DEFAULT_GROUP
  3. 配置集 Data ID:用于唯一区分不同配置文件,可以根据配置的信息不同,分别设置,比如可以单独配置 Redis 的配置文件,以及 MySQL 的配置文件,可以尽量细分复用,这样在有调整的时候只需要改一处即可。
  4. 更详细的名词解释可以参考官网https://nacos.io/zh-cn/docs/concepts.html

使用

上面介绍了 Nacos 的本地搭建,搭建好了过后我们就可以使用了,因为 Nacos 不仅支持服务发现也支持统一配置管理,这篇文章我们先创建两个命名空间 test1 和 test2,然后分别添加两个配置文件com.example.properties 和com.example2.properties,创建的步骤比较简单。首先点击命名空间菜单,然后在右上角新增命名空间,接着输入相关信息就好了,这里新版本的 Nacos 支持手动填入命名空间的 ID,旧版本是不支持的,会随机生成一个字符串,这里建议手动填入具有含义的ID。

创建完命名空间过后,就可以在此命名空间下创建相应的配置文件了,这篇文章主要给大家介绍一下 Nacos 的本地部署过程以及简单的系统使用,后面的文章结合代码来实现一下服务的注册发现和配置的动态更新,敬请期待。

阅读全文 »

一点点的给你分析RabbiMQ的几种消息模型

发表于 2021-04-07 | 分类于 抓包工具

<–more–>

1.RabbitMQ的交换机

Direct Exchange(直连交换机), Fanout Exchange(扇型交换机), Topic Exchange(主题交换机)与 Headers Exchange(头交换机),RabbitMQ的交换机是这四种,而他们也是各有特点,我们这个不能看其他的文章,首先就得从RabbitMQ的官网上去看这块的内容,毕竟,官网才是正道。

为什么会有交换机,不是直接有队列呢?官网给了我们一个最经典的解释:

The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn’t even know if a message will be delivered to any queue at all.

意思就是:RabbitMQ中消息传递模型的核心思想是生产者从不将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。

也就是说,RabbitMQ在传递消息的时候,是先从生产者发送给交换机,然后这个时候,由交换机发送给队列,我们在生产消息的时候,实际上如果不关注交换机和队列的绑定的话,我们都不知道消息是发送到哪一个队列。

1.Direct Exchange(直连交换机)

直连型交换机其实是属于最简单的一种交换模式,在这里队列使用路由密钥K绑定到交换机,当具有路由键R的新消息到达直接交换时,如果K=R,则交换会将其路由到队列。

直连型交换机通常用于以轮循方式在多个工作程序(同一应用程序的实例)之间分配任务。

以上都是官网上给我们的解释,大家可以翻译一下他的英文注释即可。

2.Fanout Exchange(扇型交换机)

扇形交换机将消息路由到与其绑定的所有队列,并且路由键将被忽略。如果将N个队列绑定到扇出交换机,则将新消息发布到该交换机时,该消息的副本将传递到所有N个队列。

大家划重点,文档中是不是说明了路由键将被忽略,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。而他实际上就相当于一个广播的性质。

3.Topic Exchange(主题交换机)

主题交换机实际上就是一个发布/订阅,根据消息路由键和用于将队列绑定到交换机的模式之间的匹配将消息路由到一个或多个队列。主题交换类型通常用于实现各种发布/订阅模式变体。主题交换通常用于消息的多播路由。

主体交换机官方案例没有图,但是我们可以画一个图来理解一下

图中,如果你要绑定的时候使用了通配符了,那么你再给交换机传递消息的时候,注意,RoutingKey开头是你绑定的时候设置的通配符才可以,符号“#”匹配一个或多个词。

4.Headers Exchange(头交换机)

头交换机不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。

既然我们已经说明了这么多的交换机,那么就应该给大家写一个配置,然后让大家更快的能够实际运用到项目中。

2.DemoConfig

Direct类型绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
    //这里先定义Queue和交换机以及RoutingKey

    /**
     * 消息的routing key与队列的binding key相同的队列
     */
    public static final String DIRECT_QUEUE_NAME = "Direct_Queue";
    /**
     * direct 交换机
     */
    public static final String DIRECT_EXCHANGE_NAME = "Direct_Exchange";
    /**
     * routing key
     */
    public static final String DIRECT_ROUTING_KEY_NAME = "Direct_Routing_Key";
    
    //创建队列+队列交换机绑定
    /**
     * Direct交换机
     * @return
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE_NAME);
    }
    /**
     * 创建一条持久化的、非排他的、非自动删除的队列
     * @return
     */
    @Bean
    public Queue directQueue(){
        return new Queue(DIRECT_QUEUE_NAME);
    }

    /**
     * Binding,将该routing key的消息通过交换机转发到该队列 匹配键:DIRECT_ROUTING_KEY_NAME
     * @return
     */
    @Bean
    public Binding directBinding(){
        return BindingBuilder.bind(directQueue()).to(directExchange()).with(DIRECT_ROUTING_KEY_NAME);
    }

Fanout类型绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

    /**
     * 与fanout绑定的第一个队列
     */
    public static final String FIRST_FANOUT_QUEUE_NAME = "First_Fanout_Queue";
    
    /**
     * 与fanout交换机绑定的第二个队列
     */
    public static final String SECOND_FANOUT_QUEUE_NAME = "Send_Fanout_Queue";
    
    /**
     * fanout 交换机
     */
    public static final String FANOUT_EXCHANGE_NAME = "Fanout_Exchange";


     @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE_NAME);
    }

    @Bean
    public Queue firstFanoutQueue() {
        return new Queue(FIRST_FANOUT_QUEUE_NAME);
    }

    @Bean
    public Queue secondFanoutQueue() {
        return new Queue(SECOND_FANOUT_QUEUE_NAME);
    }

    @Bean
    public Binding firstFanoutBinding() {
        return BindingBuilder.bind(firstFanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding secondFanoutBinding() {
        return BindingBuilder.bind(secondFanoutQueue()).to(fanoutExchange());
    }

Header类型绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    /**
     * 处理对象的MQ队列
     */
    public static final String HANDLER_OBJECT_QUEUE_NAME = "Handler_Object_Queue";

    /**
     * 处理对象的MQ队列
     */
    public static final String HANDLER_OBJECT_EXCHANGE = "Handler_Object_Exchange";
    
    @Bean
    public Queue handleObjectQueue() {
        return new Queue(HANDLER_OBJECT_QUEUE_NAME);
    }

    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange(HANDLER_OBJECT_EXCHANGE);
    }
    @Bean

    public Binding headerBinding(){
        Map<String, Object> map = new HashMap<>();
        map.put("header1", "value1");
        map.put("header2", "value2");
        return BindingBuilder.bind(handleObjectQueue()).to(headersExchange()).whereAll(map).match();
    }

Topic类型绑定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 /**
     * 与topic绑定的第一个队列
     *
     */
    public final static String FIRST_TOPIC_QUEUE = "First_Topic_Queue";
    /**
     * 与topic绑定的第二个队列
     *
     */
    public final static String SECOND_TOPIC_QUEUE = "Second_Topic_Queue";
    /**
     * 与topic绑定的第二个队列
     *
     */
    public final static String TOPIC_EXANGE_NAME = "Topic_Exchange";
    
    /**
     *  Topic型交换机模型:将routingKey与binding key做通配符匹配,转发消息到匹配的队列
     *  “#”匹配一个词或多个词,“*”只匹配一个词。
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXANGE_NAME);
    }
    @Bean
    public Queue firstTopicQueue() { return new Queue(FIRST_TOPIC_QUEUE);}

    @Bean
    public Queue secondTopicQueue() {
        return new Queue(SECOND_TOPIC_QUEUE);
    }

    /**
     * 将FirstQueue和TopicExchange绑定,绑定的键为FIRST_TOPIC_QUEUE
     */
    @Bean
    public Binding firstTopicBinding() {
        return BindingBuilder.bind(firstTopicQueue()).to(topicExchange()).with(FIRST_TOPIC_QUEUE);
    }
    /**
     * 将SecondQueue和TopicExchange绑定,绑定的键为通配符使用的 Topic_#
     * 消息携带的路由键是以Topic.开头,都会分发到该队列
     */
    @Bean
    public Binding secondTopicBinding() {
        return BindingBuilder.bind(secondTopicQueue()).to(topicExchange()).with("Topic.#");
    }

至于发送消息,相信大家肯定也都会,

1
 rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXANGE_NAME, routingKey, message,new CorrelationData(UUID.randomUUID().toString()));

image

第一个参数:交换机配置

第二个参数: RoutingKey

第三个参数:消息

第四个参数:对于使用消息确认的时候,需要的一个唯一ID。

发送成功之后,我们同样需要看是否发送成功,这时候就有了confirm,而这个confirm,就是用来确认消息是否发送成功,而这块也是从你在配置RabbitMQ的时候配置决定的,是否手动确认消息发送成功。

1
2
3
4
5
    #是否支持发布确认
    publisher-confirms: true
    #是否支持发布返回
    publisher-returns: true

相对应的,你就应该在消息发出的时候加入一些代码,比如:

1
2
3
        //ACK确认机制
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE_NAME, message);

而ConfirmCallback里面的参数呢,是从下面来的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
confirm确认 此处的confirm确认是发送的ACK确认机制
    RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                System.out.println(correlationData);
                //如果confirm返回成功 则进行更新
                System.out.println("confirm确认消息已经发出");
            } else {
                //失败则进行具体的后续操作:重试 或者补偿等手段
                System.err.println("异常处理...");
            }
        }
    };

以上的代码可以直接和上一篇阿粉讲的RabbitMQ整合Springboot的案例直接放入就可以使用。

《RabbitMQ官方文档》 《RabbitMQ实战》

阅读全文 »

手把手教你学会SpringBoot整合RabbitMQ

发表于 2021-03-31 | 分类于 Java

阿粉之前已经是教给大家如何安装RabbitMQ,如何写一个生产者,如何写一个消费者,而接下来的这篇文章,详细讲解一下如何使用队列和交换机进行不同的发布消息以及消费消息,以及怎么整合SpringBoot和RabbitMQ。

<–more–>

1. 简单消息模式

下面阿粉就只用一个项目把所有类型的交换机全部都加入到一个SpringBoot项目中来,我们首先需要创建的就是一个Config类,而这个Config类中将会把所有的工作模式集中在这里。

创建个SpringBoot项目,然后配置pom.xml文件,在中间加入我们的依赖Jar。

1
2
3
4
5
6
7
8
9
<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.mq-amqp</groupId>
            <artifactId>mq-amqp-client</artifactId>
            <version>1.0.5</version>
        </dependency>

加入依赖环境,配置启动配置。

然后我们就开始写个简单消息模式的配置

简单发消息的Producer和Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Component
public class Producer {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    /**
     * 发生消息到RabbitMQ,使用SpringBoot默认的交换机
     *
     * @param message
     */
    public void sendMessage(String message) {
        
        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.convertAndSend(RabbitMQConfig.SIMPLE_QUEUE_NAME, message);
    }

    //回调函数: confirm确认 此处的confirm确认是发送的ACK确认
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if(ack){
                //如果confirm返回成功 则进行更新
                System.out.println("confirm确认消息已经发出");
            } else {
                //失败则进行具体的后续操作:重试 或者补偿等手段
                System.err.println("异常处理...");
            }
        }
    };
}

阿粉不知不觉的调用了竟然有7次,

这么多了,那么咱们赶紧都给他消费了吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
    @Autowired
    private RabbitTemplate rabbitTemplate; 
    //记住这个,这Template是SpringBoot里面整合RabbitMQ自带的,不是我自己专门写的哈,大家不要一看到这个注入,就寻思是个自己写的。

    /**
     * 普通默认点对点的消息消费者
     * @param message
     */
    @RabbitListener(queues = {RabbitMQConfig.SIMPLE_QUEUE_NAME})
    @RabbitHandler
    public void receiveMessage(String message, Channel channel, @Headers Map<String,Object> headers) { //
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                //处理监听得到的消息
                String message = null;
                try {
                    message = new String(body, "UTF-8");
                    //消息处理逻辑
                    sendMessage(message);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    channel.abort();
                } finally {
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        };
        boolean autoAck = false;
        //消息消费完成确认
        try {
            channel.basicConsume(RabbitMQConfig.SIMPLE_QUEUE_NAME, autoAck, consumer);
        }catch (Exception e){

        }
        System.out.println("消费者进行消息消费:消费主体是-----"+ message);
    }
    

这一口气怎么都给我消费光了呢?是不是有点不厚道。

@RabbitListener注解指定目标方法来作为消费消息的方法,通过注解参数指定所监听的队列,而我们给的这个队列名字也很简单SIMPLE_QUEUE_NAME,发现里面有消息,我知道了,我就要开始消费了,你给我一条,我给你消费一条。

上图也是官网上给出的简单模式的消息队列,就是你发一条,我消费一条。

Work(工作)模式

工作模式这个其实更简单,就是,一个生产者,然后整出了2个消费者去消费,大家可以在消费者里面输出内容,然后循环调用生产者,一口气给他生成个100条,然后看消费者是怎么调用的,阿粉相信大家对这个模式肯定理解是非常到位的,因为,确实是随机的,如果你不相信,大家可以看看。

1
2
3
4
5
        //消息发送
        for (int i = 0;i < 100 ;i++){
            producer.sendMessage("生产消息消息"+i);
        }
        消费者一模一样的复制出一份,在消费消息的时候打印一下就可以啦

在工作模式的时候,大家一定要注意一点,那就是一个队列中一条消息,只能被一个消费者消费

发布订阅模式

接下来我们先吧配置给大家放上,大家一定是希望看到这个内容,毕竟拿过来就可以用的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

/**
     * Fanout型交换机MQ模型:订阅模式,消息到达交换机会转发到与该交换机绑定的队列
     */

    /**
     * 与fanout绑定的第一个队列
     */
    public static final String FIRST_FANOUT_QUEUE_NAME = "First_Fanout_Queue";
    /**
     * 与fanout交换机绑定的第二个队列
     */
    public static final String SECOND_FANOUT_QUEUE_NAME = "Send_Fanout_Queue";
    /**
     * fanout 交换机
     */
    public static final String FANOUT_EXCHANGE_NAME = "Fanout_Exchange";

    /**
     * FanoutExchange,持久化、非自动删除
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange(FANOUT_EXCHANGE_NAME);
    }

    @Bean
    public Queue firstFanoutQueue() {
        return new Queue(FIRST_FANOUT_QUEUE_NAME);
    }

    @Bean
    public Queue secondFanoutQueue() {
        return new Queue(SECOND_FANOUT_QUEUE_NAME);
    }

    @Bean
    public Binding firstFanoutBinding() {
        return BindingBuilder.bind(firstFanoutQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding secondFanoutBinding() {
        return BindingBuilder.bind(secondFanoutQueue()).to(fanoutExchange());
    }

这种模式比较有意思,和之前的工作模式是不一样的,因为发布了一条消息,两个消费者那是都能消费的,为什么呢?那就是上面的一个bind方法,在这个方法里面,实际上相当于把消息发给了交换机,而交换机帮我们做了一件事情,那就是根据绑定来发送,我们再来试试看。

代码写出来

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  /**
     * Fanout型交换机的MQ模型消费消息
     * @param message
     */
    @RabbitListener(queues = {RabbitMQConfig.FIRST_FANOUT_QUEUE_NAME})
    @RabbitHandler
    public void receiveFanoutMessage(String message) {
        System.out.println("11111Fanout消费者进行消息消费:消费主体是-----"+ message);
    }

    /**
     * Fanout型交换机的MQ模型消费消息
     * @param message
     */
    @RabbitListener(queues = {RabbitMQConfig.SECOND_FANOUT_QUEUE_NAME})
    @RabbitHandler
    public void receiveFanoutMessage2(String message) {
        System.out.println("2222Fanout消费者进行消息消费:消费主体是-----"+ message);
    }

Producer

1
2
3
4
5
6
7
8
9

    /**
     * 发送消息至fanout交换机,由于fanout只关注订阅关系,所以routing key随便指定都可以
     * @param message
     */
    public void sendFanoutMessage(String message){
        rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE_NAME, "------", message);
    }

大家肯定也都很好奇,为什么呢?之前阿粉还专门说了一句话一个队列中一条消息,只能被一个消费者消费,但是你现在仔细看一下,这种发布订阅模式,他发给了谁?是不是交换机,那交换机中你自定义绑定了什么?是不是两个不同的队列,一个是队列First,一个是队列Second,所以,对于生产者来说,他只是发布了一条消息,但是他把消息发布到了交换机中,而交换机是根据你绑定队列数来进行消息的消费的,这样想的话,是不是就很明确了。

今天阿粉也不给大家讲后三种了,因为一次学的太多了,敲代码容易记不住,阿粉下一篇文章会继续带大家认识路由模式,Topic模式,还有RPC模式,如果大家迫不及待的想学习,欢迎大家来点个赞。

文献参考

《RabbitMQ官方文档》 《如何学习RabbitMQ的六种工作模式》

阅读全文 »

手把手教你学会RabbitMQ

发表于 2021-03-29 | 分类于 Java

前几天阿粉在看关于如何处理分布式事务的解决方案,于是就看到了关于使用最大努力通知来处理分布式事务的问题,而这其中最不可或缺的就是消息中间件了,那么什么是消息中间件呢?

<–more–>

为什么有消息中间件

前几天阿粉在看关于如何处理分布式事务的解决方案,于是就看到了关于使用最大努力通知来处理分布式事务的问题,而这其中最不可或缺的就是消息中间件了,那么什么是消息中间件呢?

1. 什么是消息中间件

在百度百科给出的解释是:消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统。

大家看上图,实际上就是生产者发给消息中间件一点东西,然后提供给消费者去消费,这样理解是不是就比百度百科要简单了很多了。

2. 消息中间件的种类

我们在这里先不讨论消息中间件的组成,下面会继续讲解,我们先看看都有哪些消息中间件,以及他们之间都有什么特点

1.ActiveMQ

ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。毕竟是Apache来维护的,功能还是非常强大的,

  • 支持Java消息服务(JMS)
  • 集群 (Clustering)
  • 协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP。

但是ActiveMQ的缺点一样也是很明显的,版本更新很缓慢。集群模式需要依赖Zookeeper实现。虽然现在有了Apollo,号称下一代ActiveMQ,目前案例那是少的可怜。

2.RabbitMQ

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)

生态丰富,使用者众,有很多人在前面踩坑。AMQP协议的领导实现,支持多种场景

3.RocketMQ

RocketMQ是阿里开源的消息中间件,目前在Apache孵化,使用纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点,如果大家使用过Kafka的话,那么你就会发现RocketMQ其实和Kafka很相似,但是绝对不是单纯的摘出来了一块内容那么简单,目前在阿里集团被广泛应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景,支撑了阿里多次双十一活动。

4.Kafka

Kafka是LinkedIn于2010年12月开发并开源的一个分布式流平台,现在是Apache的顶级项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,以Pull的形式消费消息。Kafka自身服务与消息的生产和消费都依赖于Zookeeper,使用Scala语言开发。学习成本有时候会非常的大,不过阿粉也是相信大家对这个东西肯定很好奇,因为毕竟他是大数据生态系统中不可缺少的一环,以后阿粉会陆陆续续的去带着大家学习这块的内容。

说完了区分,那么我们就得开始正儿八经的学习RabbitMQ了,安装,使用,整合到项目中,一气呵成。

3. RabbitMQ的安装

关于安装的教程,阿粉就不再给大家一一的去说了,毕竟网上有的是教程,官网也有指定的教程,【https://www.rabbitmq.com/】 官网在这里,不过大家需要注意一件事情,RabbitMQ如果你想要安装Windows版本的话,那么你一定得先装一个环境,那就是erlang语言的环境,否则,你是装不到Windows上的。

安装完成之后,登录上他的后台,IP/port,

默认登录进去就是这个样子滴。

4.RabbitMQ的使用

上一阶段,我们已经完全的把RabbitMQ进行了安装,接下来我们就要看他的使用了,来,继续找官网的文档。

大家看到了这个Java案例了么?来打开了瞅瞅,先把生产者和消费者确定一下,生产者Producer,消费者Consumer,生产者提供消息,然后把消息发送到消息队列,消费者监听到消息之后,对消息进行消费。

我们也手把手写一个,

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
 private final static String QUEUE_NAME = "Test_queue";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" 我生产了一条消息 " + message );
        }catch (Exception e){

        }
    }
1
2
  我生产了一条消息Hello World

这个时候,我们就能在后端的控制台上看到内容了,

看到了我们的total,还有Ready,证明我们发送消息已经成功了,现在已经有一条叫做“Hello World”的消息在我们的消息队列里面了。

既然作为生产者的我们,消息已经发送了,这时候是不是就得开始写个消费者了?来,我们再写一个最最基础的消费者

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        DeliverCallback deliverCallback = (consumerTag, deliver) -> {
            String message = new String(deliver.getBody(), "UTF-8");
            System.out.println(" 我是来消费消息的" + message );
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
    }

1
 我是来消费消息的'Hello World'

大家看,这是不是就把你刚才生产的消息给消费掉了呢?

实际上Demo就是这么简单,但是不能这么简单的去写,今天阿粉先来讲讲这个RabbitMQ里面的一些内容,之后再继续给大家整合一下SpringBoot,SpringCloud这些内容。

5.RabbitMQ的一些重点基础

5.1 Channel(通道)

多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道。

5.2 Queue(消息队列)

在这里阿粉就要给大家开始划重点了,Queue是RabbitMQ 的内部对象,用于存储消息。RabbitMQ中消息都只能存储在队列中,队列的特性是先进先出。

我们刚才也看到了图片,发送的消息都是在Queue中存储的,可以理解成一个存储数据的结构,我们把“Hello World”发送到Queue中,然后提供给消费者消费。

至于生产者Producer和消费者Consumer,阿粉肯定也不用说了,一个是生产消息,一个消费消息。

5.3 Exchange: 交换器

Exchange交换机扮演着接收生产者生产的消息的角色。同时Exchange交换机还需要将接收到的消息传递给queue队列进行存储或消费,不同类型的exchange配合着routing_key就能按照exchange类型对应的路有规则将消息传递到指定的某个或者某些queue队列。

说到交换器,那肯定就想到了路由,而在这里也是有这个名字的,不过不是单纯的路由,而是RoutingKey。

RoutingKey: 相当于一个路由键,一般是用来指定路由规则的。而他经常搭配和Binding(绑定键)一起使用。

生产者将消息发送给交换器时,需要一个RoutingKey,当BindingKey和 RoutingKey相匹配时,消息会被路由到对应的队列中。

Exchange分类:

  • Direct

  • Fanout

  • Topic

  • Headers

对于Direct类型的交换器,在接收到生产者发送的消息时会将消息路由给与该exchange绑定的且与该消息的routing_key同名的queue队列。如果exchange上未绑定与routing_key同名的queue,消息将会被抛弃。

对于Fanout类型的交换器,将接收到的消息路由投递到所有与其绑定的queue队列上,此种类型exchange消息路由与routing_key无关。

对于Topic类型的交换器,采用模糊匹配的方式,可以通过通配符满足一部分规则,这就和Direct不一样了,Direct是完全匹配BindingKey和RoutingKey。

对于Headers类型的交换器,不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配.这时候我们需要在代码里面去设置Headers中的信息(键值对),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列 。

他们这些也是各有特点,大家记住了么?到这里文章就告一段落了,接下来的文章会深入讲解一下怎么去配置这些交换器类型,还有整合SpringBoot,大家快去赶紧动手安装并且写个测试吧。

文章参考

《RabbitMQ官方文档》 《RabbitMQ入门》

阅读全文 »

大白话告诉你 TCP 为什么需要三次握手四次挥手

发表于 2021-03-28 | 分类于 网络

Hello 大家好,我是阿粉,关于 TCP 的三次握手和四次挥手相比大家早就烂熟于心了,毕竟这也是一个高频的面试题,但是很多小伙伴只是照本宣科,并没有真正的理解其中的原理,这篇文章,阿粉用通俗易懂的大白话带你们重新熟悉一下,已经掌握的小伙伴可以在回味一下,没有掌握的小伙伴刚好可以查漏补缺。

三次握手

再解释三次握手之前我们先从生活中想一个场景,假如你现在需要给领导打个电话汇报一下工作,为了保证双方能正常交流,那么在电话接通的那一刻想象一下会不会是这样的场景。

我们:领导好,我是 xx,可以听到我说话吗?有个工作需要向您汇报下。

领导:嗯,可以的,能听到我说话吧。

我们:可以。

我们:…..(汇报工作)。

通过上面的场景我们可以看出,其实跟 TCP 的三次握手是一个意思,只不过在我们日常生活中并不是每次都这么严谨,但是 TCP 不一样,TCP 是通过程序实现的,可靠的,面向连接的协议。而且程序是严谨的,每一次的建立连接都会进行这样的步骤。放一张经典的 TCP 三次握手图。

我们先用大白话解释一下为什么需要三次握手,首先我们要知道建立连接的目的是什么,我们是为了可靠的传输数据。那既然是可靠的传输数据,我们必须保证客户端和服务端都能正常的发送和接收数据,如果某一方不能正常的发送或者接收数据,那整个数据的传输就不能成功也就不可靠了。知道这个我们看下,为什么会需要三次握手,而不是两次握手。

  1. 刚开始客户端和服务端都是处于关闭的状态,而且服务器 B 端一直处于监听的状态,时刻监听是否有建立连接的请求;
  2. 当有客户端需要建立连接的时候就会发送一个确定连接的报文,此报文是同步报文SYN = 1,并且会生成一个随机的序号 seq = x,这是第一次握手;
  3. 当服务端接收到请求连接报文的时候,会发送一个同步报文确认报文,此报文 SYN = 1,并且 ACK = 1,同时服务端也会随机生成一个 seq = y,并将 ack 设置成 x + 1,回传给客户端,这是第二次握手;
  4. 当客户端接收到服务端的 ACK 报文后,会回复一个 ACK 确认报文,用于确认确认报文已经收到,此报文 ACK = 1,seq = x + 1, ack = y + 1,这是第三次握手;

这里有个点说明一下:大写的 ACK 表示报文的类型是确认报文,小写的 ack 是报文里面的确认号,这个确认号是上一次握手对方的 seq 值加 1 得到。

上面是整个三次握手的过程,现在我们分析一下为什么三次握手可以可靠的确定客户端和服务端都能支持的发送和接收数据。

第一次握手:第一次握手是客户端发送同步报文到服务端,这个时候客户端是知道自己具备发送数据的能力的,但是不知道服务端是否有接收和发送数据的能力;

第二次握手:当服务端接收到同步报文后,回复确认同步报文,此时服务端是知道客户端具有发送报文的能力,并且知道自己具有接收和发送数据的能力,但是并不知道客户端是否有接收数据的能力;

第三次握手:当客户端收到服务端的确认报文后,知道服务端具备接收和发送数据的能力,但是此时服务端并不知道自己具有接收的能力,所以还需要发送一个确认报文,告知服务端自己是具有接收能力的。

当整个三次握手结束过后,客户端和服务端都知道自己和对方具备发送和接收数据的能力,随后整个连接建立就完成了,可以进行后续数据的传输了。

看到这里,如果大家理解了就会知道很明显,两次握手是不行的,因为服务端并不知道客户端是具备接收数据的能力,所以就不能成为面向连接的可靠的传输协议。就像我们上面提到的打电话的例子,也是为了双方能够正常的进行交流,只不过我们现实生活中不会那么严谨,并不是每次都这样,但是程序是不一样的。

四次挥手

三次握手是为了建立可靠的数据传输通道,四次挥手则是为了保证等数据完成的被接收完再关闭连接。既然提到需要保证数据完整的传输完,那就需要保证双方都达到关闭连接的条件才能断开。

img

从上图中我们可以看到,

  1. 客户端发起 FIN 断开连接的报文,携带随机生成的 seq 值 u,发送给服务端,并且自己处于 FIN-WSIT 状态,这是第一次挥手;
  2. 服务端接收到 FIN 报文后,回复一个确认报文,其中 ACK = 1,随机生成一个 seq,以及 ack = u + 1,这是第二次挥手;
  3. 当服务端数据发送完了过后,再发送一个 FIN 报文给客户端,通知客户端,服务端准备关闭连接了,此报文 FIN = 1,ACK = 1,ack = u + 1,seq = w,这是第三次挥手;
  4. 当客户端收到 FIN 确认报文时再发送一个FIN 的确认报文,其中 ACK = 1,seq = u + 1,ack = w + 1,并进入 TIME-WAIT 状态,当等待 2MSL 后关闭连接,这是第四次挥手。

第一次挥手客户端发起关闭连接的请求给服务端;

第二次挥手:服务端收到关闭请求的时候可能这个时候数据还没发送完,所以服务端会先回复一个确认报文,表示自己知道客户端想要关闭连接了,但是因为数据还没传输完,所以还需要等待;

第三次挥手:当数据传输完了,服务端会主动发送一个 FIN 报文,告诉客户端,表示数据已经发送完了,服务端这边准备关闭连接了。

第四次挥手:当客户端收到服务端的 FIN 报文过后,会回复一个 ACK 报文,告诉服务端自己知道了,再等待一会就关闭连接。

疑问

  1. 为什么握手要三次,挥手却要四次呢?

    那是因为握手的时候并没有数据传输,所以服务端的 SYN 和 ACK 报文可以一起发送,但是挥手的时候有数据在传输,所以 ACK 和 FIN 报文不能同时发送,需要分两步,所以会比握手多一步。

  2. 为什么客户端在第四次挥手后还会等待 2MSL?

    等待 2MSL 是因为保证服务端接收到了 ACK 报文,因为网络是复杂了,很有可能 ACK 报文丢失了,如果服务端没接收到 ACK 报文的话,会重新发送 FIN 报文,只有当客户端等待了 2MSL 都没有收到重发的 FIN 报文时就表示服务端是正常收到了 ACK 报文,那么这个时候客户端就可以关闭了。

总结

TCP 协议是面向连接的可靠的传输层协议,它的拥塞控制,失败重传等机制在互联网数据传输中是不可或缺的。当下互联网行业中,基于 TCP 实现的程序数不胜数。对于我们程序员来说,很多时候如果不是参与底层项目的话,很少有机会会主动编写 TCP 相关的代码,但是理解 TCP 的实现原理,对我们来说是很有帮助的。

阅读全文 »

讲讲它的构建生命周期和拉取 jar 包流程

发表于 2021-03-25 | 分类于 Java

我们都知道, Maven 是一款非常优秀的软件项目管理 & 自动构建的工具,相信各位在项目中都多多少少接触过

阅读全文 »
1 … 4 5 6 … 32
Java Geek Tech

Java Geek Tech

一群热爱 Java 的技术人

633 日志
116 分类
24 作者
RSS
GitHub 知乎
Links
  • 纯洁的微笑
  • 沉默王二
  • 子悠
  • 江南一点雨
  • 炸鸡可乐
  • 郑璐璐
  • 程序通事
  • 懿
© 2019 - 2022 Java Geek Tech
由 Jekyll 强力驱动
主题 - NexT.Mist