这里有一个Python服务,它提供机器学习模型在线服务,用户只要将模型文件和加载、使用模型的的代码文件打包上传,该服务就可以将该模型上线。
无论是那种机器学习模型,要进行使用,肯定逃不了这两个套路:

  • 模型的加载(初始化)
  • 模型的调用(预测)
    也就是说我们可以在平台层面进行一定的约束,基于我们的规范实现,然后我们提供对应的服务。

规范制定好了之后,首先面临的问题就是,如何对用户上传的py文件进行动态加载,再者模型在迭代的过程中会存在多个版本同时在线的场景,也就是说还得支持同名module的动态加载。

这里其实还隐藏一个问题,我们制定的规范就是一个python类,也就是说,我们只需要对用户上传的实现该规范的类进行加载,但是呢用户可能会为该类添加其它一些依赖module,也就是说会上传多个py文件,也正是因为该场景的存在,对module的多版本动态加载带来了一点点困难,后面我们具体分析。

你可能需要知道的

在进入正题之前,我们再说一些Python中动态加载的先验知识。

什么是module

python中的module实例包含了类、函数、变量,每一个.py结尾的文件被加载完之后就是一个module,区别就是.py文件时一个静态文件,module是.py加载完的对象,如果不吹毛求疵的话,我们也可能将.py叫作module。

上面我们说.py结尾的文件就是一个个module,那么在python中还有一类特殊的module,那就是package,我们都知道我们为python工程创建package时,在每个package下都会生成一个__init__.py文件,其实该文件就对应了package的module,虽然这个module中不包含类、函数、和变量,但是一旦这个module加载完后,python需要为这个module指定__path____package__属性,即路径地址和包名。__path__属性比较重要,因为package下还有子module,该属性在子module查找时需要用到。

python的import机制

当python解释器遇到一个import指令后,如import tensorflow as tf,会首先查找tensorflow这个module在缓存在有没有存在,如果存在则之后返回该module,如果不存在则进入import的核心机制,即交给Finder和Loader进行后续工作,Finder负责搜索定位一个module,定位到之后由Loader进行module进行加载。

几个系统变量

  1. sys.meta_path
    元路径查找器对象集合,当我们调用一个import语句后,解释器先遍历该集合中的查找器进行module查找。

  2. sys.path
    系统搜索module的路径集合,例如:import tensorflow as tf,为什么系统能找到该module,就是因为在sys.path中包含了这个一个路径:C:\\ProgramData\\Anaconda3\\lib\\site-packages

  3. sys.path_hooks
    路径解析钩子函数集合,sys.path集合中的路径都会尝试用该钩子函数集合中函数尝试解析,如果该路径能够使用该钩子函数解析,则该钩子函数返回对应的module查找器。

  4. sys.modules
    解释器在遇到import指令后,会优先检查sys.modules字典中有没有需要导入的module,如果有则直接使用,没有的话才会采用Finder和Loader机制,经过Finder和Loader机制加载完后,该module会添加至sys.modules字典。

动态加载

//导入加载module(动态加载)
module = importlib.import_module(moduleName)
//获取moudle中的class
clazz = getattr(module, className)
//调用class构造方法
ins = clazz()

上面就是加载module,并获取module中属性,最后动态创建类实例。

多版本多module动态加载

在我们的背景中,假如平台提供的接口规范如下:

class Model():
	def load(self, modelDir):
    	pass
    def predict(self, inputs):
    	pass

举一个简单的例子,假如用户有一个四则运算的模型(很low是吧,但是不要紧,只是举栗子),用户上传的module结构如下:

|-/CalcModel.py
|-/utils
|-/utils/Calc.py

CalcModel.py为基于平台的接口规范实现:

#FmsModel为平台的提供的规范,可以忽略,肯定可以import到
from FmsModel import FmsModel
import utils.Calc as calc

class ModelDemo(FmsModel):
    def load(self, modelDir):
        self.modelDir = modelDir
    
    def predict(self, inputs):
        x = int(inputs[0])
        y = int(inputs[1])
        return {'modelDir': self.modelDir, 'result': calc.cacl(x, y)}

Calc.py的实现如下,就一个简单的加法运算函数:

def cacl(x, y):
    return x + y

这个模型代码上传到我们服务实例上时,我们将其部署在某个目录下,当然为了支持多版本,我们需要在目录结构上做一定的隔离,即创建模型别名和版本进行约束,实际部署上来的目录结构如下:

|-/server-home
|-/server-home/cacl_model
|-/server-home/cacl_model/001/
|-/server-home/cacl_model/001/CalcModel.py
|-/server-home/cacl_model/001/utils
|-/server-home/cacl_model/001/utils/Calc.py
|-/server-home/cacl_model/002/
|-/server-home/cacl_model/002/CalcModel.py
|-/server-home/cacl_model/002/utils
|-/server-home/cacl_model/002/utils/Calc.py

版本002的Calc.py代码发生了变更,即将加法变换成了减法:

def cacl(x, y):
    return x - y

要实现动态加载,从本质上有两个方案:

  • 一个是借助系统的加载机制,即不自定义Finder和Loader
  • 还有一种就是自定义Finder和Loader,改写module的查找和加载逻辑

借助系统加载机制

在不自定义Finder和Loader的情况下,我们要实现module的查找和加载,必须保证原有Finder能够找到对应的module,也就是说我们指定加载的module能够通过遍历sys.path中地址可以找到。所以,对于上面的例子,我们需要先将/server-home append至sys.path中。

sys.path.append('/server-home')

之后当用户上传模型之后,我们执行如下代码:

......
module = importlib.import_module('cacl_model.001.CalcModel')
clazz = getattr(module, className)
ins = clazz()
......
......
print(ins.predict([10,5]))

这里有一个问题,基于/server-homecacl_model.001.CalcModel是没有问题的,但是CalcModel这个module还有一个依赖module:import utils.Calc as calc,按照系统的Finder去找utils.Calc肯定是找不到,也就是这行不通。

那如果上面我们append至sys.path中是/server-home/cacl_model/001,然后importlib.import_module('CalcModel'),这就没有问题了,给定参数10和5打印出15,非常完美。

好,用户这个时候上传了第二个版本,也就是说我们要继续动态加载版本002的module,我们仍然要做的事情,将版本002的路径加入sys.path:

sys.path.append('/server-home')

动态加载版本002对应的module:

......
module = importlib.import_module('CalcModel')
clazz = getattr(module, className)
ins = clazz()
......
......
print(ins.predict([10,5]))

我们一看输出,懵了,居然是15,也就是版本001对应的结果,为什么会这样,其实就是我们上节提到的sys.modules在作怪,怎么办呢,答案很明显,加载前,先从sys.modules剔除CalcModelutilsutils.Calc这三个module,但是作为一个服务平台用户上传上来的module我们是未知的,我们哪知道用户会提供几个module,每个module是怎么命名的,所以在这种方案下从sys.modules剔除已经加载的module,是不可能的。

所以该方案无法做到多版本多module的动态加载。

实现Finder和Loader

上面说到了,多版本加载中,面临一个问题,就是如何有选择的从sys.modules中剔除已经加载的module,转换一下思路,就是在加载module时将所有module全部记录下来。

在sys.modules没有module时,解释器肯定会将module的加载丢给Finder和Loader,那么我们只需要提供我们自定义的Finder和Loader就可以记录加载了哪些module。

有两种方式可以让python解释器调用我们提供的Finder和Loader:

  1. 添加自定义的元路径查找器加入sys.meta_path(原路径查找器集合)
  2. 添加路径解析钩子函数至sys.path_hooks

理论上这两种方案都是可行的,但是我这边至给出第二种方案的实现。
首先我们定义一个钩子函数,并添加至系统钩子函数集合中:

def handle_path(path):
    if path.startswith('/server-home'):
    	log.debug('handle path:%s', path)
    	if path in self._url_path_cache:
   			finder = self._url_path_cache[path]
    	else:
    		finder = LocalPathFinder(path, self.packageInfo)
    		self._url_path_cache[path] = finder
    		return finder
    else:
    	#使得后续的钩子函数可以介入处理
    	raise ImportError('can not hand path:%s' % path)

将该函数添加至系统的钩子函数集合中,同时在sys.path中添加该钩子函数可以处理的路径:

# 注意一定要加在第一个,否则/server-home会被前面的钩子函数处理掉
sys.path_hooks.insert(0, handle_path)
sys.path.append('/server-home')

下面就是定义我们的Finder和Loader,这里我们重点看一下Finder的实现:

class LocalPathFinder(importlib.abc.PathEntryFinder):
    def __init__(self, base, packInfo):
        self._locals = {}
        self._base = base
        self._packInfo = packInfo
        self._loadedModules = set()

    def _get_pys(self, dir_path):
        locals = set()
        names = os.listdir(dir_path)
        for name in names:
            if not name == ('__pycache__'):
                locals.add(name)
        return locals

    def find_loader(self, fullname):
        log.debug('find_loader: [%s],base is:[%s]' % (fullname, self._base))
        self._loadedModules.add(fullname)
        parts = fullname.split('.')
        basename = parts[-1]
        # Check link cache
        # 说明目前正在加载子包,路径中已经含有模型名称和版本前缀
        if self._base.startswith(self._packInfo.base) and self._base != self._packInfo.base:
            base = self._base
        else:
            base = self._base + '/' + self._packInfo.prefix

        if base not in self._locals:
            self._locals[base] = self._get_pys(base)

        # Check if it's a package
        if basename in self._locals[base]:
            full = base + '/' + basename
            # Attempt to load the package (which accesses __init__.py)
            loader = LocalPackageLoader(full)
            try:
                loader.load_module(fullname)
            except ImportError:
                loader = None
            return loader, [full]

        # A normal module
        filename = basename + '.py'
        if filename in self._locals[base]:
            return LocalModuleLoader(base), []
        else:
            return None, []

    def invalidate_caches(self):
        log.debug('clear LocalPathFinder cache :%s', self._loadedModules)
        self._locals = {}
        for key in self._loadedModules:
            if key in sys.modules:
                del sys.modules[key]

主要逻辑就是首先记录要加载的module,之后判断要加载的module是package还是正常的.py文件,如果是package则采用LocalPackageLoader进行包的加载,否则使用LocalModuleLoader进行.py文件的加载。

这段代码中,我们发现对于package的加载,我们实际是主动调用loader.load_module(fullname)的,我们先看一下load_module的实现:

def load_module(self, fullname):
        code = self.get_code(fullname)
        # mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
        mod = imp.new_module(fullname)
        mod.__file__ = self.get_filename(fullname)
        mod.__loader__ = self
        if self.is_package(fullname):
            mod.__package__ = fullname
            mod.__path__ = [self._base]
        else:
            mod.__package__ = fullname.rpartition('.')[0]
        with _installed_safely(mod):
            exec(code, mod.__dict__)
        return mod

其实为什么要在exec之前将module加入sys.modules?如果我们要加载的package(init.py)中含有代码逻辑,init.py中指定import该package下的其它module(我们暂且称作X),那么因为exec,python转而去对X加载,X的查找加载过程是先去查找并加载父module的,我们看一下系统的查找加载的部分逻辑:

# _bootstrap._find_and_load_unlocked()

def _find_and_load_unlocked(name, import_):
    path = None
    parent = name.rpartition('.')[0]
    if parent:
        if parent not in sys.modules:
            _call_with_frames_removed(import_, parent)
        # Crazy side-effects!
        if name in sys.modules:
            return sys.modules[name]
        parent_module = sys.modules[parent]
        try:
            path = parent_module.__path__
        except AttributeError:
            msg = (_ERR_MSG + '; {!r} is not a package').format(name, parent)
            raise ModuleNotFoundError(msg, name=name) from None
    spec = _find_spec(name, path)
    ....

上面代码清晰的给出,查找当前module时,会去查找父module有没有加载,如果未加载,那则会转去加载父module,那么问题来了,X的父module就是__init__.py,__init__.py中又需要import X,那么就陷入了死循环,所以我们要先将父module加入sys.modules。

解决了这个问题,我们看一下相同module名文件(多版本)的加载。
LocalPathFinder中有一个函数invalidate_caches,该函数主要进行缓存的清理,主要就是被加载的至sys.modules中module清理,那么这个函数在什么时候调用呢,看一下该方案对应的实例动态创建:

def create_ins(self, prefix, moduleName, className):
        try:
            self._value_lock.acquire()
            self.packageInfo.setPrefix(prefix)
            moudle = importlib.import_module(moduleName)
            clazz = getattr(moudle, className)
            return clazz()
        finally:
            importlib.invalidate_caches()
            self._value_lock.release()

也就是说,每次加载完实例我们都调用importlib.invalidate_caches(),这将触发Finder类中的invalidate_caches方法的调用,这样,该module再有其他版本上传,我们一样可以进行动态加载。


到这里就完成了python的多版本多module的加载,详细代码参见我的github:pyloader