kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-2652: integrate new group protocol into partition grouping
Date Mon, 26 Oct 2015 20:28:40 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 939c4244e -> 71399ffe4


http://git-wip-us.apache.org/repos/asf/kafka/blob/71399ffe/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
index 8fdbfff..4dfa9c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java
@@ -47,14 +47,14 @@ import java.util.Set;
  * <p>
  * <h2>Basic usage</h2>
  * This component can be used to help test a {@link KeyValueStore}'s ability to read and
write entries.
- * 
+ *
  * <pre>
  * // Create the test driver ...
  * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
  * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
  *                                              .withIntegerKeys().withStringKeys()
  *                                              .inMemory().build();
- * 
+ *
  * // Verify that the store reads and writes correctly ...
  * store.put(0, "zero");
  * store.put(1, "one");
@@ -69,7 +69,7 @@ import java.util.Set;
  * assertEquals("five", store.get(5));
  * assertNull(store.get(3));
  * store.delete(5);
- * 
+ *
  * // Flush the store and verify all current entries were properly flushed ...
  * store.flush();
  * assertEquals("zero", driver.flushedEntryStored(0));
@@ -77,14 +77,14 @@ import java.util.Set;
  * assertEquals("two", driver.flushedEntryStored(2));
  * assertEquals("four", driver.flushedEntryStored(4));
  * assertEquals(null, driver.flushedEntryStored(5));
- * 
+ *
  * assertEquals(false, driver.flushedEntryRemoved(0));
  * assertEquals(false, driver.flushedEntryRemoved(1));
  * assertEquals(false, driver.flushedEntryRemoved(2));
  * assertEquals(false, driver.flushedEntryRemoved(4));
  * assertEquals(true, driver.flushedEntryRemoved(5));
  * </pre>
- * 
+ *
  * <p>
  * <h2>Restoring a store</h2>
  * This component can be used to test whether a {@link KeyValueStore} implementation properly
@@ -94,30 +94,30 @@ import java.util.Set;
  * To do this, create an instance of this driver component, {@link #addEntryToRestoreLog(Object,
Object) add entries} that will be
  * passed to the store upon creation (simulating the entries that were previously flushed
to the topic), and then create the store
  * using this driver's {@link #context() ProcessorContext}:
- * 
+ *
  * <pre>
  * // Create the test driver ...
  * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create(Integer.class,
String.class);
- * 
+ *
  * // Add any entries that will be restored to any store that uses the driver's context ...
  * driver.addRestoreEntry(0, "zero");
  * driver.addRestoreEntry(1, "one");
  * driver.addRestoreEntry(2, "two");
  * driver.addRestoreEntry(4, "four");
- * 
+ *
  * // Create the store, which should register with the context and automatically
  * // receive the restore entries ...
  * KeyValueStore&lt;Integer, String> store = Stores.create("my-store", driver.context())
  *                                              .withIntegerKeys().withStringKeys()
  *                                              .inMemory().build();
- * 
+ *
  * // Verify that the store's contents were properly restored ...
  * assertEquals(0, driver.checkForRestoredEntries(store));
- * 
+ *
  * // and there are no other entries ...
  * assertEquals(4, driver.sizeOf(store));
  * </pre>
- * 
+ *
  * @param <K> the type of keys placed in the store
  * @param <V> the type of values placed in the store
  */
@@ -163,7 +163,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * value serializers and deserializers. This can be used when the actual serializers
and deserializers are supplied to the
      * store during creation, which should eliminate the need for a store to depend on the
ProcessorContext's default key and
      * value serializers and deserializers.
-     * 
+     *
      * @return the test driver; never null
      */
     public static <K, V> KeyValueStoreTestDriver<K, V> create() {
@@ -181,7 +181,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * deserializers for the given built-in key and value types (e.g., {@code String.class},
{@code Integer.class},
      * {@code Long.class}, and {@code byte[].class}). This can be used when store is created
to rely upon the
      * ProcessorContext's default key and value serializers and deserializers.
-     * 
+     *
      * @param keyClass the class for the keys; must be one of {@code String.class}, {@code
Integer.class},
      *            {@code Long.class}, or {@code byte[].class}
      * @param valueClass the class for the values; must be one of {@code String.class}, {@code
Integer.class},
@@ -198,7 +198,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * {@link ProcessorContext#forward(Object, Object) forwarded} by the store and that provides
the specified serializers and
      * deserializers. This can be used when store is created to rely upon the ProcessorContext's
default key and value serializers
      * and deserializers.
-     * 
+     *
      * @param keySerializer the key serializer for the {@link ProcessorContext}; may not
be null
      * @param keyDeserializer the key deserializer for the {@link ProcessorContext}; may
not be null
      * @param valueSerializer the value serializer for the {@link ProcessorContext}; may
not be null
@@ -283,7 +283,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Set the directory that should be used by the store for local disk storage.
-     * 
+     *
      * @param dir the directory; may be null if no local storage is allowed
      */
     public void useStateDir(File dir) {
@@ -320,25 +320,25 @@ public class KeyValueStoreTestDriver<K, V> {
      * <p>
      * To create such a test, create the test driver, call this method one or more times,
and then create the
      * {@link KeyValueStore}. Your tests can then check whether the store contains the entries
from the log.
-     * 
+     *
      * <pre>
      * // Set up the driver and pre-populate the log ...
      * KeyValueStoreTestDriver&lt;Integer, String> driver = KeyValueStoreTestDriver.create();
      * driver.addRestoreEntry(1,"value1");
      * driver.addRestoreEntry(2,"value2");
      * driver.addRestoreEntry(3,"value3");
-     * 
+     *
      * // Create the store using the driver's context ...
      * ProcessorContext context = driver.context();
      * KeyValueStore&lt;Integer, String> store = ...
-     * 
+     *
      * // Verify that the store's contents were properly restored from the log ...
      * assertEquals(0, driver.checkForRestoredEntries(store));
-     * 
+     *
      * // and there are no other entries ...
      * assertEquals(3, driver.sizeOf(store));
      * </pre>
-     * 
+     *
      * @param key the key for the entry
      * @param value the value for the entry
      * @see #checkForRestoredEntries(KeyValueStore)
@@ -354,7 +354,7 @@ public class KeyValueStoreTestDriver<K, V> {
      * <p>
      * If the {@link KeyValueStore}'s are to be restored upon its startup, be sure to {@link
#addEntryToRestoreLog(Object, Object)
      * add the restore entries} before creating the store with the {@link ProcessorContext}
returned by this method.
-     * 
+     *
      * @return the processing context; never null
      * @see #addEntryToRestoreLog(Object, Object)
      */
@@ -365,7 +365,7 @@ public class KeyValueStoreTestDriver<K, V> {
     /**
      * Get the entries that are restored to a KeyValueStore when it is constructed with this
driver's {@link #context()
      * ProcessorContext}.
-     * 
+     *
      * @return the restore entries; never null but possibly a null iterator
      */
     public Iterable<Entry<K, V>> restoredEntries() {
@@ -375,7 +375,7 @@ public class KeyValueStoreTestDriver<K, V> {
     /**
      * Utility method that will count the number of {@link #addEntryToRestoreLog(Object,
Object) restore entries} missing from the
      * supplied store.
-     * 
+     *
      * @param store the store that is to have all of the {@link #restoredEntries() restore
entries}
      * @return the number of restore entries missing from the store, or 0 if all restore
entries were found
      * @see #addEntryToRestoreLog(Object, Object)
@@ -395,7 +395,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Utility method to compute the number of entries within the store.
-     * 
+     *
      * @param store the key value store using this {@link #context()}.
      * @return the number of entries
      */
@@ -410,7 +410,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Retrieve the value that the store {@link KeyValueStore#flush() flushed} with the given
key.
-     * 
+     *
      * @param key the key
      * @return the value that was flushed with the key, or {@code null} if no such key was
flushed or if the entry with this
      *         key was {@link #flushedEntryStored(Object) removed} upon flush
@@ -421,7 +421,7 @@ public class KeyValueStoreTestDriver<K, V> {
 
     /**
      * Determine whether the store {@link KeyValueStore#flush() flushed} the removal of the
given key.
-     * 
+     *
      * @param key the key
      * @return {@code true} if the entry with the given key was removed when flushed, or
{@code false} if the entry was not
      *         removed when last flushed
@@ -438,4 +438,4 @@ public class KeyValueStoreTestDriver<K, V> {
         flushedEntries.clear();
         flushedRemovals.clear();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71399ffe/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index 761f5ce..16df9c5 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -83,12 +83,7 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
 
     @Override
     public int id() {
-        return -1;
-    }
-
-    @Override
-    public boolean joinable() {
-        return true;
+        return 0;
     }
 
     @Override
@@ -174,4 +169,4 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
         return this.timestamp;
     }
 
-}
\ No newline at end of file
+}


Mime
View raw message