利用HanLP与Flink/Spark做实时中文分词
这两天又开始忙起来了,继续写小短文。
最近刚刚接了一个实时舆情监控的任务,大体思路是实时接入并处理各微信群内用户的发言。一是从中发现规律,为各群用户生成画像,方便做推广;二是及时将用户的不满与抱怨及时上报给业务方。显然,千里之行的第一步就是将用户的发言做词法分析——通俗来讲就是“分词”。
HanLP(主页见这里,项目repo见这里)是目前Java环境下非常流行的中文NLP工具集。要与Spark或Flink等分布式计算框架一起使用,我们首先得把词典和模型放到共享的文件系统——也就是HDFS上。根据项目readme的描述:
下载data.zip,然后将解压的内容上传到HDFS。如果有自定义词典或模型的话,就一并上传上去。
unzip data-for-1.7.5.zip
hdfs dfs -mkdir /hanlp
hdfs dfs -copyFromLocal ./data /hanlp
然后下载hanlp-release.zip,将解压出来的JAR包放到项目中的特定文件夹(如lib)内,再将配置文件hanlp.properties移入项目的resources文件夹。
HanLP默认的I/O适配器是基于普通文件系统的。要想让它能读取HDFS存储的词典和模型数据,需要自定义新的I/O适配器。
public class HadoopFileIOAdapter implements IIOAdapter {
@Override
public InputStream open(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(path), conf);
return fs.open(new Path(path));
}
@Override
public OutputStream create(String path) throws IOException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(path), conf);
return fs.create(new Path(path));
}
}
再修改hanlp.properties文件,指定数据文件夹的路径与新的文件适配器类名。
root=hdfs://our-cluster-123/hanlp/
IOAdapter=com.xyz.bigdata.nlp.adapter.HadoopFileIOAdapter
最后在pom文件里加入JAR包的依赖。
<dependency>
<groupId>com.hankcs</groupId>
<artifactId>hanlp</artifactId>
<version>${hanlp.version}</version>
<type>jar</type>
<scope>system</scope>
<systemPath>${basedir}/lib/hanlp-${hanlp.version}.jar</systemPath>
</dependency>
接下来就可以写程序了,以Flink为例,输出每句话的分词结果与没每个词的词性:
dataStream.map(str -> {
List<Term> terms = HanLP.segment(str);
List<Tuple2<String, String>> result = new ArrayList<>();
for (Term term : terms) {
result.add(new Tuple2<>(term.word, term.nature.toString()));
}
return result;
});
Spark的代码类似,将map()算子换成mapPartitions()就行。
特别需要注意,HanLP类的静态代码块里初始化了大量对象(词典、模型),并且它们都没有实现Serializable接口。所以,我们不要在transformation算子外面调用HanLP相关的任何逻辑,以免出现Driver向Executor无法序列化传输数据的问题。虽然这样每个Executor都会持有全部的HanLP相关数据,但毕竟处于大数据环境,对这点开销是不太敏感的。
晚安。