当前位置:网站首页>Spark data format unsafe row
Spark data format unsafe row
2022-07-26 17:48:00 【InfoQ】
1. brief introduction
org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema2. Class properties
- private Object baseObject; // The entire row of data is stored on this object , Generally, it is a byte array byte[], Under what circumstances are other types ?
- private long baseOffset; //baseObject Even an array , But it's also a java object ,baseOffset Record baseObject Type of object header Occupied memory space , Array objects in 64 position jvm In general 16
- private int numFields; // The number of fields in a row
- private int sizeInBytes;// It records the number of bytes occupied by the current row of data =baseObject Total capacity - baseOffset - Unused capacity , If there is string Equal variable length type field , The memory allocated may be larger than the actual )
- private int bitSetWidthInBytes; // The number of bytes used to record the null field , Each byte takes up 1bit, therefore 64 Within fields 1 byte ,65-128 Fields take up 2 byte , And so on
- public static final Set<DataType> mutableFieldTypes; // stay UnsafeRow Field types that can be modified in , Because this part of the type is baseObject Is stored in a fixed location with a fixed length , So you can modify ; Variable types share :NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType
3. Distribution of memory

- null bit set: Used to indicate that those fields are null value , One field occupies 1bit, For total size bitSetWidthInBytes Express : size =(( Number of fields + 63)/ 64) * 8;
- values: In the area , Each field will occupy 8 Bytes , Each field has been assigned when initializing . If it's a variable type (mutableFieldTypes) Field of , Store the value of this field directly ; If the field is an immutable type , Then only the offset( With baseOffset Is the relative offset of the benchmark , Not relative base address baseObject) And size, The two are merged into one long type ( high 32 Position as offset, low 32 Position as size), The actual value is stored in
variable length portion
- variable length portion: The specific value data of all immutable fields are stored adjacent , There may be some space left
4. UnsafeRow The creation process
case class Person(id: Long, id2: Long, id3: String)
val e = Encoders.product[Person]
val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]
val person = Person(2, 7,"abcdefghijklmnopqrst")
val row = personExprEncoder.toRow(person) // This is a UnsafeRow object , And baseObject by byte[64], For why 64, The following analysis
println(row.getLong(0))
println(row.getString(2))
toRowabstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}
GenerateUnsafeProjection#createclass SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private Object[] references;
private boolean resultIsNull_0;
private boolean globalIsNull_0;
private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray_2 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
private UnsafeRow[] mutableStateArray_1 = new UnsafeRow[1];
public SpecificUnsafeProjection(Object[] references) {
this.references = references;
mutableStateArray_1[0] = new UnsafeRow(3); // establish UnsafeRow example ,3 A field :id,id2,id3
mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);
mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
}
public UnsafeRow apply(InternalRow i) {
mutableStateArray_2[0].reset();
mutableStateArray_3[0].zeroOutNullBytes();
writeFields_0_0(i);
writeFields_0_1(i);
mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());
return mutableStateArray_1[0];
}
// Initialize fields id3
private void writeFields_0_1(InternalRow i) {
UTF8String value_13 = StaticInvoke_0(i);
if (globalIsNull_0) {
mutableStateArray_3[0].setNullAt(2);
} else {
mutableStateArray_3[0].write(2, value_13);
}
}
// Initialize fields id,id1
private void writeFields_0_0(InternalRow i) {
boolean isNull_3 = i.isNullAt(0);
com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_0 = value_3.id();
if (isNull_0) {
mutableStateArray_3[0].setNullAt(0);
} else {
mutableStateArray_3[0].write(0, value_0);
}
com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_4 = value_7.id2();
if (isNull_4) {
mutableStateArray_3[0].setNullAt(1);
} else {
mutableStateArray_3[0].write(1, value_4);
}
}
}
public BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
"Cannot create BufferHolder for input UnsafeRow because there are " +
"too many fields (number of fields: " + row.numFields() + ")");
}
this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); // Fixed length
this.buffer = new byte[fixedSize + initialSize]; // namely UnsafeRow.baseObject
this.row = row;
this.row.pointTo(buffer, buffer.length);
}
GenerateUnsafeProjection#createCode Initial memory = fixedSize + initialSize
= (bitsetWidthInBytes + 8* Total number of fields ) + ( Variable number of fields *32)
= 8+8*3+1*32
= 64
baseOffset+fixedSizeSpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write- For variable type fields, see 1 A field id, First calculate the absolute offset
offset=baseOffset + bitSetWidthInBytes + 0 * 8L, Then write directly to this position , Corresponding values Section of the area 1 individual 8 byte
- For fields of immutable type, such as 3 A field id3, The process of writing is as follows :
public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes(); // Calculation id3 Bytes of ,20 Letters , Occupy 20 byte
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); // Need to be for 8 Number of digits ,32>=20,32 Bytes will be allocated to id3 Value
holder.grow(roundedSize); // Expanding memory dynamically , just initialSize by 32, So there is no need to expand this time
zeroOutPaddingBytes(numBytes);
input.writeToMemory(holder.buffer, holder.cursor); //id3 For the first immutable field , therefore cursor It just points to variable length portion The starting position of the area 48
setOffsetAndSize(ordinal, numBytes); // Set up id3 Relative offset of offset=(cursor-baseOffset)=32 and size=numBytes=20
holder.cursor += roundedSize; //cursor Move back 32 byte , Representing the next immutable field offset
}

5. serialize
ExternalizableKryoSerializable@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}
- Serialization , No need. UnsafeRow The object itself is serialized into a binary stream , Put... Directly baseOject This binary array can be input into the stream .
- During deserialization, the binary array is also read directly from the input stream to UnsafeRow In the object
6. summary
- Data is stored in byte arrays , Reduce java Object to reduce additional memory overhead
- java Object reduction , And less gc The cost of
- shuffle When the process data is transmitted through the network , Data eliminates serialization and deserialization , And the data transmission size is also greatly reduced
7. Reference resources
边栏推荐
- (24)Blender源码分析之顶层菜单显示代码分析
- CCS TM4C123新建工程
- SQL注入(思维导图)
- VS Code 格式化后 自动让函数名后有空格
- [cloud native] IVX low code development was introduced into Tencent map and previewed online
- Interview with celebrities | open source is a double-edged sword for security -- Wei Jianfan, author of the Chinese translation of cathedral and market
- Kudu design tablet
- kudu设计-tablet
- 就这一次!详细聊聊分布式系统的那些技术方案
- Is it really safe and reliable to exempt five in case of opening an account in a stock company
猜你喜欢
![[virtual machine data recovery] data recovery cases in which XenServer virtual machine is unavailable due to accidental power failure and virtual disk files are lost](/img/99/e5404a09ec7f52a7c5d7be23e43e85.jpg)
[virtual machine data recovery] data recovery cases in which XenServer virtual machine is unavailable due to accidental power failure and virtual disk files are lost

Asemi rectifier bridge kbpc2510, kbpc2510 parameters, kbpc2510 specifications

即刻报名|飞桨黑客马拉松第三期盛夏登场,等你挑战

The latest interface of Taobao / tmall keyword search

Come on developer! Not only for the 200000 bonus, try the best "building blocks" for a brainstorming!

Heavy! The 2022 China open source development blue book was officially released

图解用户登录验证流程,写得太好了!

【OpenCV 例程 300篇】240. OpenCV 中的 Shi-Tomas 角点检测

【机器学习】Mean Shift原理及代码

ASEMI整流桥KBPC2510,KBPC2510参数,KBPC2510规格书
随机推荐
数据库使用psql及jdbc进行远程连接,不定时自动断开的解决办法
After vs code is formatted, the function name will be automatically followed by a space
Kudu design tablet
AI遮天传 ML-集成学习
The diagram of user login verification process is well written!
How to use align regexp to align userscript meta information
How to set IP for layer 2 management switches
图解用户登录验证流程,写得太好了!
Shrimp Shope get commodity details according to ID API return value description
树形dp问题
【集训Day3】section
#夏日挑战赛# OpenHarmony基于JS实现的贪吃蛇
兆骑科创海外高层次人才引进平台,创业赛事活动路演
Interview with celebrities | open source is a double-edged sword for security -- Wei Jianfan, author of the Chinese translation of cathedral and market
Is it really safe and reliable to exempt five in case of opening an account in a stock company
【集训Day1】Spy dispatch
COSCon'22城市/学校/机构出品人征集令
ACL experiment demonstration (Huawei router device configuration)
Summer Challenge openharmony greedy snake based on JS
Heavy! The 2022 China open source development blue book was officially released