MXNet Based on ps-kafka
configure
- modify MXNet的Makefile
- modify ps-kafka的Makefile和make/*.mk
- 修改了MXNet和dmlc-core的一小部分,用来解析
--brokers
- 修改了ps-lite,变成基于kafka的ps-kafka
- 修改kafka配置,根据已有实验MXNet单个消息大小可以达到1600000bytes,rdkafka默认message.max.bytes为1000000。
-
需要修改Broker中的 message.max.bytes=104857600
-
Broker中的socket.request.max.bytes大于message.max.byte
-
修改Broker中的fetch.message.max.bytes大于message.max.byte
-
配置rdkafka的Producer和Consumer的message.max.bytes = message.max.byte。
-
配置rdkafka的C的fetch.message.max.bytes >= message.max.bytes
-
Kafka的Topic的partitions <= brokre数量
-
重启Kafka
参考rdkafka docs和Kafka docs
-
broker.id=0
listeners=PLAINTEXT://YourThisHostIPorHostnameYouMustUseItAlways:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
#########################################################
delete.topic.enable=true
fetch.message.max.bytes=104857600
message.max.bytes=104857600
#auto.create.topics.enable=true
num.partitions=3
default.replication.factor=1
connections.max.idle.ms=1209600000 #2 weeks
socket.request.max.bytes=104857600
编译安装
- 所有结点需要升级gcc到5.4,以便支持c++11,并更新到GLIBCXX_3.4.21
参考:https://www.jianshu.com/p/f7cd0e2416b9 - bin/im2rec的错误: 在MXNet的Makefile注释掉即可
编译
sudo yum install build-essential git
sudo yum install lapack-devel openblas-devel opencv-devel
# sudo apt-get install -y build-essential libatlas-base-dev libopencv-dev graphviz
git clone --recursive https://github.com/gbxu/mxnet-kafka.git
cd mxnet-kafka
# cmake -D USE_CUDA=OFF .
make clean_all
make -j $(nproc) USE_OPENCV=1 USE_BLAS=openblas USE_DIST_KVSTORE=1 USE_PROFILER=1
更新mxnet 和 submodule
git pull
#cd ps-kafka 然后 git fetch 与 git merge origin/master或者:
git submodule update --remote 3rdparty/ps-kafka
安装
sudo pip uninstall mxnet
cd python
sudo pip install --upgrade pip
sudo pip install -e .
运行
启动kafka broker
- 创建三个topic,命令格式如下:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TOSCHEDULER
- 删除topic命令:
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic "TOWORKERS"
- 后期增加partitions
./kafka-topics.sh --zookeeper node14:2181 --alter --topic TOWORKERS --partitions 5
- topic及需要的partitions数量:
topic | partitions |
---|---|
TOSERVERS |
-s +1 |
TOWORKERS |
-n +1 |
TOSCHEDULER |
1 |
运行 MXNet-kafka
参考MXNet运行(分布式+动态库),复制库到example下的mxnet文件夹中,并为其他节点配置环境。
最后的运行命令改为:
python ../../tools/launch.py -s 1 -n 1 --launcher ssh -H hosts --brokers node14:9092,node15:9092,node16:9092 --sync-dst-dir /home/xugb/kafka_test/ python train_mnist.py --network lenet --kv-store dist_sync
或者在python文件中增加一句环境变量即可
os.environ['BROKERS']='node14'
保存结果:
将上述命令写到out_1_1.sh,再运行下述命令即可:
nohup ./out_1_1.sh >/home/xugb/out/out14/kfk_s1n1_1.txt &
查看当前结果:
tail -n 2 kfk_s1n1_1.txt
==========
手动测试:
export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=worker; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync
export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=server; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1; cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync
export DMLC_PS_ROOT_URI=10.0.0.14; export DMLC_ROLE=scheduler; export DMLC_PS_ROOT_PORT=9091; export BROKERS=node14:9092,node15:9092,node16:9092; export DMLC_NUM_WORKER=1; export DMLC_NUM_SERVER=1;cd /home/xugb/ic_kafka_test/; python train_mnist.py --network lenet --kv-store dist_sync
==============
本地:
export DMLC_PS_ROOT_URI=127.0.0.1; export DMLC_PS_ROOT_PORT=8000; export DMLC_NUM_SERVER=1; export DMLC_NUM_WORKER=1; export BROKERS=localhost:9092; export DMLC_ROLE=scheduler;
export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=server;
export DMLC_PS_ROOT_URI=127.0.0.1;export DMLC_PS_ROOT_PORT=8000;export DMLC_NUM_SERVER=1;export DMLC_NUM_WORKER=1;export BROKERS=localhost:9092;export DMLC_NODE_HOST=127.0.0.1;export DMLC_ROLE=worker;