kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kkaranta...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)
Date Tue, 06 Oct 2020 01:31:46 GMT
This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new c7e1c3d  KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale.
(#9320)
c7e1c3d is described below

commit c7e1c3d209d6975c2bcbf74e2f2f2b7ab2202b49
Author: Alex Diachenko <sansanichfb@gmail.com>
AuthorDate: Mon Oct 5 17:24:44 2020 -0700

    KAFKA-10439: Connect's Values to parse BigInteger as Decimal with zero scale. (#9320)
    
    The `org.apache.kafka.connect.data.Values#parse` method parses integers, which are larger
than `Long.MAX_VALUE` as `double` with `Schema.FLOAT64_SCHEMA`.
    
    That means we are losing precision for these larger integers.
    
    For example:
    `SchemaAndValue schemaAndValue = Values.parseString("9223372036854775808");`
    returns:
    `SchemaAndValue{schema=Schema{FLOAT64}, value=9.223372036854776E18}`
    
    Also, this method parses values that can be parsed as `FLOAT32` to `FLOAT64`.
    
    This PR changes parsing logic, to use `FLOAT32`/`FLOAT64` for numbers that don't have
fraction part(`decimal.scale()!=0`) only, and use an arbitrary-precision `org.apache.kafka.connect.data.Decimal`
otherwise.
    Also, it updates the method to parse numbers, that can be represented as `float` to `FLOAT64`.
    
    Added unit tests, that cover parsing `BigInteger`, `Byte`, `Short`, `Integer`, `Long`,
`Float`, `Double` types.
    
    Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
---
 .../java/org/apache/kafka/connect/data/Values.java |   8 +-
 .../org/apache/kafka/connect/data/ValuesTest.java  | 128 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 1 deletion(-)

diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index bf0c528..fbd3a8e 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -839,8 +839,14 @@ public class Values {
                 } catch (ArithmeticException e) {
                     // continue
                 }
+                float fValue = decimal.floatValue();
+                if (fValue != Float.NEGATIVE_INFINITY && fValue != Float.POSITIVE_INFINITY
+                    && decimal.scale() != 0) {
+                    return new SchemaAndValue(Schema.FLOAT32_SCHEMA, fValue);
+                }
                 double dValue = decimal.doubleValue();
-                if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY)
{
+                if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY
+                    && decimal.scale() != 0) {
                     return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
                 }
                 Schema schema = Decimal.schema(decimal.scale());
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
index dcfa3cf..c9a86b6 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.connect.data.Values.Parser;
 import org.apache.kafka.connect.errors.DataException;
 import org.junit.Test;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -331,6 +333,132 @@ public class ValuesTest {
     public void canConsume() {
     }
 
+    @Test
+    public void shouldParseBigIntegerAsDecimalWithZeroScale() {
+        BigInteger value = BigInteger.valueOf(Long.MAX_VALUE).add(new BigInteger("1"));
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Decimal.schema(0), schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof BigDecimal);
+        assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
+        value = BigInteger.valueOf(Long.MIN_VALUE).subtract(new BigInteger("1"));
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Decimal.schema(0), schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof BigDecimal);
+        assertEquals(value, ((BigDecimal) schemaAndValue.value()).unscaledValue());
+    }
+
+    @Test
+    public void shouldParseByteAsInt8() {
+        Byte value = Byte.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Byte);
+        assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
+        value = Byte.MIN_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT8_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Byte);
+        assertEquals(value.byteValue(), ((Byte) schemaAndValue.value()).byteValue());
+    }
+
+    @Test
+    public void shouldParseShortAsInt16() {
+        Short value = Short.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Short);
+        assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
+        value = Short.MIN_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT16_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Short);
+        assertEquals(value.shortValue(), ((Short) schemaAndValue.value()).shortValue());
+    }
+
+    @Test
+    public void shouldParseIntegerAsInt32() {
+        Integer value = Integer.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Integer);
+        assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
+        value = Integer.MIN_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Integer);
+        assertEquals(value.intValue(), ((Integer) schemaAndValue.value()).intValue());
+    }
+
+    @Test
+    public void shouldParseLongAsInt64() {
+        Long value = Long.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Long);
+        assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
+        value = Long.MIN_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.INT64_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Long);
+        assertEquals(value.longValue(), ((Long) schemaAndValue.value()).longValue());
+    }
+
+    @Test
+    public void shouldParseFloatAsFloat32() {
+        Float value = Float.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Float);
+        assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
+        value = -Float.MAX_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.FLOAT32_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Float);
+        assertEquals(value.floatValue(), ((Float) schemaAndValue.value()).floatValue(), 0);
+    }
+
+    @Test
+    public void shouldParseDoubleAsFloat64() {
+        Double value = Double.MAX_VALUE;
+        SchemaAndValue schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Double);
+        assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(),
0);
+        value = -Double.MAX_VALUE;
+        schemaAndValue = Values.parseString(
+            String.valueOf(value)
+        );
+        assertEquals(Schema.FLOAT64_SCHEMA, schemaAndValue.schema());
+        assertTrue(schemaAndValue.value() instanceof Double);
+        assertEquals(value.doubleValue(), ((Double) schemaAndValue.value()).doubleValue(),
0);
+    }
+
     protected void assertParsed(String input) {
         assertParsed(input, input);
     }


Mime
View raw message