kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id
Date Thu, 17 Mar 2016 17:41:53 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 9a836d015 -> 958e10c87


http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 8ff72bc..b3b6537 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -37,7 +37,7 @@ import java.util.Set;
 
 public abstract class AbstractTask {
     protected final TaskId id;
-    protected final String jobId;
+    protected final String applicationId;
     protected final ProcessorTopology topology;
     protected final Consumer consumer;
     protected final ProcessorStateManager stateMgr;
@@ -45,7 +45,7 @@ public abstract class AbstractTask {
     protected ProcessorContext processorContext;
 
     protected AbstractTask(TaskId id,
-                           String jobId,
+                           String applicationId,
                            Collection<TopicPartition> partitions,
                            ProcessorTopology topology,
                            Consumer<byte[], byte[]> consumer,
@@ -53,17 +53,17 @@ public abstract class AbstractTask {
                            StreamsConfig config,
                            boolean isStandby) {
         this.id = id;
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.partitions = new HashSet<>(partitions);
         this.topology = topology;
         this.consumer = consumer;
 
         // create the processor state manager
         try {
-            File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
-            File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString());
+            File applicationStateDir = StreamThread.makeStateDir(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG));
+            File stateFile = new File(applicationStateDir.getCanonicalPath(), id.toString());
             // if partitions is null, this is a standby task
-            this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
+            this.stateMgr = new ProcessorStateManager(applicationId, id.partition, partitions, stateFile, restoreConsumer, isStandby);
         } catch (IOException e) {
             throw new ProcessorStateException("Error while creating the state manager", e);
         }
@@ -83,8 +83,8 @@ public abstract class AbstractTask {
         return id;
     }
 
-    public final String jobId() {
-        return jobId;
+    public final String applicationId() {
+        return applicationId;
     }
 
     public final Set<TopicPartition> partitions() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index c4acc01..f6e43d0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -27,15 +27,10 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.TaskId;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 
 public class ProcessorContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
-    private static final Logger log = LoggerFactory.getLogger(ProcessorContextImpl.class);
-
     private final TaskId id;
     private final StreamTask task;
     private final StreamsMetrics metrics;
@@ -84,8 +79,8 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public String jobId() {
-        return task.jobId();
+    public String applicationId() {
+        return task.applicationId();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index c8f289e..df8516c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -51,7 +51,7 @@ public class ProcessorStateManager {
     public static final String CHECKPOINT_FILE_NAME = ".checkpoint";
     public static final String LOCK_FILE_NAME = ".lock";
 
-    private final String jobId;
+    private final String applicationId;
     private final int defaultPartition;
     private final Map<String, TopicPartition> partitionForTopic;
     private final File baseDir;
@@ -65,8 +65,8 @@ public class ProcessorStateManager {
     private final boolean isStandby;
     private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
 
-    public ProcessorStateManager(String jobId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
-        this.jobId = jobId;
+    public ProcessorStateManager(String applicationId, int defaultPartition, Collection<TopicPartition> sources, File baseDir, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby) throws IOException {
+        this.applicationId = applicationId;
         this.defaultPartition = defaultPartition;
         this.partitionForTopic = new HashMap<>();
         for (TopicPartition source : sources) {
@@ -104,8 +104,8 @@ public class ProcessorStateManager {
         }
     }
 
-    public static String storeChangelogTopic(String jobId, String storeName) {
-        return jobId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
+    public static String storeChangelogTopic(String applicationId, String storeName) {
+        return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
     }
 
     public static FileLock lockStateDirectory(File stateDir) throws IOException {
@@ -154,7 +154,7 @@ public class ProcessorStateManager {
         // check that the underlying change log topic exist or not
         String topic;
         if (loggingEnabled)
-            topic = storeChangelogTopic(this.jobId, store.name());
+            topic = storeChangelogTopic(this.applicationId, store.name());
         else topic = store.name();
 
         // block until the partition is ready for this state changelog topic or time has elapsed
@@ -325,7 +325,7 @@ public class ProcessorStateManager {
                 for (String storeName : stores.keySet()) {
                     TopicPartition part;
                     if (loggingEnabled.contains(storeName))
-                        part = new TopicPartition(storeChangelogTopic(jobId, storeName), getPartition(storeName));
+                        part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
                     else
                         part = new TopicPartition(storeName, getPartition(storeName));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
index 82633b4..0bcae18 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java
@@ -25,17 +25,13 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.File;
 
 public class StandbyContextImpl implements ProcessorContext, RecordCollector.Supplier {
 
-    private static final Logger log = LoggerFactory.getLogger(StandbyContextImpl.class);
-
     private final TaskId id;
-    private final String jobId;
+    private final String applicationId;
     private final StreamsMetrics metrics;
     private final ProcessorStateManager stateMgr;
 
@@ -47,12 +43,12 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     private boolean initialized;
 
     public StandbyContextImpl(TaskId id,
-                              String jobId,
+                              String applicationId,
                               StreamsConfig config,
                               ProcessorStateManager stateMgr,
                               StreamsMetrics metrics) {
         this.id = id;
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.metrics = metrics;
         this.stateMgr = stateMgr;
 
@@ -78,8 +74,8 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup
     }
 
     @Override
-    public String jobId() {
-        return jobId;
+    public String applicationId() {
+        return applicationId;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index da454cb..f19d5a3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -23,8 +23,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.processor.TaskId;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -36,15 +34,13 @@ import java.util.Map;
  */
 public class StandbyTask extends AbstractTask {
 
-    private static final Logger log = LoggerFactory.getLogger(StandbyTask.class);
-
     private final Map<TopicPartition, Long> checkpointedOffsets;
 
     /**
      * Create {@link StandbyTask} with its assigned partitions
      *
      * @param id                    the ID of this task
-     * @param jobId                 the ID of the job
+     * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
@@ -53,17 +49,17 @@ public class StandbyTask extends AbstractTask {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StandbyTask(TaskId id,
-                       String jobId,
+                       String applicationId,
                        Collection<TopicPartition> partitions,
                        ProcessorTopology topology,
                        Consumer<byte[], byte[]> consumer,
                        Consumer<byte[], byte[]> restoreConsumer,
                        StreamsConfig config,
                        StreamsMetrics metrics) {
-        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, true);
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, true);
 
         // initialize the topology with its own context
-        this.processorContext = new StandbyContextImpl(id, jobId, config, stateMgr, metrics);
+        this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics);
 
         initializeStateStores();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 266df3e..a6b82af 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -117,7 +117,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         streamThread = (StreamThread) o;
         streamThread.partitionAssignor(this);
 
-        this.topicGroups = streamThread.builder.topicGroups(streamThread.jobId);
+        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
 
         if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
             internalTopicManager = new InternalTopicManager(
@@ -445,7 +445,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
     /* For Test Only */
     public Set<TaskId> tasksForState(String stateName) {
-        return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.jobId, stateName));
+        return stateChangelogTopicToTaskIds.get(ProcessorStateManager.storeChangelogTopic(streamThread.applicationId, stateName));
     }
 
     public Set<TaskId> tasksForPartition(TopicPartition partition) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 4d66324..54a25c1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -61,7 +61,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * Create {@link StreamTask} with its assigned partitions
      *
      * @param id                    the ID of this task
-     * @param jobId                 the ID of the job
+     * @param applicationId         the ID of the stream processing application
      * @param partitions            the collection of assigned {@link TopicPartition}
      * @param topology              the instance of {@link ProcessorTopology}
      * @param consumer              the instance of {@link Consumer}
@@ -71,7 +71,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
      * @param metrics               the {@link StreamsMetrics} created by the thread
      */
     public StreamTask(TaskId id,
-                      String jobId,
+                      String applicationId,
                       Collection<TopicPartition> partitions,
                       ProcessorTopology topology,
                       Consumer<byte[], byte[]> consumer,
@@ -79,7 +79,7 @@ public class StreamTask extends AbstractTask implements Punctuator {
                       Consumer<byte[], byte[]> restoreConsumer,
                       StreamsConfig config,
                       StreamsMetrics metrics) {
-        super(id, jobId, partitions, topology, consumer, restoreConsumer, config, false);
+        super(id, applicationId, partitions, topology, consumer, restoreConsumer, config, false);
         this.punctuationQueue = new PunctuationQueue();
         this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index e9343e0..491c812 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -72,7 +72,7 @@ public class StreamThread extends Thread {
     private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
 
     public final PartitionGrouper partitionGrouper;
-    public final String jobId;
+    public final String applicationId;
     public final String clientId;
     public final UUID processId;
 
@@ -106,12 +106,12 @@ public class StreamThread extends Thread {
     private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
 
-    static File makeStateDir(String jobId, String baseDirName) {
+    static File makeStateDir(String applicationId, String baseDirName) {
         File baseDir = new File(baseDirName);
         if (!baseDir.exists())
             baseDir.mkdir();
 
-        File stateDir = new File(baseDir, jobId);
+        File stateDir = new File(baseDir, applicationId);
         if (!stateDir.exists())
             stateDir.mkdir();
 
@@ -150,12 +150,12 @@ public class StreamThread extends Thread {
 
     public StreamThread(TopologyBuilder builder,
                         StreamsConfig config,
-                        String jobId,
+                        String applicationId,
                         String clientId,
                         UUID processId,
                         Metrics metrics,
                         Time time) {
-        this(builder, config, null , null, null, jobId, clientId, processId, metrics, time);
+        this(builder, config, null , null, null, applicationId, clientId, processId, metrics, time);
     }
 
     StreamThread(TopologyBuilder builder,
@@ -163,17 +163,17 @@ public class StreamThread extends Thread {
                  Producer<byte[], byte[]> producer,
                  Consumer<byte[], byte[]> consumer,
                  Consumer<byte[], byte[]> restoreConsumer,
-                 String jobId,
+                 String applicationId,
                  String clientId,
                  UUID processId,
                  Metrics metrics,
                  Time time) {
         super("StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
 
-        this.jobId = jobId;
+        this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics(jobId);
+        this.sourceTopics = builder.sourceTopics(applicationId);
         this.clientId = clientId;
         this.processId = processId;
         this.partitionGrouper = config.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
@@ -194,7 +194,7 @@ public class StreamThread extends Thread {
         this.standbyRecords = new HashMap<>();
 
         // read in task specific config values
-        this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
+        this.stateDir = makeStateDir(this.applicationId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG));
         this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG);
         this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
         this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG);
@@ -224,7 +224,7 @@ public class StreamThread extends Thread {
     private Consumer<byte[], byte[]> createConsumer() {
         String threadName = this.getName();
         log.info("Creating consumer client for stream thread [" + threadName + "]");
-        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.jobId, this.clientId + "-" + threadName),
+        return new KafkaConsumer<>(config.getConsumerConfigs(this, this.applicationId, this.clientId + "-" + threadName),
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer());
     }
@@ -580,9 +580,9 @@ public class StreamThread extends Thread {
     protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
-        return new StreamTask(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
+        return new StreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, sensors);
     }
 
     private void addStreamTasks(Collection<TopicPartition> assignment) {
@@ -650,10 +650,10 @@ public class StreamThread extends Thread {
     protected StandbyTask createStandbyTask(TaskId id, Collection<TopicPartition> partitions) {
         sensors.taskCreationSensor.record();
 
-        ProcessorTopology topology = builder.build(jobId, id.topicGroupId);
+        ProcessorTopology topology = builder.build(applicationId, id.topicGroupId);
 
         if (!topology.stateStoreSuppliers().isEmpty()) {
-            return new StandbyTask(id, jobId, partitions, topology, consumer, restoreConsumer, config, sensors);
+            return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, sensors);
         } else {
             return null;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index aac4d85..4229f94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -57,7 +57,7 @@ public class StoreChangeLogger<K, V> {
     }
 
     protected StoreChangeLogger(String storeName, ProcessorContext context, int partition, Serdes<K, V> serialization, int maxDirty, int maxRemoved) {
-        this.topic = ProcessorStateManager.storeChangelogTopic(context.jobId(), storeName);
+        this.topic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName);
         this.context = context;
         this.partition = partition;
         this.serialization = serialization;

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f0276ab..83ebe48 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -41,7 +41,7 @@ public class StreamsConfigTest {
 
     @Before
     public void setUp() {
-        props.put(StreamsConfig.JOB_ID_CONFIG, "streams-config-test");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-config-test");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
         props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
@@ -59,9 +59,10 @@ public class StreamsConfigTest {
 
     @Test
     public void testGetConsumerConfigs() throws Exception {
-        Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-job", "client");
+        Map<String, Object> returnedProps =
+            streamsConfig.getConsumerConfigs(streamThreadPlaceHolder, "example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
-        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-job");
+        assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 14cb493..1d0a969 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -183,12 +183,12 @@ public class ProcessorStateManagerTest {
     }
 
     private final Set<TopicPartition> noPartitions = Collections.emptySet();
-    private final String jobId = "test-job";
+    private final String applicationId = "test-application";
     private final String stateDir = "test";
     private final String persistentStoreName = "persistentStore";
     private final String nonPersistentStoreName = "nonPersistentStore";
-    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, persistentStoreName);
-    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(jobId, nonPersistentStoreName);
+    private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName);
+    private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName);
 
     @Test
     public void testLockStateDirectory() throws IOException {
@@ -197,7 +197,7 @@ public class ProcessorStateManagerTest {
             FileLock lock;
 
             // the state manager locks the directory
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
 
             try {
                 // this should not get the lock
@@ -226,7 +226,7 @@ public class ProcessorStateManagerTest {
         try {
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, new MockRestoreConsumer(), false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
             } finally {
@@ -258,7 +258,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -311,7 +311,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 2, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 2, noPartitions, baseDir, restoreConsumer, false);
             try {
                 restoreConsumer.reset();
 
@@ -351,9 +351,9 @@ public class ProcessorStateManagerTest {
             String storeName2 = "store2";
             String storeName3 = "store3";
 
-            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
-            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
-            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(jobId, storeName3);
+            String storeTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+            String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
+            String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3);
 
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset));
@@ -386,7 +386,7 @@ public class ProcessorStateManagerTest {
 
             // if there is an source partition, inherit the partition id
             Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1));
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 0, sourcePartitions, baseDir, restoreConsumer, true); // standby
             try {
                 restoreConsumer.reset();
 
@@ -425,7 +425,7 @@ public class ProcessorStateManagerTest {
 
             MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -462,12 +462,12 @@ public class ProcessorStateManagerTest {
             HashMap<TopicPartition, Long> ackedOffsets = new HashMap<>();
             ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L);
             ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L);
-            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(jobId, "otherTopic"), 1), 789L);
+            ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L);
 
             MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true);
             MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false);
 
-            ProcessorStateManager stateMgr = new ProcessorStateManager(jobId, 1, noPartitions, baseDir, restoreConsumer, false);
+            ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, 1, noPartitions, baseDir, restoreConsumer, false);
             try {
                 // make sure the checkpoint file is deleted
                 assertFalse(checkpointFile.exists());

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index c8115b8..12210cc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -67,7 +67,7 @@ public class ProcessorTopologyTest {
         // Create a new directory in which we'll put all of the state for this test, enabling running tests in parallel ...
         File localState = StateTestUtils.tempDir();
         Properties props = new Properties();
-        props.setProperty(StreamsConfig.JOB_ID_CONFIG, "processor-topology-test");
+        props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "processor-topology-test");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
         props.setProperty(StreamsConfig.STATE_DIR_CONFIG, localState.getAbsolutePath());
         props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 295f0dd..21bdaff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -57,11 +57,11 @@ public class StandbyTaskTest {
 
     private final Serializer<Integer> intSerializer = new IntegerSerializer();
 
-    private final String jobId = "test-job";
+    private final String applicationId = "test-application";
     private final String storeName1 = "store1";
     private final String storeName2 = "store2";
-    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(jobId, storeName1);
-    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(jobId, storeName2);
+    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName1);
+    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2);
 
     private final TopicPartition partition1 = new TopicPartition(storeChangelogTopicName1, 1);
     private final TopicPartition partition2 = new TopicPartition(storeChangelogTopicName2, 1);
@@ -94,7 +94,7 @@ public class StandbyTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -133,7 +133,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions()));
 
@@ -148,7 +148,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -167,7 +167,7 @@ public class StandbyTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -201,7 +201,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 
@@ -230,7 +230,7 @@ public class StandbyTaskTest {
             ));
 
             StreamsConfig config = createConfig(baseDir);
-            StandbyTask task = new StandbyTask(taskId, jobId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
+            StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null);
 
             restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions()));
 
@@ -299,7 +299,7 @@ public class StandbyTaskTest {
 
             task.close();
 
-            File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString());
+            File taskDir = new File(StreamThread.makeStateDir(applicationId, baseDir.getCanonicalPath()), taskId.toString());
             OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME));
             Map<TopicPartition, Long> offsets = checkpoint.read();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 7f37bda..a5990bd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -94,7 +94,7 @@ public class StreamPartitionAssignorTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test");
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-partition-assignor-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 1f401db..f2ade6b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -78,7 +78,7 @@ public class StreamTaskTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test");
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "stream-task-test");
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath());
@@ -105,7 +105,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),
@@ -156,7 +156,7 @@ public class StreamTaskTest {
         File baseDir = Files.createTempDirectory("test").toFile();
         try {
             StreamsConfig config = createConfig(baseDir);
-            StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
+            StreamTask task = new StreamTask(new TaskId(1, 1), "applicationId", partitions, topology, consumer, producer, restoreStateConsumer, config, null);
 
             task.addRecords(partition1, records(
                     new ConsumerRecord<>(partition1.topic(), partition1.partition(), 10, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue),

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index eaaf842..b201c07 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -60,7 +60,7 @@ import java.util.UUID;
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
-    private final String jobId = "stream-thread-test";
+    private final String applicationId = "stream-thread-test";
     private final UUID processId = UUID.randomUUID();
 
     private TopicPartition t1p1 = new TopicPartition("topic1", 1);
@@ -118,7 +118,7 @@ public class StreamThreadTest {
                 setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
                 setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor");
-                setProperty(StreamsConfig.JOB_ID_CONFIG, jobId);
+                setProperty(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
             }
@@ -129,14 +129,14 @@ public class StreamThreadTest {
         public boolean committed = false;
 
         public TestStreamTask(TaskId id,
-                              String jobId,
+                              String applicationId,
                               Collection<TopicPartition> partitions,
                               ProcessorTopology topology,
                               Consumer<byte[], byte[]> consumer,
                               Producer<byte[], byte[]> producer,
                               Consumer<byte[], byte[]> restoreConsumer,
                               StreamsConfig config) {
-            super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null);
+            super(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, null);
         }
 
         @Override
@@ -168,11 +168,11 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId, processId, new Metrics(), new SystemTime()) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
             }
         };
 
@@ -271,12 +271,12 @@ public class StreamThreadTest {
 
             StreamsConfig config = new StreamsConfig(props);
 
-            File jobDir = new File(baseDir, jobId);
-            jobDir.mkdir();
-            File stateDir1 = new File(jobDir, task1.toString());
-            File stateDir2 = new File(jobDir, task2.toString());
-            File stateDir3 = new File(jobDir, task3.toString());
-            File extraDir = new File(jobDir, "X");
+            File applicationDir = new File(baseDir, applicationId);
+            applicationDir.mkdir();
+            File stateDir1 = new File(applicationDir, task1.toString());
+            File stateDir2 = new File(applicationDir, task2.toString());
+            File stateDir3 = new File(applicationDir, task3.toString());
+            File extraDir = new File(applicationDir, "X");
             stateDir1.mkdir();
             stateDir2.mkdir();
             stateDir3.mkdir();
@@ -290,7 +290,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -299,7 +299,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -412,7 +412,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, applicationId, clientId,  processId, new Metrics(), mockTime) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -421,7 +421,7 @@ public class StreamThreadTest {
                 @Override
                 protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                     ProcessorTopology topology = builder.build("X", id.topicGroupId);
-                    return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
+                    return new TestStreamTask(id, applicationId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config);
                 }
             };
 
@@ -482,7 +482,7 @@ public class StreamThreadTest {
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
-        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId));
+        partitionAssignor.configure(config.getConsumerConfigs(thread, thread.applicationId, thread.clientId));
 
         Map<String, PartitionAssignor.Assignment> assignments =
                 partitionAssignor.assign(metadata, Collections.singletonMap("client", subscription));

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index 6cb45f3..063eafe 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -81,7 +81,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
     private static KafkaStreams createKafkaStreams(File stateDir, String kafka, String zookeeper) {
         Properties props = new Properties();
-        props.put(StreamsConfig.JOB_ID_CONFIG, "SmokeTest");
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index d597fd2..b463669 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -106,8 +106,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S
     }
 
     @Override
-    public String jobId() {
-        return "mockJob";
+    public String applicationId() {
+        return "mockApplication";
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index cf17dbe..a2948a2 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -126,7 +126,7 @@ public class ProcessorTopologyTestDriver {
 
     private final Serializer<byte[]> bytesSerializer = new ByteArraySerializer();
 
-    private final String jobId = "test-driver-job";
+    private final String applicationId = "test-driver-application";
 
     private final TaskId id;
     private final ProcessorTopology topology;
@@ -167,7 +167,7 @@ public class ProcessorTopologyTestDriver {
         }
 
         task = new StreamTask(id,
-            jobId,
+            applicationId,
             partitionsByTopic.values(),
             topology,
             consumer,
@@ -334,7 +334,7 @@ public class ProcessorTopologyTestDriver {
         };
         // For each store name ...
         for (String storeName : storeNames) {
-            String topicName = ProcessorStateManager.storeChangelogTopic(jobId, storeName);
+            String topicName = ProcessorStateManager.storeChangelogTopic(applicationId, storeName);
             // Set up the restore-state topic ...
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...


Mime
View raw message