kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove deprecated valueTransformer.punctuate (#4993)
Date Thu, 10 May 2018 16:51:07 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa1702f  MINOR: Remove deprecated valueTransformer.punctuate (#4993)
fa1702f is described below

commit fa1702fece04c5fc50149fc9b05d77a459b7180b
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu May 10 09:50:59 2018 -0700

    MINOR: Remove deprecated valueTransformer.punctuate (#4993)
    
    Also removed the InternalValueTransformerWithKey / Supplier which is used to mock away
the deprecated punctuate function.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>
---
 .../kafka/streams/kstream/ValueTransformer.java    | 22 -----------
 .../streams/kstream/internals/AbstractStream.java  | 46 ++--------------------
 .../internals/InternalValueTransformerWithKey.java | 24 -----------
 .../InternalValueTransformerWithKeySupplier.java   | 21 ----------
 .../streams/kstream/internals/KStreamImpl.java     | 10 ++---
 .../kstream/internals/KStreamTransformValues.java  | 10 +++--
 .../kstream/internals/AbstractStreamTest.java      | 35 ++++++++--------
 .../internals/KStreamTransformValuesTest.java      | 16 ++------
 8 files changed, 34 insertions(+), 150 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
index 1da779e..c0da38a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java
@@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.To;
 
 /**
@@ -85,27 +84,6 @@ public interface ValueTransformer<V, VR> {
     VR transform(final V value);
 
     /**
-     * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long)
schedule itself} with
-     * the context during {@link #init(ProcessorContext) initialization}.
-     * <p>
-     * It is not possible to return any new output records within {@code punctuate}.
-     * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object,
Object, To)}
-     * will result in an {@link StreamsException exception}.
-     * Furthermore, {@code punctuate} must return {@code null}.
-     * <p>
-     * Note, that {@code punctuate} is called base on <it>stream time</it> (i.e.,
time progress with regard to
-     * timestamps return by the used {@link TimestampExtractor})
-     * and not based on wall-clock time.
-     *
-     * @deprecated Please use {@link Punctuator} functional interface instead.
-     *
-     * @param timestamp the stream time when {@code punctuate} is being called
-     * @return must return {@code null}&mdash;otherwise, an {@link StreamsException exception}
will be thrown
-     */
-    @Deprecated
-    VR punctuate(final long timestamp);
-
-    /**
      * Close this processor and clean up any resources.
      * <p>
      * It is not possible to return any new output records within {@code close()}.
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
index 497bdac..7bc7a15 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.ValueTransformer;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
@@ -84,19 +83,13 @@ public abstract class AbstractStream<K> {
         };
     }
 
-    static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final
ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
+    static <K, V, VR> ValueTransformerWithKeySupplier<K, V, VR> toValueTransformerWithKeySupplier(final
ValueTransformerSupplier<V, VR> valueTransformerSupplier) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't
be null");
-        return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
+        return new ValueTransformerWithKeySupplier<K, V, VR>() {
             @Override
-            public InternalValueTransformerWithKey<K, V, VR> get() {
+            public ValueTransformerWithKey<K, V, VR> get() {
                 final ValueTransformer<V, VR> valueTransformer = valueTransformerSupplier.get();
-                return new InternalValueTransformerWithKey<K, V, VR>() {
-                    @SuppressWarnings("deprecation")
-                    @Override
-                    public VR punctuate(final long timestamp) {
-                        return valueTransformer.punctuate(timestamp);
-                    }
-
+                return new ValueTransformerWithKey<K, V, VR>() {
                     @Override
                     public void init(final ProcessorContext context) {
                         valueTransformer.init(context);
@@ -115,35 +108,4 @@ public abstract class AbstractStream<K> {
             }
         };
     }
-
-    static <K, V, VR> InternalValueTransformerWithKeySupplier<K, V, VR> toInternalValueTransformerSupplier(final
ValueTransformerWithKeySupplier<K, V, VR> valueTransformerWithKeySupplier) {
-        Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier
can't be null");
-        return new InternalValueTransformerWithKeySupplier<K, V, VR>() {
-            @Override
-            public InternalValueTransformerWithKey<K, V, VR> get() {
-                final ValueTransformerWithKey<K, V, VR> valueTransformerWithKey = valueTransformerWithKeySupplier.get();
-                return new InternalValueTransformerWithKey<K, V, VR>() {
-                    @Override
-                    public VR punctuate(final long timestamp) {
-                        throw new StreamsException("ValueTransformerWithKey#punctuate should
not be called.");
-                    }
-
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        valueTransformerWithKey.init(context);
-                    }
-
-                    @Override
-                    public VR transform(final K readOnlyKey, final V value) {
-                        return valueTransformerWithKey.transform(readOnlyKey, value);
-                    }
-
-                    @Override
-                    public void close() {
-                        valueTransformerWithKey.close();
-                    }
-                };
-            }
-        };
-    }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
deleted file mode 100644
index 636e409..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-
-import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
-
-public interface InternalValueTransformerWithKey<K, V, VR> extends ValueTransformerWithKey<K,
V, VR> {
-    VR punctuate(final long timestamp);
-}
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
deleted file mode 100644
index 3418e71..0000000
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.kstream.internals;
-
-public interface InternalValueTransformerWithKeySupplier<K, V, VR> {
-    InternalValueTransformerWithKey<K, V, VR> get();
-}
\ No newline at end of file
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 2ddd5ff..b8195a0 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -348,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements
KStream<K, V
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be
null");
 
-        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier),
stateStoreNames);
+        return doTransformValues(toValueTransformerWithKeySupplier(valueTransformerSupplier),
stateStoreNames);
     }
 
     @Override
@@ -356,13 +356,13 @@ public class KStreamImpl<K, V> extends AbstractStream<K>
implements KStream<K, V
                                                final String... stateStoreNames) {
         Objects.requireNonNull(valueTransformerSupplier, "valueTransformSupplier can't be
null");
 
-        return transformValues(toInternalValueTransformerSupplier(valueTransformerSupplier),
stateStoreNames);
+        return doTransformValues(valueTransformerSupplier, stateStoreNames);
     }
 
-    private <VR> KStream<K, VR> transformValues(final InternalValueTransformerWithKeySupplier<?
super K, ? super V, ? extends VR> internalValueTransformerWithKeySupplier,
-                                                final String... stateStoreNames) {
+    private <VR> KStream<K, VR> doTransformValues(final ValueTransformerWithKeySupplier<?
super K, ? super V, ? extends VR> valueTransformerWithKeySupplier,
+                                                  final String... stateStoreNames) {
         final String name = builder.newProcessorName(TRANSFORMVALUES_NAME);
-        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(internalValueTransformerWithKeySupplier),
this.name);
+        builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerWithKeySupplier),
this.name);
         if (stateStoreNames != null && stateStoreNames.length > 0) {
             builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
index fb6af34..d45b7cf 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java
@@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -35,9 +37,9 @@ import java.util.Map;
 
 public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K, V>
{
 
-    private final InternalValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
+    private final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier;
 
-    public KStreamTransformValues(final InternalValueTransformerWithKeySupplier<K, V,
R> valueTransformerSupplier) {
+    KStreamTransformValues(final ValueTransformerWithKeySupplier<K, V, R> valueTransformerSupplier)
{
         this.valueTransformerSupplier = valueTransformerSupplier;
     }
 
@@ -48,10 +50,10 @@ public class KStreamTransformValues<K, V, R> implements ProcessorSupplier<K,
V>
 
     public static class KStreamTransformValuesProcessor<K, V, R> implements Processor<K,
V> {
 
-        private final InternalValueTransformerWithKey<K, V, R> valueTransformer;
+        private final ValueTransformerWithKey<K, V, R> valueTransformer;
         private ProcessorContext context;
 
-        public KStreamTransformValuesProcessor(final InternalValueTransformerWithKey<K,
V, R> valueTransformer) {
+        KStreamTransformValuesProcessor(final ValueTransformerWithKey<K, V, R> valueTransformer)
{
             this.valueTransformer = valueTransformer;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 1f9bcba..a37b6f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -47,29 +47,26 @@ public class AbstractStreamTest {
 
     @Test
     public void testToInternlValueTransformerSupplierSuppliesNewTransformers() {
-        final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class);
-        expect(vts.get()).andReturn(null).times(3);
-        final InternalValueTransformerWithKeySupplier ivtwks =
-            AbstractStream.toInternalValueTransformerSupplier(vts);
-        replay(vts);
-        ivtwks.get();
-        ivtwks.get();
-        ivtwks.get();
-        verify(vts);
+        final ValueTransformerSupplier valueTransformerSupplier = createMock(ValueTransformerSupplier.class);
+        expect(valueTransformerSupplier.get()).andReturn(null).times(3);
+        final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier =
+            AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier);
+        replay(valueTransformerSupplier);
+        valueTransformerWithKeySupplier.get();
+        valueTransformerWithKeySupplier.get();
+        valueTransformerWithKeySupplier.get();
+        verify(valueTransformerSupplier);
     }
 
     @Test
     public void testToInternalValueTransformerSupplierSuppliesNewTransformers() {
-        final ValueTransformerWithKeySupplier vtwks =
-            createMock(ValueTransformerWithKeySupplier.class);
-        expect(vtwks.get()).andReturn(null).times(3);
-        final InternalValueTransformerWithKeySupplier ivtwks =
-            AbstractStream.toInternalValueTransformerSupplier(vtwks);
-        replay(vtwks);
-        ivtwks.get();
-        ivtwks.get();
-        ivtwks.get();
-        verify(vtwks);
+        final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = createMock(ValueTransformerWithKeySupplier.class);
+        expect(valueTransformerWithKeySupplier.get()).andReturn(null).times(3);
+        replay(valueTransformerWithKeySupplier);
+        valueTransformerWithKeySupplier.get();
+        valueTransformerWithKeySupplier.get();
+        valueTransformerWithKeySupplier.get();
+        verify(valueTransformerWithKeySupplier);
     }
 
     @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 419e6f1..807fb1f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -69,11 +69,6 @@ public class KStreamTransformValuesTest {
                         }
 
                         @Override
-                        public Integer punctuate(long timestamp) {
-                            return null;
-                        }
-
-                        @Override
                         public void close() {
                         }
                     };
@@ -143,15 +138,10 @@ public class KStreamTransformValuesTest {
     @Test
     public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() {
         final BadValueTransformer badValueTransformer = new BadValueTransformer();
-        final KStreamTransformValues<Integer, Integer, Integer> transformValue = new
KStreamTransformValues<>(new InternalValueTransformerWithKeySupplier<Integer, Integer,
Integer>() {
+        final KStreamTransformValues<Integer, Integer, Integer> transformValue = new
KStreamTransformValues<>(new ValueTransformerWithKeySupplier<Integer, Integer, Integer>()
{
             @Override
-            public InternalValueTransformerWithKey<Integer, Integer, Integer> get()
{
-                return new InternalValueTransformerWithKey<Integer, Integer, Integer>()
{
-                    @Override
-                    public Integer punctuate(long timestamp) {
-                        throw new StreamsException("ValueTransformerWithKey#punctuate should
not be called.");
-                    }
-
+            public ValueTransformerWithKey<Integer, Integer, Integer> get() {
+                return new ValueTransformerWithKey<Integer, Integer, Integer>() {
                     @Override
                     public void init(final ProcessorContext context) {
                         badValueTransformer.init(context);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message