黑马程序员技术交流社区

标题: 【上海校区】Kafka2.0服务端启动源码 [打印本页]

作者: 梦缠绕的时候    时间: 2019-7-18 10:20
标题: 【上海校区】Kafka2.0服务端启动源码
Kafka2.0服务端启动源码
  Kafka 服务端通过Kafka.scala的主函数main方法启动。KafkaServerStartable类提供读取配置文件、启动/停止服务的方法。而启动/停止服务最终调用的是KafkaServer的startup/shutdown方法。
启动流程请求处理流程
详细源码分析Acceptor 线程def run() {  serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT) // 注册接收事件  startupComplete() // 通知 Acceptor 线程  var currentProcessor = 0  while (isRunning) {    val ready = nioSelector.select(500) // 轮询事件    if (ready > 0) {      val keys = nioSelector.selectedKeys()      val iter = keys.iterator()      while (iter.hasNext && isRunning) {        val key = iter.next        iter.remove()        if (key.isAcceptable) { // 有可接受事件          val processor = synchronized {            currentProcessor = currentProcessor % processors.size            processors(currentProcessor) // 缓存 Processor           }          accept(key, processor) // 将 SocketChannel 缓存到队列        }      }    }  }}Processor 线程override def run() {  startupComplete() // CountDownLatch.countDown 唤醒 Acceptor 线程。  while (isRunning) {    configureNewConnections() // 从缓存队列取出 SocketChannel,绑定到 KafkaChannel    processNewResponses() // 处理返回客户端的响应    poll() // Kafka.Selector 轮询读取/写入事件    processCompletedReceives() // 处理客户端的请求,放到阻塞队列    processCompletedSends() // 处理返回客户端响应后的回调    processDisconnected() // 断开连接后的处理  }}KafkaRequestHandler 线程阻塞队列def run() {  while (!stopped) {    val startSelectTime = time.nanoseconds    // 从阻塞队列拉取请求    val req = requestChannel.receiveRequest(300)     req match {      case request: RequestChannel.Request =>        try {          apis.handle(request) // 调用`KafkaApis.handle`方法,进行集中处理请求。        }    }  }}KSelector
  参考客户端源码分析。











作者: 梦缠绕的时候    时间: 2019-7-18 10:20
有任何问题欢迎在评论区留言
作者: 梦缠绕的时候    时间: 2019-7-18 10:20
或者添加学姐微信
DKA-2018




欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2