kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject kafka git commit: KAFKA-5600; Fix group loading regression causing stale metadata/offset cache
Date Mon, 17 Jul 2017 18:53:48 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 973f9d2b2 -> 3a515505e


KAFKA-5600; Fix group loading regression causing stale metadata/offset cache

the while loop was too big and need to be closed earlier
to see the fix, ignore whitespace since most of it is indentation

this bug was introduced by commit
5bd06f1d542e6b588a1d402d059bc24690017d32

Author: Jan Burkhardt <jan.burkhardt@just.social>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Jason Gustafson <jason@confluent.io>

Closes #3538 from bjrke/trunk

(cherry picked from commit e2fe19d22a42525001c5a66f21f02f49c051ecb5)
Signed-off-by: Jason Gustafson <jason@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a515505
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a515505
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a515505

Branch: refs/heads/0.11.0
Commit: 3a515505e07fb4ccc961afcf52b5bad5fbf57e0f
Parents: 973f9d2
Author: Jan Burkhardt <jan.burkhardt@just.social>
Authored: Mon Jul 17 11:49:50 2017 -0700
Committer: Jason Gustafson <jason@confluent.io>
Committed: Mon Jul 17 11:53:45 2017 -0700

----------------------------------------------------------------------
 .../group/GroupMetadataManager.scala            | 86 ++++++++++----------
 .../group/GroupMetadataManagerTest.scala        | 66 ++++++++++++++-
 2 files changed, 105 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index a8419fd..9322ff2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -550,57 +550,57 @@ class GroupMetadataManager(brokerId: Int,
             }
             currOffset = batch.nextOffset
           }
+        }
 
+        val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+          .groupBy(_._1.group)
+          .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition,
offset) })
+          .partition { case (group, _) => loadedGroups.contains(group) }
 
-          val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+        val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition,
CommitRecordMetadataAndOffset]]]()
+        pendingOffsets.foreach { case (producerId, producerOffsets) =>
+          producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
+          producerOffsets
             .groupBy(_._1.group)
-            .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition,
offset) })
-            .partition { case (group, _) => loadedGroups.contains(group) }
-
-          val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition,
CommitRecordMetadataAndOffset]]]()
-          pendingOffsets.foreach { case (producerId, producerOffsets) =>
-            producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
-            producerOffsets
-              .groupBy(_._1.group)
-              .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition,
offset)})
-              .foreach { case (group, offsets) =>
-                val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-                val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
-                groupProducerOffsets ++= offsets
-              }
-          }
+            .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition,
offset)})
+            .foreach { case (group, offsets) =>
+              val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+              val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId,
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+              groupProducerOffsets ++= offsets
+            }
+        }
 
-          val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
-            .partition { case (group, _) => loadedGroups.contains(group)}
+        val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup
+          .partition { case (group, _) => loadedGroups.contains(group)}
 
-          loadedGroups.values.foreach { group =>
-            val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition,
CommitRecordMetadataAndOffset])
-            val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            debug(s"Loaded group metadata $group with offsets $offsets and pending offsets
$pendingOffsets")
-            loadGroup(group, offsets, pendingOffsets)
-            onGroupLoaded(group)
-          }
+        loadedGroups.values.foreach { group =>
+          val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+          val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+          debug(s"Loaded group metadata $group with offsets $offsets and pending offsets
$pendingOffsets")
+          loadGroup(group, offsets, pendingOffsets)
+          onGroupLoaded(group)
+        }
 
-          // load groups which store offsets in kafka, but which have no active members and
thus no group
-          // metadata stored in the log
-          (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId)
=>
-            val group = new GroupMetadata(groupId)
-            val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition,
CommitRecordMetadataAndOffset])
-            val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            debug(s"Loaded group metadata $group with offsets $offsets and pending offsets
$pendingOffsets")
-            loadGroup(group, offsets, pendingOffsets)
-            onGroupLoaded(group)
-          }
+        // load groups which store offsets in kafka, but which have no active members and
thus no group
+        // metadata stored in the log
+        (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId)
=>
+          val group = new GroupMetadata(groupId)
+          val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+          val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long,
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+          debug(s"Loaded group metadata $group with offsets $offsets and pending offsets
$pendingOffsets")
+          loadGroup(group, offsets, pendingOffsets)
+          onGroupLoaded(group)
+        }
 
-          removedGroups.foreach { groupId =>
-            // if the cache already contains a group which should be removed, raise an error.
Note that it
-            // is possible (however unlikely) for a consumer group to be removed, and then
to be used only for
-            // offset storage (i.e. by "simple" consumers)
-            if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
-              throw new IllegalStateException(s"Unexpected unload of active group $groupId
while " +
-                s"loading partition $topicPartition")
-          }
+        removedGroups.foreach { groupId =>
+          // if the cache already contains a group which should be removed, raise an error.
Note that it
+          // is possible (however unlikely) for a consumer group to be removed, and then
to be used only for
+          // offset storage (i.e. by "simple" consumers)
+          if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId))
+            throw new IllegalStateException(s"Unexpected unload of active group $groupId
while " +
+              s"loading partition $topicPartition")
         }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6245e85..a8ce17e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -610,6 +610,51 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
+    val startOffset = 15L
+    val tp0 = new TopicPartition("foo", 0)
+    val tp1 = new TopicPartition("foo", 1)
+    val tp2 = new TopicPartition("bar", 0)
+    val tp3 = new TopicPartition("xxx", 0)
+
+    val logMock =  EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
+
+    val segment1MemberId = "a"
+    val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
+    val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)):
_*)
+    val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records)
+
+    val segment2MemberId = "b"
+    val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
+    val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE,
+      createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)):
_*)
+    val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records)
+
+    EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
+
+    EasyMock.replay(logMock, replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded
into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Stable, group.currentState)
+
+    assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId)
+    assertEquals("segment2 group record member should be only member", Set(segment2MemberId),
group.allMembers)
+
+    // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions
+    val committedOffsets = segment1Offsets ++ segment2Offsets
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+
+  @Test
   def testAddGroup() {
     val group = new GroupMetadata("foo")
     assertEquals(group, groupMetadataManager.addGroup(group))
@@ -1303,20 +1348,33 @@ class GroupMetadataManagerTest {
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val endOffset = startOffset + records.records.asScala.size
     val logMock =  EasyMock.mock(classOf[Log])
+    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
+    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
+    EasyMock.replay(logMock)
+  }
+
+  /**
+   * mock records into a mocked log
+   *
+   * @return the calculated end offset to be mocked into [[ReplicaManager.getLogEndOffset]]
+   */
+  private def expectGroupMetadataLoad(logMock: Log,
+                                      startOffset: Long,
+                                      records: MemoryRecords): Long = {
+    val endOffset = startOffset + records.records.asScala.size
     val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
 
-    EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
-    EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None),
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock))
     EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt()))
       .andReturn(records.buffer)
+    EasyMock.replay(fileRecordsMock)
 
-    EasyMock.replay(logMock, fileRecordsMock)
+    endOffset
   }
 
   private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],


Mime
View raw message