学习笔记TF065:TensorFlowOnSpark

news/2024/7/5 1:37:01

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

Hadoop生态大数据系统分为Yam、 HDFS、MapReduce计算框架。TensorFlow分布式相当于MapReduce计算框架,Kubernetes相当于Yam调度系统。TensorFlowOnSpark,利用远程直接内存访问(Remote Direct Memory Access,RDMA)解决存储功能和调度,实现深度学习和大数据融合。TensorFlowOnSpark(TFoS),雅虎开源项目。https://github.com/yahoo/TensorFlowOnSpark 。支持ApacheSpark集群分布式TensorFlow训练、预测。TensorFlowOnSpark提供桥接程序,每个Spark Executor启动一个对应TensorFlow进程,通过远程进程通信(RPC)交互。

TensorFlowOnSpark架构。TensorFlow训练程序用Spark集群运行,管理Spark集群步骤:预留,在Executor执行每个TensorFlow进程保留一个端口,启动数据消息监听器。启动,在Executor启动TensorFlow主函数。数据获取,TensorFlow Readers和QueueRunners机制直接读取HDFS数据文件,Spark不访问数据;Feeding,SparkRDD 数据发送TensorFlow节点,数据通过feed_dict机制传入TensorFlow计算图。关闭,关闭Executor TensorFlow计算节点、参数服务节点。Spark Driver->Spark Executor->参数服务器->TensorFlow Core->gRPC、RDMA->HDFS数据集。http://yahoohadoop.tumblr.com/post/157196317141/open-sourcing-tensorflowonspark-distributed-deep 。

TensorFlowOnSpark MNIST。https://github.com/yahoo/TensorFlowOnSpark/wiki/GetStarted_standalone 。Standalone模式Spark集群,一台计算机。安装 Spark、Hadoop。部署Java 1.8.0 JDK。下载Spark2.1.0版 http://spark.apache.org/downloads.html 。下载Hadoop2.7.3版 http://hadoop.apache.org/#Download+Hadoop 。0.12.1版本支持较好。 修改配置文件,设置环境变量,启动Hadoop:$HADOOP_HOME/sbin/start-all.sh。检出TensorFlowOnSpark源代码:

git clone --recurse-submodules https://github.com/yahoo/TensorFlowOnSpark.git
cd TensorFlowOnSpark
git submodule init
git submodule update --force
git submodule foreach --recursive git clean -dfx

源代码打包,提交任务使用:

cd TensorflowOnSpark/src
zip -r ../tfspark.zip *

设置TensorFlowOnSpark根目录环境变量:

cd TensorFlowOnSpark
export TFoS_HOME=$(pwd)

启动Spark主节点(master):

$(SPARK_HOME)/sbin/start-master.sh

配置两个工作节点(worker)实例,master-spark-URL连接主节点:

export MASTER=spark://$(hostname):7077
export SPARK_WORKER_INSTANCES=2
export CORES_PER_WORKER=1
export TOTAL_CORES=$(($(CORES_PER_WORKER)*$(SPARK_WORKER_INSTANCES)))
$(SPARK_HOME)/sbin/start-slave.sh -c $CORES_PER_WORKER -m 3G $(MASTER)

提交任务,MNIST zip文件转换为HDFS RDD 数据集:

$(SPARK_HOME)/bin/spark-submit \
--master $(MASTER) --conf spark.ui.port=4048 --verbose \
$(TFoS_HOME)/examples/mnist/mnist_data_setup.py \
--output examples/mnist/csv \
--format csv

查看处理过的数据集:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv

查看保存图片、标记向量:

hadoop fs -ls hdfs://localhost:9000/user/libinggen/examples/mnist/csv/train/labels

把训练集、测试集分别保存RDD数据。 https://github.com/yahoo/TensorFlowOnSpark/blob/master/examples/mnist/mnist_data_setup.py 。

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy
import tensorflow as tf
from array import array
from tensorflow.contrib.learn.python.learn.datasets import mnist
def toTFExample(image, label):"""Serializes an image/label as a TFExample byte string"""example = tf.train.Example(features = tf.train.Features(feature = {'label': tf.train.Feature(int64_list=tf.train.Int64List(value=label.astype("int64"))),'image': tf.train.Feature(int64_list=tf.train.Int64List(value=image.astype("int64")))}))return example.SerializeToString()
def fromTFExample(bytestr):"""Deserializes a TFExample from a byte string"""example = tf.train.Example()example.ParseFromString(bytestr)return example
def toCSV(vec):"""Converts a vector/array into a CSV string"""return ','.join([str(i) for i in vec])
def fromCSV(s):"""Converts a CSV string to a vector/array"""return [float(x) for x in s.split(',') if len(s) > 0]
def writeMNIST(sc, input_images, input_labels, output, format, num_partitions):"""Writes MNIST image/label vectors into parallelized files on HDFS"""# load MNIST gzip into memory# MNIST图像、标记向量写入HDFSwith open(input_images, 'rb') as f:images = numpy.array(mnist.extract_images(f))with open(input_labels, 'rb') as f:if format == "csv2":labels = numpy.array(mnist.extract_labels(f, one_hot=False))else:labels = numpy.array(mnist.extract_labels(f, one_hot=True))shape = images.shapeprint("images.shape: {0}".format(shape))          # 60000 x 28 x 28print("labels.shape: {0}".format(labels.shape))   # 60000 x 10# create RDDs of vectorsimageRDD = sc.parallelize(images.reshape(shape[0], shape[1] * shape[2]), num_partitions)labelRDD = sc.parallelize(labels, num_partitions)output_images = output + "/images"output_labels = output + "/labels"# save RDDs as specific format# RDDs保存特定格式if format == "pickle":imageRDD.saveAsPickleFile(output_images)labelRDD.saveAsPickleFile(output_labels)elif format == "csv":imageRDD.map(toCSV).saveAsTextFile(output_images)labelRDD.map(toCSV).saveAsTextFile(output_labels)elif format == "csv2":imageRDD.map(toCSV).zip(labelRDD).map(lambda x: str(x[1]) + "|" + x[0]).saveAsTextFile(output)else: # format == "tfr":tfRDD = imageRDD.zip(labelRDD).map(lambda x: (bytearray(toTFExample(x[0], x[1])), None))# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD.saveAsNewAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileOutputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")
#  Note: this creates TFRecord files w/o requiring a custom Input/Output format
#  else: # format == "tfr":
#    def writeTFRecords(index, iter):
#      output_path = "{0}/part-{1:05d}".format(output, index)
#      writer = tf.python_io.TFRecordWriter(output_path)
#      for example in iter:
#        writer.write(example)
#      return [output_path]
#    tfRDD = imageRDD.zip(labelRDD).map(lambda x: toTFExample(x[0], x[1]))
#    tfRDD.mapPartitionsWithIndex(writeTFRecords).collect()
def readMNIST(sc, output, format):"""Reads/verifies previously created output"""output_images = output + "/images"output_labels = output + "/labels"imageRDD = NonelabelRDD = Noneif format == "pickle":imageRDD = sc.pickleFile(output_images)labelRDD = sc.pickleFile(output_labels)elif format == "csv":imageRDD = sc.textFile(output_images).map(fromCSV)labelRDD = sc.textFile(output_labels).map(fromCSV)else: # format.startswith("tf"):# requires: --jars tensorflow-hadoop-1.0-SNAPSHOT.jartfRDD = sc.newAPIHadoopFile(output, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")imageRDD = tfRDD.map(lambda x: fromTFExample(str(x[0])))num_images = imageRDD.count()num_labels = labelRDD.count() if labelRDD is not None else num_imagessamples = imageRDD.take(10)print("num_images: ", num_images)print("num_labels: ", num_labels)print("samples: ", samples)
if __name__ == "__main__":import argparsefrom pyspark.context import SparkContextfrom pyspark.conf import SparkConfparser = argparse.ArgumentParser()parser.add_argument("-f", "--format", help="output format", choices=["csv","csv2","pickle","tf","tfr"], default="csv")parser.add_argument("-n", "--num-partitions", help="Number of output partitions", type=int, default=10)parser.add_argument("-o", "--output", help="HDFS directory to save examples in parallelized format", default="mnist_data")parser.add_argument("-r", "--read", help="read previously saved examples", action="store_true")parser.add_argument("-v", "--verify", help="verify saved examples after writing", action="store_true")

args = parser.parse_args() print("args:",args) sc = SparkContext(conf=SparkConf().setAppName("mnist_parallelize")) if not args.read: # Note: these files are inside the mnist.zip file writeMNIST(sc, "mnist/train-images-idx3-ubyte.gz", "mnist/train-labels-idx1-ubyte.gz", args.output + "/train", args.format, args.num_partitions) writeMNIST(sc, "mnist/t10k-images-idx3-ubyte.gz", "mnist/t10k-labels-idx1-ubyte.gz", args.output + "/test", args.format, args.num_partitions) if args.read or args.verify: readMNIST(sc, args.output + "/train", args.format)

提交训练任务,开始训练,在HDFS生成mnist_model,命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/train/images \
--labels examples/mnist/csv/train/labels \
--format csv \
--mode train \
--model mnist_model

mnist_dist.py 构建TensorFlow 分布式任务,定义分布式任务主函数,启动TensorFlow主函数map_fun,数据获取方式Feeding。获取TensorFlow集群和服务器实例:

cluster, server = TFNode.start_cluster_server(ctx, 1, args.rdma)

TFNode调用tfspark.zip TFNode.py文件。

mnist_spark.py文件是训练主程序,TensorFlowOnSpark部署步骤:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
import argparse
import os
import numpy
import sys
import tensorflow as tf
import threading
import time
from datetime import datetime
from tensorflowonspark import TFCluster
import mnist_dist
sc = SparkContext(conf=SparkConf().setAppName("mnist_spark"))
executors = sc._conf.get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
num_ps = 1
parser = argparse.ArgumentParser()
parser.add_argument("-b", "--batch_size", help="number of records per batch", type=int, default=100)
parser.add_argument("-e", "--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("-f", "--format", help="example format: (csv|pickle|tfr)", choices=["csv","pickle","tfr"], default="csv")
parser.add_argument("-i", "--images", help="HDFS path to MNIST images in parallelized format")
parser.add_argument("-l", "--labels", help="HDFS path to MNIST labels in parallelized format")
parser.add_argument("-m", "--model", help="HDFS path to save/load model during train/inference", default="mnist_model")
parser.add_argument("-n", "--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("-o", "--output", help="HDFS path to save test/inference output", default="predictions")
parser.add_argument("-r", "--readers", help="number of reader/enqueue threads", type=int, default=1)
parser.add_argument("-s", "--steps", help="maximum number of steps", type=int, default=1000)
parser.add_argument("-tb", "--tensorboard", help="launch tensorboard process", action="store_true")
parser.add_argument("-X", "--mode", help="train|inference", default="train")
parser.add_argument("-c", "--rdma", help="use rdma connection", default=False)
args = parser.parse_args()
print("args:",args)
print("{0} ===== Start".format(datetime.now().isoformat()))
if args.format == "tfr":images = sc.newAPIHadoopFile(args.images, "org.tensorflow.hadoop.io.TFRecordFileInputFormat",keyClass="org.apache.hadoop.io.BytesWritable",valueClass="org.apache.hadoop.io.NullWritable")def toNumpy(bytestr):example = tf.train.Example()example.ParseFromString(bytestr)features = example.features.featureimage = numpy.array(features['image'].int64_list.value)label = numpy.array(features['label'].int64_list.value)return (image, label)dataRDD = images.map(lambda x: toNumpy(str(x[0])))
else:if args.format == "csv":images = sc.textFile(args.images).map(lambda ln: [int(x) for x in ln.split(',')])labels = sc.textFile(args.labels).map(lambda ln: [float(x) for x in ln.split(',')])else: # args.format == "pickle":images = sc.pickleFile(args.images)labels = sc.pickleFile(args.labels)print("zipping images and labels")dataRDD = images.zip(labels)
#1.为在Executor执行每个TensorFlow进程保留一个端口
cluster = TFCluster.run(sc, mnist_dist.map_fun, args, args.cluster_size, num_ps, args.tensorboard, TFCluster.InputMode.SPARK)
#2.启动Tensorflow主函数
cluster.start(mnist_dist.map_fun, args)
if args.mode == "train":#3.训练cluster.train(dataRDD, args.epochs)
else:#3.预测labelRDD = cluster.inference(dataRDD)labelRDD.saveAsTextFile(args.output)
#4.关闭Executor TensorFlow计算节点、参数服务节点
cluster.shutdown()
print("{0} ===== Stop".format(datetime.now().isoformat()))

预测命令:

${SPARK_HOME}/bin/spark-submit \
--master ${MASTER} \
--py-files ${TFoS_HOME}/examples/mnist/spark/mnist_dist.py \
--conf spark.cores.max=${TOTAL_CORES} \
--conf spark.task.cpus=${CORES_PER_WORKER} \
--conf spark.executorEnv.JAVA_HOME="$JAVA_HOME" \
${TFoS_HOME}/examples/mnist/spark/mnist_spark.py \
--cluster_size ${SPARK_WORKER_INSTANCES} \
--images examples/mnist/csv/test/images \
--labels examples/mnist/csv/test/labels \
--mode inference \
--format csv \
--model mnist_model \
--output predictions

还可以Amazon EC2运行及在Hadoop集群采用YARN模式运行。

参考资料: 《TensorFlow技术解析与实战》

欢迎推荐上海机器学习工作机会,我的微信:qingxingfengzi

转载于:https://my.oschina.net/u/3482787/blog/1572538


http://lihuaxi.xjx100.cn/news/238806.html

相关文章

CTO 基本功大盘点 —— 没有这些技能,谈何远大前程?

本文由 「TGO鲲鹏会」原创,原文链接:CTO 基本功大盘点 —— 没有这些技能,谈何远大前程? 作者|刘海星 2018 年马上就要过去六分之一了,你的 KPI 完成多少了? 别沮丧,其实我想说的是&…

vim编辑器异常退出产生备份文件

当非正常关闭vim编辑器时(比如直接关闭终端或者电脑断电),会生成一个.swp文件,这个文件是一个临时交换文件,用来备份缓冲区中的内容。 需要注意的是如果你并没有对文件进行修改,而只是读取文件&#xff0c…

MongoDB给数据库创建用户

转自http://www.imooc.com/article/18439 一.先以非授权的模式启动MongoDB非授权: linux/Mac : mongod -f /mongodb/etc/mongo.confwindows : mongod --config c:\mongodb\etc\mongo.conf 或者 net start mongodb (前提是mongo安装到了服务里面&#xff…

聊天软交互原理_来自不同城市的人们如何在freeCodeCamp聊天室中进行交互

聊天软交互原理by Dborah Mesquita由DborahMesquita 来自不同城市的人们如何在freeCodeCamp聊天室中进行交互 (How people from different cities interact in the freeCodeCamp chatrooms) 推理统计入门以及如何使用spaCy从文本中提取信息 (A primer on Inferential statisti…

linux硬盘满了问题排查

关键指令: df du find step1: 如果发现硬盘满了,首先要确定一下,使用df查看硬盘使用情况 df -h step2: 从第一步结果判定满了,确定哪些文件或哪个文件占了大头,使用du指令做逐步排查&#xff0c…

限制HTTP数据包发送Referer

一般点击一个A标签的时候都会发送 Referer 什么是 Referer&#xff1f; 就是你点击A标签 Referer的信息告诉服务端你从哪里点击出来的 可在HTML上加 <meta name"referrer" content"no-referrer">这样就不发送Referer头了

极速理解设计模式系列:11.单例模式(Singleton Pattern)

单例模式&#xff1a;确保某一个类只有一个实例&#xff0c;而且自行实例化并向整个系统提供这个实例。这个类称为单例类。 三要点&#xff1a; 一、单例类只能有一个实例 二、单例类必须自行创建自身实例 三、单例类自行向整个系统提供实例 类图&#xff1a; 应用场景&#xf…

python中nlp的库_单词袋简介以及如何在Python for NLP中对其进行编码

python中nlp的库by Praveen Dubey通过Praveen Dubey 单词词汇入门以及如何在Python中为NLP 编写代码的简介 (An introduction to Bag of Words and how to code it in Python for NLP) Bag of Words (BOW) is a method to extract features from text documents. These featur…