kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rha...@apache.org
Subject [kafka] branch 2.5 updated: KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (#7568)
Date Tue, 04 Feb 2020 17:30:38 GMT
This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new a3b5b39  KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a
time and timestamp literal (#7568)
a3b5b39 is described below

commit a3b5b398e255a70421c36e2deb24d1ccdddb2aad
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Tue Feb 4 11:15:04 2020 -0600

    KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp
literal (#7568)
    
    * KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp
literal
    
    Time and timestamp literal strings contain a `:` character, but the internal parser used
in the `Values.parseString(String)` method tokenizes on the colon character to tokenize and
parse map entries. The colon could be escaped, but then the backslash character used to escape
the colon is not removed and the parser fails to match the literal as a time or timestamp
value.
    
    This fix corrects the parsing logic to properly parse timestamp and time literal strings
whose colon characters are either escaped or unescaped. Additional unit tests were added to
first verify the incorrect behavior and then to validate the correction.
    
    Author: Randall Hauch <rhauch@gmail.com>
    Reviewers: Chris Egerton <chrise@confluent.io>, Nigel Liang <nigel@nigelliang.com>,
Jason Gustafson <jason@confluent.io>
---
 .../java/org/apache/kafka/connect/data/Values.java | 133 +++++++++++++-----
 .../org/apache/kafka/connect/data/ValuesTest.java  | 155 +++++++++++++++++++++
 2 files changed, 257 insertions(+), 31 deletions(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 93c320a..d99fbca 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -30,13 +30,17 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.text.StringCharacterIterator;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Base64;
 import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Set;
 import java.util.TimeZone;
 import java.util.regex.Pattern;
 
@@ -70,9 +74,18 @@ public class Values {
     private static final String FALSE_LITERAL = Boolean.FALSE.toString();
     private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
     private static final String NULL_VALUE = "null";
-    private static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
-    private static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
-    private static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN
+ "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+    static final String ISO_8601_DATE_FORMAT_PATTERN = "yyyy-MM-dd";
+    static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
+    static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN
+ "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+    private static final Set<String> TEMPORAL_LOGICAL_TYPE_NAMES =
+            Collections.unmodifiableSet(
+                    new HashSet<>(
+                            Arrays.asList(Time.LOGICAL_NAME,
+                            Timestamp.LOGICAL_NAME,
+                            Date.LOGICAL_NAME
+                            )
+                    )
+            );
 
     private static final String QUOTE_DELIMITER = "\"";
     private static final String COMMA_DELIMITER = ",";
@@ -467,6 +480,9 @@ public class Values {
                                 int days = (int) (millis / MILLIS_PER_DAY); // truncates
                                 return Date.toLogical(toSchema, days);
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -492,6 +508,9 @@ public class Values {
                                 calendar.set(Calendar.DAY_OF_MONTH, 1);
                                 return Time.toLogical(toSchema, (int) calendar.getTimeInMillis());
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -523,6 +542,9 @@ public class Values {
                             if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
                                 return value;
                             }
+                        } else {
+                            // There is no fromSchema, so no conversion is needed
+                            return value;
                         }
                     }
                     long numeric = asLong(value, fromSchema, null);
@@ -755,7 +777,14 @@ public class Values {
                     }
                     sb.append(parser.next());
                 }
-                return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
+                String content = sb.toString();
+                // We can parse string literals as temporal logical types, but all others
+                // are treated as strings
+                SchemaAndValue parsed = parseString(content);
+                if (parsed != null && TEMPORAL_LOGICAL_TYPE_NAMES.contains(parsed.schema().name()))
{
+                    return parsed;
+                }
+                return new SchemaAndValue(Schema.STRING_SCHEMA, content);
             }
         }
 
@@ -838,7 +867,9 @@ public class Values {
                     }
 
                     if (!parser.canConsume(ENTRY_DELIMITER)) {
-                        throw new DataException("Map entry is missing '=': " + parser.original());
+                        throw new DataException("Map entry is missing '" + ENTRY_DELIMITER
+                                                + "' at " + parser.position()
+                                                + " in " + parser.original());
                     }
                     SchemaAndValue value = parse(parser, true);
                     Object entryValue = value != null ? value.value() : null;
@@ -855,7 +886,7 @@ public class Values {
                 throw new DataException("Map is missing terminating '}': " + parser.original());
             }
         } catch (DataException e) {
-            LOG.debug("Unable to parse the value as a map or an array; reverting to string",
e);
+            LOG.trace("Unable to parse the value as a map or an array; reverting to string",
e);
             parser.rewindTo(startPosition);
         }
 
@@ -867,6 +898,27 @@ public class Values {
 
         char firstChar = token.charAt(0);
         boolean firstCharIsDigit = Character.isDigit(firstChar);
+
+        // Temporal types are more restrictive, so try them first
+        if (firstCharIsDigit) {
+            // The time and timestamp literals may be split into 5 tokens since an unescaped
colon
+            // is a delimiter. Check these first since the first of these tokens is a simple
numeric
+            int position = parser.mark();
+            String remainder = parser.next(4);
+            if (remainder != null) {
+                String timeOrTimestampStr = token + remainder;
+                SchemaAndValue temporal = parseAsTemporal(timeOrTimestampStr);
+                if (temporal != null) {
+                    return temporal;
+                }
+            }
+            // No match was found using the 5 tokens, so rewind and see if the current token
has a date, time, or timestamp
+            parser.rewindTo(position);
+            SchemaAndValue temporal = parseAsTemporal(token);
+            if (temporal != null) {
+                return temporal;
+            }
+        }
         if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
             try {
                 // Try to parse as a number ...
@@ -901,37 +953,42 @@ public class Values {
                 // can't parse as a number
             }
         }
-        if (firstCharIsDigit) {
-            // Check for a date, time, or timestamp ...
-            int tokenLength = token.length();
-            if (tokenLength == ISO_8601_DATE_LENGTH) {
-                try {
-                    return new SchemaAndValue(Date.SCHEMA, new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            } else if (tokenLength == ISO_8601_TIME_LENGTH) {
-                try {
-                    return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
-                try {
-                    return new SchemaAndValue(Timestamp.SCHEMA, new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
-                } catch (ParseException e) {
-                    // not a valid date
-                }
-            }
-        }
         if (embedded) {
             throw new DataException("Failed to parse embedded value");
         }
-        // At this point, the only thing this can be is a string. Embedded strings were processed
above,
-        // so this is not embedded and we can use the original string...
+        // At this point, the only thing this non-embedded value can be is a string.
         return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
     }
 
+    private static SchemaAndValue parseAsTemporal(String token) {
+        if (token == null) {
+            return null;
+        }
+        // If the colons were escaped, we'll see the escape chars and need to remove them
+        token = token.replace("\\:", ":");
+        int tokenLength = token.length();
+        if (tokenLength == ISO_8601_TIME_LENGTH) {
+            try {
+                return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+              // not a valid date
+            }
+        } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
+            try {
+                return new SchemaAndValue(Timestamp.SCHEMA, new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+              // not a valid date
+            }
+        } else if (tokenLength == ISO_8601_DATE_LENGTH) {
+            try {
+                return new SchemaAndValue(Date.SCHEMA, new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
+            } catch (ParseException e) {
+                // not a valid date
+            }
+        }
+        return null;
+    }
+
     protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) {
         if (latest == null) {
             return previous;
@@ -1088,6 +1145,7 @@ public class Values {
         public void rewindTo(int position) {
             iter.setIndex(position);
             nextToken = null;
+            previousToken = null;
         }
 
         public String original() {
@@ -1112,6 +1170,19 @@ public class Values {
             return previousToken;
         }
 
+        public String next(int n) {
+            int current = mark();
+            int start = mark();
+            for (int i = 0; i != n; ++i) {
+                if (!hasNext()) {
+                    rewindTo(start);
+                    return null;
+                }
+                next();
+            }
+            return original.substring(current, position());
+        }
+
         private String consumeNextToken() throws NoSuchElementException {
             boolean escaped = false;
             int start = iter.getIndex();
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index a5909f3..c437e46 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.connect.data.Values.Parser;
 import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -383,6 +384,146 @@ public class ValuesTest {
         assertEquals(str, result.value());
     }
 
+    @Test
+    public void shouldParseTimestampStringAsTimestamp() throws Exception {
+        String str = "2019-08-23T14:34:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT64, result.schema().type());
+        assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseDateStringAsDate() throws Exception {
+        String str = "2019-08-23";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Date.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsDate() throws Exception {
+        String str = "14:34:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Time.LOGICAL_NAME, result.schema().name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(str);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimestampStringWithEscapedColonsAsTimestamp() throws Exception
{
+        String str = "2019-08-23T14\\:34\\:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT64, result.schema().type());
+        assertEquals(Timestamp.LOGICAL_NAME, result.schema().name());
+        String expectedStr = "2019-08-23T14:34:54.346Z";
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(expectedStr);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringWithEscapedColonsAsDate() throws Exception {
+        String str = "14\\:34\\:54.346Z";
+        SchemaAndValue result = Values.parseString(str);
+        assertEquals(Type.INT32, result.schema().type());
+        assertEquals(Time.LOGICAL_NAME, result.schema().name());
+        String expectedStr = "14:34:54.346Z";
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(expectedStr);
+        assertEquals(expected, result.value());
+    }
+
+    @Test
+    public void shouldParseDateStringAsDateInArray() throws Exception {
+        String dateStr = "2019-08-23";
+        String arrayStr = "[" + dateStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT32, elementSchema.type());
+        assertEquals(Date.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_DATE_FORMAT_PATTERN).parse(dateStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsTimeInArray() throws Exception {
+        String timeStr = "14:34:54.346Z";
+        String arrayStr = "[" + timeStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT32, elementSchema.type());
+        assertEquals(Time.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseTimestampStringAsTimestampInArray() throws Exception {
+        String tsStr = "2019-08-23T14:34:54.346Z";
+        String arrayStr = "[" + tsStr + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT64, elementSchema.type());
+        assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr);
+        assertEquals(Collections.singletonList(expected), result.value());
+    }
+
+    @Test
+    public void shouldParseMultipleTimestampStringAsTimestampInArray() throws Exception {
+        String tsStr1 = "2019-08-23T14:34:54.346Z";
+        String tsStr2 = "2019-01-23T15:12:34.567Z";
+        String tsStr3 = "2019-04-23T19:12:34.567Z";
+        String arrayStr = "[" + tsStr1 + "," + tsStr2 + ",   " + tsStr3 + "]";
+        SchemaAndValue result = Values.parseString(arrayStr);
+        assertEquals(Type.ARRAY, result.schema().type());
+        Schema elementSchema = result.schema().valueSchema();
+        assertEquals(Type.INT64, elementSchema.type());
+        assertEquals(Timestamp.LOGICAL_NAME, elementSchema.name());
+        java.util.Date expected1 = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr1);
+        java.util.Date expected2 = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr2);
+        java.util.Date expected3 = new SimpleDateFormat(Values.ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(tsStr3);
+        assertEquals(Arrays.asList(expected1, expected2, expected3), result.value());
+    }
+
+    @Test
+    public void shouldParseQuotedTimeStringAsTimeInMap() throws Exception {
+        String keyStr = "k1";
+        String timeStr = "14:34:54.346Z";
+        String mapStr = "{\"" + keyStr + "\":\"" + timeStr + "\"}";
+        SchemaAndValue result = Values.parseString(mapStr);
+        assertEquals(Type.MAP, result.schema().type());
+        Schema keySchema = result.schema().keySchema();
+        Schema valueSchema = result.schema().valueSchema();
+        assertEquals(Type.STRING, keySchema.type());
+        assertEquals(Type.INT32, valueSchema.type());
+        assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonMap(keyStr, expected), result.value());
+    }
+
+    @Test
+    public void shouldParseTimeStringAsTimeInMap() throws Exception {
+        String keyStr = "k1";
+        String timeStr = "14:34:54.346Z";
+        String mapStr = "{\"" + keyStr + "\":" + timeStr + "}";
+        SchemaAndValue result = Values.parseString(mapStr);
+        assertEquals(Type.MAP, result.schema().type());
+        Schema keySchema = result.schema().keySchema();
+        Schema valueSchema = result.schema().valueSchema();
+        assertEquals(Type.STRING, keySchema.type());
+        assertEquals(Type.INT32, valueSchema.type());
+        assertEquals(Time.LOGICAL_NAME, valueSchema.name());
+        java.util.Date expected = new SimpleDateFormat(Values.ISO_8601_TIME_FORMAT_PATTERN).parse(timeStr);
+        assertEquals(Collections.singletonMap(keyStr, expected), result.value());
+    }
+
     /**
      * This is technically invalid JSON, and we don't want to simply ignore the blank elements.
      */
@@ -433,6 +574,20 @@ public class ValuesTest {
     }
 
     @Test
+    public void shouldConsumeMultipleTokens() {
+        String value = "a:b:c:d:e:f:g:h";
+        Parser parser = new Parser(value);
+        String firstFive = parser.next(5);
+        assertEquals("a:b:c", firstFive);
+        assertEquals(":", parser.next());
+        assertEquals("d", parser.next());
+        assertEquals(":", parser.next());
+        String lastEight = parser.next(8); // only 7 remain
+        assertNull(lastEight);
+        assertEquals("e", parser.next());
+    }
+
+    @Test
     public void shouldParseStringsWithoutDelimiters() {
         //assertParsed("");
         assertParsed("  ");


Mime
View raw message