From commits-return-9500-apmail-kafka-commits-archive=kafka.apache.org@kafka.apache.org Thu May 10 16:51:08 2018 Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id AD34D18614 for ; Thu, 10 May 2018 16:51:08 +0000 (UTC) Received: (qmail 16304 invoked by uid 500); 10 May 2018 16:51:08 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 16267 invoked by uid 500); 10 May 2018 16:51:08 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 16258 invoked by uid 99); 10 May 2018 16:51:08 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 10 May 2018 16:51:08 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id DA53D888B1; Thu, 10 May 2018 16:51:07 +0000 (UTC) Date: Thu, 10 May 2018 16:51:07 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Remove deprecated valueTransformer.punctuate (#4993) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152597106728.23438.16047538126220602948@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 830ee16d0d043bad281ba25ed7287f7d45ac9c09 X-Git-Newrev: fa1702fece04c5fc50149fc9b05d77a459b7180b X-Git-Rev: fa1702fece04c5fc50149fc9b05d77a459b7180b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new fa1702f MINOR: Remove deprecated valueTransformer.punctuate (#4993) fa1702f is described below commit fa1702fece04c5fc50149fc9b05d77a459b7180b Author: Guozhang Wang AuthorDate: Thu May 10 09:50:59 2018 -0700 MINOR: Remove deprecated valueTransformer.punctuate (#4993) Also removed the InternalValueTransformerWithKey / Supplier which is used to mock away the deprecated punctuate function. Reviewers: Matthias J. Sax --- .../kafka/streams/kstream/ValueTransformer.java | 22 ----------- .../streams/kstream/internals/AbstractStream.java | 46 ++-------------------- .../internals/InternalValueTransformerWithKey.java | 24 ----------- .../InternalValueTransformerWithKeySupplier.java | 21 ---------- .../streams/kstream/internals/KStreamImpl.java | 10 ++--- .../kstream/internals/KStreamTransformValues.java | 10 +++-- .../kstream/internals/AbstractStreamTest.java | 35 ++++++++-------- .../internals/KStreamTransformValuesTest.java | 16 ++------ 8 files changed, 34 insertions(+), 150 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java index 1da779e..c0da38a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformer.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.To; /** @@ -85,27 +84,6 @@ public interface ValueTransformer { VR transform(final V value); /** - * Perform any periodic operations if this processor {@link ProcessorContext#schedule(long) schedule itself} with - * the context during {@link #init(ProcessorContext) initialization}. - *

- * It is not possible to return any new output records within {@code punctuate}. - * Using {@link ProcessorContext#forward(Object, Object)} or {@link ProcessorContext#forward(Object, Object, To)} - * will result in an {@link StreamsException exception}. - * Furthermore, {@code punctuate} must return {@code null}. - *

- * Note, that {@code punctuate} is called base on stream time (i.e., time progress with regard to - * timestamps return by the used {@link TimestampExtractor}) - * and not based on wall-clock time. - * - * @deprecated Please use {@link Punctuator} functional interface instead. - * - * @param timestamp the stream time when {@code punctuate} is being called - * @return must return {@code null}—otherwise, an {@link StreamsException exception} will be thrown - */ - @Deprecated - VR punctuate(final long timestamp); - - /** * Close this processor and clean up any resources. *

* It is not possible to return any new output records within {@code close()}. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index 497bdac..7bc7a15 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueTransformer; import org.apache.kafka.streams.kstream.ValueTransformerWithKey; @@ -84,19 +83,13 @@ public abstract class AbstractStream { }; } - static InternalValueTransformerWithKeySupplier toInternalValueTransformerSupplier(final ValueTransformerSupplier valueTransformerSupplier) { + static ValueTransformerWithKeySupplier toValueTransformerWithKeySupplier(final ValueTransformerSupplier valueTransformerSupplier) { Objects.requireNonNull(valueTransformerSupplier, "valueTransformerSupplier can't be null"); - return new InternalValueTransformerWithKeySupplier() { + return new ValueTransformerWithKeySupplier() { @Override - public InternalValueTransformerWithKey get() { + public ValueTransformerWithKey get() { final ValueTransformer valueTransformer = valueTransformerSupplier.get(); - return new InternalValueTransformerWithKey() { - @SuppressWarnings("deprecation") - @Override - public VR punctuate(final long timestamp) { - return valueTransformer.punctuate(timestamp); - } - + return new ValueTransformerWithKey() { @Override public void init(final ProcessorContext context) { valueTransformer.init(context); @@ -115,35 +108,4 @@ public abstract class AbstractStream { } }; } - - static InternalValueTransformerWithKeySupplier toInternalValueTransformerSupplier(final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier) { - Objects.requireNonNull(valueTransformerWithKeySupplier, "valueTransformerSupplier can't be null"); - return new InternalValueTransformerWithKeySupplier() { - @Override - public InternalValueTransformerWithKey get() { - final ValueTransformerWithKey valueTransformerWithKey = valueTransformerWithKeySupplier.get(); - return new InternalValueTransformerWithKey() { - @Override - public VR punctuate(final long timestamp) { - throw new StreamsException("ValueTransformerWithKey#punctuate should not be called."); - } - - @Override - public void init(final ProcessorContext context) { - valueTransformerWithKey.init(context); - } - - @Override - public VR transform(final K readOnlyKey, final V value) { - return valueTransformerWithKey.transform(readOnlyKey, value); - } - - @Override - public void close() { - valueTransformerWithKey.close(); - } - }; - } - }; - } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java deleted file mode 100644 index 636e409..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKey.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - - -import org.apache.kafka.streams.kstream.ValueTransformerWithKey; - -public interface InternalValueTransformerWithKey extends ValueTransformerWithKey { - VR punctuate(final long timestamp); -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java deleted file mode 100644 index 3418e71..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalValueTransformerWithKeySupplier.java +++ /dev/null @@ -1,21 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -public interface InternalValueTransformerWithKeySupplier { - InternalValueTransformerWithKey get(); -} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 2ddd5ff..b8195a0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -348,7 +348,7 @@ public class KStreamImpl extends AbstractStream implements KStream extends AbstractStream implements KStream KStream transformValues(final InternalValueTransformerWithKeySupplier internalValueTransformerWithKeySupplier, - final String... stateStoreNames) { + private KStream doTransformValues(final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier, + final String... stateStoreNames) { final String name = builder.newProcessorName(TRANSFORMVALUES_NAME); - builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(internalValueTransformerWithKeySupplier), this.name); + builder.internalTopologyBuilder.addProcessor(name, new KStreamTransformValues<>(valueTransformerWithKeySupplier), this.name); if (stateStoreNames != null && stateStoreNames.length > 0) { builder.internalTopologyBuilder.connectProcessorAndStateStores(name, stateStoreNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java index fb6af34..d45b7cf 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.ValueTransformerWithKey; +import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -35,9 +37,9 @@ import java.util.Map; public class KStreamTransformValues implements ProcessorSupplier { - private final InternalValueTransformerWithKeySupplier valueTransformerSupplier; + private final ValueTransformerWithKeySupplier valueTransformerSupplier; - public KStreamTransformValues(final InternalValueTransformerWithKeySupplier valueTransformerSupplier) { + KStreamTransformValues(final ValueTransformerWithKeySupplier valueTransformerSupplier) { this.valueTransformerSupplier = valueTransformerSupplier; } @@ -48,10 +50,10 @@ public class KStreamTransformValues implements ProcessorSupplier public static class KStreamTransformValuesProcessor implements Processor { - private final InternalValueTransformerWithKey valueTransformer; + private final ValueTransformerWithKey valueTransformer; private ProcessorContext context; - public KStreamTransformValuesProcessor(final InternalValueTransformerWithKey valueTransformer) { + KStreamTransformValuesProcessor(final ValueTransformerWithKey valueTransformer) { this.valueTransformer = valueTransformer; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java index 1f9bcba..a37b6f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java @@ -47,29 +47,26 @@ public class AbstractStreamTest { @Test public void testToInternlValueTransformerSupplierSuppliesNewTransformers() { - final ValueTransformerSupplier vts = createMock(ValueTransformerSupplier.class); - expect(vts.get()).andReturn(null).times(3); - final InternalValueTransformerWithKeySupplier ivtwks = - AbstractStream.toInternalValueTransformerSupplier(vts); - replay(vts); - ivtwks.get(); - ivtwks.get(); - ivtwks.get(); - verify(vts); + final ValueTransformerSupplier valueTransformerSupplier = createMock(ValueTransformerSupplier.class); + expect(valueTransformerSupplier.get()).andReturn(null).times(3); + final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = + AbstractStream.toValueTransformerWithKeySupplier(valueTransformerSupplier); + replay(valueTransformerSupplier); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + verify(valueTransformerSupplier); } @Test public void testToInternalValueTransformerSupplierSuppliesNewTransformers() { - final ValueTransformerWithKeySupplier vtwks = - createMock(ValueTransformerWithKeySupplier.class); - expect(vtwks.get()).andReturn(null).times(3); - final InternalValueTransformerWithKeySupplier ivtwks = - AbstractStream.toInternalValueTransformerSupplier(vtwks); - replay(vtwks); - ivtwks.get(); - ivtwks.get(); - ivtwks.get(); - verify(vtwks); + final ValueTransformerWithKeySupplier valueTransformerWithKeySupplier = createMock(ValueTransformerWithKeySupplier.class); + expect(valueTransformerWithKeySupplier.get()).andReturn(null).times(3); + replay(valueTransformerWithKeySupplier); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + valueTransformerWithKeySupplier.get(); + verify(valueTransformerWithKeySupplier); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java index 419e6f1..807fb1f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java @@ -69,11 +69,6 @@ public class KStreamTransformValuesTest { } @Override - public Integer punctuate(long timestamp) { - return null; - } - - @Override public void close() { } }; @@ -143,15 +138,10 @@ public class KStreamTransformValuesTest { @Test public void shouldNotAllowValueTransformerToCallInternalProcessorContextMethods() { final BadValueTransformer badValueTransformer = new BadValueTransformer(); - final KStreamTransformValues transformValue = new KStreamTransformValues<>(new InternalValueTransformerWithKeySupplier() { + final KStreamTransformValues transformValue = new KStreamTransformValues<>(new ValueTransformerWithKeySupplier() { @Override - public InternalValueTransformerWithKey get() { - return new InternalValueTransformerWithKey() { - @Override - public Integer punctuate(long timestamp) { - throw new StreamsException("ValueTransformerWithKey#punctuate should not be called."); - } - + public ValueTransformerWithKey get() { + return new ValueTransformerWithKey() { @Override public void init(final ProcessorContext context) { badValueTransformer.init(context); -- To stop receiving notification emails like this one, please contact guozhang@apache.org.