<samp id="wckti"></samp>
<var id="wckti"></var>
  • <rt id="wckti"></rt>

    <rt id="wckti"><bdo id="wckti"></bdo></rt><strong id="wckti"><noscript id="wckti"></noscript></strong>
  • 當前位置:聯(lián)升科技 > 技術(shù)資訊 > 開(kāi)發(fā)技術(shù) >

    經(jīng)常用Redis,這些坑你知道嗎?

    2020-12-24    作者:二馬讀書(shū)    來(lái)源:楊建榮的學(xué)習筆記    閱讀: 次
    近些年,Redis憑借在性能、穩定性和高可擴展性上的卓越表現,基本上已經(jīng)成了互聯(lián)網(wǎng)行業(yè)緩存中間件的標配,甚至很多傳統行業(yè)也在使用Redis。那么我們在使用Redis等緩存中間件時(shí),要注意哪些問(wèn)題呢?本文咱們就來(lái)聊聊,我們使用緩存中間件過(guò)程中曾經(jīng)遇到的坑!
    緩存穿透
    先看一個(gè)常見(jiàn)的緩存使用方式。請求來(lái)了,先查緩存,緩存有值就直接返回;緩存沒(méi)值,查數據庫,然后把數據庫的值存到緩存,再返回。
    假如緩存沒(méi)查到某個(gè)值,查數據庫也沒(méi)這個(gè)值,也就是說(shuō)要查的值根本不存在,這樣就會(huì )導致每次對這個(gè)值的查詢(xún)請求都會(huì )穿透到數據庫。這就是所謂的“緩存穿透”。
    如何避免緩存穿透?
    如果從數據庫中沒(méi)查到值,可以在緩存中記錄一個(gè)空值,來(lái)避免“緩存穿透”。并且要給這個(gè)空值設置一個(gè)較短的過(guò)期時(shí)間。
    比如說(shuō),我們經(jīng)常會(huì )把用戶(hù)信息緩存到Redis。如果調用方傳了一個(gè)不存在的UserID,在緩存中就查不到這個(gè)用戶(hù)信息,然后去DB也查不到。這樣就會(huì )導致,每次根據這個(gè)UserID查用戶(hù)信息,都會(huì )穿透到數據庫,給數據庫造成了壓力。為了避免緩存穿透,當數據庫查不到時(shí),我們可以在緩存中記錄一條空數據,比如userID做為key,空json做為值,如果程序獲得這個(gè)空json,就按用戶(hù)不存在處理。再給這個(gè)key設置一個(gè)很短的過(guò)期時(shí)間,比如30秒。
    緩存雪崩
    我們經(jīng)常會(huì )遇到需要初始化緩存的情況。比如說(shuō)用戶(hù)系統重構,表結構發(fā)生了變化,緩存信息也要變,上線(xiàn)前需要初始化緩存,將用戶(hù)信息批量存入緩存。假如我們給這些用戶(hù)信息設置相同的過(guò)期時(shí)間,到過(guò)期時(shí)間點(diǎn)所有用戶(hù)信息的緩存記錄就會(huì )同時(shí)集中失效,導致大量請求瞬間打到數據庫,數據庫很可能會(huì )被搞掛。這種緩存集中失效,導致大量請求同時(shí)穿透到數據庫的情況,就是所謂的“雪崩效應”。
    所以,當我們向緩存初始化數據時(shí),要保證每個(gè)緩存記錄過(guò)期時(shí)間的離散性??梢圆捎靡粋€(gè)較大的固定值加上一個(gè)較小的隨機值。比如過(guò)期時(shí)間可以是:10小時(shí) + 0到3600秒的隨機值。
    緩存并發(fā)
    當系統并發(fā)很高,緩存數據尤其是熱點(diǎn)數據過(guò)期后,可能會(huì )出現多個(gè)請求同時(shí)訪(fǎng)問(wèn)數據庫并設置緩存的情況,不但給數據庫帶來(lái)壓力,而且會(huì )有緩存頻繁更新的問(wèn)題。
    我們可以通過(guò)加鎖來(lái)避免緩存并發(fā)問(wèn)題。如果從緩存查不到數據,對查詢(xún)數據加分布式鎖,然后查數據庫并把數據庫查詢(xún)結果放入緩存。其他線(xiàn)程等待鎖釋放后,直接從緩存取值。
    比如,電商系統會(huì )緩存商品SKU價(jià)格,一些熱點(diǎn)商品的并發(fā)訪(fǎng)問(wèn)會(huì )非常高。當緩存過(guò)期失效后,訪(fǎng)問(wèn)請求從緩存查不到記錄,此時(shí)可以用商品SKU ID為Key加分布式鎖,然后從數據庫查詢(xún)價(jià)格并把價(jià)格放入緩存,最后解鎖。解鎖后其他請求就可以從緩存直接取值了。從而避免了數據庫的壓力。
    分布式鎖
    以我們之前做過(guò)的5人拼團為例。如果有用戶(hù)參加團購,我們需要先校驗參團人數是否達到了上限5人。如果沒(méi)達到5人,用戶(hù)才可以參團。偽代碼如下:
    //根據拼團ID獲取目前參團成員數量 
    int numOfMembers = pinTuanService.getNumOfMembersById(pinTuanID); 
    if(numOfMembers < 5) { 
      pinTuanService.pintuan();//執行,加入拼團,生單等邏輯 
    }  
    高并發(fā)場(chǎng)景下,上面的代碼會(huì )有很?chē)乐氐膯?wèn)題。如果某個(gè)團當前的參團人數是4,這時(shí)有兩個(gè)用戶(hù)同時(shí)參團,用戶(hù)A和用戶(hù)B的請求同時(shí)進(jìn)入上面的代碼塊,A和B的請求同時(shí)執行到第2行代碼,獲取的numOfMembers都是4,表達式 numOfMembers < 5 成立,所以?xún)蓚€(gè)用戶(hù)都能執行到第4行代碼,就是說(shuō)A用戶(hù)和B用戶(hù)都能成功參加拼團。于是,參團人數就超過(guò)了5人的上限。所以我們就需要加鎖來(lái)避免這個(gè)問(wèn)題。synchronized行嗎?不行。因為我們的服務(wù)是多節點(diǎn)部署的,所以要加分布式鎖。代碼如下:
    boolean aquired = distributedLock.aquireLock(pinTuanID, 3000); 
    if(aquired == true) { 
      try{ 
        //根據拼團ID獲取目前參團成員數量 
        int numOfMembers = pinTuanService.getNumOfMembersById(pinTuanID); 
        if(numOfMembers < 5) { 
          pinTuanService.pintuan();//執行,加入拼團,生單等邏輯 
        }  
      } finally { 
        distributedLock.releaseLock(pinTuanID); 
      } 
    這樣就好多啦!接下來(lái)我們看看基于Redis分布式鎖的實(shí)現,以及特別要注意的問(wèn)題。一般我們會(huì )基于setnx實(shí)現Redis分布式鎖。setnx命令可以檢查key是否存在,如果key不存在,就在Redis中創(chuàng )建一個(gè)鍵值對(操作成功),如果key已經(jīng)存在就放棄執行(操作失敗)。
    先看一段基于Springboot實(shí)現的加鎖和釋放鎖的代碼:
    @Component 
    public class DistributedLock { 
     
     @Autowired 
     private StringRedisTemplate redisTemplate; 
      
     /** 
     * 加鎖 
     * lockKey,redis的key 
     * expireTime,過(guò)期時(shí)間,單位是毫秒 
     * 注:setIfAbsent方法就使用了redis的setnx 
     */ 
      public boolean aquireLock(String lockKey, long expireTime) { 
       long waitTime = 0; 
       boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, "distributedLock", 
                         expireTime, TimeUnit.MILLISECONDS); 
       if(success == true){ 
          return success;    
       } else { 
         //如果加鎖失敗,循環(huán)重試加鎖 
         while(success != true && waitTime < 5000L ) { 
           success = redisTemplate.opsForValue().setIfAbsent(lockKey, "distributedLock", 
                           expireTime, TimeUnit.MILLISECONDS); 
           sleep 100毫秒;                 
           waitTime += 100L; 
         } 
       } 
        
       return success; 
     } 
      
     /** 
     * 釋放鎖 
     * lockKey,redis的key 
     */ 
     public void releaseLock(String lockKey) { 
       redisTemplate.delete(lockKey); 
     }  
      
    上面的代碼。乍一看,好像沒(méi)什么問(wèn)題!加鎖失敗有循環(huán)重試加鎖,過(guò)期時(shí)間設置了,而且也保證了創(chuàng )建Key-Value鍵值對和設置過(guò)期時(shí)間的原子性,這樣當程序沒(méi)有正常釋放鎖時(shí),也能保證過(guò)期后鎖自動(dòng)釋放(注意:redis較老的版本不支持 setnx 和設置過(guò)期時(shí)間的原子操作,不過(guò)可以利用Lua腳本來(lái)保證原子性)。
    我們再仔細思考一下,一般場(chǎng)景我們會(huì )對Key設置一個(gè)很短的過(guò)期時(shí)間,當一次操作因為網(wǎng)絡(luò )等原因耗費了較長(cháng)時(shí)間,操作還沒(méi)完成key就過(guò)期失效了。這樣會(huì )產(chǎn)生什么問(wèn)題呢?我們還是以拼團為例加以說(shuō)明,先看看下面這張圖:

    如上圖,用戶(hù)A和用戶(hù)B同時(shí)參加同一團,團ID為 001,我們以團ID作為分布式鎖的Key,"distributedLock" 作為固定的Value,過(guò)期時(shí)間是5秒。A先獲取分布式鎖,但是由于網(wǎng)絡(luò )等原因A的拼團操作在5秒內沒(méi)完成,這時(shí)Key過(guò)期并從Redis清除掉,A的分布式鎖失效。此時(shí)用戶(hù)B拿到分布式鎖,Key也同樣是團ID 001。在用戶(hù)B的拼團邏輯執行完之前,用戶(hù)A的邏輯先執行完了,緊接著(zhù)A就把鎖給釋放了。不過(guò)A的鎖早已經(jīng)過(guò)期失效了,B持有鎖的Key和A又完全一樣,所以此時(shí)A釋放的其實(shí)是B的鎖。這樣一來(lái)整個(gè)拼團還是有可能會(huì )超員。怎么解決呢?
    我們可以把分布式鎖的Value設成可以區分的值,比如拼團的場(chǎng)景Value可以設置為userID,在釋放鎖的時(shí)候根據key和value來(lái)判斷當前的鎖是不是自己的,只有Redis中userID和自己的userID相同才釋放鎖。
    改進(jìn)后的代碼如下:
    @Component 
    public class DistributedLock { 
     
     @Autowired 
     private StringRedisTemplate redisTemplate; 
      
     /** 
     * 加鎖 
     * lockKey,redis的key 
     * expireTime,過(guò)期時(shí)間,單位是毫秒 
     * 注:setIfAbsent方法就使用了redis的setnx 
     */ 
      public boolean aquireLock(String lockKey, String userID, long expireTime) { 
       long waitTime = 0; 
       boolean success = redisTemplate.opsForValue().setIfAbsent(lockKey, userID, 
                         expireTime, TimeUnit.MILLISECONDS); 
       if(success == true){ 
          return success;    
       } else { 
         //如果加鎖失敗,循環(huán)重試加鎖 
         while(success != true && waitTime < 5000L ) { 
           success = redisTemplate.opsForValue().setIfAbsent(lockKey, userID, 
                           expireTime, TimeUnit.MILLISECONDS); 
           sleep 100毫秒;                 
           waitTime += 100L; 
         } 
       } 
        
       return success; 
     } 
      
     /** 
     * 釋放鎖 
     * lockKey,redis的key 
     */ 
     public void releaseLock(String lockKey, String userID) { 
       String userIDFromRedis = redisTemplate.get(lockKey); 
       if( userID.equals(userIDFromRedis) ) { 
         redisTemplate.delete(lockKey); 
       } 
     }  
      
    還有一種場(chǎng)景需要考慮。當Redis master發(fā)生故障,主備切換時(shí)往往會(huì )造成數據丟失,包括分布式鎖的Key-Value 也可能丟失。這樣就會(huì )導致操作還沒(méi)執行完,鎖就被其他請求拿到了。Redis官方提供了Redlock算法,以及相應的開(kāi)源實(shí)現 Redisson。用到分布式鎖的場(chǎng)景,大家可以直接使用 Redisson,非常方便。如果系統對可靠性要求很高,如需用到分布式鎖,建議使用 Zookeeper,etcd 等。


    相關(guān)文章

    我們很樂(lè )意傾聽(tīng)您的聲音!
    即刻與我們取得聯(lián)絡(luò )
    成為日后肩并肩合作的伙伴。

    行業(yè)資訊

    聯(lián)系我們

    13387904606

    地址:新余市仙女湖區仙女湖大道萬(wàn)商紅A2棟

    手機:13755589003
    QQ:122322500
    微信號:13755589003

    江西新余網(wǎng)站設計_小程序制作_OA系統開(kāi)發(fā)_企業(yè)ERP管理系統_app開(kāi)發(fā)-新余聯(lián)升網(wǎng)絡(luò )科技有限公司 贛ICP備19013599號-1   贛公網(wǎng)安備 36050202000267號   

    微信二維碼
    五月天婷婷在线观看历史|国产欧美日韩免费一区二区|亚洲成a∨人片在无码|欧美日韩不卡一区二区三区中文字|每日更新国产精品视频|成年人在线观看视频免费|亚洲A片一区二区三区在线观看

    
    

    <samp id="wckti"></samp>
    <var id="wckti"></var>
  • <rt id="wckti"></rt>

    <rt id="wckti"><bdo id="wckti"></bdo></rt><strong id="wckti"><noscript id="wckti"></noscript></strong>