这里有一个Python服务,它提供机器学习模型在线服务,用户只要将模型文件和加载、使用模型的的代码文件打包上传,该服务就可以将该模型上线。
无论是那种机器学习模型,要进行使用,肯定逃不了这两个套路:
- 模型的加载(初始化)
- 模型的调用(预测)
也就是说我们可以在平台层面进行一定的约束,基于我们的规范实现,然后我们提供对应的服务。
规范制定好了之后,首先面临的问题就是,如何对用户上传的py文件进行动态加载,再者模型在迭代的过程中会存在多个版本同时在线的场景,也就是说还得支持同名module的动态加载。
这里其实还隐藏一个问题,我们制定的规范就是一个python类,也就是说,我们只需要对用户上传的实现该规范的类进行加载,但是呢用户可能会为该类添加其它一些依赖module,也就是说会上传多个py文件,也正是因为该场景的存在,对module的多版本动态加载带来了一点点困难,后面我们具体分析。
你可能需要知道的
在进入正题之前,我们再说一些Python中动态加载的先验知识。
什么是module
python中的module实例包含了类、函数、变量,每一个.py结尾的文件被加载完之后就是一个module,区别就是.py文件时一个静态文件,module是.py加载完的对象,如果不吹毛求疵的话,我们也可能将.py叫作module。
上面我们说.py结尾的文件就是一个个module,那么在python中还有一类特殊的module,那就是package,我们都知道我们为python工程创建package时,在每个package下都会生成一个__init__.py文件,其实该文件就对应了package的module,虽然这个module中不包含类、函数、和变量,但是一旦这个module加载完后,python需要为这个module指定__path__
和__package__
属性,即路径地址和包名。__path__
属性比较重要,因为package下还有子module,该属性在子module查找时需要用到。
python的import机制
当python解释器遇到一个import指令后,如import tensorflow as tf
,会首先查找tensorflow这个module在缓存在有没有存在,如果存在则之后返回该module,如果不存在则进入import的核心机制,即交给Finder和Loader进行后续工作,Finder负责搜索定位一个module,定位到之后由Loader进行module进行加载。
几个系统变量
-
sys.meta_path
元路径查找器对象集合,当我们调用一个import语句后,解释器先遍历该集合中的查找器进行module查找。 -
sys.path
系统搜索module的路径集合,例如:import tensorflow as tf
,为什么系统能找到该module,就是因为在sys.path中包含了这个一个路径:C:\\ProgramData\\Anaconda3\\lib\\site-packages
-
sys.path_hooks
路径解析钩子函数集合,sys.path集合中的路径都会尝试用该钩子函数集合中函数尝试解析,如果该路径能够使用该钩子函数解析,则该钩子函数返回对应的module查找器。 -
sys.modules
解释器在遇到import指令后,会优先检查sys.modules字典中有没有需要导入的module,如果有则直接使用,没有的话才会采用Finder和Loader机制,经过Finder和Loader机制加载完后,该module会添加至sys.modules字典。
动态加载
//导入加载module(动态加载)
module = importlib.import_module(moduleName)
//获取moudle中的class
clazz = getattr(module, className)
//调用class构造方法
ins = clazz()
上面就是加载module,并获取module中属性,最后动态创建类实例。
多版本多module动态加载
在我们的背景中,假如平台提供的接口规范如下:
class Model():
def load(self, modelDir):
pass
def predict(self, inputs):
pass
举一个简单的例子,假如用户有一个四则运算的模型(很low是吧,但是不要紧,只是举栗子),用户上传的module结构如下:
|-/CalcModel.py
|-/utils
|-/utils/Calc.py
CalcModel.py
为基于平台的接口规范实现:
#FmsModel为平台的提供的规范,可以忽略,肯定可以import到
from FmsModel import FmsModel
import utils.Calc as calc
class ModelDemo(FmsModel):
def load(self, modelDir):
self.modelDir = modelDir
def predict(self, inputs):
x = int(inputs[0])
y = int(inputs[1])
return {'modelDir': self.modelDir, 'result': calc.cacl(x, y)}
Calc.py
的实现如下,就一个简单的加法运算函数:
def cacl(x, y):
return x + y
这个模型代码上传到我们服务实例上时,我们将其部署在某个目录下,当然为了支持多版本,我们需要在目录结构上做一定的隔离,即创建模型别名和版本进行约束,实际部署上来的目录结构如下:
|-/server-home
|-/server-home/cacl_model
|-/server-home/cacl_model/001/
|-/server-home/cacl_model/001/CalcModel.py
|-/server-home/cacl_model/001/utils
|-/server-home/cacl_model/001/utils/Calc.py
|-/server-home/cacl_model/002/
|-/server-home/cacl_model/002/CalcModel.py
|-/server-home/cacl_model/002/utils
|-/server-home/cacl_model/002/utils/Calc.py
版本002的Calc.py代码发生了变更,即将加法变换成了减法:
def cacl(x, y):
return x - y
要实现动态加载,从本质上有两个方案:
- 一个是借助系统的加载机制,即不自定义Finder和Loader
- 还有一种就是自定义Finder和Loader,改写module的查找和加载逻辑
借助系统加载机制
在不自定义Finder和Loader的情况下,我们要实现module的查找和加载,必须保证原有Finder能够找到对应的module,也就是说我们指定加载的module能够通过遍历sys.path中地址可以找到。所以,对于上面的例子,我们需要先将/server-home
append至sys.path中。
sys.path.append('/server-home')
之后当用户上传模型之后,我们执行如下代码:
......
module = importlib.import_module('cacl_model.001.CalcModel')
clazz = getattr(module, className)
ins = clazz()
......
......
print(ins.predict([10,5]))
这里有一个问题,基于/server-home
找cacl_model.001.CalcModel
是没有问题的,但是CalcModel
这个module还有一个依赖module:import utils.Calc as calc
,按照系统的Finder去找utils.Calc
肯定是找不到,也就是这行不通。
那如果上面我们append至sys.path中是/server-home/cacl_model/001
,然后importlib.import_module('CalcModel')
,这就没有问题了,给定参数10和5打印出15,非常完美。
好,用户这个时候上传了第二个版本,也就是说我们要继续动态加载版本002的module,我们仍然要做的事情,将版本002的路径加入sys.path:
sys.path.append('/server-home')
动态加载版本002对应的module:
......
module = importlib.import_module('CalcModel')
clazz = getattr(module, className)
ins = clazz()
......
......
print(ins.predict([10,5]))
我们一看输出,懵了,居然是15,也就是版本001对应的结果,为什么会这样,其实就是我们上节提到的sys.modules在作怪,怎么办呢,答案很明显,加载前,先从sys.modules剔除CalcModel
、utils
、utils.Calc
这三个module,但是作为一个服务平台用户上传上来的module我们是未知的,我们哪知道用户会提供几个module,每个module是怎么命名的,所以在这种方案下从sys.modules剔除已经加载的module,是不可能的。
所以该方案无法做到多版本多module的动态加载。
实现Finder和Loader
上面说到了,多版本加载中,面临一个问题,就是如何有选择的从sys.modules中剔除已经加载的module,转换一下思路,就是在加载module时将所有module全部记录下来。
在sys.modules没有module时,解释器肯定会将module的加载丢给Finder和Loader,那么我们只需要提供我们自定义的Finder和Loader就可以记录加载了哪些module。
有两种方式可以让python解释器调用我们提供的Finder和Loader:
- 添加自定义的元路径查找器加入sys.meta_path(原路径查找器集合)
- 添加路径解析钩子函数至sys.path_hooks
理论上这两种方案都是可行的,但是我这边至给出第二种方案的实现。
首先我们定义一个钩子函数,并添加至系统钩子函数集合中:
def handle_path(path):
if path.startswith('/server-home'):
log.debug('handle path:%s', path)
if path in self._url_path_cache:
finder = self._url_path_cache[path]
else:
finder = LocalPathFinder(path, self.packageInfo)
self._url_path_cache[path] = finder
return finder
else:
#使得后续的钩子函数可以介入处理
raise ImportError('can not hand path:%s' % path)
将该函数添加至系统的钩子函数集合中,同时在sys.path中添加该钩子函数可以处理的路径:
# 注意一定要加在第一个,否则/server-home会被前面的钩子函数处理掉
sys.path_hooks.insert(0, handle_path)
sys.path.append('/server-home')
下面就是定义我们的Finder和Loader,这里我们重点看一下Finder的实现:
class LocalPathFinder(importlib.abc.PathEntryFinder):
def __init__(self, base, packInfo):
self._locals = {}
self._base = base
self._packInfo = packInfo
self._loadedModules = set()
def _get_pys(self, dir_path):
locals = set()
names = os.listdir(dir_path)
for name in names:
if not name == ('__pycache__'):
locals.add(name)
return locals
def find_loader(self, fullname):
log.debug('find_loader: [%s],base is:[%s]' % (fullname, self._base))
self._loadedModules.add(fullname)
parts = fullname.split('.')
basename = parts[-1]
# Check link cache
# 说明目前正在加载子包,路径中已经含有模型名称和版本前缀
if self._base.startswith(self._packInfo.base) and self._base != self._packInfo.base:
base = self._base
else:
base = self._base + '/' + self._packInfo.prefix
if base not in self._locals:
self._locals[base] = self._get_pys(base)
# Check if it's a package
if basename in self._locals[base]:
full = base + '/' + basename
# Attempt to load the package (which accesses __init__.py)
loader = LocalPackageLoader(full)
try:
loader.load_module(fullname)
except ImportError:
loader = None
return loader, [full]
# A normal module
filename = basename + '.py'
if filename in self._locals[base]:
return LocalModuleLoader(base), []
else:
return None, []
def invalidate_caches(self):
log.debug('clear LocalPathFinder cache :%s', self._loadedModules)
self._locals = {}
for key in self._loadedModules:
if key in sys.modules:
del sys.modules[key]
主要逻辑就是首先记录要加载的module,之后判断要加载的module是package还是正常的.py文件,如果是package则采用LocalPackageLoader进行包的加载,否则使用LocalModuleLoader进行.py文件的加载。
这段代码中,我们发现对于package的加载,我们实际是主动调用loader.load_module(fullname)
的,我们先看一下load_module的实现:
def load_module(self, fullname):
code = self.get_code(fullname)
# mod = sys.modules.setdefault(fullname, imp.new_module(fullname))
mod = imp.new_module(fullname)
mod.__file__ = self.get_filename(fullname)
mod.__loader__ = self
if self.is_package(fullname):
mod.__package__ = fullname
mod.__path__ = [self._base]
else:
mod.__package__ = fullname.rpartition('.')[0]
with _installed_safely(mod):
exec(code, mod.__dict__)
return mod
其实为什么要在exec之前将module加入sys.modules?如果我们要加载的package(init.py)中含有代码逻辑,init.py中指定import该package下的其它module(我们暂且称作X),那么因为exec,python转而去对X加载,X的查找加载过程是先去查找并加载父module的,我们看一下系统的查找加载的部分逻辑:
# _bootstrap._find_and_load_unlocked()
def _find_and_load_unlocked(name, import_):
path = None
parent = name.rpartition('.')[0]
if parent:
if parent not in sys.modules:
_call_with_frames_removed(import_, parent)
# Crazy side-effects!
if name in sys.modules:
return sys.modules[name]
parent_module = sys.modules[parent]
try:
path = parent_module.__path__
except AttributeError:
msg = (_ERR_MSG + '; {!r} is not a package').format(name, parent)
raise ModuleNotFoundError(msg, name=name) from None
spec = _find_spec(name, path)
....
上面代码清晰的给出,查找当前module时,会去查找父module有没有加载,如果未加载,那则会转去加载父module,那么问题来了,X的父module就是__init__.py
,__init__.py
中又需要import X,那么就陷入了死循环,所以我们要先将父module加入sys.modules。
解决了这个问题,我们看一下相同module名文件(多版本)的加载。
LocalPathFinder中有一个函数invalidate_caches
,该函数主要进行缓存的清理,主要就是被加载的至sys.modules中module清理,那么这个函数在什么时候调用呢,看一下该方案对应的实例动态创建:
def create_ins(self, prefix, moduleName, className):
try:
self._value_lock.acquire()
self.packageInfo.setPrefix(prefix)
moudle = importlib.import_module(moduleName)
clazz = getattr(moudle, className)
return clazz()
finally:
importlib.invalidate_caches()
self._value_lock.release()
也就是说,每次加载完实例我们都调用importlib.invalidate_caches(),这将触发Finder类中的invalidate_caches方法的调用,这样,该module再有其他版本上传,我们一样可以进行动态加载。
到这里就完成了python的多版本多module的加载,详细代码参见我的github:pyloader