`

Glance源码架构探秘(二)

阅读更多

    上一篇文章,介绍了Glance服务的对外启动接口/bin/glance-api,其中最重要的部分就是通过server = eventlet.wsgi.Server()生成了一个http server,并通过server.start()启动了一个WSGI程序。

    WSGI(web server gateway interface)web服务器网关接口。简单来说,WSGI的作用就是将client发给web server的请求转发给实际处理这个请求的程序。

    WSGI在python中官方的参考实现wsgiref,#module-wsgiref。

 from wsgiref.simple_server import make_server 
 # Every WSGI application must have an application object - a callable 
 # object that accepts two arguments. For that purpose, we're going to 
 # use a function (note that you're not limited to a function, you can 
 # use a class for example). The first argument passed to the function 
 # is a dictionary containing CGI-style envrironment variables and the 
 # second variable is the callable object (see PEP 333). 
 def hello_world_app(environ, start_response): 
     status = '200 OK' 
     # HTTP Status 
     headers = [('Content-type', 'text/plain')] 
     # HTTP Headers 
     start_response(status, headers) 
     # The returned object is going to be printed return ["Hello World"] 
     httpd = make_server('', 8000, hello_world_app) 
     print "Serving on port 8000..." 
     # Serve until process is killed ()      

 参考流程大概就是server中定义一个start_response()函数,返回值为一个write()函数,用来返回给client的响应。application函数要实现两个接口参数,environ和start_response(),前者就是服务器server传递过来的request请求,application控制后者将程序的返回值发回给web server。

    下面分析OpenStack中wsgi接口的实现,/glance/common/wsgi.py

class Server(object): 
     """Server class to manage multiple WSGI sockets and applications.""" 
     def __init__(self, threads=1000): 
         self.threads = threads 
         self.children = [] 
         self.running = True def 
         start(self, application, default_port): 
         """ Run a WSGI server with the given application.
          : param application: The application to be run in the WSGI server 
         :param default_port: Port to bind to if none is specified in conf """ 
         def kill_children(*args): 
             """Kills the entire process group.""" 
             sel(_('SIGTERM or SIGINT received')) 
             signal.signal(signal.SIGTERM, signal.SIG_IGN) 
             signal.signal(signal.SIGINT, signal.SIG_IGN) 
             self.running = False 
             os.killpg(0, signal.SIGTERM) 
             def hup(*args): 
                 """ Shuts down the server, but allows running requests to complete """ 
                 sel(_('SIGHUP received')) 
                 signal.signal(signal.SIGHUP, signal.SIG_IGN) 
                 self.running = False 
                 self.application = application 
                 self.sock = get_socket(default_port) 
                 os.umask(027) 
                 # ensure files are created with the correct privileges 
                 self.logger = os_logging.getLogger('eventlet.wsgi.server')
                 if CONF.workers == 0: 
                     # Useful for profiling, test, debug etc. 
                     self.pool = self.create_pool() 
                     self.pool.spawn_n(self._single_run, self.application, self.sock) 
                     return 
                 else:  
                     sel(_("Starting %d workers") % CONF.workers) 
                     signal.signal(signal.SIGTERM, kill_children) 
                     signal.signal(signal.SIGINT, kill_children) 
                     signal.signal(signal.SIGHUP, hup) 
                     while len(self.children) < CONF.workers: 
                         self.run_child() 
                     def create_pool(self): 
                         eventlet.patcher.monkey_patch(all=False, socket=True) 
                         return eventlet.GreenPool(size=self.threads) 
                     def wait_on_children(self): 
                         while self.running: 
                             try: 
                                 pid, status = os.wait() 
                                 if os.WIFEXITED(status) or os.WIFSIGNALED(status): 
                                     sel(_('Removing dead child %s') % pid) 
                                     self.children.remove(pid) 
                                     if os.WIFEXITED(status) and os.WEXITSTATUS(status) != 0: 
                                         self.logger.error(_('Not respawning child %d, cannot ' 'recover from termination') % pid) 
                                         if not self.children: 
                                             sel( _('All workers have terminated. Exiting')) 
                                             self.running = False else 
                                             self.run_child() 
                             except OSError, err: 
                                 if err.errno not in (errno.EINTR, errno.ECHILD): 
                                     raise 
                             except KeyboardInterrupt:
                                  sel(_('Caught keyboard interrupt. Exiting.')) 
                                  break 
                              eventlet.greenio.shutdown_safe(self.sock) 
                              self.sock.close() 
                              self.logger.debug(_('Exited')) 
                              def wait(self): 
                                  """Wait until all servers have completed running.""" 
                                  try: 
                                      if self.children: 
                                          self.wait_on_children() 
                                      else: 
                                          self.pool.waitall() 
                                  except KeyboardInterrupt: 
                                      pass 
                                  def run_child(self): 
                                      pid = os.fork() 
                                      if pid == 0: 
                                          signal.signal(signal.SIGHUP, signal.SIG_DFL) 
                                          signal.signal(signal.SIGTERM, signal.SIG_DFL) 
                                          # ignore the interrupt signal to avoid a race whereby 
                                          # a child worker receives the signal before the parent 
                                          # and is respawned unneccessarily as a result 
                                          signal.signal(signal.SIGINT, signal.SIG_IGN) 
                                          self.run_server() sel(_('Child %d exiting normally') % os.getpid()) 
                                          # self.pool.waitall() has been called by run_server, so 
                                          # its safe to exit here 
                                          sys.exit(0) 
                                      else: 
                                          sel(_('Started child %s') % pid) 
                                          self.children.append(pid) 
                                          def run_server(self): 
                                              """Run a WSGI server.""" 
                                              if cfg.CONF.pydev_worker_debug_host: 
                                                  utils.setup_remote_pydev_debug(cfg.CONF.pydev_worker_debug_host, cfg.CONF.pydev_worker_debug_port) 
                                                  eventlet.wsgi.HttpProtocol.default_request_version = "HTTP/1.0" 
                                                  try: 
                                                      eventlet.hubs.use_hub('poll') 
                                                  except Exception: 
                                                      msg = _("eventlet 'poll' hub is not available on this platform") 
                                                      raise  exception.WorkerCreationFailure(reason=msg) 
                                                     self.pool = self.create_pool() 
                                                  try: 
                                                      eventlet.wsgi.server(self.sock, self.application, log=WritableLogger(self.logger), custom_pool=self.pool) 
                                                  except socket.error, err: 
                                                      if err[0] != errno.EINVAL: 
                                                          raise self.pool.waitall()

 大家可能看了代码之后很乱,前面许多行是设置log写入程序,打开conf读取ip,port等信息,最重要的内容是代码最后几行:

self.pool = self.create_pool()

eventlet.wsgi.server(self.sock, self.application, log = WritableLogger(self.logger), custom_pool=self.pool)

刚才提到了server和application之间的通讯接口WSGI,现在讲讲server。OpenStack并没有使用Python标准库中的BaseHTTPServer,而是使用了在网络并发等领域处理效率非常优异的eventlet库。

     eventlet提供了一套API以实现“协程”(coroutines)。所谓的“协程”可以简单地看做是“假线程”,他可以实现线程的非阻塞异步IO调用的功能,但是每个协程都有自己独立的堆栈,这和线程之间公用堆栈是有区别的。eventlet会维护一个协程“池”,用来存放所有创建的协程。但是不同于线程,协程同时只能有一个实例在运行,其他的协程要运行,必须等待当前协程显式地被挂起。不同于线程的执行顺序随机,协程的执行时按调用顺序的。

    OpenStack的服务,没有使用市面上常见的web server的原因,大概就是其处理并发无非就是使用多线程或IO复用等。然而,当多客户端并发访问时,OpenStack内部的一些共享资源,并不能十分安全地利用互斥锁等方法进行线程共享资源的互斥。为了防止并发出现资源死锁,简化架构设计流程,采用“协程”是个非常不错的选择。并且,线程间的切换需要大量的时间和空间的开销,而协程可以有效地避免这个问题。

import eventlet 
def handle(client): 
    while True: 
        c = client.recv(1) 
        if not c: 
            break 
        client.sendall(c) 
        server = eventlet.listen(('0.0.0.0', 6000)) 
        pool = eventlet.GreenPool(10000) 
        while True: 
            new_sock, address = server.accept() 
            pool.spawn_n(handle, new_sock)

 上面是eventlet一个简单服务器端的示例,首先用eventlet.GreenPool(1000)生成一个最大容量为1000的“协程”缓冲池,server.accept()等待,服务器端server端收到一个客户端的连接请求,就用pool.spawn_n()启动一个“协程”进行处理响应。

    回到glance中,OpenStack将python原版的CGIHTTPServer进行“绿化”,提供了eventlet.wsgi.server进行http的响应,其内部实现结构和作用和上面的代码相似,同样都是来一个http请求,就会启动一个协程server进行响应。参数custom_pool就是我们上面刚刚申请的GreenPool协程池。参数self.application为WSGI程序入口。这样我们就成功运行了一个WSGI服务程序。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics