kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-2622: Add Time logical type for Copycat.
Date Fri, 09 Oct 2015 18:55:17 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6a06e22ef -> 49822ff83


KAFKA-2622: Add Time logical type for Copycat.

Author: Ewen Cheslack-Postava <me@ewencp.org>

Reviewers: Guozhang Wang

Closes #285 from ewencp/kafka-2622-time-logical-type


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/49822ff8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/49822ff8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/49822ff8

Branch: refs/heads/trunk
Commit: 49822ff836e0fcd2a30bdf994d7b881a9de95cdf
Parents: 6a06e22
Author: Ewen Cheslack-Postava <me@ewencp.org>
Authored: Fri Oct 9 11:59:38 2015 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Fri Oct 9 11:59:38 2015 -0700

----------------------------------------------------------------------
 .../kafka/copycat/data/CopycatSchema.java       | 19 ++++-
 .../org/apache/kafka/copycat/data/Time.java     | 77 +++++++++++++++++++
 .../kafka/copycat/data/CopycatSchemaTest.java   | 29 +++++++
 .../org/apache/kafka/copycat/data/TimeTest.java | 80 ++++++++++++++++++++
 .../kafka/copycat/json/JsonConverter.java       | 18 +++++
 .../kafka/copycat/json/JsonConverterTest.java   | 31 ++++++++
 6 files changed, 253 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
index 6b77717..104abf1 100644
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
@@ -19,6 +19,7 @@ package org.apache.kafka.copycat.data;
 
 import org.apache.kafka.copycat.errors.DataException;
 
+import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.*;
 
@@ -27,6 +28,10 @@ public class CopycatSchema implements Schema {
      * Maps Schema.Types to a list of Java classes that can be used to represent them.
      */
     private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
+    /**
+     * Maps known logical types to a list of Java classes that can be used to represent them.
+     */
+    private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new
HashMap<>();
 
     /**
      * Maps the Java classes to the corresponding Schema.Type.
@@ -54,6 +59,14 @@ public class CopycatSchema implements Schema {
             for (Class<?> schemaClass : schemaClasses.getValue())
                 JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
         }
+
+        LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class));
+        LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
+        LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
+        LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
+        // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used
to determine schemas for
+        // schemaless data and logical types will have ambiguous schemas (e.g. many of them
use the same Java class) so
+        // they should not be used without schemas.
     }
 
     // The type of the field
@@ -195,7 +208,11 @@ public class CopycatSchema implements Schema {
                 return;
         }
 
-        final List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+        List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
+
+        if (expectedClasses == null)
+                expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
+
         if (expectedClasses == null)
             throw new DataException("Invalid Java object for schema type " + schema.type()
+ ": " + value.getClass());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
new file mode 100644
index 0000000..76bf28c
--- /dev/null
+++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A time representing a specific point in a day, not tied to any specific date. The
corresponding Java type is a
+ *     java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero.
This effectively makes it a
+ *     point in time during the first day after the Unix epoch. The underlying representation
is an integer
+ *     representing the number of milliseconds after midnight.
+ * </p>
+ */
+public class Time {
+    public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Time";
+
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override
additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int32()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Time) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static int fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Time object but the schema does
not match.");
+        Calendar calendar = Calendar.getInstance(UTC);
+        calendar.setTime(value);
+        long unixMillis = calendar.getTimeInMillis();
+        if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
+            throw new DataException("Copycat Time type should not have any time fields set
to non-zero values.");
+        }
+        return (int) unixMillis;
+    }
+
+    public static java.util.Date toLogical(Schema schema, int value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but the schema does
not match.");
+        if (value  < 0 || value > MILLIS_PER_DAY)
+            throw new DataException("Time values must use number of milliseconds greater
than 0 and less than 86400000");
+        return new java.util.Date(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
index 9e97cad..4976950 100644
--- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java
@@ -20,6 +20,8 @@ package org.apache.kafka.copycat.data;
 import org.apache.kafka.copycat.errors.DataException;
 import org.junit.Test;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.nio.CharBuffer;
 import java.util.Arrays;
@@ -97,6 +99,14 @@ public class CopycatSchemaTest {
         CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
     }
 
+    @Test
+    public void testValidateValueMatchingLogicalType() {
+        CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"),
2));
+        CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
+        CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
+        CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
+    }
+
     // To avoid requiring excessive numbers of tests, these checks for invalid types use
a similar type where possible
     // to only include a single test for each type
 
@@ -204,6 +214,25 @@ public class CopycatSchemaTest {
         );
     }
 
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDecimal() {
+        CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchDate() {
+        CopycatSchema.validateValue(Date.SCHEMA, 1000L);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchTime() {
+        CopycatSchema.validateValue(Time.SCHEMA, 1000L);
+    }
+
+    @Test(expected = DataException.class)
+    public void testValidateValueMismatchTimestamp() {
+        CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L);
+    }
 
     @Test
     public void testPrimitiveEquality() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
new file mode 100644
index 0000000..8e54cb2
--- /dev/null
+++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.data;
+
+import org.apache.kafka.copycat.errors.DataException;
+import org.junit.Test;
+
+import java.util.Calendar;
+import java.util.GregorianCalendar;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+public class TimeTest {
+    private static final GregorianCalendar EPOCH;
+    private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT;
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
+    static {
+        EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));
+
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1,
0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
+
+
+        EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0,
0);
+        EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000);
+    }
+
+    @Test
+    public void testBuilder() {
+        Schema plain = Time.SCHEMA;
+        assertEquals(Time.LOGICAL_NAME, plain.name());
+        assertEquals(1, (Object) plain.version());
+    }
+
+    @Test
+    public void testFromLogical() {
+        assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime()));
+        assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime()));
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidSchema() {
+        Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime());
+    }
+
+    @Test(expected = DataException.class)
+    public void testFromLogicalInvalidHasDateComponents() {
+        Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime());
+    }
+
+    @Test
+    public void testToLogical() {
+        assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0));
+        assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA,
10000));
+    }
+
+    @Test(expected = DataException.class)
+    public void testToLogicalInvalidSchema() {
+        Time.toLogical(Time.builder().name("invalid").build(), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
index 8910b27..ca8f029 100644
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
@@ -217,6 +217,15 @@ public class JsonConverter implements Converter {
             }
         });
 
+        TO_COPYCAT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof Integer))
+                    throw new DataException("Invalid type for Time, underlying representation
should be int32 but was " + value.getClass());
+                return Time.toLogical(schema, (int) value);
+            }
+        });
+
         TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {
@@ -247,6 +256,15 @@ public class JsonConverter implements Converter {
             }
         });
 
+        TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
+            @Override
+            public Object convert(Schema schema, Object value) {
+                if (!(value instanceof java.util.Date))
+                    throw new DataException("Invalid type for Time, expected Date but was
" + value.getClass());
+                return Time.fromLogical(schema, (java.util.Date) value);
+            }
+        });
+
         TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter()
{
             @Override
             public Object convert(Schema schema, Object value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/49822ff8/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
index 8a8e243..6b40046 100644
--- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.copycat.data.Schema;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.data.SchemaBuilder;
 import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.data.Time;
 import org.apache.kafka.copycat.data.Timestamp;
 import org.apache.kafka.copycat.errors.DataException;
 import org.junit.Before;
@@ -230,6 +231,20 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToCopycat() {
+        Schema schema = Time.SCHEMA;
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 14400000);
+        java.util.Date reference = calendar.getTime();
+        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Time\",
\"version\": 1 }, \"payload\": 14400000 }";
+        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
+        java.util.Date converted = (java.util.Date) schemaAndValue.value();
+        assertEquals(schema, schemaAndValue.schema());
+        assertEquals(reference, converted);
+    }
+
+    @Test
     public void timestampToCopycat() {
         Schema schema = Timestamp.SCHEMA;
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
@@ -455,6 +470,22 @@ public class JsonConverterTest {
     }
 
     @Test
+    public void timeToJson() throws IOException {
+        GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
+        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
+        calendar.add(Calendar.MILLISECOND, 14400000);
+        java.util.Date date = calendar.getTime();
+
+        JsonNode converted = parse(converter.fromCopycatData(TOPIC, Time.SCHEMA, date));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Time\",
\"version\": 1 }"),
+                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
+        assertTrue(payload.isInt());
+        assertEquals(14400000, payload.longValue());
+    }
+
+    @Test
     public void timestampToJson() throws IOException {
         GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0,
0, 0);
         calendar.setTimeZone(TimeZone.getTimeZone("UTC"));


Mime
View raw message