kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From maniku...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7742; Fixed removing hmac entry for a token being removed from DelegationTokenCache
Date Thu, 20 Dec 2018 10:25:55 GMT
This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b23bf41  KAFKA-7742; Fixed removing hmac entry for a token being removed from DelegationTokenCache
b23bf41 is described below

commit b23bf41e84d468185aa4165bf171b24735b8fecd
Author: Satish Duggana <satishd@apache.org>
AuthorDate: Thu Dec 20 15:55:34 2018 +0530

    KAFKA-7742; Fixed removing hmac entry for a token being removed from DelegationTokenCache
    
    Author: Satish Duggana <satishd@apache.org>
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
    
    Closes #6037 from satishd/KAFKA-7742
---
 .../delegation/internals/DelegationTokenCache.java | 24 +++++++++++++++-----
 .../delegation/DelegationTokenManagerTest.scala    | 26 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 7 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java
b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java
index a74781f..9cc913f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/internals/DelegationTokenCache.java
@@ -32,10 +32,15 @@ import java.util.concurrent.ConcurrentHashMap;
 public class DelegationTokenCache {
 
     private CredentialCache credentialCache = new CredentialCache();
+
     //Cache to hold all the tokens
     private Map<String, TokenInformation> tokenCache = new ConcurrentHashMap<>();
+
     //Cache to hold hmac->tokenId mapping. This is required for renew, expire requests
-    private Map<String, String> hmacIDCache = new ConcurrentHashMap<>();
+    private Map<String, String> hmacTokenIdCache = new ConcurrentHashMap<>();
+
+    //Cache to hold tokenId->hmac mapping. This is required for removing entry from hmacTokenIdCache
using tokenId.
+    private Map<String, String> tokenIdHmacCache = new ConcurrentHashMap<>();
 
     public DelegationTokenCache(Collection<String> scramMechanisms) {
         //Create caches for scramMechanisms
@@ -60,17 +65,21 @@ public class DelegationTokenCache {
         //Update Scram Credentials
         updateCredentials(tokenId, scramCredentialMap);
         //Update hmac-id cache
-        hmacIDCache.put(hmac, tokenId);
+        hmacTokenIdCache.put(hmac, tokenId);
+        tokenIdHmacCache.put(tokenId, hmac);
     }
 
-
     public void removeCache(String tokenId) {
         removeToken(tokenId);
-        updateCredentials(tokenId, new HashMap<String, ScramCredential>());
+        updateCredentials(tokenId, new HashMap<>());
+    }
+
+    public String tokenIdForHmac(String base64hmac) {
+        return hmacTokenIdCache.get(base64hmac);
     }
 
     public TokenInformation tokenForHmac(String base64hmac) {
-        String tokenId = hmacIDCache.get(base64hmac);
+        String tokenId = hmacTokenIdCache.get(base64hmac);
         return tokenId == null ? null : tokenCache.get(tokenId);
     }
 
@@ -81,7 +90,10 @@ public class DelegationTokenCache {
     public void removeToken(String tokenId) {
         TokenInformation tokenInfo = tokenCache.remove(tokenId);
         if (tokenInfo != null) {
-            hmacIDCache.remove(tokenInfo.tokenId());
+            String hmac = tokenIdHmacCache.remove(tokenInfo.tokenId());
+            if (hmac != null) {
+                hmacTokenIdCache.remove(hmac);
+            }
         }
     }
 
diff --git a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
index b8d4376..ed82f5e 100644
--- a/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/token/delegation/DelegationTokenManagerTest.scala
@@ -19,7 +19,7 @@ package kafka.security.token.delegation
 
 import java.net.InetAddress
 import java.nio.ByteBuffer
-import java.util.Properties
+import java.util.{Base64, Properties}
 
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.Acl.WildCardHost
@@ -190,6 +190,30 @@ class DelegationTokenManagerTest extends ZooKeeperTestHarness  {
   }
 
   @Test
+  def testRemoveTokenHmac():Unit = {
+    val config = KafkaConfig.fromProps(props)
+    val tokenManager = createDelegationTokenManager(config, tokenCache, time, zkClient)
+    tokenManager.startup
+
+    tokenManager.createToken(owner, renewer, -1 , createTokenResultCallBack)
+    val issueTime = time.milliseconds
+    val tokenId = createTokenResult.tokenId
+    val password = DelegationTokenManager.createHmac(tokenId, masterKey)
+    assertEquals(CreateTokenResult(issueTime, issueTime + renewTimeMsDefault,  issueTime
+ maxLifeTimeMsDefault, tokenId, password, Errors.NONE), createTokenResult)
+
+    // expire the token immediately
+    tokenManager.expireToken(owner, ByteBuffer.wrap(password), -1, renewResponseCallback)
+
+    val encodedHmac = Base64.getEncoder.encodeToString(password)
+    // check respective hmac map entry is removed for the expired tokenId.
+    val tokenInformation = tokenManager.tokenCache.tokenIdForHmac(encodedHmac)
+    assertNull(tokenInformation)
+
+    //check that the token is removed
+    assert(tokenManager.getToken(tokenId).isEmpty)
+  }
+
+  @Test
   def testDescribeToken(): Unit = {
 
     val config = KafkaConfig.fromProps(props)


Mime
View raw message