kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2474: Add caching of JSON schema conversions to JsonConverter
Date Tue, 06 Oct 2015 22:27:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 23f9afb70 -> 174a43cd0


KAFKA-2474: Add caching of JSON schema conversions to JsonConverter

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Ismael Juma, Guozhang Wang

Closes #250 from ewencp/kafka-2474-cache-json-schema-conversions


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/174a43cd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/174a43cd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/174a43cd

Branch: refs/heads/trunk
Commit: 174a43cd09d3a2a2785daf0cfa5ada1646d8bfcc
Parents: 23f9afb
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Tue Oct 6 15:31:28 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Oct 6 15:31:28 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/cache/Cache.java    | 53 +++++++++++
 .../org/apache/kafka/common/cache/LRUCache.java | 57 ++++++++++++
 .../kafka/common/cache/SynchronizedCache.java   | 51 +++++++++++
 .../apache/kafka/common/cache/LRUCacheTest.java | 93 ++++++++++++++++++++
 .../kafka/copycat/json/JsonConverter.java       | 41 +++++++--
 .../kafka/copycat/json/JsonConverterTest.java   | 47 ++++++++++
 6 files changed, 336 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/Cache.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/Cache.java b/clients/src/main/java/org/apache/kafka/common/cache/Cache.java
new file mode 100644
index 0000000..6678e40
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/cache/Cache.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.cache;
+
+/**
+ * Interface for caches, semi-peristent maps which store key-value mappings until either
an eviction criteria is met
+ * or the entries are manually invalidated. Caches are not required to be thread-safe, but
some implementations may be.
+ */
+public interface Cache<K, V> {
+
+    /**
+     * Look up a value in the cache.
+     * @param key the key to
+     * @return the cached value, or null if it is not present.
+     */
+    V get(K key);
+
+    /**
+     * Insert an entry into the cache.
+     * @param key the key to insert
+     * @param value the value to insert
+     */
+    void put(K key, V value);
+
+    /**
+     * Manually invalidate a key, clearing its entry from the cache.
+     * @param key the key to remove
+     * @return true if the key existed in the cache and the entry was removed or false if
it was not present
+     */
+    boolean remove(K key);
+
+    /**
+     * Get the number of entries in this cache. If this cache is used by multiple threads
concurrently, the returned
+     * value will only be approximate.
+     * @return the number of entries in the cache
+     */
+    long size();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
new file mode 100644
index 0000000..89e6e87
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/cache/LRUCache.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/**
+ * A cache implementing a least recently used policy.
+ */
+public class LRUCache<K, V> implements Cache<K, V> {
+    private final LinkedHashMap<K, V> cache;
+
+    public LRUCache(final int maxSize) {
+        cache = new LinkedHashMap<K, V>(16, .75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry eldest) {
+                return size() > maxSize;
+            }
+        };
+    }
+
+    @Override
+    public V get(K key) {
+        return cache.get(key);
+    }
+
+    @Override
+    public void put(K key, V value) {
+        cache.put(key, value);
+    }
+
+    @Override
+    public boolean remove(K key) {
+        return cache.remove(key) != null;
+    }
+
+    @Override
+    public long size() {
+        return cache.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java b/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java
new file mode 100644
index 0000000..0e88aa3
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/cache/SynchronizedCache.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.cache;
+
+/**
+ * Wrapper for caches that adds simple synchronization to provide a thread-safe cache. Note
that this simply adds
+ * synchronization around each cache method on the underlying unsynchronized cache. It does
not add any support for
+ * atomically checking for existence of an entry and computing and inserting the value if
it is missing.
+ */
+public class SynchronizedCache<K, V> implements Cache<K, V> {
+    private final Cache<K, V> underlying;
+
+    public SynchronizedCache(Cache<K, V> underlying) {
+        this.underlying = underlying;
+    }
+
+    @Override
+    public synchronized V get(K key) {
+        return underlying.get(key);
+    }
+
+    @Override
+    public synchronized void put(K key, V value) {
+        underlying.put(key, value);
+    }
+
+    @Override
+    public synchronized boolean remove(K key) {
+        return underlying.remove(key);
+    }
+
+    @Override
+    public synchronized long size() {
+        return underlying.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
new file mode 100644
index 0000000..4cf130c
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/cache/LRUCacheTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.common.cache;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class LRUCacheTest {
+
+    @Test
+    public void testPutGet() {
+        Cache<String, String> cache = new LRUCache<>(4);
+
+        cache.put("a", "b");
+        cache.put("c", "d");
+        cache.put("e", "f");
+        cache.put("g", "h");
+
+        assertEquals(4, cache.size());
+
+        assertEquals("b", cache.get("a"));
+        assertEquals("d", cache.get("c"));
+        assertEquals("f", cache.get("e"));
+        assertEquals("h", cache.get("g"));
+    }
+
+    @Test
+    public void testRemove() {
+        Cache<String, String> cache = new LRUCache<>(4);
+
+        cache.put("a", "b");
+        cache.put("c", "d");
+        cache.put("e", "f");
+        assertEquals(3, cache.size());
+
+        assertEquals(true, cache.remove("a"));
+        assertEquals(2, cache.size());
+        assertNull(cache.get("a"));
+        assertEquals("d", cache.get("c"));
+        assertEquals("f", cache.get("e"));
+
+        assertEquals(false, cache.remove("key-does-not-exist"));
+
+        assertEquals(true, cache.remove("c"));
+        assertEquals(1, cache.size());
+        assertNull(cache.get("c"));
+        assertEquals("f", cache.get("e"));
+
+        assertEquals(true, cache.remove("e"));
+        assertEquals(0, cache.size());
+        assertNull(cache.get("e"));
+    }
+
+    @Test
+    public void testEviction() {
+        Cache<String, String> cache = new LRUCache<>(2);
+
+        cache.put("a", "b");
+        cache.put("c", "d");
+        assertEquals(2, cache.size());
+
+        cache.put("e", "f");
+        assertEquals(2, cache.size());
+        assertNull(cache.get("a"));
+        assertEquals("d", cache.get("c"));
+        assertEquals("f", cache.get("e"));
+
+        // Validate correct access order eviction
+        cache.get("c");
+        cache.put("g", "h");
+        assertEquals(2, cache.size());
+        assertNull(cache.get("e"));
+        assertEquals("d", cache.get("c"));
+        assertEquals("h", cache.get("g"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 1841640..5b37f27 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -21,6 +21,9 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.cache.Cache;
+import org.apache.kafka.common.cache.LRUCache;
+import org.apache.kafka.common.cache.SynchronizedCache;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.copycat.data.*;
 import org.apache.kafka.copycat.errors.DataException;
@@ -28,7 +31,11 @@ import org.apache.kafka.copycat.storage.Converter;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 /**
  * Implementation of Converter that uses JSON to store schemas and objects.
@@ -36,6 +43,8 @@ import java.util.*;
 public class JsonConverter implements Converter {
     private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
     private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
+    private static final String SCHEMAS_CACHE_CONFIG = "schemas.cache.size";
+    private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
 
     private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS
= new HashMap<>();
 
@@ -188,6 +197,9 @@ public class JsonConverter implements Converter {
     }
 
     private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
+    private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
+    private Cache<Schema, ObjectNode> fromCopycatSchemaCache;
+    private Cache<JsonNode, Schema> toCopycatSchemaCache;
 
     private final JsonSerializer serializer = new JsonSerializer();
     private final JsonDeserializer deserializer = new JsonDeserializer();
@@ -200,6 +212,12 @@ public class JsonConverter implements Converter {
 
         serializer.configure(configs, isKey);
         deserializer.configure(configs, isKey);
+
+        Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_DEFAULT);
+        if (cacheSizeVal != null)
+            cacheSize = (int) cacheSizeVal;
+        fromCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
+        toCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
     }
 
     @Override
@@ -247,10 +265,14 @@ public class JsonConverter implements Converter {
         return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
     }
 
-    private static ObjectNode asJsonSchema(Schema schema) {
+    private ObjectNode asJsonSchema(Schema schema) {
         if (schema == null)
             return null;
 
+        ObjectNode cached = fromCopycatSchemaCache.get(schema);
+        if (cached != null)
+            return cached;
+
         final ObjectNode jsonSchema;
         switch (schema.type()) {
             case BOOLEAN:
@@ -313,14 +335,19 @@ public class JsonConverter implements Converter {
         if (schema.defaultValue() != null)
             jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue()));
 
+        fromCopycatSchemaCache.put(schema, jsonSchema);
         return jsonSchema;
     }
 
 
-    private static Schema asCopycatSchema(JsonNode jsonSchema) {
+    private Schema asCopycatSchema(JsonNode jsonSchema) {
         if (jsonSchema.isNull())
             return null;
 
+        Schema cached = toCopycatSchemaCache.get(jsonSchema);
+        if (cached != null)
+            return cached;
+
         JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
         if (schemaTypeNode == null || !schemaTypeNode.isTextual())
             throw new DataException("Schema must contain 'type' field");
@@ -409,7 +436,9 @@ public class JsonConverter implements Converter {
         if (schemaDefaultNode != null)
             builder.defaultValue(convertToCopycat(builder, schemaDefaultNode));
 
-        return builder.build();
+        Schema result = builder.build();
+        toCopycatSchemaCache.put(jsonSchema, result);
+        return result;
     }
 
 
@@ -420,11 +449,11 @@ public class JsonConverter implements Converter {
      * @param value the value
      * @return JsonNode-encoded version
      */
-    private static JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
+    private JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
         return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode();
     }
 
-    private static JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
+    private JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) {
         return convertToJson(schema, value);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/174a43cd/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index 214f9ce..96f8544 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -21,12 +21,16 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.data.Struct;
 import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Before;
 import org.junit.Test;
+import org.powermock.reflect.Whitebox;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
@@ -44,6 +48,11 @@ public class JsonConverterTest {
     ObjectMapper objectMapper = new ObjectMapper();
     JsonConverter converter = new JsonConverter();
 
+    @Before
+    public void setUp() {
+        converter.configure(Collections.EMPTY_MAP, false);
+    }
+
     // Schema metadata
 
     @Test
@@ -206,6 +215,27 @@ public class JsonConverterTest {
         assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
     }
 
+
+    @Test
+    public void testCacheSchemaToCopycatConversion() {
+        Cache<JsonNode, Schema> cache = Whitebox.getInternalState(converter, "toCopycatSchemaCache");
+        assertEquals(0, cache.size());
+
+        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\":
true }".getBytes());
+        assertEquals(1, cache.size());
+
+        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\":
true }".getBytes());
+        assertEquals(1, cache.size());
+
+        // Different schema should also get cached
+        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\":
true }, \"payload\": true }".getBytes());
+        assertEquals(2, cache.size());
+
+        // Even equivalent, but different JSON encoding of schema, should get different cache
entry
+        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\":
false }, \"payload\": true }".getBytes());
+        assertEquals(3, cache.size());
+    }
+
     // Schema types
 
     @Test
@@ -428,6 +458,23 @@ public class JsonConverterTest {
         assertEquals(true, converted.booleanValue());
     }
 
+    @Test
+    public void testCacheSchemaToJsonConversion() {
+        Cache<Schema, ObjectNode> cache = Whitebox.getInternalState(converter, "fromCopycatSchemaCache");
+        assertEquals(0, cache.size());
+
+        // Repeated conversion of the same schema, even if the schema object is different
should return the same Java
+        // object
+        converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true);
+        assertEquals(1, cache.size());
+
+        converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true);
+        assertEquals(1, cache.size());
+
+        // Validate that a similar, but different schema correctly returns a different schema.
+        converter.fromCopycatData(TOPIC, SchemaBuilder.bool().optional().build(), true);
+        assertEquals(2, cache.size());
+    }
 
 
     private JsonNode parse(byte[] json) {


Mime
View raw message