kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove magic number and extract Pattern instance from method as class field (#4799)
Date Sun, 08 Apr 2018 18:54:27 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 37efc79  MINOR: Remove magic number and extract Pattern instance from method as class
field (#4799)
37efc79 is described below

commit 37efc79eb82e0f290d0d2a17f24001f2cbc922cc
Author: Benedict Jin <1571805553@qq.com>
AuthorDate: Mon Apr 9 02:54:22 2018 +0800

    MINOR: Remove magic number and extract Pattern instance from method as class field (#4799)
    
    * Remove magic number
    * Extract Pattern instance from method as class field
    * Add @Override declare
    
    Reviewers: Randall Hauch <rhauch@gmail.com>
---
 .../src/main/java/org/apache/kafka/connect/data/ConnectSchema.java  | 1 +
 .../src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java  | 2 ++
 .../main/java/org/apache/kafka/connect/header/ConnectHeaders.java   | 1 +
 .../api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java   | 1 +
 .../src/main/java/org/apache/kafka/connect/source/SourceTask.java   | 1 +
 .../main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java | 1 +
 .../java/org/apache/kafka/connect/runtime/WorkerSourceTask.java     | 1 +
 .../apache/kafka/connect/runtime/distributed/DistributedHerder.java | 6 ++++--
 .../apache/kafka/connect/runtime/distributed/WorkerCoordinator.java | 3 +++
 .../kafka/connect/runtime/isolation/DelegatingClassLoader.java      | 1 +
 .../main/java/org/apache/kafka/connect/runtime/rest/RestServer.java | 4 ++--
 .../apache/kafka/connect/runtime/standalone/StandaloneHerder.java   | 2 ++
 .../org/apache/kafka/connect/storage/FileOffsetBackingStore.java    | 1 +
 .../main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java  | 1 +
 .../main/java/org/apache/kafka/connect/util/ReflectionsUtil.java    | 2 ++
 15 files changed, 24 insertions(+), 4 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
index 85357fe..ff82716 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java
@@ -180,6 +180,7 @@ public class ConnectSchema implements Schema {
         return fields;
     }
 
+    @Override
     public Field field(String fieldName) {
         if (type != Type.STRUCT)
             throw new DataException("Cannot look up fields on non-struct type");
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
index a9064b8..fdcf05a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
@@ -336,12 +336,14 @@ public class SchemaBuilder implements Schema {
      * Get the list of fields for this Schema. Throws a DataException if this schema is not
a struct.
      * @return the list of fields for this Schema
      */
+    @Override
     public List<Field> fields() {
         if (type != Type.STRUCT)
             throw new DataException("Cannot list fields on non-struct type");
         return new ArrayList<>(fields.values());
     }
 
+    @Override
     public Field field(String fieldName) {
         if (type != Type.STRUCT)
             throw new DataException("Cannot look up fields on non-struct type");
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
index e3b5e72..185ba65 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
@@ -505,6 +505,7 @@ public class ConnectHeaders implements Headers {
             this.key = key;
         }
 
+        @Override
         protected Header makeNext() {
             while (original.hasNext()) {
                 Header header = original.next();
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
index 1406b30..5d308c4 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -169,5 +169,6 @@ public abstract class SinkTask implements Task {
      * commit has completed. Implementations of this method should only need to perform final
cleanup operations, such
      * as closing network connections to the sink system.
      */
+    @Override
     public abstract void stop();
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index 5507f56..8767a62 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -83,6 +83,7 @@ public abstract class SourceTask implements Task {
      * could set a flag that will force {@link #poll()} to exit immediately and invoke
      * {@link java.nio.channels.Selector#wakeup() wakeup()} to interrupt any ongoing requests.
      */
+    @Override
     public abstract void stop();
 
     /**
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index fd05af5..6a90310 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -97,6 +97,7 @@ public class ConnectorConfig extends AbstractConfig {
             super(configDef, props);
         }
 
+        @Override
         public Object get(String key) {
             return super.get(key);
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6ef5ae3..f2cef5a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -135,6 +135,7 @@ class WorkerSourceTask extends WorkerTask {
         }
     }
 
+    @Override
     protected void close() {
         producer.close(30, TimeUnit.SECONDS);
         transformationChain.close();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index c6e6a47..5e9707a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -109,6 +109,8 @@ import java.util.concurrent.atomic.AtomicLong;
 public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
+    private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
+    private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
 
@@ -404,9 +406,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable
{
             forwardRequestExecutor.shutdown();
             startAndStopExecutor.shutdown();
 
-            if (!forwardRequestExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS))
+            if (!forwardRequestExecutor.awaitTermination(FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
                 forwardRequestExecutor.shutdownNow();
-            if (!startAndStopExecutor.awaitTermination(1000L, TimeUnit.MILLISECONDS))
+            if (!startAndStopExecutor.awaitTermination(START_AND_STOP_SHUTDOWN_TIMEOUT_MS,
TimeUnit.MILLISECONDS))
                 startAndStopExecutor.shutdownNow();
         } catch (InterruptedException e) {
             // ignore
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 6007728..60407c1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -93,6 +93,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements
Clos
         this.rejoinRequested = false;
     }
 
+    @Override
     public void requestRejoin() {
         rejoinRequested = true;
     }
@@ -315,12 +316,14 @@ public final class WorkerCoordinator extends AbstractCoordinator implements
Clos
             this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
 
             Measurable numConnectors = new Measurable() {
+                @Override
                 public double measure(MetricConfig config, long now) {
                     return assignmentSnapshot.connectors().size();
                 }
             };
 
             Measurable numTasks = new Measurable() {
+                @Override
                 public double measure(MetricConfig config, long now) {
                     return assignmentSnapshot.tasks().size();
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
index 37b7951..c67dfb5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
@@ -238,6 +238,7 @@ public class DelegatingClassLoader extends URLClassLoader {
         // implementing the java.sql.Driver interface.
         AccessController.doPrivileged(
                 new PrivilegedAction<Void>() {
+                    @Override
                     public Void run() {
                         ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
                                 Driver.class,
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 77f6cdf..8d7803e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -62,6 +62,7 @@ import java.util.regex.Pattern;
 public class RestServer {
     private static final Logger log = LoggerFactory.getLogger(RestServer.class);
 
+    private static final Pattern LISTENER_PATTERN = Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
     private static final long GRACEFUL_SHUTDOWN_TIMEOUT_MS = 60 * 1000;
 
     private static final String PROTOCOL_HTTP = "http";
@@ -118,8 +119,7 @@ public class RestServer {
      * Creates Jetty connector according to configuration
      */
     public Connector createConnector(String listener) {
-        Pattern listenerPattern = Pattern.compile("^(.*)://\\[?([0-9a-zA-Z\\-%._:]*)\\]?:(-?[0-9]+)");
-        Matcher listenerMatcher = listenerPattern.matcher(listener);
+        Matcher listenerMatcher = LISTENER_PATTERN.matcher(listener);
 
         if (!listenerMatcher.matches())
             throw new ConfigException("Listener doesn't have the right format (protocol://hostname:port).");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 41609cb..96f8e87 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -67,12 +67,14 @@ public class StandaloneHerder extends AbstractHerder {
         configBackingStore.setUpdateListener(new ConfigUpdateListener());
     }
 
+    @Override
     public synchronized void start() {
         log.info("Herder starting");
         startServices();
         log.info("Herder started");
     }
 
+    @Override
     public synchronized void stop() {
         log.info("Herder stopping");
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 9961a7c..35e4855 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -88,6 +88,7 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
         }
     }
 
+    @Override
     protected void save() {
         try (ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file)))
{
             Map<byte[], byte[]> raw = new HashMap<>();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
index e87851c..95b49af 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/SchemaSourceTask.java
@@ -81,6 +81,7 @@ public class SchemaSourceTask extends SourceTask {
         .field("seqno", Schema.INT64_SCHEMA)
         .build();
 
+    @Override
     public String version() {
         return new SchemaSourceConnector().version();
     }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
index 61d0e35..f7dfe7b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ReflectionsUtil.java
@@ -52,6 +52,7 @@ public class ReflectionsUtil {
             this.endings = endings;
         }
 
+        @Override
         public boolean matches(URL url) {
             final String protocol = url.getProtocol();
             final String externalForm = url.toExternalForm();
@@ -66,6 +67,7 @@ public class ReflectionsUtil {
             return false;
         }
 
+        @Override
         public Dir createDir(final URL url) throws Exception {
             return emptyVfsDir(url);
         }

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message