call 的调用流程分析.txt 5.37 KB
call 的调用流程分析

一个lua vm 中 任意时刻 只能运行一个 coroutine
导入重入的原因是,call 操作中 让出执行权,另一个coroutine 修改了某个值,则唤醒call的coroutine时,再来操作那个值,可能已经改变了

coroutine 理解:
---------------------------------------------------
function foo (a)
	print("foo", a)
	return coroutine.yield(2*a)
end

co = coroutine.create(function (a,b)
	print("co-body", a, b)
	local r = foo(a+1)
	print("co-body", r)
	local r, s = coroutine.yield(a+b, a-b)
	print("co-body", r, s)
	return b, "end"
end)

print("main", coroutine.resume(co, 1, 10))
print("main", coroutine.resume(co, "r"))
print("main", coroutine.resume(co, "x", "y"))
print("main", coroutine.resume(co, "x", "y"))
当你运行它,将产生下列输出:

co-body 1       10
foo     2
main    true    4
co-body r
main    true    11      -9
co-body x       y
main    true    10      end
main    false   cannot resume dead coroutine

coroutine.resume 将会返回 yield的参数
继续执行coroutine.resume,coroutine.yield将返回coroutine.resume除第一个参数的其他参数

coroutine.resume
开始或继续协程 co 的运行。 当你第一次延续一个协程,它会从主体函数处开始运行。
 val1, ... 这些值会以参数形式传入主体函数。 如果该协程被让出,resume 会重新启动它;
  val1, ... 这些参数会作为让出点的返回值。

如果协程运行起来没有错误, resume 返回 true 加上传给 yield 的所有值 (当协程让出),
 或是主体函数的所有返回值(当协程中止)。 如果有任何错误发生, resume 返回 false 加错误消息。
---------------------------------------------------
A call B

skynet.call(addr, typename, ...)

[A]
1. 根据 typename 找到 消息的 pack 和 unpack 类型
2. 将消息发送出去 将消息 pack 压入 对方的消息队列
3. 将当前coroutine 让出执行权,

考虑 skynet.call 所在coroutine
在dispatch 消息中
494: suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
suspend(co, true, "CALL", session)
suspend(co, result, command, param, size)
result = true, command = "CALL", param = session

4. 初始化 session_id_coroutine[session] = co

[B]
1. 通过dispatch_message,处理完相关请求
2. 若中间出现错误,将错误信息转发至A, 正常情况下通过skynet.ret()返回

考虑处理消息所在coroutine
在dispatch 消息中 (记录 co->session, so->source)
494: suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...)))
suspend(co, true, "RETURN", msg, sz)
suspend(co, result, command, param, size)
result = true, command = "RETURN", param = msg, size = sz
c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size)
将消息发送回去

考虑在B coroutine中 发生错误
在 lua-skynet.c _cb函数中,对消息分发函数(lua function)作了pcall处理,若发生错误,将错误内容输出到logger服务中
在 skynet.lua 中 raw_dispatch_message 函数中
suspend(co, coroutine.resume(co, session, source, p.unpack(msg,sz, ...)))

若 coroutine.resume 失败
将返回 false, 以及错误信息
在 suspend 函数中  if not result then  判断下 将错误信息发送给A(这里有更多信息) 并掉用
error(debug.traceback(co,tostring(command)))
将错误堆栈 让 c层 捕获到 输出到 logger服务中

[A]
1. 从 session_id_coroutine 中 根据session找到coroutine, 并置空session_id_coroutine
2. 在raw_dispatch_message 中
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
处理suspend(co, coroutine.resume(co, true, msg, sz))
这时候在yield_call中
local succ, msg, sz = coroutine_yield("CALL", session)
返回coroutine.resume 除第一个参数的其他参数
local succ, msg, sz = true, msg, sz

====================================================================
send 操作就比较简单,A端发送出去就不管了,B端发生错误也不会理睬,
它在 B 端 dispatch_message 阶段 suspend 中 走的 command == nil


重要的数据结构
在被调用的对象中,记录调用者信息
session_coroutine_id[co] = session
session_coroutine_address[co] = sourceAddr
因为 coroutine 是需要被复用的,所以coroutine退出时候,会清理这两个结构

调用者
watching_session[targetAddr] = session
在挂起当前协程的时候,记录 地址和session
1. 出错信息
2. 当前vm退出,可告知被call对方,出错信息

====================================================================

queue 的实现
当运行该coroutine的时候,调用skynet.wake,生成session,coroutine_yield("SLEEP", session)

在 suspend 中,设置
session_id_coroutine[session] = co
sleep_session[co] = session

dispatch_wakeup 中
coroutine.resume(co, false, "BREAK"))
继续运行
sleep_session[co] = nil
session_id_coroutine[session] = nil

function ()
	option {
		yield "sleep"
	}

	# A call B, 则 A 是 yield "call"; B 是 yield "return"
	operation {
		option yield "call"
		option yield "return"
	}

	yield "exit"
end

====================================================================

redirect 实现
skynet.redirect = function(dest, source, typename, ...)
	return c.redirect(dest, source, proto[typename].id, ...)
end

使用范例:
	skynet.redirect(agent, c.client or 0, "client", 0, msg, sz)

1. dest
2. source
3. typeId
4. session
5,6 skynet.pack(...)

====================================================================