本帖最后由 上海分校-小影 于 2018-5-3 09:44 编辑
Redis分布式锁的实现背景 目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们:"任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。"所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。在单机环境中,Java中其实提供了很多并发处理相关的API,但是这些API在分布式场景中就无能为力了。也就是说单纯的Java Api并不能提供分布式锁的能力。所以针对分布式锁的实现目前有多种方案。
分布式锁 分布式锁是一个在很多环境中非常有用的原语,它是不同进程互斥操作共享资源的唯一方法。 针对分布式锁的实现,目前比较常用的有以下几种方案: 基于数据库实现分布式锁 基于缓存(redis,memcached,tair)实现分布式锁 基于Zookeeper实现分布式锁
Redis分分布式锁的代码实现1.定义锁接口[AppleScript] 纯文本查看 复制代码 /**
* Redis分布式锁接口
* Created by hetiewei on 2017/4/7.
*/
public interface RedisDistributionLock {
/**
* 加锁成功,返回加锁时间
* @param lockKey
* @param threadName
* @return
*/
public long lock(String lockKey, String threadName);
/**
* 解锁, 需要更新加锁时间,判断是否有权限
* @param lockKey
* @param lockValue
* @param threadName
*/
public void unlock(String lockKey, long lockValue, String threadName);
/**
* 多服务器集群,使用下面的方法,代替System.currentTimeMillis(),获取redis时间,避免多服务的时间不一致问题!!!
* @return
*/
public long currtTimeForRedis();
}
##### 2.定义锁实现
import com.jay.service.redis.RedisDistributionLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.dao.DataAccessException; import org.springframework.data.redis.connection.RedisConnection; import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.RedisSerializer; import java.util.concurrent.TimeUnit;
[AppleScript] 纯文本查看 复制代码 /**
Created by hetiewei on // */ public class RedisLockImpl implements RedisDistributionLock {
``` //加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象 private static final long LOCK_TIMEOUT = 5*1000;
private static final Logger LOG = LoggerFactorygetLogger(RedisLockImpl.class);
private StringRedisTemplate redisTemplate;
public RedisLockImpl(StringRedisTemplate redisTemplate) {
thisredisTemplate = redisTemplate;
}
@Override
public synchronized long lock(String lockKey, String threadName) {
LOG.info(threadName+"开始执行加锁");
while (true){ //循环获取锁
//锁时间
Long lock_timeout = currtTimeForRedis()+ LOCK_TIMEOUT +1;
if (redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection redisConnection) throws DataAccessException {
//定义序列化方式
RedisSerializer<String> serializer = redisTemplate.getStringSerializer();
byte[] value = serializer.serialize(lock_timeout.toString());
boolean flag = redisConnection.setNX(lockKey.getBytes(), value);
return flag;
}
})){
//如果加锁成功
LOG.info(threadName +"加锁成功 ++++ 111111");
//设置超时时间,释放内存
redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
return lock_timeout;
}else {
//获取redis里面的时间
String result = redisTemplate.opsForValue().get(lockKey);
Long currt_lock_timeout_str = result==null?null:Long.parseLong(result);
//锁已经失效
if (currt_lock_timeout_str != null && currt_lock_timeout_str < System.currentTimeMillis()){
//判断是否为空,不为空时,说明已经失效,如果被其他线程设置了值,则第二个条件判断无法执行
//获取上一个锁到期时间,并设置现在的锁到期时间
Long old_lock_timeout_Str = Long.valueOf(redisTemplate.opsForValue().getAndSet(lockKey, lock_timeout.toString()));
if (old_lock_timeout_Str != null && old_lock_timeout_Str.equals(currt_lock_timeout_str)){
//多线程运行时,多个线程签好都到了这里,但只有一个线程的设置值和当前值相同,它才有权利获取锁
LOG.info(threadName + "加锁成功 ++++ 22222");
//设置超时间,释放内存
redisTemplate.expire(lockKey, LOCK_TIMEOUT, TimeUnit.MILLISECONDS);
//返回加锁时间
return lock_timeout;
}
}
}
[AppleScript] 纯文本查看 复制代码 try {
LOG.info(threadName +"等待加锁, 睡眠100毫秒");
```
// TimeUnit.MILLISECONDS.sleep(100); TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }
[align=left] ``` /** * 解锁 * @param lockKey * @param lockValue * @param threadName */ @Override public synchronized void unlock(String lockKey, long lockValue, String threadName) { LOG.info(threadName + "执行解锁==========");//正常直接删除 如果异常关闭判断加锁会判断过期时间 //获取redis中设置的时间 String result = redisTemplate.opsForValue().get(lockKey); Long currt_lock_timeout_str = result ==null?null:Long.valueOf(result);
//如果是加锁者,则删除锁, 如果不是,则等待自动过期,重新竞争加锁
[AppleScript] 纯文本查看 复制代码 if (currt_lock_timeout_str !=null && currt_lock_timeout_str == lockValue){
redisTemplate.delete(lockKey);
LOG.info(threadName + "解锁成功------------------");
}
}
/**
* 多服务器集群,使用下面的方法,代替System.currentTimeMillis(),获取redis时间,避免多服务的时间不一致问题!!!
[AppleScript] 纯文本查看 复制代码 * @return
*/
@Override
public long currtTimeForRedis(){
return redisTemplate.execute(new RedisCallback<Long>() {
@Override
public Long doInRedis(RedisConnection redisConnection) throws DataAccessException {
return redisConnection.time();
}
});
}
}
##### 分布式锁验证
@RestController @RequestMapping("/distribution/redis") public class RedisLockController { ``` private static final String LOCK_NO = "redis_distribution_lock_no_"; private static int i = 0; private ExecutorService service; @Autowired private StringRedisTemplate redisTemplate; /** * 模拟1000个线程同时执行业务,修改资源 * * 使用线程池定义了20个线程 * */ @GetMapping("lock1") public void testRedisDistributionLock1(){
s[AppleScript] 纯文本查看 复制代码 ervice = Executors.newFixedThreadPool(20);
for (int i=0;i<1000;i++){
service.execute(new Runnable() {
@Override
public void run() {
task(Thread.currentThread().getName());
}
});
} [AppleScript] 纯文本查看 复制代码 } @GetMapping("/{key}") public String getValue(@PathVariable("key") String key){ Serializable result = redisTemplate.opsForValue().get(key); return result.toString(); } private void task(String name) { // System.out.println(name + "任务执行中"+(i++)); //创建一个redis分布式锁 RedisLockImpl redisLock = new RedisLockImpl(redisTemplate); //加锁时间 Long lockTime; if ((lockTime = redisLock.lock((LOCK_NO+1)+"", name))!=null){ //开始执行任务 System.out.println(name + "任务执行中"+(i++)); //任务执行完毕 关闭锁 redisLock.unlock((LOCK_NO+1)+"", lockTime, name); } } [/align] }
|