What data types are supported by Flink?

  • 2021-09-11 20:33:23
  • OfStack

Directory 1. Supported data types 2. Tuple type of Flink 3. Use of Tuple 4. POJO type of Flink 5. Basic type of Flink 6. General type of Flink 7. Value type of Flink Values 8. Flink Hadoop Writable class 9. Special type of Flink

1. Supported data types

Flink imposes some restrictions on the types of elements that can be in DataSet or DataStream. The reason for this is that the system analyzes the type to determine an effective execution strategy.

1. Java Tuple and Scala Case;

2. Java POJO;

3. Basic types;

4. General class;

5. Value;

6.Hadoop Writables;

7. Special Types

2. Tuple Type of Flink

Tuple Type   Tuple Yes flink 1 very special type (tuple type), is an abstract class, a total of 26 Tuple Subclass inheritance Tuple They are Tuple0 1 until Tuple25


package org.apache.flink.api.java.tuple;
​
import java.io.Serializable;
import org.apache.flink.annotation.Public;
import org.apache.flink.types.NullFieldException;
​
@Public
public abstract class Tuple implements Serializable {
    private static final long serialVersionUID = 1L;
    public static final int MAX_ARITY = 25;
    private static final Class<?>[] CLASSES = new Class[]{Tuple0.class, Tuple1.class, Tuple2.class, Tuple3.class, Tuple4.class, Tuple5.class, Tuple6.class, Tuple7.class, Tuple8.class, Tuple9.class, Tuple10.class, Tuple11.class, Tuple12.class, Tuple13.class, Tuple14.class, Tuple15.class, Tuple16.class, Tuple17.class, Tuple18.class, Tuple19.class, Tuple20.class, Tuple21.class, Tuple22.class, Tuple23.class, Tuple24.class, Tuple25.class};
​
    public Tuple() {
    }
​
    public abstract <T> T getField(int var1);
​
    public <T> T getFieldNotNull(int pos) {
        T field = this.getField(pos);
        if (field != null) {
            return field;
        } else {
            throw new NullFieldException(pos);
        }
    }
​
    public abstract <T> void setField(T var1, int var2);
​
    public abstract int getArity();
​
    public abstract <T extends Tuple> T copy();
​
    public static Class<? extends Tuple> getTupleClass(int arity) {
        if (arity >= 0 && arity <= 25) {
            return CLASSES[arity];
        } else {
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
​
    public static Tuple newInstance(int arity) {
        switch(arity) {
        case 0:
            return Tuple0.INSTANCE;
        case 1:
            return new Tuple1();
        case 2:
            return new Tuple2();
        case 3:
            return new Tuple3();
        case 4:
            return new Tuple4();
        case 5:
            return new Tuple5();
        case 6:
            return new Tuple6();
        case 7:
            return new Tuple7();
        case 8:
            return new Tuple8();
        case 9:
            return new Tuple9();
        case 10:
            return new Tuple10();
        case 11:
            return new Tuple11();
        case 12:
            return new Tuple12();
        case 13:
            return new Tuple13();
        case 14:
            return new Tuple14();
        case 15:
            return new Tuple15();
        case 16:
            return new Tuple16();
        case 17:
            return new Tuple17();
        case 18:
            return new Tuple18();
        case 19:
            return new Tuple19();
        case 20:
            return new Tuple20();
        case 21:
            return new Tuple21();
        case 22:
            return new Tuple22();
        case 23:
            return new Tuple23();
        case 24:
            return new Tuple24();
        case 25:
            return new Tuple25();
        default:
            throw new IllegalArgumentException("The tuple arity must be in [0, 25].");
        }
    }
}

Looking at the source code, we see that Tuple0 1 until Tuple25

Let's see that flink has constructed a template class with 0-25 fields for us


ackage org.apache.flink.api.java.tuple;
​
import java.io.ObjectStreamException;
import org.apache.flink.annotation.Public;
​
@Public
public class Tuple0 extends Tuple {
    private static final long serialVersionUID = 1L;
    public static final Tuple0 INSTANCE = new Tuple0();
​
    public Tuple0() {
    }
​
    public int getArity() {
        return 0;
    }
​
    public <T> T getField(int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public <T> void setField(T value, int pos) {
        throw new IndexOutOfBoundsException(String.valueOf(pos));
    }
​
    public Tuple0 copy() {
        return new Tuple0();
    }
​
    public String toString() {
        return "()";
    }
​
    public boolean equals(Object o) {
        return this == o || o instanceof Tuple0;
    }
​
    public int hashCode() {
        return 0;
    }
​
    private Object readResolve() throws ObjectStreamException {
        return INSTANCE;
    }
}

3. Use of Tuple

Mode 1: Initialize tuples

The static method newInstance can be used for tuple construction to specify tuple space size;

ex: 1, the tuple has only 1 space, and the actual Tuple1 field used is only f0

ex: 12, the tuple has only two spaces, and the actual Tuple2 fields used are only f0, f1

Specify the size of Tuple tuple space (understood as the number of fields)


Tuple tuple = Tuple.newInstance(1);

Mode 1: Construct tuples

Use Tuple. newInstance (xx), specify the size of the tuple space, although such access can be achieved, but there will be storage index location incorrect use of the situation, may be due to errors in the operation to write the index out of bounds exception, and use is not very convenient, use Tuplex. of (data) method to construct Tuple tuple


Tuple3<String, String, String> tuple3 = Tuple3.of("test0", "test1", "test2");
System.out.println(tuple3.f0); // test0
System.out.println(tuple3.f1); // test1
System.out.println(tuple3.f2); // test2

4. POJO Type of Flink

The classes of Java and Scala are treated exclusively by Flink as special POJO data types when the following conditions are met:

1. It is a public class;

2. Parametric constructs are common;

3. All attributes are available (declared public, or provided with get, set methods);

4. The type of the field must be supported by Flink. Flink uses Avro to serialize arbitrary objects.

Flink parses the structure of the POJO type for the fields of POJO. The POJO type works better than the 1 type. In addition, Flink accesses POJO more efficiently than 1-type.


public class WordWithCount {
    public String word;
    public int count;
    public WordWithCount() {}
    public WordWithCount(String word, int count) { this.word = word; this.count = count; }
}
    DataStream<WordWithCount> wordCounts = env.fromElements(
    new WordWithCount("hello", 1),
    new WordWithCount("world", 2));
    wordCounts.keyBy("word");

5. Basic types of Flink

Flink supports all the basic data types of Java and Scala, such as Integer, String, and Double.

6. Common types of Flink

Flink supports most Java, Scala classes (API and custom). Classes that contain fields that cannot be serialized can also be supported after adding 1 restriction. Class 1 that follows the Java Bean specification is generally available.

All classes Flink that cannot be considered POJO are treated as generic classes. These data types are treated as black boxes, and their contents are invisible. Generic classes use Kryo for sequencing/deserialization.

7. Value Type Values for Flink

The read and write methods that implement the org. apache. flinktypes. Value interface provide custom code for serialization/deserialization rather than using the generic serialization framework.

The predefined value type of Flink corresponds to the native data type 11 (for example: ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue). These value types are variants of native data types, and their values can be changed, allowing programs to reuse objects and thus relieve the pressure of GC.

8. Class Writable of Hadoop of Flink

It implements the type of org. apache. hadoop. Writable interface, the serialization logic of which is implemented in the write () and readFields () methods.

9. Special Type of Flink

There are two special types of Flink:

1. Either, Option and Try of Scala.

2. Java ApI has its own Either implementation.

Java Api is similar to Scala Either That represents two possible types of values, Left or Right. Either Operators that are useful for error handling or need to output two different types of records.

Type erasure and type inference

The Java compiler discards a lot of generic type information after compilation. This is called type erasure in Java. This means that at run time, instances of objects no longer know their generic types.

For example, in JVM, DataStream < String > And DataStream < Long > Examples of look the same.


List<String> l1 = new ArrayList<String>();
List<Integer> l2 = new ArrayList<Integer>();
System.out.println(l1.getClass() == l2.getClass());

Generics: A more accurate statement is to parameterize a type, or to pass a type as an argument to a class or method. Java API of Flink attempts to reconstruct (and type-reason) the discarded type information and store it explicitly in the dataset and in the operation. You can get the type through the DataStream. getType () method, which will return an instance of TypeInformation, which is the internal representation of the type in Flink.


Related articles: