前言

OkIo 其实通过分析 OkIo 的源码, 无非是对 IO 操作的一层封装。 Okhttp 中使用到的, 很基础, 使用起来也很方便, 我觉得对于像网络或者其他读取写入速度不匹配的情况下使用 OkIo 还是挺有用呢, 数据是环形链表串联起来的。 如果仅仅读取本地一个文件, 读多少处理多少的话, 不是贪图其封装好了使用方便, 其作用也不大。 下面就对最简单的文件读取或者写文件进行简单分析, 让我们可以更清晰的认识和使用它。

简单的使用

对源码的分析, 必须要有一个入口呢。 对于 OkIo 来说无非读取文件和写文件。 那么好, 先写出最简单的读取和写入, 然后一个一个的分析。

读取文件

1
2
3
4
5
6
7
8
9
val file = File("./xiaoxige")
val source = file.source()
source.use {
val buffer = Buffer()
val readLength = 1024L
while (source.read(buffer, readLength) > 0) {
println(buffer.readByteString().utf8())
}
}

写入文件

1
2
3
4
val sink = file.appendingSink()
val buffer = Buffer()
buffer.writeUtf8("xiaoxige")
sink.write(buffer, buffer.size)

源码分析

通过上面最简单的读取文件和写文件, 可以看到数据都是以 Buffer 这个类作为载体呢, 通过名字也可以看的出来。 那么我们就先看看这个 Buffer。

1
2
3
4
5
6
actual class Buffer : BufferedSource, BufferedSink, Cloneable, ByteChannel {
@JvmField internal actual var head: Segment? = null
actual var size: Long = 0L
internal set
// ......
}

Segment

Buffer 里肯定有数据存的地方呢。 其中 Segment 类型的就是啦。 可以从变量名称 head 可以看出它是一个头部, 应该是一个链式的存储结构, 通过后面我们可以看出它是一个首位相接的单链表。 数据都是存在 Segment 中, 不管是读取还是写入 Segment 就是一个数据载体, 所以我们看看它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
internal class Segment {
@JvmField val data: ByteArray
@JvmField var pos: Int = 0
@JvmField var limit: Int = 0
@JvmField var shared: Boolean = false
@JvmField var owner: Boolean = false
@JvmField var next: Segment? = null
@JvmField var prev: Segment? = null

constructor() {
this.data = ByteArray(SIZE)
this.owner = true
this.shared = false
}

constructor(data: ByteArray, pos: Int, limit: Int, shared: Boolean, owner: Boolean) {
this.data = data
this.pos = pos
this.limit = limit
this.shared = shared
this.owner = owner
}

fun sharedCopy(): Segment {
shared = true
return Segment(data, pos, limit, true, false)
}

fun unsharedCopy() = Segment(data.copyOf(), pos, limit, false, true)

fun pop(): Segment? {
val result = if (next !== this) next else null
prev!!.next = next
next!!.prev = prev
next = null
prev = null
return result
}

fun push(segment: Segment): Segment {
segment.prev = this
segment.next = next
next!!.prev = segment
next = segment
return segment
}

fun split(byteCount: Int): Segment {
require(byteCount > 0 && byteCount <= limit - pos) { "byteCount out of range" }
val prefix: Segment
if (byteCount >= SHARE_MINIMUM) {
prefix = sharedCopy()
} else {
prefix = SegmentPool.take()
data.copyInto(prefix.data, startIndex = pos, endIndex = pos + byteCount)
}

prefix.limit = prefix.pos + byteCount
pos += byteCount
prev!!.push(prefix)
return prefix
}

fun compact() {
check(prev !== this) { "cannot compact" }
if (!prev!!.owner) return // Cannot compact: prev isn't writable.
val byteCount = limit - pos
val availableByteCount = SIZE - prev!!.limit + if (prev!!.shared) 0 else prev!!.pos
if (byteCount > availableByteCount) return // Cannot compact: not enough writable space.
writeTo(prev!!, byteCount)
pop()
SegmentPool.recycle(this)
}

fun writeTo(sink: Segment, byteCount: Int) {
check(sink.owner) { "only owner can write" }
if (sink.limit + byteCount > SIZE) {
if (sink.shared) throw IllegalArgumentException()
if (sink.limit + byteCount - sink.pos > SIZE) throw IllegalArgumentException()
sink.data.copyInto(sink.data, startIndex = sink.pos, endIndex = sink.limit)
sink.limit -= sink.pos
sink.pos = 0
}

data.copyInto(
sink.data, destinationOffset = sink.limit, startIndex = pos,
endIndex = pos + byteCount
)
sink.limit += byteCount
pos += byteCount
}

companion object {
/** The size of all segments in bytes. */
const val SIZE = 8192

/** Segments will be shared when doing so avoids `arraycopy()` of this many bytes. */
const val SHARE_MINIMUM = 1024
}
}

上面的整个类其实代码不多, 本想删除不重要的部分, 但发现都是有必要写出来的。
我们先看下变量, data 这个就是存储元数据的空间, pos 是当前 Segment 的当前数据头部, 可以理解为数据段的起点, limit 可以理解为当前数据的有效结束点。 next 和 prev 是两个指针, 如果有前后数据的话, 就指向前后两个数据的地址, 如果当前就仅仅有当前 Segment 那么 next 和 prev 都指向自己, 还需要注意的是, 该结构是一个环形的, 头尾相连, 所以第一个数据的 head 的 prev 如果不是自己的话, 那就是最后一个 Segment 数据的地址。
我们看下方法(具体逻辑很简单, 这里简单的说下其作用): pop, 其作用就是把当前的节点从当前链中去掉, 切断和重新链接保证链的正确性。 push, 在当前节点后面添加一个新的节点, 当前也是要切断和重新链接保证链的正确性。 split, 分割, 把当前节点分割成两个节点。 compact, 合并两个节点。 writeTo, 把当前的数据复制到指定的节点中。

Source

我们来看看读取文件, 为了方便我把上面简易的读取复制下来。

1
2
3
4
5
6
7
8
9
val file = File("./xiaoxige")
val source = file.source()
source.use {
val buffer = Buffer()
val readLength = 1024L
while (source.read(buffer, readLength) > 0) {
println(buffer.readByteString().utf8())
}
}

我们看看 source 是什么。

1
2
3
4
5
interface Source : Closeable {
fun read(sink: Buffer, byteCount: Long): Long
fun timeout(): Timeout
override fun close()
}

可以看出 source 是一个协议, 我们想读取各种规则的文件可以直接复写规则即可。
我们可以看到 file.source() 返回的是 InputStreamSource。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
fun File.source(): Source = InputStreamSource(inputStream(), Timeout.NONE)

public inline fun File.inputStream(): FileInputStream {
return FileInputStream(this)
}

private open class InputStreamSource(
private val input: InputStream,
private val timeout: Timeout
) : Source {

override fun read(sink: Buffer, byteCount: Long): Long {
if (byteCount == 0L) return 0L
require(byteCount >= 0L) { "byteCount < 0: $byteCount" }
try {
timeout.throwIfReached()
val tail = sink.writableSegment(1)
val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt()
val bytesRead = input.read(tail.data, tail.limit, maxToCopy)
if (bytesRead == -1) {
if (tail.pos == tail.limit) {
// We allocated a tail segment, but didn't end up needing it. Recycle!
sink.head = tail.pop()
SegmentPool.recycle(tail)
}
return -1
}
tail.limit += bytesRead
sink.size += bytesRead
return bytesRead.toLong()
} catch (e: AssertionError) {
if (e.isAndroidGetsocknameError) throw IOException(e)
throw e
}
}

override fun close() = input.close()

override fun timeout() = timeout

override fun toString() = "source($input)"
}

其中文件的流操作还是代理了 FileInputStream, 然后读取文件时, 运行 read 方法, 不管怎么存储吧, 最后还是交给 文件流 FileInputStream 去读取了文件数据。 当前中间我们读取的存储还是很有必要看看呢。 主要关注下 Segment 的逻辑处理。
我们看下 sink.writableSegment(1) 方法, 最终调用到:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
internal inline fun Buffer.commonWritableSegment(minimumCapacity: Int): Segment {
require(minimumCapacity >= 1 && minimumCapacity <= Segment.SIZE) { "unexpected capacity" }

if (head == null) {
val result = SegmentPool.take() // Acquire a first segment.
head = result
result.prev = result
result.next = result
return result
}

var tail = head!!.prev
if (tail!!.limit + minimumCapacity > Segment.SIZE || !tail.owner) {
tail = tail.push(SegmentPool.take()) // Append a new empty segment to fill up.
}
return tail
}

我们可以通过上面的代码可以看到, 如果 head 为空的话, 没有头结点, 说明还没有数据呢, 直接新建一个节点, 在这里可以看到如果只有 head 节点的话, next 和 prev 都是指向自己呢。 当不为空的话, 那么找到尾节点, 如果尾节点的剩余空间已经不能再装了, 或者不是当前节点的权限(owner), 那么就新增一个节点并链接到尾部, 返回出去一个尾节点, 可以看到返回出去的节点第一肯定是有空间的, 第二如果当前尾节点有足够的空间就会返回出去, 节约了空间。
这里我们有一个问题需要思考, 为什么传入了 byteCount, 为啥每次都是去拿 Segment 传入的 minimumCapacity 都是 1 呢。 这里大家可以好好想想呢, 这样主要是为了可以解决空间, 当然可能循环的次数会增加一点点。 我们每次传入 1, 只要尾节点还存在 1 个空间就会返回当前的尾节点, 而非重新产生一个新的尾节点, 保证了一个空间都不浪费。

Sink

为了方便把上面简单的写入文件复制下来

1
2
3
4
val sink = file.appendingSink()
val buffer = Buffer()
buffer.writeUtf8("xiaoxige")
sink.write(buffer, buffer.size)

我们先看看 Sink 的协议:

1
2
3
4
5
6
7
8
9
10
11
12
actual interface Sink : Closeable, Flushable {
@Throws(IOException::class)
actual fun write(source: Buffer, byteCount: Long)

@Throws(IOException::class)
actual override fun flush()

actual fun timeout(): Timeout

@Throws(IOException::class)
actual override fun close()
}

通过 Source 的分析, 可以猜测和点进去 appendingSink 可以看到, 其仅仅也是代理了 FileOutputStream 而已, 我们进到 write 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
override fun write(source: Buffer, byteCount: Long) {
checkOffsetAndCount(source.size, 0, byteCount)
var remaining = byteCount
while (remaining > 0) {
timeout.throwIfReached()
val head = source.head!!
val toCopy = minOf(remaining, head.limit - head.pos).toInt()
out.write(head.data, head.pos, toCopy)

head.pos += toCopy
remaining -= toCopy
source.size -= toCopy

if (head.pos == head.limit) {
source.head = head.pop()
SegmentPool.recycle(head)
}
}
}

是不是很简单, 就是从 head 开始一个一个的读取, 如果当前节点已经读取完成了, 就进行回收。

总结

source 是读取操作, sink 是写入操作, 仅仅是对文件流操作的一个封装。 其特点可能需要我们对 Segment 的数据载体进行分析和学习。 source 会把读取到的数据放入到 Segment 环形链表中, 然后 sink 会通过 head 节点开始读取数据进行写入操作。 仅此而已。 可以闭上眼睛去想想, 前面一直在生产并构造和写入链表中, 然后后面一直按顺序进行从链中读取并消费, 反反复复。