当前位置:网站首页>Spark data format unsafe row
Spark data format unsafe row
2022-07-26 17:48:00 【InfoQ】
1. brief introduction
org.apache.spark.sql.Row/GenericRow/GenericRowWithSchema2. Class properties
- private Object baseObject; // The entire row of data is stored on this object , Generally, it is a byte array byte[], Under what circumstances are other types ?
- private long baseOffset; //baseObject Even an array , But it's also a java object ,baseOffset Record baseObject Type of object header Occupied memory space , Array objects in 64 position jvm In general 16
- private int numFields; // The number of fields in a row
- private int sizeInBytes;// It records the number of bytes occupied by the current row of data =baseObject Total capacity - baseOffset - Unused capacity , If there is string Equal variable length type field , The memory allocated may be larger than the actual )
- private int bitSetWidthInBytes; // The number of bytes used to record the null field , Each byte takes up 1bit, therefore 64 Within fields 1 byte ,65-128 Fields take up 2 byte , And so on
- public static final Set<DataType> mutableFieldTypes; // stay UnsafeRow Field types that can be modified in , Because this part of the type is baseObject Is stored in a fixed location with a fixed length , So you can modify ; Variable types share :NullType,BooleanType,ByteType,ShortType,IntegerType,LongType,FloatType,DoubleType,DateType,TimestampType,DecimalType
3. Distribution of memory

- null bit set: Used to indicate that those fields are null value , One field occupies 1bit, For total size bitSetWidthInBytes Express : size =(( Number of fields + 63)/ 64) * 8;
- values: In the area , Each field will occupy 8 Bytes , Each field has been assigned when initializing . If it's a variable type (mutableFieldTypes) Field of , Store the value of this field directly ; If the field is an immutable type , Then only the offset( With baseOffset Is the relative offset of the benchmark , Not relative base address baseObject) And size, The two are merged into one long type ( high 32 Position as offset, low 32 Position as size), The actual value is stored in
variable length portion
- variable length portion: The specific value data of all immutable fields are stored adjacent , There may be some space left
4. UnsafeRow The creation process
case class Person(id: Long, id2: Long, id3: String)
val e = Encoders.product[Person]
val personExprEncoder = e.asInstanceOf[ExpressionEncoder[Person]]
val person = Person(2, 7,"abcdefghijklmnopqrst")
val row = personExprEncoder.toRow(person) // This is a UnsafeRow object , And baseObject by byte[64], For why 64, The following analysis
println(row.getLong(0))
println(row.getString(2))
toRowabstract class UnsafeProjection extends Projection {
override def apply(row: InternalRow): UnsafeRow
}
GenerateUnsafeProjection#createclass SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
private Object[] references;
private boolean resultIsNull_0;
private boolean globalIsNull_0;
private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] mutableStateArray_2 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[1];
private java.lang.String[] mutableStateArray_0 = new java.lang.String[1];
private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[1];
private UnsafeRow[] mutableStateArray_1 = new UnsafeRow[1];
public SpecificUnsafeProjection(Object[] references) {
this.references = references;
mutableStateArray_1[0] = new UnsafeRow(3); // establish UnsafeRow example ,3 A field :id,id2,id3
mutableStateArray_2[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mutableStateArray_1[0], 32);
mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mutableStateArray_2[0], 3);
}
public UnsafeRow apply(InternalRow i) {
mutableStateArray_2[0].reset();
mutableStateArray_3[0].zeroOutNullBytes();
writeFields_0_0(i);
writeFields_0_1(i);
mutableStateArray_1[0].setTotalSize(mutableStateArray_2[0].totalSize());
return mutableStateArray_1[0];
}
// Initialize fields id3
private void writeFields_0_1(InternalRow i) {
UTF8String value_13 = StaticInvoke_0(i);
if (globalIsNull_0) {
mutableStateArray_3[0].setNullAt(2);
} else {
mutableStateArray_3[0].write(2, value_13);
}
}
// Initialize fields id,id1
private void writeFields_0_0(InternalRow i) {
boolean isNull_3 = i.isNullAt(0);
com.test.scala.EncoderScala$Person value_3 = isNull_3 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_0 = value_3.id();
if (isNull_0) {
mutableStateArray_3[0].setNullAt(0);
} else {
mutableStateArray_3[0].write(0, value_0);
}
com.test.scala.EncoderScala$Person value_7 = isNull_7 ? null : ((com.test.scala.EncoderScala$Person) i.get(0, null));
long value_4 = value_7.id2();
if (isNull_4) {
mutableStateArray_3[0].setNullAt(1);
} else {
mutableStateArray_3[0].write(1, value_4);
}
}
}
public BufferHolder(UnsafeRow row, int initialSize) {
int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
throw new UnsupportedOperationException(
"Cannot create BufferHolder for input UnsafeRow because there are " +
"too many fields (number of fields: " + row.numFields() + ")");
}
this.fixedSize = bitsetWidthInBytes + 8 * row.numFields(); // Fixed length
this.buffer = new byte[fixedSize + initialSize]; // namely UnsafeRow.baseObject
this.row = row;
this.row.pointTo(buffer, buffer.length);
}
GenerateUnsafeProjection#createCode Initial memory = fixedSize + initialSize
= (bitsetWidthInBytes + 8* Total number of fields ) + ( Variable number of fields *32)
= 8+8*3+1*32
= 64
baseOffset+fixedSizeSpecificUnsafeProjection#writeFields_0_0/writeFields_0_1->UnsafeRowWriter#write- For variable type fields, see 1 A field id, First calculate the absolute offset
offset=baseOffset + bitSetWidthInBytes + 0 * 8L, Then write directly to this position , Corresponding values Section of the area 1 individual 8 byte
- For fields of immutable type, such as 3 A field id3, The process of writing is as follows :
public void write(int ordinal, UTF8String input) {
final int numBytes = input.numBytes(); // Calculation id3 Bytes of ,20 Letters , Occupy 20 byte
final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes); // Need to be for 8 Number of digits ,32>=20,32 Bytes will be allocated to id3 Value
holder.grow(roundedSize); // Expanding memory dynamically , just initialSize by 32, So there is no need to expand this time
zeroOutPaddingBytes(numBytes);
input.writeToMemory(holder.buffer, holder.cursor); //id3 For the first immutable field , therefore cursor It just points to variable length portion The starting position of the area 48
setOffsetAndSize(ordinal, numBytes); // Set up id3 Relative offset of offset=(cursor-baseOffset)=32 and size=numBytes=20
holder.cursor += roundedSize; //cursor Move back 32 byte , Representing the next immutable field offset
}

5. serialize
ExternalizableKryoSerializable@Override
public void writeExternal(ObjectOutput out) throws IOException {
byte[] bytes = getBytes();
out.writeInt(bytes.length);
out.writeInt(this.numFields);
out.write(bytes);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.baseOffset = BYTE_ARRAY_OFFSET;
this.sizeInBytes = in.readInt();
this.numFields = in.readInt();
this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
this.baseObject = new byte[sizeInBytes];
in.readFully((byte[]) baseObject);
}
- Serialization , No need. UnsafeRow The object itself is serialized into a binary stream , Put... Directly baseOject This binary array can be input into the stream .
- During deserialization, the binary array is also read directly from the input stream to UnsafeRow In the object
6. summary
- Data is stored in byte arrays , Reduce java Object to reduce additional memory overhead
- java Object reduction , And less gc The cost of
- shuffle When the process data is transmitted through the network , Data eliminates serialization and deserialization , And the data transmission size is also greatly reduced
7. Reference resources
边栏推荐
- kudu设计-tablet
- A collection of commonly used shortcut keys for office software
- [300 opencv routines] 240. Shi Tomas corner detection in opencv
- 第16周OJ实践1 计算该日在本年中是第几天
- 跨站脚本攻击(XSS)
- Interview with celebrities | open source is a double-edged sword for security -- Wei Jianfan, author of the Chinese translation of cathedral and market
- 云渲染-体积云【理论基础与实现方案】
- AI遮天传 DL-回归与分类
- 6-19漏洞利用-nsf获取目标密码文件
- 使用 Dired 快速移动文件
猜你喜欢

【集训Day1】 Dwarves line up

敏捷开发与DevOps的对比

ASEMI整流桥KBPC2510,KBPC2510参数,KBPC2510规格书

Open source kaggle cat and dog data set -- used in classic CNN classification practice

跨站点请求伪造(CSRF)

机器视觉在服务机器人中的应用

【机器学习】Mean Shift原理及代码

重磅!《2022中国开源发展蓝皮书》正式发布

Asemi rectifier bridge kbpc3510, kbpc3510 package, kbpc3510 application

JS function scope variables declare that the variables that promote the scope chain without VaR are global variables
随机推荐
Pytest (mind map)
kaggle猫狗数据集开源——用于经典CNN分类实战
On the growth of data technicians
ACL实验演示(Huawei路由器设备配置)
Hardware development and market industry
Comparison between agile development and Devops
The chess robot broke the finger of a 7-year-old boy because "the chess player violated safety rules"?
JS recursive Fibonacci sequence deep cloning
A detailed explanation of throughput, QPS, TPS, concurrency and other high concurrency indicators
[cloud native] IVX low code development was introduced into Tencent map and previewed online
Pay attention to the traffic safety warning of tourism passenger transport issued by the Ministry of public security
Diagram of seven connection modes of MySQL
CCS tm4c123 new project
如何使用 align-regexp 对齐 userscript 元信息
使用 Dired 快速移动文件
解决哈希冲突的几种方式
SQL中去去重的三种方式
带你熟悉云网络的“电话簿”:DNS
How to write plug-ins quickly with elisp
点击劫持攻击