建站模板免费下载,wordpress 表单展示,上海信用网企业查询,wordpress凭密码什么是Ray之前花了大概两到三天把Ray相关的论文#xff0c;官网文档看了一遍#xff0c;同时特意去找了一些中文资料看Ray当前在国内的发展情况(以及目前国内大部分人对Ray的认知程度)。先来简单介绍下我对Ray的认知。首先基因很重要#xff0c;所以我们先需要探查下Ray最初…什么是Ray之前花了大概两到三天把Ray相关的论文官网文档看了一遍同时特意去找了一些中文资料看Ray当前在国内的发展情况(以及目前国内大部分人对Ray的认知程度)。先来简单介绍下我对Ray的认知。首先基因很重要所以我们先需要探查下Ray最初是为了解决什么问题而产生的。Ray的论文显示它最早是为了解决增强学习的挑战而设计的。增强学习的难点在于它是一个需要边学习边做实时做预测的应用场景这意味会有不同类型的tasks同时运行并且他们之间存在复杂的依赖关系tasks会在运行时动态产生产生新的tasks现有的一些计算模型肯定是没办法解决的。如果Ray只是为了解决RL事情可能没有那么复杂但是作者希望它不仅仅能跑增强学习相关的希望是一个通用的分布式机器学习框架这就意味着Ray必然要进行分层抽象了也就是至少要分成系统层和应用层。系统层面既然是分布式的应用那么肯定需要有一个应用内的resource/task调度和管理。首先是Yarn,K8s等资源调度框架是应用程序级别的的调度Ray作为一个为了解决具体业务问题的应用应该要跑在他们上面而不是取代他们而像Spark/Flink虽然也是基于task级别的资源调度框架但是因为他们在设计的时候是为了解决一个比较具体的抽象问题所以系统对task/资源都做了比较高的封装一般用户是面向业务编程很难直接操控task以及对应的resource。我们以Spark为例用户定义好了数据处理逻辑至于如何将这些逻辑分成多少个JobStageTask,最后占用多少Resource (CPU,GPU,Memory,Disk)等等都是由框架自行决定而用户无法染指。这也是我一直诟病Spark的地方。所以Ray在系统层面是一个通用的以task为调度级别的同时可以针对每个task控制资源粒度的一个通用的分布式task执行系统。记住在Ray里你需要明确定义Task以及Task的依赖并且为每个task指定合适(数量资源类型)的资源。比如你需要用三个task处理一份数据那么你就需要自己启动三个task并且指定这些task需要的资源(GPU,CPU)以及数量(可以是小数或者整数)。而在Spark,Flink里这是不大可能的。Ray为了让我们做这些事情默认提供了Python的语言接口你可以像使用Numpy那样去使用Ray。实际上也已经有基于Ray做Backend的numpy实现了当然它属于应用层面的东西了。Ray系统层面很简单也是典型的master-worker模式。类似spark的driver-executor模式不同的是Ray的worker类似yarn的worker,是负责Resource管理的具体任务它会启动Python worker去执行你的代码而spark的executor虽然也会启动Python worker执行python代码但是对应的executor也执行业务逻辑和python worker有数据交换和传输。应用层面你可以基于Ray的系统进行编程因为Ray默认提供了Python的编程接口所以你可以自己实现增强学习库(RLLib),也可以整合已有的算法框架比如tensorflow,让tensorflow成为Ray上的一个应用并且轻松实现分布式。我记得知乎上有人说Ray其实就是一个Python的分布式RPC框架这么说是对的但是显然会有误导因为这很可能让人以为他只是“Python分布式RPC框架”。如何和Spark协作根据前面我讲述的我们是可以完全基于Ray实现Spark的大部分API的只是是Ray backend而非Spark core backend。实际上Ray目前正在做流相关的功能他们现在要做的就是要兼容Flink的API。虽然官方宣称Ray是一个新一代的机器学习分布式框架但是他完全可以cover住当前大数据和AI领域的大部分事情但是任重道远还需要大量的事情。所以对我而言我看中的是它良好的Python支持以及系统层面对资源和task的控制这使得1.我们可以轻易的把我们的单机Python算法库在Ray里跑起来(虽然算法自身不是分布式的)但是我们可以很好的利用Ray的资源管理和调度功能从而解决AI平台的资源管理问题。2.Ray官方提供了大量的机器学习算法的实现以及对当前机器学习框架如Tensorflow,Pytorch的整合而分布式能力则比这些库原生提供的模式更靠谱和易用。毕竟对于这些框架而言支持他们分布式运行的那些辅助库(比如TensorFlow提供parameter servers)相当简陋。但是我们知道数据处理它自身有一个很大的生态比如你的用户画像数据都在数据湖里你需要把这些数据进行非常复杂的计算才能作为特征喂给你的机器学习算法。而如果这个时候你还要面向资源编程(或者使用一个还不够成熟的上层应用)而不是面向“业务”编程这就显得很难受了比如我就想用SQL处理数据我只关注处理的业务逻辑这个当前Ray以及之上的应用显然还是做不到如Spark那么便利的(毕竟Spark就是为了数据处理而生的)所以最好的方式是数据的获取和加工依然是在Spark之上但是数据准备好了就应该丢给用户基于Ray写的代码处理了。Ray可以通过Arrow项目读取HDFS上Spark已经处理好的数据然后进行训练然后将模型保存为HDFS。当然对于预测Ray可以自己消化掉或者丢给其他系统完成。我们知道Spark 在整合Python生态方面做出了非常多的努力比如他和Ray一样也提供了python 编程接口所以spark也较为容易的整合譬如Tensorflow等框架但是没办法很好的管控资源(比如GPU),而且spark 的executor 会在他所在的服务器上启动python worker,而spark一般而言是跑在yarn上的这就对yarn造成了很大的管理麻烦而且通常yarn 和hdfs之类的都是在一起的python环境还有资源(CPU/GPU)除了管理难度大以外还有一个很大的问题是可能会对yarn的集群造成比较大的稳定性风险。所以最好的模式是按如下步骤开发一个机器学习应用写一个python脚本在数据处理部分使用pyspark在程序的算法训练部分使用rayspark 运行在yarn(k8s)上ray运行在k8s里好处显而易见用户完全无感知他的应用其实是跑在两个集群里的对他来说就是一个普通python脚本。从架构角度来讲复杂的python环境管理问题都可以丢给ray集群来完成spark只要能跑基本的pyspark相关功能即可数据衔接通过数据湖里的表(其实就是一堆parquet文件)即可。当然如果最后结果数据不大也可以直接通过client完成pyspark到ray的交互。Spark和Ray的架构和部署现在我们来思考一个比较好的部署模式架构图大概类似这样首先大家可以理解为k8s已经解决一切了我们spark,ray都跑在K8s上。但是如果我们希望一个spark 是实例多进程跑的时候我们并不希望是像传统的那种方式所有的节点都跑在K8s上而是将executor部分放到yarn cluster. 在我们的架构里spark driver 是一个应用我们可以启动多个pod从而获得多个spark driver实例对外提供负载均衡roll upgrade/restart 等功能。也就是k8s应该是面向应用的。但是复杂的计算我们依然希望留给Yarn尤其是还涉及到数据本地性计算和存储放到一起(yarn和HDFS通常是在一起的)避免k8s和HDFS有大量数据交换。因为Yarn对Java/Scala友好但是对Python并不友好尤其是在yarn里涉及到Python环境问题会非常难搞(主要是Yarn对docker的支持还是不够优秀对GPU支持也不好)而机器学习其实一定重度依赖Python以及非常复杂的本地库以及Python环境并且对资源调度也有比较高的依赖因为算法是很消耗机器资源的必须也有资源池所以我们希望机器学习部分能跑在K8s里。但是我们希望整个数据处理和训练过程是一体的算法的同学应该无法感知到k8s/yarn的区别。为了达到这个目标用户依然使用pyspark来完成计算然后在pyspark里使用ray的API做模型训练和预测数据处理部分自动在yarn中完成而模型训练部分则自动被分发到k8s中完成。并且因为ray自身的优势算法可以很好的控制自己需要的资源比如这次训练需要多少GPU/CPU/内存支持所有的算法库在做到对算法最少干扰的情况下算法的同学们有最好的资源调度可以用。下面展示一段MLSQL代码片段展示如何利用上面的架构-- python 训练模型的代码set py_trainimport rayray.init()ray.remote(num_cpus2, num_gpus1)def f(x): return x * xfutures [f.remote(i) for i in range(4)]print(ray.get(futures));load script.py_train as py_train;-- 设置需要的python环境描述set py_env;load script.py_env as py_env;-- 加载hive的表load hive.db1.table1 as table1;-- 对Hive做处理比如做一些特征工程select features,label from table1 as data;-- 提交Python代码到Ray里,此时是运行在k8s里的train data as PythonAlg./tmp/tf/modelwhere scriptspy_trainand entryPointpy_trainand condaFilepy_envand keepVersiontrueand fitParam.0.fileFormatjson -- 还可以是parquetand fitParam.0.psNum1下面是PySpark的示例代码from pyspark.ml.linalg import Vectors, SparseVectorfrom pyspark.sql import SparkSessionimport loggingimport rayfrom pyspark.sql.types import StructField, StructType, BinaryType, StringType, ArrayType, ByteTypefrom sklearn.naive_bayes import GaussianNBimport osfrom sklearn.externals import joblibimport pickleimport scipy.sparse as spfrom sklearn.svm import SVCimport ioimport codecsos.environ[PYSPARK_PYTHON] /Users/allwefantasy/deepavlovpy3/bin/python3logger logging.getLogger(__name__)base_dir /Users/allwefantasy/CSDNWorkSpace/spark-deep-learning_latestspark SparkSession.builder.master(local[*]).appName(example).getOrCreate()data spark.read.format(libsvm).load(base_dir /data/mllib/sample_libsvm_data.txt)## 广播数据dataBr spark.sparkContext.broadcast(data.collect())## 训练模型 这部分代码会在spark executor里的python worker执行def train(row): import ray ray.init() train_data_id ray.put(dataBr.value) ## 这个函数的python代码会在K8s里的Ray里执行 ray.remote def ray_train(x): X [] y [] for i in ray.get(train_data_id): X.append(i[features]) y.append(i[label]) if row[model] SVC: gnb GaussianNB() model gnb.fit(X, y) # 为什么还需要encode一下 pickled codecs.encode(pickle.dumps(model), base64).decode() return [row[model], pickled] if row[model] BAYES: svc SVC() model svc.fit(X, y) pickled codecs.encode(pickle.dumps(model), base64).decode() return [row[model], pickled] result ray_train.remote(row) ray.get(result) ##训练模型 将模型结果保存到HDFS上rdd spark.createDataFrame([[SVC], [BAYES]], [model]).rdd.map(train)spark.createDataFrame(rdd, schemaStructType([StructField(namemodelType, dataTypeStringType()), StructField(namemodelBinary, dataTypeStringType())])).write. format(parquet). mode(overwrite).save(/tmp/wow)这是一个标准的Python程序只是使用了pyspark/ray的API我们就完成了上面所有的工作同时训练两个模型并且数据处理的工作在spark中模型训练的在ray中。完美结合最重要的是解决了资源管理的问题作者祝威廉本文为阿里云原创内容未经允许不得转载。