http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java deleted file mode 100644 index a1456f6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Window.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StateStore; - -import java.util.Iterator; - -public interface Window extends StateStore { - - void init(ProcessorContext context); - - Iterator find(K key, long timestamp); - - Iterator findAfter(K key, long timestamp); - - Iterator findBefore(K key, long timestamp); - - void put(K key, V value, long timestamp); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java deleted file mode 100644 index 46a2b9e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/WindowSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface WindowSupplier { - - String name(); - - Window get(); -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java deleted file mode 100644 index 54d44f0..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FilteredIterator.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import java.util.Iterator; - -public abstract class FilteredIterator implements Iterator { - - private Iterator inner; - private T nextValue = null; - - public FilteredIterator(Iterator inner) { - this.inner = inner; - - findNext(); - } - - @Override - public boolean hasNext() { - return nextValue != null; - } - - @Override - public T next() { - T value = nextValue; - findNext(); - - return value; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - private void findNext() { - while (inner.hasNext()) { - S item = inner.next(); - nextValue = filter(item); - if (nextValue != null) { - return; - } - } - nextValue = null; - } - - protected abstract T filter(S item); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java deleted file mode 100644 index 06083b3..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamBranch.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.kstream.Predicate; - -class KStreamBranch implements ProcessorSupplier { - - private final Predicate[] predicates; - - @SuppressWarnings("unchecked") - public KStreamBranch(Predicate ... predicates) { - this.predicates = predicates; - } - - @Override - public Processor get() { - return new KStreamBranchProcessor(); - } - - private class KStreamBranchProcessor extends AbstractProcessor { - @Override - public void process(K key, V value) { - for (int i = 0; i < predicates.length; i++) { - if (predicates[i].test(key, value)) { - // use forward with childIndex here and then break the loop - // so that no record is going to be piped to multiple streams - context().forward(key, value, i); - break; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java deleted file mode 100644 index 0b1f1e0..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFilter.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamFilter implements ProcessorSupplier { - - private final Predicate predicate; - private final boolean filterOut; - - public KStreamFilter(Predicate predicate, boolean filterOut) { - this.predicate = predicate; - this.filterOut = filterOut; - } - - @Override - public Processor get() { - return new KStreamFilterProcessor(); - } - - private class KStreamFilterProcessor extends AbstractProcessor { - @Override - public void process(K key, V value) { - if (filterOut ^ predicate.test(key, value)) { - context().forward(key, value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java deleted file mode 100644 index 175a002..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMap.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamFlatMap implements ProcessorSupplier { - - private final KeyValueMapper>> mapper; - - KStreamFlatMap(KeyValueMapper>> mapper) { - this.mapper = mapper; - } - - @Override - public Processor get() { - return new KStreamFlatMapProcessor(); - } - - private class KStreamFlatMapProcessor extends AbstractProcessor { - @Override - public void process(K1 key, V1 value) { - for (KeyValue newPair : mapper.apply(key, value)) { - context().forward(newPair.key, newPair.value); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java deleted file mode 100644 index 9b4559b..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValues.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamFlatMapValues implements ProcessorSupplier { - - private final ValueMapper> mapper; - - KStreamFlatMapValues(ValueMapper> mapper) { - this.mapper = mapper; - } - - @Override - public Processor get() { - return new KStreamFlatMapValuesProcessor(); - } - - private class KStreamFlatMapValuesProcessor extends AbstractProcessor { - @Override - public void process(K1 key, V1 value) { - Iterable newValues = mapper.apply(value); - for (V2 v : newValues) { - context().forward(key, v); - } - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java deleted file mode 100644 index 0986405..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ /dev/null @@ -1,227 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -import java.lang.reflect.Array; -import java.util.Collections; -import java.util.Set; - -public class KStreamImpl implements KStream { - - private static final String FILTER_NAME = "KAFKA-FILTER-"; - - private static final String MAP_NAME = "KAFKA-MAP-"; - - private static final String MAPVALUES_NAME = "KAFKA-MAPVALUES-"; - - private static final String FLATMAP_NAME = "KAFKA-FLATMAP-"; - - private static final String FLATMAPVALUES_NAME = "KAFKA-FLATMAPVALUES-"; - - private static final String TRANSFORM_NAME = "KAFKA-TRANSFORM-"; - - private static final String TRANSFORMVALUES_NAME = "KAFKA-TRANSFORMVALUES-"; - - private static final String PROCESSOR_NAME = "KAFKA-PROCESSOR-"; - - private static final String BRANCH_NAME = "KAFKA-BRANCH-"; - - private static final String BRANCHCHILD_NAME = "KAFKA-BRANCHCHILD-"; - - private static final String WINDOWED_NAME = "KAFKA-WINDOWED-"; - - private static final String SINK_NAME = "KAFKA-SINK-"; - - public static final String JOINTHIS_NAME = "KAFKA-JOINTHIS-"; - - public static final String JOINOTHER_NAME = "KAFKA-JOINOTHER-"; - - public static final String JOINMERGE_NAME = "KAFKA-JOINMERGE-"; - - public static final String SOURCE_NAME = "KAFKA-SOURCE-"; - - protected final KStreamBuilder topology; - protected final String name; - protected final Set sourceNodes; - - public KStreamImpl(KStreamBuilder topology, String name, Set sourceNodes) { - this.topology = topology; - this.name = name; - this.sourceNodes = sourceNodes; - } - - @Override - public KStream filter(Predicate predicate) { - String name = topology.newName(FILTER_NAME); - - topology.addProcessor(name, new KStreamFilter<>(predicate, false), this.name); - - return new KStreamImpl<>(topology, name, sourceNodes); - } - - @Override - public KStream filterOut(final Predicate predicate) { - String name = topology.newName(FILTER_NAME); - - topology.addProcessor(name, new KStreamFilter<>(predicate, true), this.name); - - return new KStreamImpl<>(topology, name, sourceNodes); - } - - @Override - public KStream map(KeyValueMapper> mapper) { - String name = topology.newName(MAP_NAME); - - topology.addProcessor(name, new KStreamMap<>(mapper), this.name); - - return new KStreamImpl<>(topology, name, null); - } - - @Override - public KStream mapValues(ValueMapper mapper) { - String name = topology.newName(MAPVALUES_NAME); - - topology.addProcessor(name, new KStreamMapValues<>(mapper), this.name); - - return new KStreamImpl<>(topology, name, sourceNodes); - } - - @Override - public KStream flatMap(KeyValueMapper>> mapper) { - String name = topology.newName(FLATMAP_NAME); - - topology.addProcessor(name, new KStreamFlatMap<>(mapper), this.name); - - return new KStreamImpl<>(topology, name, null); - } - - @Override - public KStream flatMapValues(ValueMapper> mapper) { - String name = topology.newName(FLATMAPVALUES_NAME); - - topology.addProcessor(name, new KStreamFlatMapValues<>(mapper), this.name); - - return new KStreamImpl<>(topology, name, sourceNodes); - } - - @Override - public KStreamWindowed with(WindowSupplier windowSupplier) { - String name = topology.newName(WINDOWED_NAME); - - topology.addProcessor(name, new KStreamWindow<>(windowSupplier), this.name); - - return new KStreamWindowedImpl<>(topology, name, sourceNodes, windowSupplier); - } - - @Override - @SuppressWarnings("unchecked") - public KStream[] branch(Predicate... predicates) { - String branchName = topology.newName(BRANCH_NAME); - - topology.addProcessor(branchName, new KStreamBranch(predicates.clone()), this.name); - - KStream[] branchChildren = (KStream[]) Array.newInstance(KStream.class, predicates.length); - for (int i = 0; i < predicates.length; i++) { - String childName = topology.newName(BRANCHCHILD_NAME); - - topology.addProcessor(childName, new KStreamPassThrough(), branchName); - - branchChildren[i] = new KStreamImpl<>(topology, childName, sourceNodes); - } - - return branchChildren; - } - - @Override - public KStream through(String topic, - Serializer keySerializer, - Serializer valSerializer, - Deserializer keyDeserializer, - Deserializer valDeserializer) { - String sendName = topology.newName(SINK_NAME); - - topology.addSink(sendName, topic, keySerializer, valSerializer, this.name); - - String sourceName = topology.newName(SOURCE_NAME); - - topology.addSource(sourceName, keyDeserializer, valDeserializer, topic); - - return new KStreamImpl<>(topology, sourceName, Collections.singleton(sourceName)); - } - - @Override - public KStream through(String topic) { - return through(topic, (Serializer) null, (Serializer) null, (Deserializer) null, (Deserializer) null); - } - - @Override - public void to(String topic) { - String name = topology.newName(SINK_NAME); - - topology.addSink(name, topic, this.name); - } - - @Override - public void to(String topic, Serializer keySerializer, Serializer valSerializer) { - String name = topology.newName(SINK_NAME); - - topology.addSink(name, topic, keySerializer, valSerializer, this.name); - } - - @Override - public KStream transform(TransformerSupplier> transformerSupplier, String... stateStoreNames) { - String name = topology.newName(TRANSFORM_NAME); - - topology.addProcessor(name, new KStreamTransform<>(transformerSupplier), this.name); - topology.connectProcessorAndStateStores(name, stateStoreNames); - - return new KStreamImpl<>(topology, name, null); - } - - @Override - public KStream transformValues(ValueTransformerSupplier valueTransformerSupplier, String... stateStoreNames) { - String name = topology.newName(TRANSFORMVALUES_NAME); - - topology.addProcessor(name, new KStreamTransformValues<>(valueTransformerSupplier), this.name); - topology.connectProcessorAndStateStores(name, stateStoreNames); - - return new KStreamImpl<>(topology, name, sourceNodes); - } - - @Override - public void process(final ProcessorSupplier processorSupplier, String... stateStoreNames) { - String name = topology.newName(PROCESSOR_NAME); - - topology.addProcessor(name, processorSupplier, this.name); - topology.connectProcessorAndStateStores(name, stateStoreNames); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java deleted file mode 100644 index 5e8186e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamJoin.java +++ /dev/null @@ -1,93 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -import java.util.Iterator; - -class KStreamJoin implements ProcessorSupplier { - - private static abstract class Finder { - abstract Iterator find(K key, long timestamp); - } - - private final String windowName; - private final ValueJoiner joiner; - - KStreamJoin(String windowName, ValueJoiner joiner) { - this.windowName = windowName; - this.joiner = joiner; - } - - @Override - public Processor get() { - return new KStreamJoinProcessor(windowName); - } - - private class KStreamJoinProcessor extends AbstractProcessor { - - private final String windowName; - protected Finder finder; - - public KStreamJoinProcessor(String windowName) { - this.windowName = windowName; - } - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - super.init(context); - - final Window window = (Window) context.getStateStore(windowName); - - this.finder = new Finder() { - @Override - Iterator find(K key, long timestamp) { - return window.find(key, timestamp); - } - }; - } - - @Override - public void process(K key, V1 value) { - long timestamp = context().timestamp(); - Iterator iter = finder.find(key, timestamp); - if (iter != null) { - while (iter.hasNext()) { - context().forward(key, joiner.apply(value, iter.next())); - } - } - } - } - - public static ValueJoiner reverseJoiner(final ValueJoiner joiner) { - return new ValueJoiner() { - @Override - public R apply(T2 value2, T1 value1) { - return joiner.apply(value1, value2); - } - }; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java deleted file mode 100644 index 3868318..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMap.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamMap implements ProcessorSupplier { - - private final KeyValueMapper> mapper; - - public KStreamMap(KeyValueMapper> mapper) { - this.mapper = mapper; - } - - @Override - public Processor get() { - return new KStreamMapProcessor(); - } - - private class KStreamMapProcessor extends AbstractProcessor { - @Override - public void process(K1 key, V1 value) { - KeyValue newPair = mapper.apply(key, value); - context().forward(newPair.key, newPair.value); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java deleted file mode 100644 index 692b421..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamMapValues.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamMapValues implements ProcessorSupplier { - - private final ValueMapper mapper; - - public KStreamMapValues(ValueMapper mapper) { - this.mapper = mapper; - } - - @Override - public Processor get() { - return new KStreamMapProcessor(); - } - - private class KStreamMapProcessor extends AbstractProcessor { - @Override - public void process(K1 key, V1 value) { - V2 newValue = mapper.apply(value); - context().forward(key, newValue); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java deleted file mode 100644 index 59a815b..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamPassThrough.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamPassThrough implements ProcessorSupplier { - - @Override - public Processor get() { - return new KStreamPassThroughProcessor(); - } - - public class KStreamPassThroughProcessor extends AbstractProcessor { - @Override - public void process(K key, V value) { - context().forward(key, value); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java deleted file mode 100644 index 7ebab0e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransform.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.Transformer; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -public class KStreamTransform implements ProcessorSupplier { - - private final TransformerSupplier> transformerSupplier; - - public KStreamTransform(TransformerSupplier> transformerSupplier) { - this.transformerSupplier = transformerSupplier; - } - - @Override - public Processor get() { - return new KStreamTransformProcessor(transformerSupplier.get()); - } - - public static class KStreamTransformProcessor implements Processor { - - private final Transformer> transformer; - private ProcessorContext context; - - public KStreamTransformProcessor(Transformer> transformer) { - this.transformer = transformer; - } - - @Override - public void init(ProcessorContext context) { - transformer.init(context); - this.context = context; - } - - @Override - public void process(K1 key, V1 value) { - KeyValue pair = transformer.transform(key, value); - context.forward(pair.key, pair.value); - } - - @Override - public void punctuate(long timestamp) { - transformer.punctuate(timestamp); - } - - @Override - public void close() { - transformer.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java deleted file mode 100644 index 6f989e6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValues.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueTransformer; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -public class KStreamTransformValues implements ProcessorSupplier { - - private final ValueTransformerSupplier valueTransformerSupplier; - - public KStreamTransformValues(ValueTransformerSupplier valueTransformerSupplier) { - this.valueTransformerSupplier = valueTransformerSupplier; - } - - @Override - public Processor get() { - return new KStreamTransformValuesProcessor(valueTransformerSupplier.get()); - } - - public static class KStreamTransformValuesProcessor implements Processor { - - private final ValueTransformer valueTransformer; - private ProcessorContext context; - - public KStreamTransformValuesProcessor(ValueTransformer valueTransformer) { - this.valueTransformer = valueTransformer; - } - - @Override - public void init(ProcessorContext context) { - valueTransformer.init(context); - this.context = context; - } - - @Override - public void process(K key, V value) { - context.forward(key, valueTransformer.transform(value)); - } - - @Override - public void punctuate(long timestamp) { - valueTransformer.punctuate(timestamp); - } - - @Override - public void close() { - valueTransformer.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java deleted file mode 100644 index 2923936..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindow.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -public class KStreamWindow implements ProcessorSupplier { - - private final WindowSupplier windowSupplier; - - KStreamWindow(WindowSupplier windowSupplier) { - this.windowSupplier = windowSupplier; - } - - public WindowSupplier window() { - return windowSupplier; - } - - @Override - public Processor get() { - return new KStreamWindowProcessor(); - } - - private class KStreamWindowProcessor extends AbstractProcessor { - - private Window window; - - @Override - public void init(ProcessorContext context) { - super.init(context); - this.window = windowSupplier.get(); - this.window.init(context); - } - - @Override - public void process(K key, V value) { - synchronized (this) { - window.put(key, value, context().timestamp()); - context().forward(key, value); - } - } - - @Override - public void close() { - window.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java deleted file mode 100644 index cb49873..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedImpl.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStreamWindowed; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.WindowSupplier; - -import java.util.HashSet; -import java.util.Set; - -public final class KStreamWindowedImpl extends KStreamImpl implements KStreamWindowed { - - private final WindowSupplier windowSupplier; - - public KStreamWindowedImpl(KStreamBuilder topology, String name, Set sourceNodes, WindowSupplier windowSupplier) { - super(topology, name, sourceNodes); - this.windowSupplier = windowSupplier; - } - - @Override - public KStream join(KStreamWindowed other, ValueJoiner valueJoiner) { - String thisWindowName = this.windowSupplier.name(); - String otherWindowName = ((KStreamWindowedImpl) other).windowSupplier.name(); - Set thisSourceNodes = this.sourceNodes; - Set otherSourceNodes = ((KStreamWindowedImpl) other).sourceNodes; - - if (thisSourceNodes == null || otherSourceNodes == null) - throw new KafkaException("not joinable"); - - Set allSourceNodes = new HashSet<>(sourceNodes); - allSourceNodes.addAll(((KStreamWindowedImpl) other).sourceNodes); - - KStreamJoin joinThis = new KStreamJoin<>(otherWindowName, valueJoiner); - KStreamJoin joinOther = new KStreamJoin<>(thisWindowName, KStreamJoin.reverseJoiner(valueJoiner)); - KStreamPassThrough joinMerge = new KStreamPassThrough<>(); - - String joinThisName = topology.newName(JOINTHIS_NAME); - String joinOtherName = topology.newName(JOINOTHER_NAME); - String joinMergeName = topology.newName(JOINMERGE_NAME); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KStreamImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.copartitionSources(allSourceNodes); - - return new KStreamImpl<>(topology, joinMergeName, allSourceNodes); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java deleted file mode 100644 index b54bcc9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowSupport.java +++ /dev/null @@ -1,159 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.streams.processor.internals.Stamped; - -import java.util.Iterator; - -public class WindowSupport { - - public static class ValueList { - Value head = null; - Value tail = null; - Value dirty = null; - - public void add(int slotNum, V value, long timestamp) { - Value v = new Value<>(slotNum, value, timestamp); - if (tail != null) { - tail.next = v; - } else { - head = v; - } - tail = v; - if (dirty == null) dirty = v; - } - - public Value first() { - return head; - } - - public void removeFirst() { - if (head != null) { - if (head == tail) tail = null; - head = head.next; - } - } - - public boolean isEmpty() { - return head == null; - } - - public boolean hasDirtyValues() { - return dirty != null; - } - - public void clearDirtyValues() { - dirty = null; - } - - public Iterator> iterator() { - return new ValueListIterator(head); - } - - public Iterator> dirtyValueIterator() { - return new ValueListIterator(dirty); - } - - } - - private static class ValueListIterator implements Iterator> { - - Value ptr; - - ValueListIterator(Value start) { - ptr = start; - } - - @Override - public boolean hasNext() { - return ptr != null; - } - - @Override - public Value next() { - Value value = ptr; - if (value != null) ptr = value.next; - return value; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - } - - public static class Value extends Stamped { - public final int slotNum; - private Value next = null; - - Value(int slotNum, V value, long timestamp) { - super(value, timestamp); - this.slotNum = slotNum; - } - } - - - public static long getLong(byte[] bytes, int offset) { - long value = 0; - for (int i = 0; i < 8; i++) { - value = (value << 8) | bytes[offset + i]; - } - return value; - } - - public static int getInt(byte[] bytes, int offset) { - int value = 0; - for (int i = 0; i < 4; i++) { - value = (value << 8) | bytes[offset + i]; - } - return value; - } - - public static int putLong(byte[] bytes, int offset, long value) { - for (int i = 7; i >= 0; i--) { - bytes[offset + i] = (byte) (value & 0xFF); - value = value >> 8; - } - return 8; - } - - public static int putInt(byte[] bytes, int offset, int value) { - for (int i = 3; i >= 0; i--) { - bytes[offset + i] = (byte) (value & 0xFF); - value = value >> 8; - } - return 4; - } - - public static int puts(byte[] bytes, int offset, byte[] value) { - offset += putInt(bytes, offset, value.length); - System.arraycopy(bytes, offset, value, 0, value.length); - return 4 + value.length; - } - - - public static T deserialize(byte[] bytes, int offset, int length, String topic, Deserializer deserializer) { - byte[] buf = new byte[length]; - System.arraycopy(bytes, offset, buf, 0, length); - return deserializer.deserialize(topic, buf); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java b/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java deleted file mode 100644 index 01d0024..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/AbstractProcessor.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor; - -/** - * An abstract implementation of {@link Processor} that manages the {@link ProcessorContext} instance and provides default no-op - * implementations of {@link #punctuate(long)} and {@link #close()}. - * - * @param the type of keys - * @param the type of values - */ -public abstract class AbstractProcessor implements Processor { - - private ProcessorContext context; - - protected AbstractProcessor() { - } - - @Override - public void init(ProcessorContext context) { - this.context = context; - } - - /** - * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context - * during {@link #init(ProcessorContext) initialization}. - *

- * This method does nothing by default; if desired, subclasses should override it with custom functionality. - *

- * - * @param streamTime the stream time when this method is being called - */ - @Override - public void punctuate(long streamTime) { - // do nothing - } - - /** - * Close this processor and clean up any resources. - *

- * This method does nothing by default; if desired, subclasses should override it with custom functionality. - *

- */ - @Override - public void close() { - // do nothing - } - - /** - * Get the processor's context set during {@link #init(ProcessorContext) initialization}. - * - * @return the processor context; null only when called prior to {@link #init(ProcessorContext) initialization}. - */ - protected final ProcessorContext context() { - return this.context; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java deleted file mode 100644 index 7d2188a..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/DefaultPartitionGrouper.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.TopicPartition; - -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -public class DefaultPartitionGrouper extends PartitionGrouper { - - public Map> partitionGroups(Cluster metadata) { - Map> groups = new HashMap<>(); - - for (Map.Entry> entry : topicGroups.entrySet()) { - Integer topicGroupId = entry.getKey(); - Set topicGroup = entry.getValue(); - - int maxNumPartitions = maxNumPartitions(metadata, topicGroup); - - for (int partitionId = 0; partitionId < maxNumPartitions; partitionId++) { - Set group = new HashSet<>(topicGroup.size()); - - for (String topic : topicGroup) { - if (partitionId < metadata.partitionsForTopic(topic).size()) { - group.add(new TopicPartition(topic, partitionId)); - } - } - groups.put(new TaskId(topicGroupId, partitionId), Collections.unmodifiableSet(group)); - } - } - - return Collections.unmodifiableMap(groups); - } - - protected int maxNumPartitions(Cluster metadata, Set topics) { - int maxNumPartitions = 0; - for (String topic : topics) { - List infos = metadata.partitionsForTopic(topic); - - if (infos == null) - throw new KafkaException("topic not found :" + topic); - - int numPartitions = infos.size(); - if (numPartitions > maxNumPartitions) - maxNumPartitions = numPartitions; - } - return maxNumPartitions; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java deleted file mode 100644 index 026ec89..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.streams.processor.internals.KafkaStreamingPartitionAssignor; - -import java.util.Map; -import java.util.Set; - -public abstract class PartitionGrouper { - - protected Map> topicGroups; - - private KafkaStreamingPartitionAssignor partitionAssignor = null; - - /** - * Returns a map of task ids to groups of partitions. - * - * @param metadata - * @return a map of task ids to groups of partitions - */ - public abstract Map> partitionGroups(Cluster metadata); - - public void topicGroups(Map> topicGroups) { - this.topicGroups = topicGroups; - } - - public void partitionAssignor(KafkaStreamingPartitionAssignor partitionAssignor) { - this.partitionAssignor = partitionAssignor; - } - - public Set taskIds(TopicPartition partition) { - return partitionAssignor.taskIds(partition); - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java deleted file mode 100644 index 3cade3a..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -/** - * A processor of messages. - * - * @param the type of keys - * @param the type of values - */ -public interface Processor { - - /** - * Initialize this processor with the given context. The framework ensures this is called once per processor when the topology - * that contains it is initialized. - *

- * If this processor is to be {@link #punctuate(long) called periodically} by the framework, then this method should - * {@link ProcessorContext#schedule(long) schedule itself} with the provided context. - * - * @param context the context; may not be null - */ - void init(ProcessorContext context); - - /** - * Process the message with the given key and value. - * - * @param key the key for the message - * @param value the value for the message - */ - void process(K key, V value); - - /** - * Perform any periodic operations, if this processor {@link ProcessorContext#schedule(long) schedule itself} with the context - * during {@link #init(ProcessorContext) initialization}. - * - * @param timestamp the stream time when this method is being called - */ - void punctuate(long timestamp); - - /** - * Close this processor and clean up any resources. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java deleted file mode 100644 index 88ac64e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingMetrics; - -import java.io.File; - -public interface ProcessorContext { - - /** - * Returns the task id - * - * @return the task id - */ - TaskId id(); - - /** - * Returns the key serializer - * - * @return the key serializer - */ - Serializer keySerializer(); - - /** - * Returns the value serializer - * - * @return the value serializer - */ - Serializer valueSerializer(); - - /** - * Returns the key deserializer - * - * @return the key deserializer - */ - Deserializer keyDeserializer(); - - /** - * Returns the value deserializer - * - * @return the value deserializer - */ - Deserializer valueDeserializer(); - - /** - * Returns the state directory for the partition. - * - * @return the state directory - */ - File stateDir(); - - /** - * Returns Metrics instance - * - * @return StreamingMetrics - */ - StreamingMetrics metrics(); - - /** - * Registers and possibly restores the specified storage engine. - * - * @param store the storage engine - */ - void register(StateStore store, StateRestoreCallback stateRestoreCallback); - - StateStore getStateStore(String name); - - void schedule(long interval); - - void forward(K key, V value); - - void forward(K key, V value, int childIndex); - - void commit(); - - String topic(); - - int partition(); - - long offset(); - - long timestamp(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java deleted file mode 100644 index 719d3ac..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorSupplier.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -public interface ProcessorSupplier { - - Processor get(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java deleted file mode 100644 index 39decec..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreCallback.java +++ /dev/null @@ -1,27 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -/** - * Restoration logic for log-backed state stores upon restart, - * it takes one record at a time from the logs to apply to the restoring state. - */ -public interface StateRestoreCallback { - - void restore(byte[] key, byte[] value); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java deleted file mode 100644 index 9c085a5..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStore.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -/** - * A storage engine for managing state maintained by a stream processor. - * - *

- * This interface does not specify any query capabilities, which, of course, - * would be query engine specific. Instead it just specifies the minimum - * functionality required to reload a storage engine from its changelog as well - * as basic lifecycle management. - *

- */ -public interface StateStore { - - /** - * The name of this store. - * @return the storage name - */ - String name(); - - /** - * Initializes this state store - */ - void init(ProcessorContext context); - - /** - * Flush any cached data - */ - void flush(); - - /** - * Close the storage engine - */ - void close(); - - /** - * If the storage is persistent - */ - boolean persistent(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java b/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java deleted file mode 100644 index 11545c5..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StateStoreSupplier.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -public interface StateStoreSupplier { - - String name(); - - StateStore get(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java b/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java deleted file mode 100644 index 3d474fe..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -public class TaskId { - - public final int topicGroupId; - public final int partition; - - public TaskId(int topicGroupId, int partition) { - this.topicGroupId = topicGroupId; - this.partition = partition; - } - - public String toString() { - return topicGroupId + "_" + partition; - } - - public static TaskId parse(String string) { - int index = string.indexOf('_'); - if (index <= 0 || index + 1 >= string.length()) throw new TaskIdFormatException(); - - try { - int topicGroupId = Integer.parseInt(string.substring(0, index)); - int partition = Integer.parseInt(string.substring(index + 1)); - - return new TaskId(topicGroupId, partition); - } catch (Exception e) { - throw new TaskIdFormatException(); - } - } - - @Override - public boolean equals(Object o) { - if (o instanceof TaskId) { - TaskId other = (TaskId) o; - return other.topicGroupId == this.topicGroupId && other.partition == this.partition; - } else { - return false; - } - } - - @Override - public int hashCode() { - long n = ((long) topicGroupId << 32) | (long) partition; - return (int) (n % 0xFFFFFFFFL); - } - - public static class TaskIdFormatException extends RuntimeException { - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/2b382d6f/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java deleted file mode 100644 index 62098f2..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** - * An interface that allows the KStream framework to extract a timestamp from a key-value pair - */ -public interface TimestampExtractor { - - /** - * Extracts a timestamp from a message - * - * @param record ConsumerRecord - * @return timestamp - */ - long extract(ConsumerRecord record); -}