点击上方 Java后端,选择 设为星标
优质文章,及时送达
基于Redis实现的分布式锁
Spring Cloud 分布式环境下,同一个服务都是部署在不同的机器上,这种情况无法像单体架构下数据一致性问题采用加锁就实现数据一致性问题,在高并发情况下,对于分布式架构显然是不合适的,针对这种情况我们就需要用到分布式锁了。
哪些场景需要用分布式锁
场景一:比较敏感的数据比如金额修改,同一时间只能有一个人操作,想象下2个人同时修改金额,一个加金额一个减金额,为了防止同时操作造成数据不一致,需要锁,如果是数据库需要的就是行锁或表锁,如果是在集群里,多个客户端同时修改一个共享的数据就需要分布式锁。
场景二:比如多台机器都可以定时执行某个任务,如果限制任务每次只能被一台机器执行,不能重复执行,就可以用分布式锁来做标记。
场景三:比如秒杀场景,要求并发量很高,那么同一件商品只能被一个用户抢到,那么就可以使用分布式锁实现。
分布式锁实现方式:
1、基于数据库实现分布式锁
2、基于缓存(redis,memcached,tair)实现分布式锁
3、基于Zookeeper实现分布式锁
分布式锁应该满足要求
互斥性 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
这把锁要是一把可重入锁(避免死锁)
不会发生死锁:
-
这把
锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
-
有高可用的获取锁和释放锁功能
-
获取锁和释放锁的性能要好
SETNX
和
SETEX
-
SETNX(SET If Not Exists):
当且仅当 Key 不存在时,则可以设置,否则不做任何动作。
当且仅当 key 不存在,将 key 的值设为 value ,并返回1;若给定的 key 已经存在,则 SETNX 不做任何动作,并返回0。
-
SETEX:
基于SETNX功能外,还可以设置超时时间,防止死锁。
setnx(set if not exists)
指令
> setnx lock:codehole true
OK
... do something xxxx... 数量减1
> del lock:codehole
(integer) 1
setnx 和 expire 两个指令构成一个原子操作
给锁加上一个过期时间
> setex lock:codehole true
OK
> expire lock:codehole 5
... do something xxxx ...
> del lock:codehole
(integer) 1
SETEX 实现原理
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
<scope>provided</scope>
</dependency>
server:
port: 8080
spring:
profiles: dev
data:
redis:
database: 0
host: 127.0.0.1
port: 6379
password:
@Data
public class Lock {
private String name;
private String value;
public Lock(String name, String value) {
this.name = name;
this.value = value;
}
}
@Slf4j
@Component
public class DistributedLockConfig {
private final static long LOCK_EXPIRE = 30 * 1000L;
private final static long LOCK_TRY_INTERVAL = 30L;
private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;
private RedisTemplate template;
public void setTemplate(RedisTemplate template) {
this.template = template;
}
public boolean tryLock(Lock lock) {
return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
public boolean tryLock(Lock lock, long timeout) {
return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE);
}
public boolean tryLock(Lock lock, long timeout, long tryInterval) {
return getLock(lock, timeout, tryInterval, LOCK_EXPIRE);
}
public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {
return getLock(lock, timeout, tryInterval, lockExpireTime);
}
public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) {
try {
if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {
return false;
}
long startTime = System.currentTimeMillis();
do {
if (!template.hasKey(lock.getName())) {
ValueOperations<String, String> ops = template.opsForValue();
ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS);
return true;
} else {
log.debug("lock is exist!!!");
}
if (System.currentTimeMillis() - startTime > timeout) {
return false;
}
Thread.sleep(tryInterval);
}
while (template.hasKey(lock.getName()));
} catch (InterruptedException e) {
log.error(e.getMessage());
return false;
}
return false;
}
public Boolean getLockNoTime(Lock lock) {
if (!StringUtils.isEmpty(lock.getName())) {
return false;
}
boolean falg = template.opsForValue().setIfAbsent(lock.getName(), lock.getValue());
return false;
}
public void releaseLock(Lock lock) {
if (!StringUtils.isEmpty(lock.getName())) {
template.delete(lock.getName());
}
}
}
@RequestMapping("test")
public String index() {
distributedLockConfig.setTemplate(redisTemplate);
Lock lock = new Lock("test", "test");
if (distributedLockConfig.tryLock(lock)) {
try {
System.out.println("执行方法");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
distributedLockConfig.releaseLock(lock);
}
return "hello world!";
}
1 这种方式性能问题很差,每次获取锁都要进行等待,很是浪费资源,
2 如果在判断锁是否存在这儿2个或者2个以上的线程都查到redis中存在key,同一时刻就无法保证一个客户端持有锁,不具有排他性。
Redlock
利用 Redlock
为什么选择红锁?
在集群中需要半数以上的节点同意才能获得锁,保证了数据的完整性,不会因为主节点数据存在,主节点挂了之后没有同步到从节点,导致数据丢失。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.7.0</version>
</dependency>
获取锁后需要处理的逻辑
public interface AquiredLockWorker<T> {
T invokeAfterLockAquire() throws Exception;
}
获取锁管理类
public interface DistributedLocker {
<T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception;
<T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception;
}
public class UnableToAquireLockException extends RuntimeException {
public UnableToAquireLockException() {
}
public UnableToAquireLockException(String message) {
super(message);
}
public UnableToAquireLockException(String message, Throwable cause) {
super(message, cause);
}
}
@Component
public class RedissonConnector {
RedissonClient redisson;
@PostConstruct
public void init(){
redisson = Redisson.create();
}
public RedissonClient getClient(){
return redisson;
}
}
@Component
public class RedisLocker implements DistributedLocker{
private final static String LOCKER_PREFIX = "lock:";
@Autowired
RedissonConnector redissonConnector;
@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception {
return lock(resourceName, worker, 100);
}
@Override
public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception {
RedissonClient redisson= redissonConnector.getClient();
RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName);
boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS);
if (success) {
try {
return worker.invokeAfterLockAquire();
} finally {
lock.unlock();
}
}
throw new UnableToAquireLockException();
}
}
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 50; i++) {
scheduledExecutorService.execute(new Worker());
}
scheduledExecutorService.shutdown();
class Worker implements Runnable {
public Worker() {
}
@Override
public void run() {
try {
redisLocker.lock("tizz1100", new AquiredLockWorker<Object>() {
@Override
public Object invokeAfterLockAquire() {
doTask();
return null;
}
});
} catch (Exception e) {
}
}
void doTask() {
System.out.println(Thread.currentThread().getName() + " ---------- " + LocalDateTime.now());
System.out.println(Thread.currentThread().getName() + " start");
Random random = new Random();
int _int = random.nextInt(200);
System.out.println(Thread.currentThread().getName() + " sleep " + _int + "millis");
try {
Thread.sleep(_int);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " end");
}
}
https://blog.csdn.net/yue_2018/article/details/89784454
https://blog.csdn.net/weixin_34410662/article/details/85600084
代码地址:https://github.com/pomestyle/SpringBoot/tree/master/Springboot-Redis-SETEX
如果看到这里,说明你喜欢这篇文章,请 转发、点赞。同时 标星(置顶)本公众号可以第一时间接受到博文推送。
推荐阅读
1. 二维码会被人类扫完吗?
2. Java 收入不再最低
3. 再见 JSP !
4. 再战 Spring IOC!
《Java技术栈学习手册》
,覆盖了Java技术、面试题精选、Spring全家桶、Nginx、SSM、微服务、数据库、数据结构、架构等等。
获取方式:点“ 在看,关注公众号 Java后端 并回复 777 领取,更多内容陆续奉上。
喜欢文章,点个在看
本文分享自微信公众号 - Java后端(web_resource)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。