call 的调用流程分析 一个lua vm 中 任意时刻 只能运行一个 coroutine 导入重入的原因是,call 操作中 让出执行权,另一个coroutine 修改了某个值,则唤醒call的coroutine时,再来操作那个值,可能已经改变了 coroutine 理解: --------------------------------------------------- function foo (a) print("foo", a) return coroutine.yield(2*a) end co = coroutine.create(function (a,b) print("co-body", a, b) local r = foo(a+1) print("co-body", r) local r, s = coroutine.yield(a+b, a-b) print("co-body", r, s) return b, "end" end) print("main", coroutine.resume(co, 1, 10)) print("main", coroutine.resume(co, "r")) print("main", coroutine.resume(co, "x", "y")) print("main", coroutine.resume(co, "x", "y")) 当你运行它,将产生下列输出: co-body 1 10 foo 2 main true 4 co-body r main true 11 -9 co-body x y main true 10 end main false cannot resume dead coroutine coroutine.resume 将会返回 yield的参数 继续执行coroutine.resume,coroutine.yield将返回coroutine.resume除第一个参数的其他参数 coroutine.resume 开始或继续协程 co 的运行。 当你第一次延续一个协程,它会从主体函数处开始运行。 val1, ... 这些值会以参数形式传入主体函数。 如果该协程被让出,resume 会重新启动它; val1, ... 这些参数会作为让出点的返回值。 如果协程运行起来没有错误, resume 返回 true 加上传给 yield 的所有值 (当协程让出), 或是主体函数的所有返回值(当协程中止)。 如果有任何错误发生, resume 返回 false 加错误消息。 --------------------------------------------------- A call B skynet.call(addr, typename, ...) [A] 1. 根据 typename 找到 消息的 pack 和 unpack 类型 2. 将消息发送出去 将消息 pack 压入 对方的消息队列 3. 将当前coroutine 让出执行权, 考虑 skynet.call 所在coroutine 在dispatch 消息中 494: suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) suspend(co, true, "CALL", session) suspend(co, result, command, param, size) result = true, command = "CALL", param = session 4. 初始化 session_id_coroutine[session] = co [B] 1. 通过dispatch_message,处理完相关请求 2. 若中间出现错误,将错误信息转发至A, 正常情况下通过skynet.ret()返回 考虑处理消息所在coroutine 在dispatch 消息中 (记录 co->session, so->source) 494: suspend(co, coroutine.resume(co, session,source, p.unpack(msg,sz, ...))) suspend(co, true, "RETURN", msg, sz) suspend(co, result, command, param, size) result = true, command = "RETURN", param = msg, size = sz c.send(co_address, skynet.PTYPE_RESPONSE, co_session, param, size) 将消息发送回去 考虑在B coroutine中 发生错误 在 lua-skynet.c _cb函数中,对消息分发函数(lua function)作了pcall处理,若发生错误,将错误内容输出到logger服务中 在 skynet.lua 中 raw_dispatch_message 函数中 suspend(co, coroutine.resume(co, session, source, p.unpack(msg,sz, ...))) 若 coroutine.resume 失败 将返回 false, 以及错误信息 在 suspend 函数中 if not result then 判断下 将错误信息发送给A(这里有更多信息) 并掉用 error(debug.traceback(co,tostring(command))) 将错误堆栈 让 c层 捕获到 输出到 logger服务中 [A] 1. 从 session_id_coroutine 中 根据session找到coroutine, 并置空session_id_coroutine 2. 在raw_dispatch_message 中 -- skynet.PTYPE_RESPONSE = 1, read skynet.h if prototype == 1 then 处理suspend(co, coroutine.resume(co, true, msg, sz)) 这时候在yield_call中 local succ, msg, sz = coroutine_yield("CALL", session) 返回coroutine.resume 除第一个参数的其他参数 local succ, msg, sz = true, msg, sz ==================================================================== send 操作就比较简单,A端发送出去就不管了,B端发生错误也不会理睬, 它在 B 端 dispatch_message 阶段 suspend 中 走的 command == nil 重要的数据结构 在被调用的对象中,记录调用者信息 session_coroutine_id[co] = session session_coroutine_address[co] = sourceAddr 因为 coroutine 是需要被复用的,所以coroutine退出时候,会清理这两个结构 调用者 watching_session[targetAddr] = session 在挂起当前协程的时候,记录 地址和session 1. 出错信息 2. 当前vm退出,可告知被call对方,出错信息 ==================================================================== queue 的实现 当运行该coroutine的时候,调用skynet.wake,生成session,coroutine_yield("SLEEP", session) 在 suspend 中,设置 session_id_coroutine[session] = co sleep_session[co] = session dispatch_wakeup 中 coroutine.resume(co, false, "BREAK")) 继续运行 sleep_session[co] = nil session_id_coroutine[session] = nil function () option { yield "sleep" } # A call B, 则 A 是 yield "call"; B 是 yield "return" operation { option yield "call" option yield "return" } yield "exit" end ==================================================================== redirect 实现 skynet.redirect = function(dest, source, typename, ...) return c.redirect(dest, source, proto[typename].id, ...) end 使用范例: skynet.redirect(agent, c.client or 0, "client", 0, msg, sz) 1. dest 2. source 3. typeId 4. session 5,6 skynet.pack(...) ====================================================================