Adrian


  • 首页

  • 关于

  • 标签

  • 分类

  • 归档

偏向锁、轻量级锁和重量级锁

发表于 2019-10-03 | 分类于 JVM

JVM的MarkWord存储结构

注:最后两位为锁标记位,倒数第三位是偏向标记,如果是1表示是偏向锁;合并单元格的位数就是 该字段的位数,例如hash code共25(23+2)位。

另外,对于偏向锁,如果Thread ID = 0,表示未加锁

Synchronized锁升级:偏向锁 → 轻量级锁 → 重量级锁

​ Synchronized 会从无锁升级为偏向锁,再升级为轻量级锁,最后升级为重量级锁,这里的轻量级锁就是一种自旋锁

​ 锁只能按照上述的顺序进行升级操作,锁只要升级之后,就不能降级

偏向锁

​ 初次执行到 Synchronized 代码块的时候,锁对象变成偏向锁(通过CAS修改对象头里的锁标志位),字面意思是 ”偏向于第一个获得它的线程“ 的锁

​ 偏向锁是 JDK 默认启动的选项,可以通过 -XX:-UseBiasedLocking 来关闭偏向锁。另外偏向锁默认不是立即就启动的,在程序启动后,通常有几秒的延迟,可以通过命令 -XX:BiasedLockingStartupDelay=0 来关闭延迟

​ 执行完同步代码块后,线程并不会主动释放偏向锁。当第二次到达同步代码块时,线程会判断此时持有锁的线程是否就是自己(持有锁的线程ID也在对象头里),如果是则正常往下执行。由于之前没有释放锁,这里也就不需要重新加锁。如果自始至终使用锁的线程只有一个,很明显偏向锁几乎没有额外开销,性能极高

​ 引入偏向锁的目的是为了没有多线程竞争的前提下,减少传统的重量级锁使用操作系统互斥量产生的性能消耗。偏向锁在无竞争的情况下会把整个同步都消除掉

偏向锁的加锁

​ 如果 JVM 支持偏向锁,那么在分配对象时,分配一个可偏向而未偏向的对象(Mark Word的最后3位 为101,并且Thread ID字段的值为0)

​ 然后,当一个线程访问同步块并获取锁时,将通过 CAS(Compare And Swap) 来尝试将对象头中的 Thread ID字段设置为自己的线程号,如果设置成功,则获得锁,那么以后线程再次进入和退出 同步块时,就不需要使用 CAS 来获取锁,只是简单的测试一个对象头中的Mark Word字段中是否存储着指向当前线程的偏向锁

​ 如果使用 CAS 设置失败时,说明存在锁的竞争,那么将执行偏向锁的撤销操作 (revoke bias),将偏向锁升级为轻量级锁

偏向锁升级轻量级锁

​ 当线程1访问代码块并获取锁对象时,会在 Java对象头和栈帧中记录偏向的锁的 Thread ID,因为偏向锁不会主动释放锁,因此以后线程1再次获取锁的时候,需要比较当前线程的 Thread ID 和Java对象头中的 Thread ID是否一致,如果一致(还是线程1获取锁对象),则无需使用 CAS 来加锁、解锁;如果不一致(其他线程,如线程2要竞争锁对象,而偏向锁不会主动释放因此还是存储的线程1的 Thread ID)

那么需要查看 Java对象头中记录的线程1是否存活:

  • 如果没有存活,那么锁对象被重置为无锁状态,其它线程(线程2)可以竞争将其设置为偏向锁
  • 如果存活,那么立刻查找该线程(线程1)的栈帧信息,如果还是需要继续持有这个锁对象,那么暂停当前线程1,撤销偏向锁,升级为轻量级锁,如果线程1 不再使用该锁对象,那么将锁对象状态设为无锁状态,重新偏向新的线程

简单来说:线程A第一次执行完同步代码块后,当线程B尝试获取锁的时候,发现是偏向锁,会判断线程A是否仍然存活

  • 如果线程A仍然存活,将线程A暂停,此时偏向锁升级为轻量级锁,之后线程A继续执行,线程B自旋(自旋超过一定的次数后,会膨胀成重量级锁)
  • 但是如果判断结果是线程A不存在了,则线程B持有此偏向锁,锁不升级
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias,
bool is_bulk, JavaThread* requesting_thread) {
markOop mark = obj->mark();
// 检查是否可偏向
if (!mark->has_bias_pattern()) {
return BiasedLocking::NOT_BIASED;
}
uint age = mark->age();
markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);
markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);
JavaThread* biased_thread = mark->biased_locker();
if (biased_thread == NULL) {
// 可偏向但是未偏向的情况
// 可能的使用场景为:因计算hash code而撤销偏向
if (!allow_rebias) {
obj->set_mark(unbiased_prototype);
}
return BiasedLocking::BIAS_REVOKED;
}
// 判断对象现在偏向的线程是否还存在
// 即对象头中Mark Word中Thread ID字段指向的线程是否存在
bool thread_is_alive = false;
if (requesting_thread == biased_thread) {
// 请求的线程拥有偏向锁
thread_is_alive = true;
} else {
// 请求的线程不拥有偏向锁,递归查询
for (JavaThread* cur_thread = Threads::first();
cur_thread != NULL; cur_thread = cur_thread->next()) {
if (cur_thread == biased_thread) {
thread_is_alive = true;
break;
}
}
}
if (!thread_is_alive) {
if (allow_rebias) {
//退回可偏向但未偏向的状态
obj->set_mark(biased_prototype);
} else {
//偏向撤销,变为无锁状态
obj->set_mark(unbiased_prototype);
}
return BiasedLocking::BIAS_REVOKED;
}
// 拥有偏向锁的线程仍然存活
// 检查该线程是否拥有锁:
// 如果拥有锁,那么需要升级为轻量级锁,然后将displaced mark word复制到线程栈中;
// 如果不再拥有锁,如果允许重偏向,那么将mark word中的Thread ID 重新置0;
// 如果不允许重偏向,那么将mark work设置为无锁状态,即最后两位为01

// cached_monitor_info 是该线程拥有的锁对象的信息,按照从加锁顺序的逆序排列
GrowableArray<MonitorInfo*>* cached_monitor_info =
get_or_compute_monitor_info(biased_thread);
BasicLock* highest_lock = NULL;
for (int i = 0; i < cached_monitor_info->length(); i++) {
MonitorInfo* mon_info = cached_monitor_info->at(i);
if (mon_info->owner() == obj) {
// Assume recursive case and fix up highest lock later
markOop mark = markOopDesc::encode((BasicLock*) NULL);
highest_lock = mon_info->lock();
highest_lock->set_displaced_header(mark);
}
}
if (highest_lock != NULL) {
// 线程拥有锁
// Fix up highest lock to contain displaced header and point
// object at it
highest_lock->set_displaced_header(unbiased_prototype);
// Reset object header to point to displaced mark.
// Must release storing the lock address for platforms without TSO
// ordering (e.g. ppc).
obj->release_set_mark(markOopDesc::encode(highest_lock));
} else {
// 线程不再拥有锁
if (allow_rebias) {
obj->set_mark(biased_prototype);
} else {
obj->set_mark(unbiased_prototype);
}
}
return BiasedLocking::BIAS_REVOKED;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//针对上面的伪代码实现
// 撤销流程的伪代码,在全局检查点执行该操作
if mark word 存储的不是可偏向状态:
return; // 如果不是偏向锁,那么没有撤销偏向的必要
else:
if Thread ID 指向的线程不存活:
if 允许重偏向:
退回可偏向但未偏向的状态 // Thread ID为0
else:
偏向撤销,变为无锁状态
else:
if Thread ID 指向的线程,仍然拥有锁:
升级为轻量级锁,将mark word复制到线程栈中,然后stack pointer指向最老的相关锁记录
else:
if 允许重偏向:
退回可偏向但未偏向的状态 // Thread ID为0
else:
偏向撤销,变为无锁状态

小结:

​ 撤销偏向的操作需要在全局检查点执行。我们假设线程A曾经拥有锁(不确定是否释放锁), 线程B来竞争锁对象,如果当线程A不在拥有锁时或者死亡时,线程B直接去尝试获得锁(根据是否允许重偏向(rebiasing),获得偏向锁或者轻量级锁);如果线程A仍然拥有锁,那么锁升级为轻量级锁,线程B自旋请求获得锁

偏向锁撤销流程

轻量级锁

​ 轻量级锁不是使用操作系统互斥量来实现锁, 而是通过 CAS 操作来实现锁。当线程获得轻量级锁后,可以再次进入锁,即锁是可重入(Reentrance Lock)的

​ 在轻量级锁的加锁阶段,如果线程发现对象头中Mark Word已经存在指向自己栈帧的指针,即线程已经获得轻量级锁,那么只需要将0存储在自己的栈帧中(此过程称为递归加锁);在解锁的时候,如果发现锁记录的内容为0, 那么只需要移除栈帧中的锁记录即可,而不需要更新Mark Word

​ 在轻量级锁状态下继续锁竞争,没有抢到锁的线程将自旋(即不停地循环判断锁是否能够被成功获取)

忙等

​ 长时间的自旋操作是非常消耗资源的,一个线程持有锁,其他线程就只能在原地空耗CPU,执行不了任何有效的任务,这种现象叫做忙等(busy-waiting)

​ 如果多个线程用一个锁,但是没有发生锁竞争,或者发生了很轻微的锁竞争,那么 Synchronized 就用轻量级锁,允许短时间的忙等现象。短时间的忙等,换取线程在用户态和内核态之间切换的开销

轻量级锁加锁

  1. 线程在执行同步块之前,JVM 会先在当前的线程的栈帧中创建所记录的空间,用于存储对象头中的 Mark Word的拷贝
  2. 然后线程尝试使用 CAS 将对象头中的Mark Word替换为指向锁记录(Lock Record)的指针
  3. 如果成功,当前线程获得轻量级锁
  4. 如果失败,虚拟机先检查当前对象头的Mark Word 是否指向当前线程的栈帧
    • 如果指向,则说明当前线程已经拥有这个对象的锁,则可以直接进入同步块执行操作
    • 否则表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。当竞争线程的自旋次数达到界限值(threshold),轻量级锁将会膨胀为重量级锁

轻量级锁升级重量级锁

重量级锁

​ (有个计数器记录自旋次数,默认允许循环10次,可以通过虚拟机参数更改)

​ 如果锁竞争情况严重,某个达到最大自旋次数的线程,会将轻量级锁升级为重量级锁(依然是 CAS 修改锁标志位,但不修改持有锁的线程ID)

​ 当后续线程尝试获取锁时,发现被占用的锁是重量级锁,直接进入堵塞状态,此时不消耗CPU,然后等拥有锁的线程释放锁后,唤醒堵塞的线程, 然后线程再次竞争锁

在 JDK1.6 之前,Synchronized 直接加重量级锁,很明显现在得到了很好的优化

锁竞争

​ 如果多个线程轮流获取一个锁,但是每次获取锁的时候都很顺利,没有发生阻塞,那么就不存在锁竞争。只有当某线程尝试获取锁的时候,发现该锁已经被占用,只能等待其释放,这才发生了锁竞争

锁消除

​ 锁消除指的是虚拟机即使编译器在运行时,如果检测到那些共享数据不可能存在竞争,那么就执行锁消除。锁消除可以节省毫无意义的请求锁的时间

锁粗化

​ 如果一系列的连续操作都对同一个对象反复加锁和解锁,频繁的加锁操作就会导致性能损耗

​ 如果虚拟机探测到由这样的一串零碎的操作都对同一个对象加锁,将会把加锁的范围扩展(粗化)到整个操作序列的外部

​ 比如说有多个append方法,对每一个方法都加锁,此时会进行锁粗化,将第一个append直到最后一个append包起来,这样就只会进行一次的加锁操作而不是多次

小结

​ 一个锁只能按照 偏向锁、轻量级锁、重量级锁 的顺序逐渐升级(也叫锁膨胀),不允许降级

锁 优点 缺点 适用场景
偏向锁 加锁和解锁不需要额外的消耗,与执行非同步方法仅存在纳秒级的差距 如果线程间存在竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块的情况
轻量级锁 竞争的线程不会堵塞,提高了程序的响应速度 始终得不到锁的线程,使用自旋会消耗CPU 追求响应时间,同步块执行速度非常块,只有两个线程竞争锁
重量级锁 线程竞争不使用自旋,不会消耗CPU 线程堵塞,响应时间缓慢 追求吞吐量,同步块执行速度比较慢,竞争锁的线程大于2个

中断锁

​ Java并没有提供任何直接中断某线程的方法,只提供了中断机制

何谓“中断机制”?

​ 线程A向线程B发出 “请你停止运行” 的请求(线程B也可以自己给自己发送此请求),但线程B并不会立刻停止运行,而是自行选择合适的时机以自己的方式响应中断,也可以直接忽略此中断

​ 也就是说,Java的中断不能直接终止线程,而是需要被中断的线程自己决定怎么处理

​ 如果线程A持有锁,线程B等待获取该锁。由于线程A持有锁的时间过长,线程B不想继续等待了,我们可以让线程B中断自己或者在别的线程里中断它,这种就是可中断锁。在Java中,synchronized就是不可中断锁,而Lock的实现类都是可中断锁

ThreadPoolExecutor

发表于 2019-07-14 | 分类于 Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

构造方法参数解析

  1. corePoolSize:核心线程数

  2. maximumPoolSize:最大线程数,线程池允许创建的最大线程数

  3. workQueue:任务队列,BlockingQueue 接口的某个实现(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)

  4. keepAliveTime:空闲线程的保活时间,如果某线程的空闲时间超过这个值都没有任务给它做,那么可以被关闭了

    注意:这个值并不会对所有线程起作用;

    1. 如果线程池中的线程数少于等于核心线程数 corePoolSize,那么这些线程不会因为空闲太长时间而被关闭,
    2. 可以通过调用 allowCoreThreadTimeOut(true) 使核心线程数内的线程也可以被回收
  5. TimeUnit:参数的时间单位;TimeYnit 是枚举类

  6. threadFactory:用于生成线程,一般我们可以用默认的就可以了

    通常,我们可以通过它将我们的线程的名字设置得比较可读一些

    如 Message-Thread-1, Message-Thread-2 类似这样

  7. handler:当线程池已经满了,但是又有新的任务提交的时候,该采取什么策略由这个来指定

    rejectedExecutionHandler 用于处理当线程池不能执行此任务时的情况

    主要的策略有:(默认策略是 抛出异常)

    1. 抛出 RejectedExecutionException 异常
    2. 忽略任务
    3. 使用提交任务的线程来执行此任务
    4. 将队列中等待最久的任务删除,然后提交此任务

任务策略

​ 任务策略执行的时期是线程池的线程达到 maximumPoolSize 的时候,此时新提交的任务就会按照指定策略进行操作

  • CallerRunsPolicy:只要线程池没有被关闭,那么由提交任务的线程自己来执行这个任务
  • AbortPolicy:不管怎样,直接抛出 RejectedExecutionException 异常(默认)
  • DiscardPolicy:不做任何处理,直接忽略掉这个任务
  • DiscardOldestPolicy:如果线程池没有被关闭的话,把队列队头的任务(也就是等待了最长时间的)直接扔掉,然后提交这个任务到等待队列中

线程池中的线程创建时机

  1. 如果当前线程数少于 corePoolSize,那么提交任务的时候创建一个新的线程,并由这个线程执行这个任务
  2. 如果当前线程数已经达到 corePoolSize,那么将提交的任务添加到队列中,等待线程池中的线程去队列中取任务
  3. 如果队列已满,那么创建新的线程来执行任务,需要保证池中的线程数不会超过 maximumPoolSize,如果此时线程数超过了 maximumPoolSize,那么执行拒绝策略

注意:如果将队列设置为无界队列,那么线程数达到 corePoolSize 后,其实线程数就不会再增长了

线程池的状态和状态间的转换

线程池状态

  • RUNNING(-1):这是正常的状态:接受新的任务,处理等待队列中的任务

  • SHUTDOWN(0):不接受新的任务提交,但是会继续处理等待队列中的任务

  • STOP(1):不接受新的任务提交,不再处理等待队列中的任务,中断正在执行任务的线程

  • TIDYING(2):所有的任务都销毁了,workCount 为 0。线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()

  • TERMINATED(3):terminated() 方法结束后,线程池的状态就会变成这个

线程池状态等于 0 的时候不能提交任务,大于 0 的话,连正在执行的任务也需要中断

状态间的切换

  • RUNNING -> SHUTDOWN:当调用了 shutdown() 后,会发生这个状态转换,这也是最重要的

  • (RUNNING or SHUTDOWN) -> STOP:当调用 shutdownNow() 后,会发生这个状态转换,这下要清楚 shutDown() 和 shutDownNow() 的区别了

  • SHUTDOWN -> TIDYING:当任务队列和线程池都清空后,会由 SHUTDOWN 转换为 TIDYING

  • STOP -> TIDYING:当任务队列清空后,发生这个转换

  • TIDYING -> TERMINATED:这个前面说了,当 terminated() 方法结束后

ThreadPoolExecutor总结

  1. corePoolSize 到 maximumPoolSize 之间的线程会被回收,corePoolSize 的线程也可以通过设置而得到回收(allowCoreThreadTimeOut(true))

  2. workQueue 用于存放任务,添加任务的时候,如果当前线程数超过了 corePoolSize,那么往该队列中插入任务,线程池中的线程会负责到队列中拉取任务

  3. 如果某个任务执行出现异常,那么执行任务的线程会被关闭,而不是继续接收其他任务。然后会启动一个新的线程来代替它

List总结

发表于 2019-06-17 | 分类于 集合

ArrayList

小结

  • ArrayList底层实现是数组,而且允许元素为null值
  • 增加和删除的操作效率低,因为有扩容和数组复制的开销存在(如果是根据下标去删除元素的会比较快)
  • 查询和修改的效率较高,可以直接通过下标去访问元素,时间复杂度为O(1)
  • ArrayList 插入元素需要复制的元素,所以 ArrayList 插入元素的位置越靠后效率越高
  • 多线程操作同一个ArrayList是不安全的

PS:如果事先知道要存储的元素个数,那么最好使用带int参数的构造函数,因为这样能够减少数组扩容的开销

大致结构

​ ArrayList是使用数组实现的列表,具有顺序性。ArrayList内部使用了transient Object[] elementData; 来存储列表中的元素(即数组的缓存区);ArrayList的容量就是该缓存区的数组长度

  1. private static final int DEFAULT_CAPACITY = 10 默认容量,只有在add的时候判断elementData是空数组的时候,就会初始化这个默认容量的数组赋值给elementData

  2. modCount 这个属性是用来记录List列表的表结构发生变化的次数(要对list的结构发生变化的操作,都会使得modCount的值进行加一;结构性操作指对列表长度进行修改的,即add/remove操作);这个属性比较重要

modCount其实是fail-fast 机制的实现,即当某一个线程A通过Iterator去遍历某集合的过程中,若该集合的内容被其他线程所改变了;那么线程A访问集合时,就会抛出ConcurrentModificationException异常,产生fail-fast事件

构造方法

  1. 无参构造方法
1
2
3
4
5
6
7
//无参构造方法
//无参的构造函数得到的是一个空数组
private static final Object[] DEFAULTCAPACITY_EMPTY_ELEMENTDATA = {};

public ArrayList() {
this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
}
  1. 带int参数的构造方法
1
2
3
4
5
6
7
8
9
10
public ArrayList(int initialCapacity) {
if (initialCapacity > 0) {
this.elementData = new Object[initialCapacity];
} else if (initialCapacity == 0) {
this.elementData = EMPTY_ELEMENTDATA;
} else {
throw new IllegalArgumentException("Illegal Capacity: "+
initialCapacity);
}
}
  • 参数大于0返回int大小的数组
  • 参数为0返回空的数组
  • 参数小于0则抛出异常IllegalArgumentException
  1. 构造一个包含指定元素的列表
1
2
3
4
5
6
7
8
9
10
11
public ArrayList(Collection<? extends E> c) {
elementData = c.toArray();
if ((size = elementData.length) != 0) {
// c.toArray might (incorrectly) not return Object[]
if (elementData.getClass() != Object[].class)
elementData = Arrays.copyOf(elementData, size, Object[].class);
} else {
// replace with empty array.
this.elementData = EMPTY_ELEMENTDATA;
}
}

​ 构造指定元素的列表,当指定的元素为空时返回的是一个空的数组;如果元素不为空,那么直接得到一个指定元素的数组

​ 这里有个官方的注释,这里的作用是:如果得到的element数组类型不是Obejct类型的数组,那么就要将其转换成Object类型的数组,至于这样做的原因是:

如果没有这个Object类型转换的情况会是怎么样的,我们模拟一下

1
2
3
> List<Object> list = new ArrayList<Object>(Arrays.asList("yes", "no"));
> list.set(0, new Object());
>
注意:此处模拟指没有类型转换的情况

执行上面的代码,会调用指定元素的构造方法(推荐debug看)

  1. 当执行elementData = c.toArray(); 之后,elementData会是 String[] 类型的数组,而不是 Object[] 类型的

  2. 那么执行完该构造方法之后,上面代码就会等价于

    1
    2
    3
    >    Object[] list = new String[]{"yes","no"};
    > list[0]=new Object();
    >
  1. 然后当你再执行list.set(0, new Object()); 的时候,就会抛出ArrayStoreException 异常了

根本原因是toArray()方法中调用的是copyOf 方法,该方法实际使用的是泛型,虽然返回的是 Object[] 类型的数组,但是实际的类型已经声明成 String[] 了

增删改查方法

System.arraycopy、Arrays.copyOf

了解这两个方法对看下面几个方法很有帮助

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @param src 源数组
* @param srcPos 源数组中的起始位置
* @param dest 目标数组
* @param destPos 指定目标数据中的起始位置
* @param length 要复制的数组元素的数量.
*/
public static native void arraycopy(Object src, int srcPos,
Object dest, int destPos,
int length);

/**
* @param original 要复制的数组
* @param newLength 要返回的副本的长度
* @param newType 要返回的副本的类型
*/
public static <T,U> T[] copyOf(U[] original, int newLength, Class<? extends T[]> newType) {
@SuppressWarnings("unchecked")
T[] copy = ((Object)newType == (Object)Object[].class)
? (T[]) new Object[newLength]
: (T[]) Array.newInstance(newType.getComponentType(), newLength);
System.arraycopy(original, 0, copy, 0,
Math.min(original.length, newLength));
return copy;
}

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//执行顺序1.
//size默认值是0
public boolean add(E e) {
ensureCapacityInternal(size + 1); // Increments modCount!!
elementData[size++] = e;
return true;
}

//2.其实是执行3和4的方法
private void ensureCapacityInternal(int minCapacity) {
ensureExplicitCapacity(calculateCapacity(elementData, minCapacity));
}

//3.
/**
* 这个方法是计算list的容量的:
* 上面add方法调用的时候会传入(size+1)到这个计算的方法中
* 如果当前的数组是空数组,那么就比较(size+1)与默认的容量(默认是10),取较大的值返回.
* 如果不是空数组,那么直接返回(size+1)
*/
private static int calculateCapacity(Object[] elementData, int minCapacity) {
if (elementData == DEFAULTCAPACITY_EMPTY_ELEMENTDATA) {
//默认容量DEFAULT_CAPACITY是10
return Math.max(DEFAULT_CAPACITY, minCapacity);
}
return minCapacity;
}

//4.
/**
* 因为是add操作,对列表结构有改动,需要modCount++
* 如果传入的(size+1)值大于当前数组的长度的时候,才对此数组进行扩容的操作
*/
private void ensureExplicitCapacity(int minCapacity) {
modCount++;
if (minCapacity - elementData.length > 0)
grow(minCapacity);
}

//5.如果是超过当前数组长度才需要进行数组扩容的操作
/**
* 如果新数组大小还是小于传入的容量的话,那就直接取传入的容量作为新数组的长度
* 如果新数组的长度要比最大定义的数组长的话
* (最大的数组长度是Integer.MAX_VALUE-8)
* 那么就直接使用Integer.MAX_VALUE作为数组的长度.
* 然后就将数组的元素拷贝到一个新的数组中
*/
private void grow(int minCapacity) {
int oldCapacity = elementData.length;
//右移1位,此处得到新容量是旧容量的1.5倍
int newCapacity = oldCapacity + (oldCapacity >> 1);
if (newCapacity - minCapacity < 0)
newCapacity = minCapacity;
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
// minCapacity is usually close to size, so this is a win:
elementData = Arrays.copyOf(elementData, newCapacity);
}

​ addAll(Collection<? extends E> c) 方法和普通add方法类似,主要使用了System.arraycopy方法来拷贝指定元素

Set

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//插入到index的下标的位置,然后返回旧的值
public E set(int index, E element) {
//判断传入的下标是否越界
rangeCheck(index);

E oldValue = elementData(index);
elementData[index] = element;
return oldValue;
}

private void rangeCheck(int index) {
if (index >= size)
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}

Get

1
2
3
4
5
public E get(int index) {
//判断传入的下标是否越界
rangeCheck(index);
return elementData(index);
}

Remove

  1. 根据传入下标来移除列表中的元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public E remove(int index) {
//判断传入的下标是否越界
rangeCheck(index);
//删除操作修改了列表结构
modCount++;
E oldValue = elementData(index);
//需要进行移动的元素个数
int numMoved = size - index - 1;
if (numMoved > 0)
//移动数组的元素
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
//更新列表的元素个数,清除最后一个元素,让GC回收
elementData[--size] = null; // clear to let GC do its work

return oldValue;
}
  1. 根据传入的对象来删除元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public boolean remove(Object o) {
//移除null值元素
if (o == null) {
for (int index = 0; index < size; index++)
if (elementData[index] == null) {
fastRemove(index);
return true;
}
} else {
//for循环遍历列表,根据对象的equals方法来移除对应元素
for (int index = 0; index < size; index++)
if (o.equals(elementData[index])) {
fastRemove(index);
return true;
}
}
return false;
}

//此方法是移除列表中的元素,根据下标志移除
private void fastRemove(int index) {
modCount++;
int numMoved = size - index - 1;
if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
numMoved);
elementData[--size] = null; // clear to let GC do its work
}
  1. 移除包含指定元素的列表
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public boolean removeAll(Collection<?> c) {
//如果参数为null,那么抛出异常
Objects.requireNonNull(c);
return batchRemove(c, false);
}

private boolean batchRemove(Collection<?> c, boolean complement) {
final Object[] elementData = this.elementData;
int r = 0, w = 0;
boolean modified = false;
try {
//注意此处的complement是false
//所以就是将elementData中不存在c中的元素留下
for (; r < size; r++)
if (c.contains(elementData[r]) == complement)
elementData[w++] = elementData[r];
} finally {
//这里的判断的原因是因为 contains 方法可能会抛出异常
//如果抛出异常的情况下,就需要对 elementData 进行处理
if (r != size) {
System.arraycopy(elementData, r,
elementData, w,
size - r);
w += size - r;
}
//这里很简单,清除元素,修改 modCount 的值,然后返回true
if (w != size) {
for (int i = w; i < size; i++)
elementData[i] = null;
modCount += size - w;
size = w;
modified = true;
}
}
return modified;
}

ArrayList的迭代器

1
2
3
public Iterator<E> iterator() {
return new Itr();
}

ArrayList使用的迭代器是其内部类Itr

其内部类还有ListItr也是迭代器,只是这个迭代器可以在任意方向遍历列表,即可以向前/向后遍历。可以通过public ListIterator<E> listIterator()方法得到该迭代器

此处专注于迭代器Itr,先来查看该类的属性

  • int cursor :要返回的下一个元素的索引
  • int lastRet = -1:返回的最后一个元素的索引,如果没有返回-1
  • int expectedModCount = modCount:结构修改次数
  1. 遍历元素的方法

在遍历元素前列表都会比较结构修改次数,如果结构修改次数不对应,那么会抛出ConcurrentModificationException异常;所以在多线程对同个列表进行迭代和增删操作的时候,很大可能会抛出异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//是否还有下一个可遍历的元素
public boolean hasNext() {
return cursor != size;
}

public E next() {
checkForComodification();
int i = cursor;
//这里都是判断下标是否越界
if (i >= size)
throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length)
throw new ConcurrentModificationException();
//游标+1
cursor = i + 1;
return (E) elementData[lastRet = i];
}
//在获取元素之前,需要先判断列表结构是否被修改过
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
  1. 移除元素
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void remove() {
//这个地方就是为什么需要先执行 next 方法后才能调用 remove 方法的原因
if (lastRet < 0)
throw new IllegalStateException();
//在移除元素之前,需要先判断列表结构是否被修改过
checkForComodification();

try {
//调用arrayList的remove方法
ArrayList.this.remove(lastRet);
//重置一下游标,因为 next 操作里面将游标+1了
cursor = lastRet;
lastRet = -1;
//更新结构修改次数,上面调用remove方法导致modCount增加了
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}

ArrayList的遍历方式及问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* List的遍历方式
*/
public void cycleList() {
Integer[] ints = {1,2,3,4,5,6,7,8,9};
// Arrays.asList(ints) 将数组转成List
List<Integer> lists = Arrays.asList(ints);
/**
* 1.for循环遍历
*
* foreach循环(增强for循环)
*/
for (int i = 0;i < lists.size();i++) {
System.out.println(lists.get(i));
}
for (Integer i : lists) {
System.out.println(i);
}


/**
* 2.迭代器遍历
*/
Iterator<Integer> iterator = lists.iterator();
while (iterator.hasNext()) {
System.out.println(iterator.next());
}

/**
* 3.JDK8的stream流方式
*/
lists.stream().forEach(System.out::println);
}

注意:

for循环遍历元素的时候,如果需要在for循环中删除元素,此时需要注意:

  • 使用foreach(即增强for循环)的话会有下标越界问题
  • for循环如果长度length是每次判断都重新获取的话是没有问题的
  • for循环如果是事先定义好长度length的话会发生下标越界问题

ArrayList元素去重

遍历列表去重

实现思路:

  1. 定义(new)一个新的List集合
  2. 遍历原集合中的每一个元素,然后判断新集合中是否包含了该元素
    • 包含就不添加到新集合
    • 不包含则加到新集合中
  3. 然后返回新的集合得到去重后的List

自定义对象去重

实现思路:跟遍历去重方式思路基本上一致,只是对于对象的遍历判断,是需要重写equals和hasCode方法来比较对象是否重复

JDK8的流方式去重

1
list.stream().distinct().forEach(System.out::println);
注意: 如果列表中是对象的话,一定要重写对象的equals() 和hasCode() 方法来定义对象是否重复的规则



### SubList方法的坑

SubList方法能够返回指定下标区间的父List的视图、视图、视图

1
2
3
4
5
6
7
8
9
10
11
12
13
//不包含toIndex下标的元素,即左闭右开
public List<E> subList(int fromIndex, int toIndex) {
subListRangeCheck(fromIndex, toIndex, size);
return new SubList(this, 0, fromIndex, toIndex);
}
SubList(AbstractList<E> parent,
int offset, int fromIndex, int toIndex) {
this.parent = parent;
this.parentOffset = fromIndex;
this.offset = offset + fromIndex;
this.size = toIndex - fromIndex;
this.modCount = ArrayList.this.modCount;
}


要注意的地方就是:调用这个方法并不是返回一个全新的列表,而是返回一个视图。所以在操作这个方法得到的视图会影响原列表的内容。简单来说就是subList后得到的列表跟原列表是同一个对象



## LinkedList

### 小结

- LinkedList底层的实现是链表(双向链表),可以允许元素为null值,LinkedList是有序集合
- 增加和删除的时候只需要修改节点上的指针即可,效率比较高
- 查找和修改的时候就只能从链表的头出发一直往下遍历找到该目标元素的节点才能进行操作,效率很低
- LinkedList内部查询的时候使用的size >> 1 的方式(即折半法查找)来查找index,从而提高查询的效率
- LinkedList 集合插入元素的位置影响 LinkedList 集合插入的效率,插入位置越靠前或者越靠后,效率越高;位置越中间效率低

大致结构

  1. transient Node<E> first 存放头结点
  2. transient Node<E> last 存放尾节点
  3. modCount:fail-fast机制
  4. Node节点;LinkedList内部使用了Node类的存放元素的,新增元素的时候,会将元素包装成Node节点来进行操作
1
2
3
4
5
6
7
8
9
10
11
private static class Node<E> {
E item; //元素
Node<E> next; //后继节点
Node<E> prev; //前置节点

Node(Node<E> prev, E element, Node<E> next) {
this.item = element;
this.next = next;
this.prev = prev;
}
}

构造方法

  1. 无参构造(真无参。。)
1
2
public LinkedList() {
}
  1. 指定元素列表的构造器
1
2
3
4
5
//调用addAll方法,这个下面详说
public LinkedList(Collection<? extends E> c) {
this();
addAll(c);
}

增删改查方法

add

LinkedList的增加方法的核心是linkBefore(E e, Node<E> succ)、linkFirst(E e)、linkLast(E e)和node(int index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//添加元素到链表末尾
//将元素包装成Node节点,然后就是链表的操作了
// 如果尾节点是空的,那么将该节点作为头和尾节点
// 如果尾节点非空,那么将该节点作为尾节点,并且加入到链表中
void linkLast(E e) {
final Node<E> l = last;
final Node<E> newNode = new Node<>(l, e, null);
last = newNode;
if (l == null)
first = newNode;
else
l.next = newNode;
size++;
modCount++;
}
//添加元素到链表头
private void linkFirst(E e) {
final Node<E> f = first;
final Node<E> newNode = new Node<>(null, e, f);
first = newNode;
if (f == null)
last = newNode;
else
f.prev = newNode;
size++;
modCount++;
}
//链表操作,将 e 插入到 succ 前面
void linkBefore(E e, Node<E> succ) {
// succ 不能为null
final Node<E> pred = succ.prev;
final Node<E> newNode = new Node<>(pred, e, succ);
succ.prev = newNode;
if (pred == null)
//即succ是头节点的情况
first = newNode;
else
pred.next = newNode;
size++;
modCount++;
}
/**
* 采用折半法得到index所在的位置(左/右区间)
* 然后采用for循环遍历的方式得到index位置上的元素
*/
Node<E> node(int index) {
if (index < (size >> 1)) {
Node<E> x = first;
for (int i = 0; i < index; i++)
x = x.next;
return x;
} else {
Node<E> x = last;
//注意:这里是从后往前遍历
for (int i = size - 1; i > index; i--)
x = x.prev;
return x;
}
}
  1. add(E e)
1
2
3
4
5
public boolean add(E e) {
//插入链表的末尾
linkLast(e);
return true;
}
  1. add(int index, E element)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void add(int index, E element) {
//检查下标是否越界
checkPositionIndex(index);

if (index == size)
//添加到链表最后,即调用上面的add(E e)方法
linkLast(element);
else
//插入node(index)节点之前
linkBefore(element, node(index));
}
//检查下标是否越界和index是否小于0
private void checkPositionIndex(int index) {
if (!isPositionIndex(index))
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}
private boolean isPositionIndex(int index) {
return index >= 0 && index <= size;
}
  1. addAll方法,带参构造函数中也调用此方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public boolean addAll(Collection<? extends E> c) {
return addAll(size, c);
}
public boolean addAll(int index, Collection<? extends E> c) {
//检查下标是否越界
checkPositionIndex(index);
//得到指定插入的元素
Object[] a = c.toArray();
int numNew = a.length;
if (numNew == 0)
return false;
//获取插入位置的前驱和后继节点
Node<E> pred, succ;
if (index == size) {
succ = null;
pred = last;
} else {
succ = node(index);
pred = succ.prev;
}
//for循环插入指定元素,并且设置好上面的前驱和后继节点
for (Object o : a) {
E e = (E) o;
Node<E> newNode = new Node<>(pred, e, null);
if (pred == null)
first = newNode;
else
pred.next = newNode;
pred = newNode;
}
if (succ == null) {
last = pred;
} else {
pred.next = succ;
succ.prev = pred;
}
size += numNew;
modCount++;
return true;
}

Get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//返回头节点元素内容
public E getFirst() {
final Node<E> f = first;
if (f == null)
throw new NoSuchElementException();
return f.item;
}
//返回尾节点元素内容
public E getLast() {
final Node<E> l = last;
if (l == null)
throw new NoSuchElementException();
return l.item;
}
//获取指定下标的元素
public E get(int index) {
checkElementIndex(index);
//折半for循环查找指定下标的元素
return node(index).item;
}
//检查下标是否越界和index是否小于0
private void checkElementIndex(int index) {
if (!isElementIndex(index))
throw new IndexOutOfBoundsException(outOfBoundsMsg(index));
}
private boolean isElementIndex(int index) {
return index >= 0 && index < size;
}

Remove

LinkedList的删除方法的核心是unlink(Node<E> x)、unlinkFirst(Node<E> f)和unlinkLast(Node<E> l)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
//要求参数 x 不能为null
E unlink(Node<E> x) {
//得到需要删除节点的内容、前驱节点和后继节点
final E element = x.item;
final Node<E> next = x.next;
final Node<E> prev = x.prev;

if (prev == null) {
//该节点是头结点的情况
first = next;
} else {
prev.next = next;
x.prev = null;
}

if (next == null) {
//该节点是尾节点的情况
last = prev;
} else {
next.prev = prev;
x.next = null;
}
x.item = null;
size--;
modCount++;
return element;
}

private E unlinkFirst(Node<E> f) {
// 要求f是头节点并且不为null
final E element = f.item;
final Node<E> next = f.next;
f.item = null;
f.next = null; // help GC
first = next;
if (next == null)
//链表元素只有一个的情况
last = null;
else
next.prev = null;
size--;
modCount++;
return element;
}

private E unlinkLast(Node<E> l) {
// 要求l是尾节点并且不为null
final E element = l.item;
final Node<E> prev = l.prev;
l.item = null;
l.prev = null; // help GC
last = prev;
if (prev == null)
//链表元素只有一个的情况
first = null;
else
prev.next = null;
size--;
modCount++;
return element;
}
  1. remove()移除第一个节点,即当前头节点
  2. remove(int index)移除下标是index的节点,先检查下标是否越界,再利用node(int index)方法得到节点,最后用unlink(Node<E> x)移除
  3. removeFirst()和removeLast()方法。根据名称可知作用,如果头结点/尾节点为null的情况,即链表为空的情况会抛出NoSuchElementException,然后各自调用unlinkFirst和unlinkLast删除节点
  4. remove(Object o)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//删除指定元素的节点,利用for循环便利链表,找到第一个配对的元素删除
//注意:如果链表有多个相同的元素,该方法只会删除第一个
public boolean remove(Object o) {
if (o == null) {
for (Node<E> x = first; x != null; x = x.next) {
if (x.item == null) {
unlink(x);
return true;
}
}
} else {
for (Node<E> x = first; x != null; x = x.next) {
if (o.equals(x.item)) {
unlink(x);
return true;
}
}
}
return false;
}

Set

1
2
3
4
5
6
7
8
9
public E set(int index, E element) {
//检查下标是否越界或者index是否小于0
checkElementIndex(index);
//找到index下标的元素,更新该item内容后返回旧值
Node<E> x = node(index);
E oldVal = x.item;
x.item = element;
return oldVal;
}

pop、poll、peek、push、offer

  1. peek获取头结点;如果链表为空则返回null
  2. pop弹出头结点,头结点会被删除。如果链表为空,那么抛出NoSuchElementException
  3. poll弹出头结点,头结点会被删除。如果链表为空则返回null
  4. push插入元素作为头结点
  5. offer插入节点到链表末尾,相当于入队操作

peek和pop方法的区别是:peek方法和pop方法都是返回头节点的元素,但是peek方法不会删除头结点只返回null,而pop方法会删除头节点(相当于出栈)

ArrayList和LinkedList的区别

底层实现和线程安全问题

  • ArrayList
    • 底层数据结构是数组。线程不安全
  • LinkedList
    • 底层数据结构是链表。线程不安全
  • Vector
    • 底层数据结构是数组。线程安全(方法都加上了synchronized同步锁)

增删改查方面

总体来说:

  • ArrayList 查询和修改要比 LinkedList 快
  • LinkedList 增加和删除要比 ArrayList 快
注意:上面是总体而已,某些情况下不成立
  1. 如果删除元素是删除末尾元素的情况下,ArrayList 要比 LinkedList 快,因为 LinkedList 需要操作前后指针
  2. 如果删除的是中间的位置的元素,还是 ArrayList 快,因为 LinkedList 的折半查找法分别是从 头/尾 往中间找元素的,所以索引越靠近中间,LinkedList 的性能就越差
  3. 如果增加的元素一直都是在列表的最后增加, 那么 ArrayList 会比 LinkedList 快;此时的扩容操作占整体时间是很少的,而 LinkedList 还需要新建对象并且操作链表的前驱和后继节点

CopyOnWriteArrayList

​ CopyOnWriteArrayList是JUC包提供的线程安全的ArrayList,底层通过复制数组的方式来实现;CopyOnWriteArrayList在迭代器遍历的使用不会抛出ConcurrentModificationException异常(即没有快速失败机制),并且迭代器遍历的时候就不用额外加锁

​ copy-on-write是指写时复制;如果有多个调用者同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变

优点是如果调用者没有修改该资源,就不会有副本(private copy)被建立,因此多个调用者只是读取操作时可以共享同一份资源

大致结构

  1. final transient ReentrantLock lock = new ReentrantLock() 该列表通过 ReentrantLock 来实现加锁操作
  2. private transient volatile Object[] array CowArrayList用来存储数据的数组

对数组的修改操作

add

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean add(E e) {
final ReentrantLock lock = this.lock;
//加锁操作
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
//复制一个新的数组,新增操作在新数组上完成
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len] = e;
//将新数组设置到 array 上
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}

CowArrayList通过复制一个新数组,并且增加操作在新数组上完成,最后将array指向到新数组中;跟ArrayList的区别就是:

  • CowArrayList有加锁操作
  • CowArrayList新增的操作在复制好的新数组上完成,而ArrayList是在原数组上完成

CowArrayList的set()、remove()操作都类似add()操作,都是在新数组上完成操作后将array指向新数组

剖析迭代器

​ 因为CopyOnWriteArrayList在迭代器遍历的使用不会抛出ConcurrentModificationException异常,并且迭代器遍历的时候就不用额外加锁;这个原因就在于CopyOnWriteArrayList在使用迭代器的时候,保存了一份原数组的副本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
static final class COWIterator<E> implements ListIterator<E> {
//原数组的副本
private final Object[] snapshot;
//游标
private int cursor;
//CopyOnWriteArrayList调用 iterator() 执行的构造函数
private COWIterator(Object[] elements, int initialCursor) {
cursor = initialCursor;
snapshot = elements;
}
public boolean hasNext() {
return cursor < snapshot.length;
}
public boolean hasPrevious() {
return cursor > 0;
}

public E next() {
if (! hasNext())
throw new NoSuchElementException();
return (E) snapshot[cursor++];
}
public E previous() {
if (! hasPrevious())
throw new NoSuchElementException();
return (E) snapshot[--cursor];
}
public int nextIndex() {
return cursor;
}
public int previousIndex() {
return cursor-1;
}
//CopyOnWriteArrayList的迭代器不支持一下方法
public void remove() {
throw new UnsupportedOperationException();
}
public void set(E e) {
throw new UnsupportedOperationException();
}
public void add(E e) {
throw new UnsupportedOperationException();
}
}

显而易见:

​ 在调用迭代器的方法时,迭代器会保存一份原数组的副本,即snapshot。该迭代器的所有操作都是对原数组进行操作,因此也就没有线程间的读写问题了,也就不需要快速失败机制和加锁操作了,所以CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性

CopyOnWriteArrayList小结

  • 如果 CopyOnWriteArrayList 需要经常增删改列表的数据,经常要执行add()、set()、remove()的话,那是比较耗费内存的(因为都需要复制一个新数组进行增删改)
  • 数据一致性:CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性
  • 相对于ArrayList和Vector来说,遍历数组时不会受其他线程对数组操作而造成影响(读的是原数组的副本)

Set 对应的是 CopyOnWriteArraySet

跨域请求

发表于 2019-05-21 | 分类于 分布式

跨域请求

​ 跨域请求就是指:当前发起请求的域与该请求指向的资源所在的域不一样。这里的域指的是这样的一个概念:我们认为如果协议 + 域名 + 端口号均相同,那么就是同域,否则则是跨域的

​ 跨域请求是由于浏览器的同源策略导致的,浏览器的同源策略是不能没有的(同源策略是浏览器最核心最基础的安全策略)

  1. 同源策略禁止 Ajax 直接发起跨域HTTP请求(其实可以发送请求,结果被浏览器拦截,不展示),同时 Ajax 请求不能携带与本网站不同源的 Cookie;如果没有同源策略别的域名就可以拿到你浏览器上其他的Cookie信息,这样会导致很多重要的信息泄露,例如不法的网站可能会利用你的cookie去登录一些网站,盗用你的信息等。
  2. DOM 层面的同源策略限制了来自不同源的Document对象或 JS 脚本,对当前document对象的读取或设置某些属性;没有同源策略,一些脚本就能获取到你的用户密码输入框的内容信息

JSONP

​ JSONP 是一种非官方的跨域数据交互协议;JSONP 本质上是利用

分布式锁及其实现

发表于 2019-05-13 | 分类于 分布式

为什么需要分布式锁

​ 在单机部署的项目中,多线程间的并发控制可以由Java相关的并发处理API来控制线程间的通信和互斥。但是在分布式集群的系统中,单机部署情况下的并发控制策略就会失效了,单纯的Java API是不具备分布式环境下的并发控制能力的;所以这就需要一种跨JVM的互斥机制来控制对共享资源的访问,这就是分布式锁要解决的问题了

​ 在分布式场景下,CAP理论已经证明了任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项;所以为了保证在分布式环境下的数据最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等

分布式锁的特性

  1. 在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行
  2. 高可用、高性能的获取锁与释放锁
  3. 具备可重入特性
  4. 具备锁失效机制,防止死锁
  5. 具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败

分布式锁的三种实现方案

数据库实现

​ 数据库实现分布式锁主要是依赖唯一索引

(唯一索引:不允许具有索引值相同的行,从而禁止重复的索引或键值。数据库会在创建该索引时检查是否有重复的键值,并在每次使用 INSERT 或 UPDATE 语句时进行检查)

​ 实现的思路:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,因为做了唯一索引,所以即使多个请求同时提交到数据库,都只会保证只有一个操作能够成功,插入成功则获取到该方法的锁,执行完成后删除对应的行数据释放锁

1
2
3
4
5
6
7
8
9
CREATE TABLE `distributed_lock` (
`id` int(11) NOT NULL COMMENT '主键',
`method_name` varchar(64) NOT NULL COMMENT '方法名(需要锁住的方法名)',
`desc` varchar(255) NOT NULL COMMENT '备注信息',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `index_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

数据库实现分布式锁的增强

  1. 该分布式锁依赖数据库的可用性,如果数据库是单点且挂掉,那么分布式锁功能失效
    • 解决方案:
      • 多机部署,数据同步,数据库主备切换
  2. 同一个线程在释放锁之前,行数据一直存在,无法再次插入数据;这种情况下该分布式锁不具备可重入性
    • 解决方案:在表中新增一列用于记录当前获取到锁的机器和线程信息,在该线程再次获取锁的时候,先查询表中机器和线程信息是否和当前机器和线程相同,若相同则直接获取锁
  3. 没有锁失效的机制可能会出现在获取锁之后,数据库宕机,对应的行数据没有被删除,等到数据库服务器恢复后,表中的数据仍然存在,从而无法再获取到锁;或者释放锁失败
    • 解决方案:
      • 在表中新增一列,用于记录失效时间,并且需要有定时任务清除这些失效的数据;此时也需要根据业务需求考虑定时任务的执行时间,不能过长或者过短
      • 多机部署,数据同步,数据库主备切换
  4. 阻塞锁特性,在代码逻辑中增加失败重试机制(while循环),根据业务需求多次去获取锁直到成功或者达到失败次数后返回等等

数据库实现分布式锁的问题

​ 虽然我们对method_name 使用了唯一索引,并且显示使用for update来使用行级锁。

​ 但是,MySql会对查询进行优化,即便在条件中使用了索引字段,但是否使用索引来检索数据是由 MySQL 通过判断不同执行计划的代价来决定的,如果 MySQL 认为全表扫效率更高,比如对一些很小的表,它就不会使用索引,这种情况下 InnoDB 将使用表锁,而不是行锁

Redis实现分布式锁

实现思路:

- setnx:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0

1. 获取锁的时候,使用setnx加锁,锁的value值可以是一个随机生成的UUID,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁
2. 获取锁的时候设置一个获取锁的超时时间,若超过这个时间则放弃获取锁
3. 释放锁的时候,通过随机生成的UUID去匹对锁的键值对是否对应,若是则执行delete释放锁



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/**
* redis实现分布式锁
*/
@Component
public class DistributedLock {
@Autowired
private JedisPool jedisPool;

/**
* 加锁
* @param lockName 存放redis中的key
* @param acquireTimeOut 分布式锁的过期时间
* @param timeout 获取锁的超时时间
* @return
*/
public String lockWithTimeOut(String lockName, int acquireTimeOut, long timeout) {
/**
* 先setnx key是否成功;
* 成功则设置随机值(UUID),然后设置过期时间,返回随机值给释放锁用
*
* 失败则计算获取锁的超时时间,时间未到则自旋获取锁直到成功或者达到超时时间
*/
String identifier = UUID.randomUUID().toString().replaceAll("-", "");
timeout = System.currentTimeMillis() + timeout;
String reIdentifier = "";

Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.select(0);
//带超时时间的循环获取锁实现锁阻塞特性
while(System.currentTimeMillis() < timeout){
Long setnx = jedis.setnx(lockName, identifier);
if (setnx != null && setnx == 1){
//设置过期时间
jedis.expire(lockName, acquireTimeOut);
reIdentifier = identifier;
break;
}else {
//这一步很重要
//如果key已经存在,查看过期时间,如果该key无过期时间则重新设置过期时间,以免发生死锁
Long ttl = jedis.ttl(lockName);
if (ttl == -1){
jedis.expire(lockName, acquireTimeOut);
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.println("线程中断");
Thread.currentThread().interrupt();
}
}
}
} catch (Exception e) {
//TODO 处理异常
e.printStackTrace();
} finally {
if (jedis != null)
jedis.close();
}
return reIdentifier;
}

/**
* 释放锁
* @param lockName 锁的名称
* @param identifier 锁的标识(用来验证锁中的val是否一致)
* @return
*/
public boolean releaseLock(String lockName, String identifier){
Jedis jedis = null;
boolean flag = false;
try {
jedis = jedisPool.getResource();
jedis.select(0);
jedis.watch(lockName);
String result = jedis.get(lockName);
if (result != null && identifier.equals(result)){
Transaction multi = jedis.multi();
multi.del(lockName);
List<Object> exec = multi.exec();
if (exec != null && exec.size() > 0){
flag = true;
}
}
jedis.unwatch();
return flag;
} catch (Exception e) {
//TODO 处理异常
e.printStackTrace();
return false;
} finally {
if (jedis != null)
jedis.close();
}
}

}


#### Redis分布式锁的增强

1. 锁失效时间
- 锁失效的时间需要根据实际业务需求来设置一个合适的值
- 如果设置的失效时间太短,方法没等执行完,锁就自动释放了,那么就会产生并发问题
- 如果设置的时间太长,其他获取锁的线程就可能要平白的多等一段时间
2. 可利用while循环去获取锁,可以设置重试间隔时间和最大重试时间来实现锁阻塞特性
3. 不可重入
- 解决方案:
- 线程获取到锁之后,把当前主机信息和线程信息保存起来,下次再获取之前先检查自己是不是当前锁的拥有者;释放锁的时候将这些信息删除
4. 单点故障
- 解决方案:
- Redis集群,Redis主从



#### Redis实现分布式锁存在的问题

​ 这类最大的缺点就是它加锁时只作用在一个Redis节点上,即使Redis通过sentinel保证高可用,如果这个master节点由于某些原因发生了主从切换,那么就会出现锁丢失的情况:

1. 在Redis的master节点上拿到了锁
2. 但是这个加锁的key还没有同步到slave节点
3. master故障,发生故障转移,slave节点升级为master节点
4. 导致锁丢失



### Zookeeper实现分布式锁

实现思路:

​ 每个客户端对某个方法加锁时,在zookeeper上的与该方法对应的指定节点的目录下,生成一个唯一的瞬时有序节点(EPHEMERAL_SEQUENTIAL)

​ 使用Zookeeper可以实现的分布式锁是阻塞的,客户端可以通过在ZK中创建瞬时有序节点,并且在节点上绑定监听器,一旦节点发生变化,ZK会通知客户端,客户端可以检查自己创建的节点是不是当前所有节点中序号最小的,如果是那么自己就获取到锁,反之则继续等待

​ 当释放锁的时候,只需将这个瞬时节点删除即可。同时,其可以避免服务宕机导致的锁无法释放,而产生的死锁问题,因为瞬时节点在会话断开后就会自动删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
/**
* Zookeeper 实现分布式锁
*/
public class ZooKeeperLock implements Watcher {

// ZK对象
private ZooKeeper zk = null;
// 分布式锁的根节点
private String rootLockNode;
// 竞争资源,用来生成子节点名称
private String lockName;
// 当前锁
private String currentLock;
// 等待的锁(前一个锁)
private String waitLock;
// 计数器(用来在加锁失败时阻塞加锁线程)
private CountDownLatch countDownLatch;
// 超时时间
private int sessionTimeout = 30000;


/**
* 构造器中创建ZK链接,创建锁的根节点
*
* @param zkAddress ZK的地址
* @param rootLockNode 根节点名称
* @param lockName 子节点名称
*/
public ZooKeeperLock(String zkAddress, String rootLockNode, String lockName) {
this.rootLockNode = rootLockNode;
this.lockName = lockName;
try {
/**
* 创建连接,zkAddress格式为:IP:PORT
* watcher监听器为自身
*/
zk = new ZooKeeper(zkAddress, this.sessionTimeout, this);
/**
* 检测锁的根节点是否存在,不存在则创建
*/
Stat stat = zk.exists(rootLockNode, false);
if (null == stat) {
zk.create(rootLockNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 加锁方法,先尝试加锁,不能加锁则等待上一个锁的释放
*
* @return
*/
public boolean lock() {
if (this.tryLock()) {
System.out.println("线程【" + Thread.currentThread().getName() + "】加锁(" + this.currentLock + ")成功!");
return true;
} else {
return waitOtherLock(this.waitLock, this.sessionTimeout);
}
}

public boolean tryLock() {
// 分隔符
String split = "_lock_";
if (this.lockName.contains("_lock_")) {
throw new RuntimeException("lockName can't contains '_lock_' ");
}
try {
/**
* 创建锁节点(临时有序节点)并且得到节点名称
*
* path: 根节点/子锁名称+分隔符
*/
this.currentLock = zk.create(this.rootLockNode + "/" + this.lockName + split, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

System.out.println("线程【" + Thread.currentThread().getName()
+ "】创建锁节点(" + this.currentLock + ")成功,开始竞争...");

/**
* 获取所有子节点
*/
List<String> nodes = zk.getChildren(this.rootLockNode, false);
/**
* 获取所有正在竞争lockName的锁
*/
List<String> lockNodes = new ArrayList<String>();
for (String nodeName : nodes) {
if (nodeName.split(split)[0].equals(this.lockName)) {
lockNodes.add(nodeName);
}
}
Collections.sort(lockNodes);

/**
* 获取最小节点与当前锁节点比对加锁
*
* 比对最小节点的名称是否跟刚才创建的临时节点名称一致
* 一致则证明当前加锁成功
*/
String currentLockPath = this.rootLockNode + "/" + lockNodes.get(0);
if (this.currentLock.equals(currentLockPath)) {
return true;
}

/**
* 加锁失败,设置前一节点为等待锁节点
*/
String currentLockNode = this.currentLock.substring(this.currentLock.lastIndexOf("/") + 1);
int preNodeIndex = Collections.binarySearch(lockNodes, currentLockNode) - 1;
this.waitLock = lockNodes.get(preNodeIndex);

} catch (Exception e) {
e.printStackTrace();
}
return false;
}

/**
* 等待获取锁,带超时时间
*
* @param waitLock 当前节点的前一个锁
* @param sessionTimeout 等待获取锁的超时时间
* @return
*/
private boolean waitOtherLock(String waitLock, int sessionTimeout) {
boolean islock = false;
try {
// 监听等待锁节点
String waitLockNode = this.rootLockNode + "/" + waitLock;
Stat stat = zk.exists(waitLockNode, true);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName()
+ "】锁(" + this.currentLock + ")加锁失败,等待锁(" + waitLockNode + ")释放...");
/**
* 设置计数器,使用计数器阻塞线程,带超时时间
*/
this.countDownLatch = new CountDownLatch(1);
islock = this.countDownLatch.await(sessionTimeout, TimeUnit.MILLISECONDS);
this.countDownLatch = null;
if (islock) {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁成功,锁(" + waitLockNode + ")已经释放");
} else {
System.out.println("线程【" + Thread.currentThread().getName() + "】锁("
+ this.currentLock + ")加锁失败...");
}
} else {
islock = true;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return islock;
}

/**
* 释放分布式锁
*
* @throws InterruptedException
*/
public void unlock() throws InterruptedException {
try {
Stat stat = zk.exists(this.currentLock, false);
if (null != stat) {
System.out.println("线程【" + Thread.currentThread().getName() + "】释放锁 " + this.currentLock);
zk.delete(this.currentLock, -1);
this.currentLock = null;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
} finally {
zk.close();
}
}

/**
* 节点监听器回调
*
* @param watchedEvent
*/
@Override
public void process(WatchedEvent watchedEvent) {
/**
* 监听节点删除的事件
* 计数器减一,恢复线程操作
*/
if (null != this.countDownLatch && watchedEvent.getType() == Event.EventType.NodeDeleted) {
this.countDownLatch.countDown();
}
}
}

Curator的分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Curator 实现的分布式锁:
* InterProcessMutex: 分布式可重入排它锁
* InterProcessSemaphoreMutex: 分布式排它锁
* InterProcessReadWriteLock: 分布式读写锁
* InterProcessMultiLock: 将多个锁作为单个实体管理的容器
*/
public class CuratorLock {

public static void main(String[] args) {
/**
* 设置重试策略,创建zk客户端
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
// 启动客户端
client.start();
/**
* 创建分布式可重入排他锁,监听客户端为client,锁的根节点为/locks
*/
InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
try {
/**
* 加锁操作
* public boolean acquire(long time, TimeUnit unit)
* 第一个参数是超时时间
* 第二个参数是时间的单位
*/
mutex.acquire(3, TimeUnit.SECONDS);

/**
* 释放锁
*/
mutex.release();
} catch (Exception e) {
e.printStackTrace();
} finally {
client.close();
}
}
}

ZK实现分布式锁的问题

​ 使用Zookeeper也有可能带来并发问题:由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了,就可能产生并发问题。

​ 这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点(所以选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡)

三者的比较

  1. 从性能角度(从高到低)
    • 缓存 > Zookeeper >= 数据库
  2. 从可靠性角度(从高到低)
    • Zookeeper > 缓存 > 数据库

Linux操作系统的五种IO模型

发表于 2019-05-08 | 分类于 Linux

Linux操作系统的五种IO模型

Linux系统的IO

​ Java中常说的IO是指文件的输入和输出;而在操作系统中的一次IO可以简化成把数据从硬件(硬盘)中读取到用户空间中;详细分为两个阶段

  • 将数据从磁盘文件先加载到内核内存空间(缓存区),等待数据准备完成,此过程时间较长。(准备数据的阶段)
  • 然后将数据从内核缓冲区复制到用户空间的进程内存中,此过程时间较短(拷贝数据阶段)

在了解五种模型前先知道一些名词

  • 同步/异步:关注点是消息的通信机制

    • 同步:调用者等到被调用者返回消息,才能继续往下执行
    • 异步:被调用者通过状态、通知或者回调机制主动通知调用者当前被调用者的状态
  • 阻塞/非阻塞:关注点是调用者在等待结果返回之前所处的状态

    • 阻塞:指在IO操作彻底完成后才返回到用户空间,在获得调用结果之前被挂起
    • 非阻塞:指在发起IO操作后立即返回给用户一个状态值,无须等到IO操作彻底完成才返回;在调用结果返回之前,用户进程不会被挂起
  • 文件描述符FD

    ​ 文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。文件描述符在形式上是一个非负整数

    ​ 实际上它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符

    ​ 在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统

同步阻塞IO模型

​ 同步阻塞UI模型是最简单的IO模型,用户线程在内核进行IO操作的时候被阻塞;用户线程通过系统调用recvfrom 发起IO读操作,由用户空间转到内核空间。内核等到数据包到达后,再将接受的数据拷贝到用户空间,这才能完成一次读操作。

​ 但是用户在进行读操作的时候,由于内核还不能立刻准备好数据包,应用进程就会阻塞住,直到内核准备好数据包,recvfrom 完成数据报复制工作,应用进程才能结束阻塞状态;这就导致用户在发起IO请求时,不能再进行其他任何操作,这对CPU的资源利用率是很低的

同步阻塞IO模型

同步非阻塞IO模型

​ 用户线程在发起IO请求后立即返回,但是此时并没有读取到任何数据,用户线程需要不断的通过 recvfrom 调用去和内核交互,直到内核准备好数据(即轮询机制)。如果没有准备好,内核会返回error,应用进程在得到error后,过一段时间再发送recvfrom请求。在两次发送请求的时间段,进程可以先做别的事情

​ 整个IO请求的过程,虽然用户线程立即返回了,但是为了得到数据还是需要通过轮询机制去请求是否准备好数据;重复的请求会消耗大量的CPU资源,一般很少会直接使用这个模型

同步非阻塞IO模型

信号驱动IO模型

​ 用户进程可以通过sigaction系统调用注册一个信号处理程序,然后主程序返回不会阻塞;当有IO操作准备就绪时,由内核通知触发一个SIGIO信号处理程序执行,然后将用户进程再通过recvfrom将数据从内核空间拷贝到用户空间(数据拷贝是同步的)

​ 这种模型的优势在于等到数据包到达期间进程不会被阻塞,用户主程序可以继续执行,只要等待来自信号处理函数的通知即可(此模式实现复杂,不常用)

信号驱动IO模型

IO多路复用模型

​ 多个进程的IO可以注册到同一个select上,当用户进程调用该select,select会监听所有注册好的IO,如果所有被监听的IO需要的数据都没有准备好时,进程调用select后会阻塞;当任意一个IO所需的数据准备好之后,select调用就会返回,然后用户进程在通过recvfrom来进行数据拷贝

​ 虽然select可以同时监控多个IO操作,但是每个IO请求过程还是会阻塞(阻塞在select函数)的;进程在发出select后,要等到select监听的所有IO操作中至少有一个需要的数据准备好,才会有返回,并且也需要再次发送请求去进行文件的拷贝

​ 所以如果处理的连接数不是很高的话,使用IO多路复用不一定比使用多线程 + 阻塞 IO的性能更好,可能延迟还更大。IO多路复用的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接

IO多路复用模型

select、poll、epoll

select,poll,epoll 都是多路复用IO的函数;其三者的区别在于:

  1. select
    • select函数会无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长
    • select目前几乎在所有的平台上支持,其良好跨平台支持也是它的一个优点
    • 其缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上32位机默认是1024个,64位机默认是2048。也可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低
  2. poll
    • poll本质上和select没有区别, 但是它没有最大连接数的限制,原因是它是基于链表来存储的(select是数组存储)
  3. epoll
    • epoll是通过事件通知的方式(事件关联上fd),只要有可写的IO就会通过回调的方式告知用户进程
    • epoll的最大连接数也是没有限制的,其基于哈希表来存储

异步IO模型

​ 用户进程发起aio_read操作之后,给内核传递描述符、缓冲区指针、缓冲区大小等,告诉内核当整个操作完成时,如何通知进程,然后就立刻去做其他事情了。当内核收到aio_read后,会立刻返回,然后内核开始等待数据准备,数据准备好以后,直接把数据拷贝到用户空间,然后再通知进程本次IO已经完成

​ 相比于IO多路复用,异步IO并不常用,更多的服务程序会使用IO多路复用模型 + 多线程任务处理的架构来满足业务需求

异步IO与信号驱动最主要的区别是

  1. 信号驱动IO是由内核通知用户进程何时进行IO操作,而异步IO是内核把数据拷贝到用户空间后才通知用户进程IO操作何时完成
  2. 信号驱动还需要用户进程阻塞在从内核空间缓冲区拷贝数据到用户空间,而异步IO是内核直接把所有数据准备好并且拷贝到用户空间后才通知用户进程可以进行后续的操作;即数据准备阶段两者都是非阻塞的,而数据拷贝阶段,信号驱动是阻塞的而异步IO是非阻塞的

异步IO模型

正向代理和反向代理

发表于 2019-05-06 | 分类于 分布式

正向代理和反向代理

正向代理

​ 正向代理是一个位于客户端和原始服务器(origin server)之间的服务器,为了从原始服务器取得内容,客户端向代理发送一个请求并指定目标(原始服务器),然后代理向原始服务器转交请求并将获得的内容返回给客户端。

​ 正向代理,其实是”代理服务器”代理了”客户端”,去和”目标服务器”进行交互;客户端必须要进行一些特别的设置才能使用正向代理

​ 举个栗子:一个国内用户访问不了Google,但是他能访问到一个代理服务器,这个代理服务器能够访问Google,于是该用户需要先连上代理服务器,然后告诉代理服务器需要访问哪个网站的内容,代理服务器去将网站内容取回来,然后返回给用户。从网站的角度,只在代理服务器来取内容的时候有一次记录,有时候并不知道是用户的请求,也隐藏了用户的资料,这取决于代理告不告诉访问的网站

正向代理的用途

  1. 突破访问限制

    ​ 通过代理服务器,可以突破自身IP访问限制,访问国外网站等

  2. 提高访问速度

    ​ 通常代理服务器都设置一个较大的硬盘缓冲区,会将部分请求的响应保存到缓冲区中,当其他用户再访问相同的信息时, 则直接由缓冲区中取出信息,传给用户,以提高访问速度

  3. 隐藏客户端真实IP

    ​ 上网者也可以通过这种方法隐藏自己的IP,免受攻击

反向代理

​ 反向代理(Reverse Proxy)是指以代理服务器来接受Internet上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给Internet上请求连接的客户端,此时代理服务器对外就表现为一个服务器

​ 反向代理,其实是”代理服务器”代理了”目标服务器”,去和”客户端”进行交互;通过反向代理服务器访问目标服务器时,客户端是不知道真正的目标服务器是谁的,甚至不知道自己访问的是一个代理

反向代理的用途

  1. 隐藏服务器真实IP

    ​ 使用反向代理,可以对客户端隐藏服务器的IP地址

  2. 负载均衡

    ​ 反向代理服务器可以做负载均衡

    ​ 根据所有真实服务器的负载情况,将客户端请求分发到不同的真实服务器上

  3. 提高访问速度

    反向代理服务器可以对于静态内容及短时间内有大量访问请求的动态内容提供缓存服务,提高访问速度

  4. 提供安全保障

    ​ 反向代理服务器可以作为应用层防火墙,为网站提供对基于Web的攻击行为(例如DoS/DDoS)的防护,更容易排查恶意软件等

    ​ 还可以为后端服务器统一提供加密和SSL加速(如SSL终端代理),提供HTTP访问认证等

正向代理和反向代理的区别

  1. 正向代理其实是客户端的代理,帮助客户端访问其无法访问的服务器资源

    反向代理则是服务器的代理,帮助服务器做负载均衡,安全防护等

  1. 正向代理一般是客户端架设的,比如在自己的机器上安装一个代理软件

    反向代理一般是服务器架设的,比如在自己的机器集群中部署一个反向代理服务器

  1. 正向代理中,服务器不知道真正的客户端到底是谁,以为访问自己的就是真实的客户端

    反向代理中,客户端不知道真正的服务器是谁,以为自己访问的就是真实的服务器

  1. 正向代理主要是用来解决访问限制问题

    反向代理则是提供负载均衡、安全防护等作用

图片来源:https://www.zhihu.com/question/24723688

代理

Java获取用户的IP地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 获取请求的真实ip地址
* @param request
* @return
*/
public String getRealIpAdrress(HttpServletRequest request) {
String ipAdrress = "";
String forwardFor = request.getHeader("X-Forwarded-For");
if (!T.isBlank(forwardFor) && !"unKnown".equalsIgnoreCase(forwardFor)) {
//多次反向代理后会有多个ip值,第一个ip才是真实ip
int index = forwardFor.indexOf(",");
if(index != -1){
ipAdrress = forwardFor.substring(0,index);
}else{
ipAdrress = forwardFor;
}
}
if (T.isBlank(ipAdrress) || "unKnown".equalsIgnoreCase(forwardFor)) {
ipAdrress = request.getHeader("Proxy-Client-IP");
}
if (T.isBlank(ipAdrress) || "unKnown".equalsIgnoreCase(forwardFor)) {
ipAdrress = request.getHeader("WL-Proxy-Client-IP");
}
if (T.isBlank(ipAdrress)) {
ipAdrress = request.getHeader("X-Real-IP");
}
if (T.isBlank(ipAdrress) || "unKnown".equalsIgnoreCase(forwardFor)) {
ipAdrress = request.getHeader("HTTP_CLIENT_IP");
}
if (T.isBlank(ipAdrress) || "unKnown".equalsIgnoreCase(forwardFor)) {
ipAdrress = request.getHeader("HTTP_X_FORWARDED_FOR");
}
if (T.isBlank(ipAdrress) || "unKnown".equalsIgnoreCase(forwardFor)) {
ipAdrress = request.getRemoteAddr();
}
return ipAdrress;
}

getRemoteAddr

​ request.getRemoteAddr()获取客户端的ip地址

​ 但是如果使用了反向代理软件,例如Nginx,用request.getRemoteAddr()方法获取的IP地址都会是反向代理服务器的ip,而并不是客户端的真实IP。
​ 经过代理以后,由于在客户端和服务之间增加了中间层,因此服务器无法直接拿到客户端的IP,服务器端应用也无法直接通过转发请求的地址返回给客户端。但是在转发请求的HTTP头信息中,增加了X-Forwarded-For信息。用以跟踪原有的客户端IP地址和原来客户端请求的服务器地址

X-Forwarded-For

​ X-Forwarded-For是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段

​ 如果有配置X-Forwarded-For设置的话,每次经过proxy转发都会有记录,格式就是:

(client1, proxy1, proxy2) 以逗号隔开各个地址(client指客户端IP,proxy指反向代理服务器的IP)

即设置了X-Forwarded-For,取逗号分隔的第一项即为客户端的IP

在代理转发及反向代理中经常使用X-Forwarded-For 字段

  1. 代理转发(正向代理)

    ​ 在代理转发的场景中,你可以通过内部代理链以及记录在网关设备上的IP地址追踪到网络中客户端的IP地址

    ​ 处于安全考虑,网关设备在把请求发送到外网(因特网)前,应该去除 X-Forwarded-For 字段里的所有信息

    ​ 这种情况下所有的信息都是在你的内部网络内生成,因此X-Forwarded-For字段中的信息应该是可靠的

  2. 反向代理

    ​ 在反向代理的情况下,你可以追踪到互联网上连接到你的服务器的客户端的IP地址,即使你的网络服务器和互联网在路由上是不可达的

    ​ 这种情况下你不应该信任所有X-Forwarded-For信息,其中有部分可能是伪造的。因此需要建立一个信任白名单来确保X-Forwarded-For中哪些IP地址对你是可信的

    ​ 最后一次代理服务器的地址并没有记录在代理链中,因此只记录 X-Forwarded-For 字段是不够的。完整起见,Web服务器应该记录请求来源的IP地址以及X-Forwarded-For 字段信息

X-Forwarded-For 和 X-Real-IP** 区别

​ X-Real-IP如果经过多级代理的情况下,其记录的IP不一定是真实的客户端IP;如果有多级代理,x-forwarded-for效果是大于x-real-ip的,可以记录完整的代理链路

zookeeper小结

发表于 2019-05-04 | 分类于 分布式

什么是Zookeeper?

​ Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目。

​ ZooKeeper是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务

​ 分布式应用程序可以基于ZooKeeper实现数据发布与订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Leader选举、分布式锁、分布式队列等功能。

zookeeper的一些特点

  1. 顺序一致性:从同一个client客户端发来的请求,会按其发送的顺序来执行
  2. 原子性:一次数据处理要么全部成功,要么全部失败
  3. 数据一致性:每个Server保存一份相同的数据,客户端无论连接到哪个Server,数据都是一致的
  4. 实时性:在一定时间范围内,客户端能够读取到最新的数据

Zookeeper的数据模型

​ ZK会维护一个具有层次关系的树状的数据结构,每个树节点称为一个ZNode。每个ZNode默认能够存储1MB的数据,每个ZNode都可以通过路径唯一标识

​ 一个ZNode既能在它下面创建子节点,作为路径标识的一部分,同时该节点也能存储数据;主要存放分布式应用的配置信息和状态信息等

​ 每个ZNode节点都有各自的版本号,当节点数据发生变化是,那该节点的版本号也会累加(乐观锁的机制)

节点类型

  • 持久(Persistent):客户端和服务器断开连接后,创建的节点不会被删除
  • 短暂(Ephemeral):客户端和服务器断开连接后,创建的节点会自动删除

​ 创建ZNode节点的时候可以设置顺序标识,ZNode名称后会附加一个顺序号,这个顺序号是单调递增的计数器,并且是由父节点来维护的

​ 注意:在分布式系统中,顺序号可以被用于所有事件的全局排序;客户端可以通过顺序号来推断事件的执行顺序

Zookeeper的应用场景

  1. 统一命名服务

    在分布式环境下,对应用/服务进行统一的命名,会便于识别

    对外只显示服务的名称,通过节点去访问对应IP的服务

  2. 统一配置管理

    ​ 集群中一般要求所有节点的配置信息是一致的,例如Kafka集群。并且对配置文件修改后,能够快速更新到各个节点上

    ​ 可以将配置信息写入ZNode中,各个客户端监听该配置信息的状态,一旦ZNode中的数据发生改变,可以及时通知各个客户端将最新的配置信息更新到系统中

  3. 统一集群管理

    ​ 服务节点动态上下线,当ZK中注册的服务下线时,客户端能够实时的得到下线通知;这里可以通过ZK的监听器去监听节点的动态新增/删除

  4. 分布式锁

  5. 软负载均衡

    ZK记录节点上的服务,可以让访问数最少的服务器去处理最新的客户端请求

ZK安装

​ 注意:下面操作没有设置环境变量,如果设置的环境变量,那么可以在全局环境下直接使用zkServer.sh或者zkCli.sh

设置方法:

1
2
3
4
> vim /etc/profile
> export ZOOKEEPER_HOME=/opt/zookeeper
> export PATH=$PATH:$ZOOKEEPER_HOME/bin
>

单机模式

  1. 解压tar.gz文件到指定目录下(/opt)

    1
    tar -zxvf zookeeper-3.4.10.tar.gz
  2. 复制conf下的zoo_sample.cfg为新文件zoo.cfg,并且在zookeeper的主目录下创建data文件夹,并在配置文件中设置data目录和dataLog目录

    1
    2
    3
    4
    5
    6
    7
    cd /opt/zookeeper/conf
    cp zoo_sample.cfg zoo.cfg
    cd /opt/zookeeper
    mkdir data
    vim /opt/zookeeper/conf/zoo.cfg
    dataDir=/opt/zookeeper/data
    dataLogDir=/opt/zookeeper/dataLog
  3. 启动zk

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    #启动zk
    bin/zkServer.sh start
    #关闭zk
    bin/zkServer.sh stop

    ##查看zk的状态
    bin/zkServer.sh status

    ##查看zk进程是否启动
    jps
    4020 Jps
    4001 QuorumPeerMain

分布式部署

  1. 在data目录下创建myid文件,在文件上添加ZK编号

    1
    2
    3
    touch myid
    1
    ##其他ZK的机子上需要添加不同的编号
  2. 修改 zoo.cfg 配置文件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    vim zoo.cfg
    #######################cluster##########################
    server.1=zk1:2888:3888
    server.2=zk2:2888:3888
    server.3=zk3:2888:3888

    #######################cluster##########################
    server.1=localhost:2881:3881
    server.2=localhost:2882:3882
    server.3=localhost:2883:3883

    配置文件解析:

    • server后面的数字就是 myid 文件制定的编号
    • zk1 是你服务器的 ip 地址
    • 2888 是zk集群的信息交换端口(不一定是2888,可自行指定)
    • 3888 是zk集群中Leader节点挂了之后重新选择Leader节点时进行通信的端口(同样可自行选择其他端口)

深入学习Zookeeper

ZK配置文件

  1. tickTime

    通信心跳数,Zookeeper服务器与客户端心跳时间,单位毫秒

    ​ Zookeeper使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime时间就会发送一个心跳,时间单位为毫秒

    ​ 它用于心跳机制,并且设置最小的session(会话)超时时间为两倍心跳时间(session的最小超时时间是2*tickTime)

  2. initLimit

    集群中主从服务器之间的初始通信时限

    ​ 集群中的Follower跟随者服务器与Leader领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的Zookeeper服务器连接到Leader的时限

  3. syncLimit

    集群中主从服务器之间的同步通信时限

    ​ 集群中Leader与Follower之间的最大响应时间单位,假如响应超过syncLimit * tickTime,Leader认为Follwer死掉,从服务器列表中删除Follwer

  4. dataDir

    数据文件目录+数据持久化路径

  5. dataLogDir

    日志文件目录,如果不配置则使用dataDir的目录进行日志的存放

  6. clientPort

    监听客户端连接的端口,默认是2181

ZK集群

选举机制中的基础概念

  1. 服务器ID

    即myid文件中的编号;编号越大,权重越大

  2. Zxid,数据ID

    服务器中存放的最大数据ID;值越大说明该数据越新,权重越大

  3. Epoch:逻辑时钟

    ​ 投票的次数(轮数),同一轮投票过程中的逻辑时钟值是相同的

    ​ 每投完一次票这个数据就会增加,然后与接收到的其它服务器返回的投票信息中的数值相比,根据不同的值做出不同的判断

  4. Server状态:选举状态

    • LOOKING,竞选状态
    • FOLLOWING,随从状态,同步leader状态,参与投票
    • OBSERVING,观察状态,同步leader状态,不参与投票
    • LEADING,领导者状态

选举简易流程

​ 目前有5台服务器,每台服务器均没有数据,它们的编号分别是1,2,3,4,5,按编号依次启动,它们的选择举过程如下:

  1. 服务器1启动,给自己投票,然后发投票信息,由于其它机器还没有启动所以它收不到反馈信息,服务器1的状态一直属于Looking(竞选状态)
  2. 服务器2启动,给自己投票,同时与之前启动的服务器1交换结果;由于服务器2的编号比服务器1的大,所以服务器2胜出;但此时投票数没有大于半数,所以两个服务器的状态依然是LOOKING
  3. 服务器3启动,给自己投票,同时与之前启动的服务器1和2交换信息,由于服务器3的编号最大,所以服务器3胜出,此时投票数正好大于半数,所以服务器3成为Leader,服务器1和2成为Follower,状态变成FOLLOWING
  4. 服务器4启动,给自己投票,同时与之前启动的服务器1,2,3交换信息,尽管服务器4的编号大,但服务器3的状态已经是Leading,所以服务器4也是Follower
  5. 服务器5启动,逻辑同服务器4

几种情况的选举

  1. 一台宕机重启的机器加入已有环境,如果已有环境中已经存在Leader,那么该机器会变成Follwoer

  2. 一台机器加入正在投票中的环境

    所有server都会接受优先级最高的投票,最高优先级最高的选票当选,选举结束

  3. 当集群中多数机器宕机重启

    ​ 存活的服务发现不满足多数派,改变状态为LOOKING,投票轮数+1,然后重新开始投票,会按照优先级的选举投票直至结束

    • 逻辑时钟小的选举结果被忽略,重新投票
    • 统一逻辑时钟后,数据 version 大的胜出
    • 数据 version 相同的情况下,server id 大的胜出

以上,只要有超过半数的机器存活,最终会完成投票

选举机制(半数机制)

​ 集群中半数以上机器存活,集群可用。所以Zookeeper适合安装奇数台服务器

​ Zookeeper虽然在配置文件中并没有指定Master和Slave。Zookeeper工作时只有一个节点为Leader,其他则为Follower,Leader是通过内部的选举机制临时产生的

zkClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#启动zk客户端
bin/zkCli.sh
##指定访问server
zkCli.sh -server 192.168.1.1:2181

###常用操作
create /dh "shuaige"
get /dh

##创建短暂节点
create -e /dh/fat "fat"
##创建带顺序号的持久节点
create -s /dh/handsome "handsome"
##修改节点的值
set /dh/fat "littlefat" 0
##删除节点
delete /dh/fat 0
##递归删除
rmr /dh

常用操作命令

命令基本语法 功能描述
help 显示所有操作命令
ls path [watch] 使用 ls 命令来查看当前znode中所包含的内容
ls2 path [watch] 查看当前节点数据并能看到更新次数等数据(详细数据)
create [选项] 普通创建一个zNode -s :含有序列 -e:临时(重启或者超时消失)
get path [watch] 获得节点的值
set path data [version] 设置(修改)节点的具体值,可根据版本号对节点的值进行修改(推荐使用版本号修改,乐观锁机制)
stat 查看节点状态
delete path data [version] 删除节点,可根据版本号对节点进行删除(推荐使用版本号删除,乐观锁机制)
rmr 递归删除节点

Stat结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
[zk: localhost:2181(CONNECTED) 1] ls2 /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  1. cZxid:创建节点的事务zxid

    ​ 每次修改ZooKeeper状态都会收到一个zxid形式的时间戳,也就是ZooKeeper事务ID

    ​ 事务ID是ZooKeeper中所有修改总的次序。每个修改都有唯一的zxid,如果zxid1小于zxid2,那么zxid1在zxid2之前发生

  2. ctime:znode被创建的毫秒数(从1970年开始)

  3. mzxid:znode最后更新的事务zxid

  4. mtime:znode最后修改的毫秒数(从1970年开始)

  5. pZxid:znode最后更新的子节点zxid

  6. cversion:znode子节点变化版本号,znode子节点修改次数

  7. dataversion:znode数据变化版本号

  8. aclVersion:znode访问控制列表的变化版本号

  9. ephemeralOwner:如果是临时节点,这个是znode拥有者的session id。如果不是临时节点则是0

  10. dataLength:znode的数据长度
  11. numChildren:znode子节点数量

watcher机制

​ watcher是zk中的监听器机制,父节点或者子节点的增删改操作都能够触发watcher事件

事件类型

  1. 父节点创建:NodeCreated
  2. 父节点数据修改:NodeDataChanged
  3. 父节点删除:NodeDeleted
  4. 创建了子节点:NodeChildrenChanged
  5. 删除子节点:NodeChildrenChanged
  6. 修改子节点不触发任何事件

watcher机制的使用场景

​ 统一的配置管理,可以监听配置信息的节点,当配置信息的节点数据发生变化的时候触发客户端更新配置的操作

ACL权限控制

​ ACL(access control lists),可以针对节点设置读写等权限,可以保障数据的安全性;如果没有权限,则会抛出异常

zk的acl通过 [scheme​ : id : ​permissions] 的形式来构成权限的列表

  • scheme:代表采用的某种权限机制
  • id:代表允许访问的用户
  • permissions:权限组合字符串(有crdwa)
    • c:CREATE,创建子节点
    • r:READ,获取节点/子节点
    • d:DELETE,删除子节点
    • w:WRITE,设置节点数据
    • a:ADMIN,设置权限

权限示例:

world:world:anyone:[permissions]

auth:auth:user:password:[permissions] 代表认证登录,需要注册的用户有操作权限即可

digest:digest:username:BASE64(SHA1(password)):[permissions] 表示需要对密码进行加密才可以访问

ip:ip:ip地址:[permissions] 可以限制指定ip才能访问该节点

ACL的命令行操作

  1. getAcl:获取某个节点的acl权限信息

  2. setAcl:设置某个节点的acl权限信息

    示例:(1和2是等价的)

    1. setAcl /path auth:dai:dai:cdrwa
    2. setAcl /path digest:dai:password:cdrwa

    上面两个操作后需要进行addauth操作后才能够对 /path 进行操作

    1. setAcl /path ip:192.168.1.1:cdrwa

    设置ip后,只有指定ip的客户端才有权限去访问该节点

  3. addauth:输入认证授权信息,注册时输入明文密码,在zk系统中,密码都是以加密的形式存在的

    参照2的示例: 执行 addauth digest:dai:dai 登录后能获取上面设置节点的操作权限

    ​ 注意:要使用 dai 用户前需要先注册 dai 用户才可以设置成功,注册用户同样是addAuth命令:addauth digest dai:dai

    ​ 注意:使用 digest 来设置权限时,查看加密后的password可以通过getAcl,比如:

    getAcl /dh

    ​ ‘digest,’dai:password(此处的password是加密后的显示)

Java使用ZK

原生ZK的API

引入POM

1
2
3
4
5
6
7
<!-- zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<!-- 版本与ZK版本一致 -->
<version>3.4.11</version>
</dependency>

连接ZK

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public class ZKDemo implements Watcher {

final static Logger log = LoggerFactory.getLogger(ZKConnect.class);

public static final String zkServerPath = "192.168.1.1:2181";
// public static final String zkServerPath = "192.168.1.1:2181,192.168.1.2:2182,192.168.1.3:2183";
public static final Integer timeout = 5000;

public static void main(String[] args) throws Exception {
/**
* 客户端和zk服务端链接是一个异步的过程
* 当连接成功后后,客户端会收的一个watch通知
*
* 参数:
* connectString:连接服务器的ip字符串,
* 比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
* 可以是一个ip,也可以是多个ip,一个ip代表单机,多个ip代表集群
*
* sessionTimeout:超时时间,心跳收不到了,那就超时
*
* watcher:通知事件,如果有对应的事件触发,则会收到一个通知;如果不需要,那就设置为null
*
* canBeReadOnly:可读,当这个物理机节点断开后,还是可以读到数据的,只是不能写,
* 此时数据被读取到的可能是旧数据,此处建议设置为false,不推荐使用
*
* sessionId:会话的id
*
* sessionPasswd:会话密码
* 当会话丢失后,可以依据 sessionId 和 sessionPasswd 重新获取会话
*
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKDemo());
log.warn("连接状态:{}", zk.getState());

// 开始会话重连
long sessionId = zk.getSessionId();
byte[] sessionPassword = zk.getSessionPasswd();
log.warn("开始会话重连...");
ZooKeeper zkSession = new ZooKeeper(zkServerPath,
timeout,
new ZKDemo(),
sessionId,
sessionPassword);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
new Thread().sleep(1000);
log.warn("重新连接状态zkSession:{}", zkSession.getState());
}

@Override
public void process(WatchedEvent event) {
log.warn("接受到watch通知:{}", event);
}
}

ZK的节点操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class ZKNode implements Watcher {

private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.1:2181";
public static final Integer timeout = 5000;

public ZKNodeExist() {}

public ZKNodeExist(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKNode());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

public static void main(String[] args) throws Exception {
ZKNode zkServer = new ZKNode(zkServerPath);
/**
*查询节点是否存在
* 参数:
* path:节点路径
* watch:watch
*/
Stat stat = zkServer.getZookeeper().exists("/dh-demo", true);
if (stat != null) {
System.out.println("查询的节点版本为dataVersion:" + stat.getVersion());
} else {
System.out.println("该节点不存在...");
}
/**
* 同步或者异步创建节点,都不支持子节点的递归创建,异步有一个callback函数
* 参数:
* path:创建的路径
* data:存储的数据的byte[]
* acl:控制权限策略
* Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:节点类型, 是一个枚举
* PERSISTENT:持久节点
* PERSISTENT_SEQUENTIAL:持久顺序节点
* EPHEMERAL:临时节点
* EPHEMERAL_SEQUENTIAL:临时顺序节点
*/
String ctx = "{'create':'success'}";
zookeeper.create("/dh-path", "data".get, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CreateCallBack(), ctx);

/**
*获取一个节点的值
* 参数:
* path:节点路径
* watch:true或者false,注册一个watch事件
* stat:状态
*/
Stat stat = new Stat();
byte[] resByte = zkServer.getZookeeper().getData("/dh", true, stat);
String result = new String(resByte);
System.out.println("当前值:" + result);

/**
*设置节点的数据
* 参数:
* path:节点路径
* data:数据
* version:数据状态
*/
stat = zkServer.getZookeeper().setData("/dh-path", "data".getBytes(), stat.getVersion());
System.out.println(status.getVersion());

/**
*删除节点(带回调的)
* 参数:
* path:节点路径
* version:数据状态
*/
String ctx = "{'delete':'success'}";
zkServer.getZookeeper().delete("/dh-path", stat.getVersion(), new DeleteCallBack(), ctx);
}

/**
* 对节点的监听
*
* @param event
*/
@Override
public void process(WatchedEvent event) {
if (event.getType() == EventType.NodeCreated) {
System.out.println("节点创建");
countDown.countDown();
} else if (event.getType() == EventType.NodeDataChanged) {
System.out.println("节点数据改变");
countDown.countDown();
} else if (event.getType() == EventType.NodeDeleted) {
System.out.println("节点删除");
countDown.countDown();
}
}

public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
}

CallBack回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//父节点的watcher机制回调
public class CreateCallBack implements StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
System.out.println("创建节点: " + path);
//ctx 就是create方法传入的ctx参数
System.out.println((String)ctx);
}
}
//子节点的watcher机制回调
public class ChildrenCallBack implements ChildrenCallback {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children) {
for (String s : children) {
System.out.println(s);
}
System.out.println("ChildrenCallback:" + path);
System.out.println((String)ctx);
}
}

ACL权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public class ZKNodeAcl implements Watcher {
private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.1:2181";
public static final Integer timeout = 5000;
public ZKNodeAcl() {}

public ZKNodeAcl(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeAcl());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}

public void createZKNode(String path, byte[] data, List<ACL> acls) {
String result = "";
try {
result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
System.out.println("创建节点:\t" + result + "\t成功...");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
ZKNodeAcl zkServer = new ZKNodeAcl(zkServerPath);
/**
* ====================== 创建node start ======================
*/
// acl 任何人都可以访问
zkServer.createZKNode("/dh-acl", "data".getBytes(), Ids.OPEN_ACL_UNSAFE);

// 自定义用户认证访问
/**
* DigestAuthenticationProvider.generateDigest(String str)
* 这个方法是zk客户端提供的加密方式 BASE61(SHA1(password))
*/
List<ACL> acls = new ArrayList<ACL>();
Id acl1 = new Id("digest", DigestAuthenticationProvider.generateDigest("dai1:123456"));

Id acl2 = new Id("digest", DigestAuthenticationProvider.generateDigest("dai2:123456"));
acls.add(new ACL(Perms.ALL, acl1));
acls.add(new ACL(Perms.READ, acl2));
acls.add(new ACL(Perms.DELETE | Perms.CREATE, acl2));
zkServer.createZKNode("/dh-acl/test", "testdigest".getBytes(), acls);

// 注册过的用户必须通过addAuthInfo才能操作节点,参考命令行 addauth
zkServer.getZookeeper().addAuthInfo("digest", "dai1:123456".getBytes());
zkServer.createZKNode("/dh-acl/test/childtest", "childtest".getBytes(), Ids.CREATOR_ALL_ACL);
Stat stat = new Stat();
byte[] data = zkServer.getZookeeper().getData("/dh-acl/test", false, stat);
System.out.println(new String(data));
zkServer.getZookeeper().setData("/dh-acl/test", "data".getBytes(), stat.getVersion());

// ip方式的acl
List<ACL> aclsIP = new ArrayList<ACL>();
Id ipId = new Id("ip", "192.168.1.10");
aclsIP.add(new ACL(Perms.ALL, ipId));
zkServer.createZKNode("/dh-acl/iptest", "data".getBytes(), aclsIP);

// 验证ip是否有权限
zkServer.getZookeeper().getData("/dh-acl/test", false, stat);
zkServer.getZookeeper().setData("/dh-acl/iptest", "setdata".getBytes(), stat.getVersion());
byte[] data = zkServer.getZookeeper().getData("/aclimooc/iptest6", false, stat);
System.out.println(new String(data));
System.out.println(stat.getVersion());
}

public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
@Override
public void process(WatchedEvent event) {
}
}

Apache Curator

ZK连接及节点操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
public class ZKCurator {

private static String zkServerPath = "127.0.0.1:2181";

public static void main(String[] args) throws Exception {
/**
* 同步创建zk示例,原生api是异步的
*
* curator链接zookeeper的策略:ExponentialBackoffRetry
* baseSleepTimeMs:初始sleep的时间
* maxRetries:最大重试次数
* maxSleepMs:最大重试时间
*/
RetryPolicy retryPolicy1 = new ExponentialBackoffRetry(1000, 5);

/**
* curator链接zookeeper的策略:RetryNTimes
* n:重试的次数
* sleepMsBetweenRetries:每次重试间隔的时间
*/
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);

/**
* curator链接zookeeper的策略:RetryOneTime
* sleepMsBetweenRetry:每次重试间隔的时间
*/
RetryPolicy retryPolicy2 = new RetryOneTime(3000);

/**
* curator链接zookeeper的策略:RetryUntilElapsed
* maxElapsedTimeMs:最大重试时间
* sleepMsBetweenRetries:每次重试间隔
* 重试时间超过maxElapsedTimeMs,就不再重试
*/
RetryPolicy retryPolicy3 = new RetryUntilElapsed(2000, 3000);

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();
//判断链接是否成功
boolean isZkCuratorStarted = client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接中" : "已关闭"));

/**
* 创建节点:
* creatingParentsIfNeeded : 开启递归的创建方式,不用一层一层的创建
* withMode : 节点的类型
* withACL : acl权限
* forPath : 节点路径和数据
*
*/
String nodePath = "/dai";
String str = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
.forPath(nodePath, "data".getBytes());

/**
* 获取节点数据:
* storingStatIn : 把服务器端获取的状态数据存储到stat对象
*
*/
Stat stat = new Stat();
byte[] data = client.getData()
.storingStatIn(stat)
.forPath(nodePath);
System.out.println("节点" + nodePath + "的数据为: " + new String(data));
System.out.println("该节点的版本号为: " + stat.getVersion());

/**
* 更新节点数据:
* withVersion : 数据版本
*
*/
client.setData()
.withVersion(stat.getVersion())
.forPath(nodePath, "update".getBytes());

/**
* 删除节点:
* guaranteed : 保障措施,只要客户端会话有效; 那么Curator会在后台持续进行删除操作,直到删除节点成功
* deletingChildrenIfNeeded : 递归删除,有子节点的情况下会将所有子节点也一并删除
* withVersion : 数据版本
*
*/
client.getData().storingStatIn(stat).forPath(nodePath);
client.delete()
.guaranteed()
.deletingChildrenIfNeeded()
.withVersion(stat.getVersion())
.forPath(nodePath);

/**
* 查询子节点
*
*/
List<String> childNodes = client.getChildren().forPath(nodePath);
System.out.println("开始打印子节点:");
for (String child : childNodes) {
System.out.println(child);
}

/**
* 判断节点是否存在,如果不存在则为空
*/
Stat statExist = client.checkExists().forPath(nodePath + "/exist");
System.out.println(statExist);

/**
* watcher 事件:
* 当使用usingWatcher的时候,监听只会触发一次,监听完毕后就销毁
*
*/
client.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);

/**
* watcher 事件:
*
* NodeCache: 监听数据节点的变更,会触发事件
*
*
*/
final NodeCache nodeCache = new NodeCache(client, nodePath);
/**
* 参数:
* buildInitial : 初始化的时候获取node的值并且缓存
* 只有开启这个缓存后,下面的getCurrentData方法才能拿到数据
*/
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("节点初始化数据为:" + new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("节点初始化数据为空...");
}
/**
* 添加节点数据监听器
*/
nodeCache.getListenable()
.addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
if (nodeCache.getCurrentData() == null) {
System.out.println("节点数据为空");
return;
}
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("节点路径:" + nodeCache.getCurrentData().getPath() + " 数据:" + data);
}
});

/**
* 为子节点添加watcher事件
* PathChildrenCache: 监听数据节点的增删改,会触发事件
* cacheData: 是否设置缓存节点的数据状态
*
*/
final PathChildrenCache childrenCache = new PathChildrenCache(client, nodePath, true);
/**
* StartMode: 初始化方式
* POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
* NORMAL:异步初始化
* BUILD_INITIAL_CACHE:同步初始化
*/
childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

List<ChildData> childDataList = childrenCache.getCurrentData();
System.out.println("当前数据节点的子节点数据列表:");
for (ChildData child : childDataList) {
String childData = new String(child.getData());
System.out.println(childData);
}
/**
* 添加节点数据监听器
*/
childrenCache.getListenable()
.addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){
System.out.println("子节点初始化ok...");
}

else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
String path = event.getData().getPath();
if (path.equals("dh-add")) {
System.out.println("添加子节点:" + event.getData().getPath());
System.out.println("子节点数据:" + new String(event.getData().getData()));
} else if (path.equals("/super/imooc/e")) {
System.out.println("添加不正确...");
}

}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){
System.out.println("删除子节点:" + event.getData().getPath());
}else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
System.out.println("修改子节点路径:" + event.getData().getPath());
System.out.println("修改子节点数据:" + new String(event.getData().getData()));
}
}
});

Thread.sleep(100000);

//关闭zk客户端连接
if (client != null) {
client.close();
}
boolean isZkCuratorStarted2 = client.isStarted();
System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接中" : "已关闭"));
}

/**
* watcher
*/
public static class MyCuratorWatcher implements CuratorWatcher {
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
System.out.println("触发了watch 事件,节点路径:" + watchedEvent.getPath());
}
}
}

ACL权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ZKCurator {

private static String zkServerPath = "127.0.0.1:2181";

public static void main(String[] args) throws Exception {
RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(zkServerPath)
.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
.namespace("workspace").build();
client.start();

List<ACL> aclList = new ArrayList<>();
/**
* Id 构造参数:
* scheme:认证方式
* world:默认方式,相当于全世界都能访问
* auth:代表已经认证通过的用户
* digest:即用户名:密码这种方式认证
* ip:使用Ip地址认证
*/
Id id1 = new Id("digest", DigestAuthenticationProvider.generateDigest("dai1:123456"));
Id id2 = new Id("digest", DigestAuthenticationProvider.generateDigest("dai2:123456"));
/**
* ACL 构造参数:
* perms:五种权限:
* CREATE: 能创建子节点
* READ:能获取节点数据和列出其子节点
* WRITE: 能设置节点数据
* DELETE: 能删除子节点
* ADMIN: 能设置权限
* Id:就是上面的Id类
*/
aclList.add(new ACL(Perms.ALL, id1));
aclList.add(new ACL(Perms.READ, id2));
aclList.add(new ACL(Perms.CREATE | Perms.WRITE, id2));
client.setACL().withACL(aclList).forPath("/dai/dh/hao");
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
/**
* withACL 第二个参数applyToParents :
* 如果设置为true,那么会将这些acl权限加到创建的父节点上(递归创建)
*/
.withACL(aclList, true)
.forPath("/dai/dh/hao", "data".getBytes());

//关闭zk客户端连接
if (client != null) {
client.close();
}
}

CAP理论

发表于 2019-04-14 | 分类于 分布式

CAP理论

什么是CAP理论

​ CAP理论指的是在一个分布式系统中,不能同时满足一致性,可用性和分区容错性

  1. C(Consistency):一致性

    在分布式系统中的所有数据备份,在同一时刻是否是同样的值

  2. A(Availabe):可用性

    ​ 可用性指在集群中一部分节点故障后,集群整体是否还能响应客户端的读写请求

    ​ 可用性是指系统提供的服务必须一直处于可用的状态,对于用户的每一个操作请求总是能够在有限的时间内返回结果。“有效的时间内”是指,对于用户的一个操作请求,系统必须能够在指定的时间(即响应时间)内返回对应的处理结果,如果超过了这个时间范围,那么系统就被认为是不可用的

  3. P(Partition Tolenrance):分区容错性

    ​ 以实际效果而言,分区相当于对通信的时限要求。系统如果不能在时限内达成数据一致性,就意味着发生了分区的情况,必须就当前操作在C和A之间做出选择,也就是说无论任何消息丢失,系统都可用

分区容错性

​ 分区容错性约束了一个分布式系统具有如下特性:分布式系统在遇到任何网络分区故障的时候,仍然需要能够保证对外提供满足一致性和可用性的服务,除非是整个网络环境都发生了故障

网络分区

​ 网络分区是指在分布式系统中,不同的节点分布在不同的子网络(机房或异地网络)中,由于一些特殊的原因导致这些子网络出现网络不连通的状况;但各个子网络的内部网络是正常的,从而导致整个系统的网络环境被切分成了若干个孤立的区域。

​ 需要注意的是,组成一个分布式系统的每个节点的加入与退出都可以看作是一个特殊的网络分区

Consistency一致性强弱

​ 一致性是指从系统外部读取系统内部的数据时,在一定约束条件下相同,即数据变动在系统内部各节点应该是同步的

根据一致性强弱可分为:

  1. 强一致性(Strong Consistency)任何时刻,任何用户都能读取到最近一次成功更新的数据
  2. 单调一致性(Monotonic Consistency)任何时刻,任何用户一旦读到某个数据在某次更新后的值,那么就不会再读到比这个值更旧的值。也就是说可获取的数据顺序必是单调递增的
  3. 会话一致性(Session Consistency)任何用户在某次会话中,一旦读到某个数据在某次更新后的值,那么在本次会话中就不会再读到比这个值更旧的值。会话一致性是在单调一致性的基础上进一步放松约束,只保证单个用户单个会话内的单调性,在不同用户或同一用户不同会话间则没有保障
  4. 最终一致性(Eventual Consistency)用户只能读到某次更新后的值,但系统保证数据将最终达到完全一致的状态,只是所需时间不能保障
  5. 弱一致性(Weak Consistency)用户无法在确定时间内读到最新更新的值

CAP的抉择

选 择 说 明
CA 放弃分区容错性,加强一致性和可用性,其实就是传统的单机数据库的选择
AP 放弃一致性(这里说的一致性是强一致性),追求分区容错性和可用性,这是很多分布式系统设计时的选择,例如很多NoSQL系统就是如此
CP 放弃可用性,追求一致性和分区容错性,基本不会选择,网络问题会直接让整个系统不可用

简单的CAP例子

​ 一个DB服务 搭建在两个机房(北京,广州),两个DB实例同时提供写入和读取

  1. 假设DB的更新操作是同时写北京和广州的DB都成功才返回成功

    在没有出现网络故障的时候,满足CA原则

    • C 即我的任何一个写入,更新操作成功并返回客户端完成后,分布式的所有节点在同一时间的数据完全一致
    • A 即我的读写操作都能够成功
    但是当出现网络故障时,我不能同时保证CA,即P条件无法满足
  1. 假设DB的更新操作是只写本地机房成功就返回,通过binlog/oplog回放方式同步至侧边机房

    ​ 这种操作保证了在出现网络故障时,双边机房都是可以提供服务的,且读写操作都能成功,意味着他满足了AP

    ​ 但是它不满足C,因为更新操作返回成功后,双边机房的DB看到的数据会存在短暂数据不一致,且在网络故障时,不一致的时间差会很大(仅能保证最终一致性)

  1. 假设DB的更新操作是同时写北京和广州的DB都成功才返回成功且网络故障时提供降级服务

    降级服务,如停止写入,只提供读取功能,这样能保证数据是一致的,且网络故障时能提供服务,满足CP原则,但是这无法满足可用性原则

选择权衡

​ 对于一个分布式系统而言,分区容错性是一个最基本的要求。因为既然是一个分布式系统,那么分布式系统中的组件必然需要被部署到不同的节点,因此必然出现子网络。而对于分布式系统而言,网络问题又是一个必定会出现的异常情况,因此分区容错性也就成为了一个分布式系统必然需要面对和解决的问题。因此系统往往需要根据业务特点在C(一致性)和A(可用性)之间寻求平衡

​ 对于大多数互联网应用来说,因为机器数量庞大,部署节点分散,网络故障是常态,可用性是必须需要保证的,所以只有舍弃强一致性(可以保证最终一致性)来保证服务的AP

​ 但是对于需要确保强一致性的场景,如银行业务,通常会权衡CA和CP模型,CA模型网络故障时完全不可用,CP模型具备部分可用性,实际的选择需要通过业务场景来权衡(并不是所有情况CP都好于CA,只能查看信息不能更新信息有时候从产品层面还不如直接拒绝服务)

Base理论

​ BASE理论是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的总结,是基于CAP定理逐步演化而来的。

​ BASE理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性

  1. Basically Available(基本可用)

    基本可用是指分布式系统在出现不可预知故障的时候,允许损失部分可用性

    注意,这绝不等价于系统不可用。如:

    • 响应时间上的损失。正常情况下,一个在线搜索引擎需要在0.5秒之内返回给用户相应的查询结果,但由于出现故障,查询结果的响应时间增加了1~2秒

    • 系统功能上的损失:正常情况下,在一个电子商务网站上进行购物的时候,消费者几乎能够顺利完成每一笔订单,但是在一些节日大促购物高峰的时候,由于消费者的购物行为激增,为了保护购物系统的稳定性,部分消费者可能会被引导到一个降级页面

  2. Soft state(软状态)

    ​ 软状态指允许系统中的数据存在中间状态,并认为该中间状态的存在不会影响系统的整体可用性,即允许系统在不同节点的数据副本之间进行数据同步的过程存在延时

  3. Eventually consistent(最终一致性)

    ​ 最终一致性强调的是所有的数据副本,在经过一段时间的同步之后,最终都能够达到一个一致的状态

    ​ 因此,最终一致性的本质是需要系统保证最终数据能够达到一致,而不需要实时保证系统数据的强一致性

最终一致性变种

  1. 因果一致性:

    ​ 因果一致性是指:如果进程A在更新完某个数据项后通知了进程B,那么进程B之后对该数据项的访问都应该能够获取到进程A更新后的最新值,并且如果进程B要对该数据项进行更新操作的话,务必基于进程A更新后的最新值,即不能发生丢失更新情况。与此同时,与进程A无因果关系的进程C的数据访问则没有这样的限制

  2. 读己之所写:

    ​ 读己之所写是指:进程A更新一个数据项之后,它自己总是能够访问到更新过的最新值,而不会看到旧值。

    ​ 也就是说,对于单个数据获取者而言,其读取到的数据一定不会比自己上次写入的值旧。因此,读己之所写也可以看作是一种特殊的因果一致性

  3. 会话一致性:

    ​ 会话一致性将对系统数据的访问过程框定在了一个会话当中:系统能保证在同一个有效的会话中实现“读己之所写”的一致性

    ​ 也就是说,执行更新操作之后,客户端能够在同一个会话中始终读取到该数据项的最新值

  4. 单调读一致性:

    ​ 单调读一致性是指:如果一个进程从系统中读取出一个数据项的某个值后,那么系统对于该进程后续的任何数据访问都不应该返回更旧的值

  5. 单调写一致性:

    单调写一致性是指:一个系统需要能够保证来自同一个进程的写操作被顺序地执行

​ 总的来说:BASE理论面向的是大型高可用可扩展的分布式系统,和传统事务的ACID特性使相反的,它完全不同于ACID的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。

​ 在实际的分布式场景中,不同业务单元和组件对数据一致性的要求是不同的,因此在具体的分布式系统架构设计过程中,ACID特性与BASE理论往往又会结合在一起使用

JVM垃圾收集

发表于 2019-04-14 | 分类于 JVM

本文基于JDK7 HotSpot VM

判定对象无用的根据

​ 通过可达性算法判断对象是否不再使用。即GC Roots,将GC Roots作为起始点向下搜索引用,还被GC Roots引用的对象就是可用的,而当一个对象没有到GC Roots有任何引用链的即为可回收的对象。

可作为GC Roots的对象:

  • 当前各线程执行方法中的局部变量(包括形参)引用的对象
  • 已被加载的类的 static 域引用的对象
  • 方法区中常量引用的对象
  • JNI (Native方法)引用的对象

内存的分配和回收

对象内存布局

​ 在堆为对象分配内存空间之前,应该先知道对象在内存中存储的布局;对象在内存中的存储布局主要分为:对象头,实例数据和对齐填充。

  • 对象头:主要用于存储对象自身运行时的数据和类型指针
    • 运行时数据包括:哈希码(HashCode)、GC分代年龄、锁状态标志
    • 类型指针是指向类元数据的指针
  • 实例数据:对象中的字段数据和父类继承过来的字段数据等
  • 对齐填充:只是起到占位符的作用。因为HotSpotVM要求对象的起始地址必须是8字节的整数倍,当一个对象的实例数据(对象头是32bit或者64bit)不是8字节的整数倍时,就需要对齐填充来补全了

对象内存分配

​ 对象的内存分配主要在新生区的Eden空间上,当Eden区没有足够的连续空间时会触发一次MinorGC;但是一些需要连续大量内存空间的对象(很长的字符串或者数组)就会导致Eden区容纳不下这个对象(垃圾清理后仍然放不下),这个时候会直接在老年代中为其分配内存。

​ 同时虚拟机也提供了-XX:PretenureSizeThreshold的参数,当对象的大小大于这个设置值的时候,会直接在老年代中进行内存分配,这样也可以避免新生代频繁的进行垃圾收集(新生代的垃圾收集是复制算法)

内存分配方式

​ 了解了对象内存分配的位置,那么就需要知道内存分配的方式了。

  1. 有大片连续内存空间时(常用于带压缩算法的收集器,如Serial,ParNew)

​ 当存在大片连续的内存可用于分配给新对象时,可以采用指针碰撞的方式(每次分配对象空间只要检测一下是否有足够的空间,如果有那么指针往前移动 N 位就分配好空间了,然后就可以初始化这个对象了)

​ 对于多线程应用,对象分配必须要保证线程安全性,如果使用全局锁,那么分配空间将成为瓶颈并降低程序性能。HotSpot 使用了称之为Thread-Local Allocation Buffers (TLABs) 的技术,该技术能改善多线程空间分配的吞吐量。TLABs首先给予每个线程一部分内存作为缓存区,每个线程都在自己的缓存区中进行指针碰撞,这样就不用获取全局锁了。只有当一个线程使用完了它的 TLAB,它才需要使用同步锁定来获取一个新的缓冲区。

​ HotSpot 使用了多项技术来降低 TLAB 对于内存的浪费。比如,TLAB 的平均大小被限制在 Eden 区大小的 1% 之内。TLABs 和使用指针碰撞的线性分配结合,使得内存分配非常简单高效。

可用-XX:+UseTLAB 来启用TLAB技术

  1. 内存空间碎片较多时(如CMS收集器,使用Mark-Sweep算法)

​ 当已经使用的内存和空闲内存相互交错的时候,就不能直接的进行指针碰撞来分配内存了;这个时候虚拟机就会维护一个 “空闲列表” 来记录可用的内存块,在分配内存的时候会再列表中找到足够大的空间划分给对象,并且会更新列表上的记录

内存分代思想

​ 虚拟机采用了分代收集的思想去管理堆内存;分代思想指的是虚拟机根据对象的存活周期来划分内存,一般将堆内存划分为年轻代(新生代)和老年代;年轻代是在垃圾收集的时候会有大量的对象死去的内存区域,而老年代中的对象普遍是存活率高的

​ 年轻代因为每次都有大量的对象死去,所以一般会采用复制算法(下文介绍,因为此算法只需要复制少量的存活对象,成本较低);虚拟机在对象创建的时候会给对象定义一个对象年龄的计数器,对象在年轻代(Eden区)被创建并且经历过一次Minor GC(年轻代垃圾收集)并且存活,将会被移到Survivor区并且设置对象年龄为1,该对象每熬过一次MinorGC,对象年龄就会加一,当对象年龄达到一定程度就会晋升到老年代中(默认是15,可以通过-XX:MaxTenuringThreshold=15来设置)

​ 但是如果每个对象都需要等到晋升年龄的话,Survivor区不一定能够容纳下这么多的对象。所以为了应对这种内存状况,虚拟机的解决办法是:如果Survivor区中相同年龄的所有对象大小的总和大于Survivor空间的一半,那么对象年龄大于或等于该值的对象就直接进入老年代,不需要达到晋升年龄

什么时候触发垃圾收集

  1. 当年轻代被填满后(一般新对象在Eden区申请内存空间失败的时候),会进行一次年轻代垃圾收集(也叫做 Minor GC)
  2. Full GC(通常也叫Major GC)会对整个堆进行垃圾收集,触发的情况:
    • 老年代或者永久代被填满的时候触发
    • 当新生代对象晋升到老年代担保失败的时候触发(即年轻代垃圾收集后会有一部分晋升到老年代的对象,当老年代不能容纳这些晋升的对象时会触发)
  3. CMS垃圾收集器的触发时机;CMS垃圾收集器不会等到老年代或者永久代满了才开始进行垃圾收集(CMS只收集老年代old Gen),会有一个预设占用率(initiating occupancy)
    • 老年代的使用率达到阈值(通过JVM参数:–XX:CMSInitiatingOccupancyFraction=n设定,JDK8默认为92%)
    • 永久代的使用率到达阈值(–XX:CMSInitiatingPermOccupancyFraction=n设定,JDK8默认为92%)
    • 当新生代对象晋升到老年代担保失败的时候触发

注意: 只有CMS的concurrent collection是只收集old Gen的,其他的老年代垃圾收集器在满足条件后会触发Full GC,此时Full GC会收集整个GC堆,会先进行young GC(即使用年轻代的垃圾收集算法去收集年轻代的垃圾),然后使用老年代算法去收集老年代和方法区。

​ 这里还有个值得注意的地方,当要进行一次young GC时,如果发现统计数据说之前young GC的平均晋升大小比目前old gen剩余的空间大,则不会进行young GC,而是会采用老年代的垃圾收集算法对整个堆进行垃圾收集(除了CMS)

​ 为什么说除了CMS呢,因为CMS垃圾收集器不能收集年轻代的垃圾。而且CMS有预设占用率会触发老年代的垃圾收集,所以CMS一般不会发生Full GC,但是当CMS发生concurrent mode failure的时候是会退化到发生Full GC,这个时候会变成使用备选的Serial Old收集器来对老年代进行垃圾收集(也有可能对整个堆进行垃圾收集哦,也就是发生上面一段话所说的情况)

空间分配担保机制

​ 分配担保机制指的是:在MinorGC后新生代还有大量的对象存活,并且Survivor区不能存放所有的存活对象的情况下,会将无法容纳的对象直接晋升到老年代中

​ 分配担保机制触发是在老年代中的内存还有能容纳晋升上来的对象的空间,而且在最坏的情况下是整个Eden区的对象都是存活的,所以在发生MinorGC之前,虚拟机都会先检查老年代的最大可用的连续内存空间是否大于新生代所有对象的总和,如果条件成立,那么可以直接进行MinorGC。

​ 否则会根据是否设置了担保失败机制来选择下一步

  • 如果允许出现担保失败,则继续检查老年代中的最大可用连续内存空间是否大于之前每次垃圾回收晋升到老年代对象容量的平均值大小,如果大于则进行MinorGC,否则会进行Full GC来让老年代腾出更大的内存空间;比较平均值是一种动态手段,这种情况下如果出现上面最坏的情况,存活的对象远远高于平均值,是会出现担保失败的情况的(Handle Promotion Failure);出现这种情况,会重新的进行一次Full GC去腾出更多空间。

    -XX:+HandlePromotionFailure 允许担保失败

    -XX:-HandlePromotionFailure 不允许担保失败

  • 如果不允许出现担保失败,则直接进行Full GC腾出空间。

注:推荐允许担保失败,因为这样能够避免频繁的Full GC

垃圾收集算法

三种常用的垃圾收集算法的思想:

  1. 标记清除算法

    • 首先标记出需要进行回收的对象,然后统一回收这些被标记的对象
    • 这个算法回收完对象之后会产生大量的不连续的内存空间,这些内存碎片容易导致在后续分配大对象的时候无法找到足够的连续的内存空间
  2. 标记复制算法

    • 这里以年轻代为例子说明:将内存划分为一个Eden区和两个Survivor区,每次使用Eden区和其中一个Survivor区(为了内存利用率),当回收垃圾的时候,将Eden区和Survivor区存活的对象复制到另外一块Survivor区中,然后再将Eden区和用过的Survivor区的内存空间清理掉
    • 虚拟机会给对象定义一个对象年龄的计数器,当对象复制到Survivor区的时候,对象的年龄增加1,当达到一定程度的时候会晋升到老年代中
    • 当复制过去的对象在Survivor区中不够大小存放的时候(内存不足),此时会触发分配担保机制,直接将这些对象晋升到老年代中

    MinorGC Start

    MinorGC After

  3. 标记整理算法(标记 -> 清除 -> 压缩)

    • 这个算法跟标记清除算法差不多一样,只是当回收完被标记的对象后,会对内存空间进行压缩,将所有活的对象移到一边,然后会剩下一大片连续的内存空间,当产生新的对象时就很容易进行内存分配了

垃圾收集器

在了解垃圾收集器之前需要先认识几个概念

  • 并行:多个垃圾回收线程同时工作,此时用户线程处于等待状态。

  • 并发:垃圾回收线程和应用程序线程同时工作,应用程序不需要挂起。

  • Stop-the-world:在垃圾收集时,需要将应用程序完全挂起的事件称为stop-the-world。

    ​ 这是因为GC需要在垃圾收集前分析堆中对象的引用关系并且保存准确性所导致的结果(如果不停顿应用线程的执行,那么会导致在分析过程中还会出现引用关系变化的情况,从而会让可达性算法分析的结果变得更加复杂和不准确)。

    ​ 应用stop-the-world的垃圾收集器会简单很多,应为在应用程序停顿的时候,堆内存空间是不会发生变化了,此时就能够准确的标记无用的对象。但是在web应用环境下,它挂起应用线程的时间是不能被接受的。

  • 吞吐量:应用程序执行时间 / (应用程序执行时间 + 垃圾收集时间)

串行垃圾收集器

​ 年轻代和老年代都使用单线程的方式来进行垃圾收集(在多核CPU下也只会使用一个CPU进行垃圾收集),收集过程需要stop-the-world。

Serial收集器

​ 用于年轻代的垃圾收集,主要采用了标记复制算法。

SerialOld收集器

​ 用于老年代的垃圾收集,主要采用了标记整理算法(标记 -> 清除 -> 压缩算法)

串行垃圾收集的优点
  • 简单高效
  • 对于单个CPU的环境下,串行收集器没有线程间交互的时间开销
  • 适用于运行在client模式下的程序

可以使用-XX:+UseSerialGC来使用Serial + SerialOld的组合进行垃圾收集

并行垃圾收集器

​ 并行垃圾收集器利用多核的优势,垃圾收集的工作将分配给多个线程在不同的 CPU 上同时进行。并行的收集器仍然会stop-the-world,只不过使用多核的优势并行执行,降低停顿的时间;同时降低垃圾收集的时间,从而提高吞吐量。

​ 并行垃圾收集器能够通过 –XX:ParallelGCThreads=n来控制垃圾收集线程的个数,合理利用CPU的资源能够有效的降低垃圾收集的停顿时间,但是上面说过了串行收集器在单CPU环境下是没有线程间交互的开销的,所以在单CPU的环境下,可能效率会比串行收集器更低。

ParNew收集器

​ 多线程版本的Serial收集器,它能够配合CMS收集器一起工作(CMS是老年代的收集器;因为是Serial的多线程版本,所以Serial收集器也能和CMS收集器配合工作)。和Serial一样采用标记复制算法,不过是并行的复制算法,需要stop-the-world。

​ 可以用-XX:+UseConcMarkSweepGC来使用CMS + ParNew组合的垃圾收集器(这里会有个情况,当CMS出现concurrent mode failure的时候会使用Serial Old收集器来手机老年代的对象);还可以用-XX:+UseParNewGC来使用Serial Old + ParNew的组合进行内存回收。

Parallel Scavenge收集器

​ 吞吐量收集器,可以这样形容这个收集器。显而易见,这是能够控制吞吐量的垃圾收集器,同样的也是采用复制算法(需要stop-the-world)。此收集器提供了两个参数来控制吞吐量(控制GC的停顿时间),分别是:-XX:MaxGCPauseMillis和-XX:GCTimeRatio(MaxGCPauseMillis优先度更高)

  1. MaxGCPauseMillis是设置最大的垃圾收集停顿时间

    • 过分的降低停顿时间的话(设置停顿时间很短的情况下),是会降低整体的吞吐量的。因为设置了很低的停顿时间,可能会使得虚拟机对新生代的空间进行调整(调整Eden和Survivor区的比例),当Eden区的空间变小了,会导致更频繁的进行垃圾收集。垃圾收集的停顿时间确实会因为Eden区变小从而降低了,但是更频繁的垃圾收集也会导致整体吞吐量降低。
  2. GCTimeRatio是用来设置吞吐量的。此参数的值是:垃圾收集时间与应用程序运行时间的比值;例如

    • -XX:GCTimeRatio=99 (同时也是默认值)此时相当于 垃圾收集时间:程序运行时间=1:99,则吞吐量=1/(1+99)=1%

      个人观点(勿喷):周志明老师JVM书上的描述有点不对,书中描述GCTimeRatio设置的值是吞吐量的倒数,比如默认值99,如果是吞吐量的倒数的话,那么吞吐量就是1/99了,这样看来是不对的。所以我认为描述为吞吐量倒数应该是不正确的。应该是垃圾收集时间与应用程序运行时间的比值

  3. 当我们能够通过设置上面两个参数来控制吞吐量,但是不知道什么样的情况下是最好的(即不知道该如何设置年轻代内存大小和各个区之间的比值等);这种情况下可以使用 -XX:+UseAdaptiveSizePolicy 来动态的调整虚拟机相关细节参数来达到设置好的停顿时间或者吞吐量,这中调节方式被称为GC自适应调节策略(建议使用该收集器时开启这个参数)

可以使用-XX:+UseParallelOldGC来使用Parallel Scavenge + Parallel Old的组合回收内存

Parallel Old收集器

​ Parallel Old是Parallel Scavenge的老年代版本。采用的是并行标记整理算法

12
Adrian Dai

Adrian Dai

看板娘赛高!

13 日志
7 分类
32 标签
GitHub E-Mail CSDN
© 2019 Adrian Dai