博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka实战-Flume到Kafka (转)
阅读量:6573 次
发布时间:2019-06-24

本文共 2232 字,大约阅读时间需要 7 分钟。

原文链接:

1.概述

  前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录:

  • 数据来源
  • Flume到Kafka
  • 数据源加载
  • 预览

  下面开始今天的分享内容。

2.数据来源

  Kafka生产的数据,是由Flume的Sink提供的,这里我们需要用到Flume集群,通过Flume集群将Agent的日志收集分发到 Kafka(供实时计算处理)和HDFS(离线计算处理)。关于Flume集群的Agent部署,这里就不多做赘述了,不清楚的同学可以参考《》一文中的介绍,下面给大家介绍数据来源的流程图,如下图所示:

  这里,我们使用Flume作为日志收集系统,将收集到的数据输送到Kafka中间件,以供Storm去实时消费计算,整个流程从各个Web节点 上,通过Flume的Agent代理收集日志,然后汇总到Flume集群,在由Flume的Sink将日志输送到Kafka集群,完成数据的生产流程。

3.Flume到Kafka

  从图,我们已经清楚了数据生产的流程,下面我们来看看如何实现Flume到Kafka的输送过程,下面我用一个简要的图来说明,如下图所示:

  这个表达了从Flume到Kafka的输送工程,下面我们来看看如何实现这部分。

  首先,在我们完成这部分流程时,需要我们将Flume集群和Kafka集群都部署完成,在完成部署相关集群后,我们来配置Flume的Sink数据流向,配置信息如下所示:

  • 首先是配置spooldir方式,内容如下所示:
producer.sources.s.type = spooldirproducer.sources.s.spoolDir = /home/hadoop/dir/logdfs
  • 当然,Flume的数据发送方类型也是多种类型的,有:Console、Text、HDFS、RPC等,这里我们系统所使用的是Kafka中间件来接收,配置内容如下所示:
producer.sinks.r.type = org.apache.flume.plugins.KafkaSinkproducer.sinks.r.metadata.broker.list=dn1:9092,dn2:9092,dn3:9092producer.sinks.r.partition.key=0 producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition producer.sinks.r.serializer.class=kafka.serializer.StringEncoder producer.sinks.r.request.required.acks=0 producer.sinks.r.max.message.size=1000000 producer.sinks.r.producer.type=sync producer.sinks.r.custom.encoding=UTF-8 producer.sinks.r.custom.topic.name=test

  这样,我们就在Flume的Sink端配置好了数据流向接受方。

4.数据加载

  在完成配置后,接下来我们开始加载数据,首先我们在Flume的spooldir端生产日志,以供Flume去收集这些日志。然后,我们通过Kafka的KafkaOffsetMonitor监控工具,去监控数据生产的情况,下面我们开始加载。

  • 启动ZK集群,内容如下所示:
zkServer.sh start

  注意:分别在ZK的节点上启动。

  • 启动Kafka集群
kafka-server-start.sh config/server.properties &

  在其他的Kafka节点输入同样的命令,完成启动。

  • 启动Kafka监控工具
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \ com.quantifind.kafka.offsetapp.OffsetGetterWeb \ --zk dn1:2181,dn2:2181,dn3:2181 \ --port 8089 \ --refresh 10.seconds \ --retain 1.days
  • 启动Flume集群
flume-ng agent -n producer -c conf -f flume-kafka-sink.properties -Dflume.root.logger=ERROR,console

  然后,我在/home/hadoop/dir/logdfs目录下上传log日志,这里我只抽取了一少部分日志进行上传,如下图所示,表示日志上传成功。

5.预览

  下面,我们通过Kafka的监控工具,来预览我们上传的日志记录,有没有在Kafka中产生消息数据,如下所示:

  • 启动Kafka集群,为生产消息截图预览

  • 通过Flume上传日志,在Kafka中产生消息数据

6.总结

  本篇文章给大家讲述了Kafka的消息产生流程,后续会在Kafka实战系列中为大家讲述Kafka的消息消费流程等一整套流程,这里只是为后续的Kafka实战编码打下一个基础,让大家先对Kafka的消息生产有个整体的认识。

转载地址:http://tkljo.baihongyu.com/

你可能感兴趣的文章
[日常] Go语言圣经--JSON习题2
查看>>
[日常] Go语言圣经-错误,函数值习题
查看>>
高并发秒杀系统分析
查看>>
3. 深入研究 UCenter API 之 加密与解密(转载)
查看>>
Asp.net MVC验证哪些事(3)-- Remote验证及其改进(附源码)
查看>>
php文件处理
查看>>
今天写了个从一张表数据插入到另一张表的oracle 语句
查看>>
Odoo Auto Backup Database And Set Linux task schedualer
查看>>
Java线程专栏文章汇总(转)
查看>>
listview中getview异步加载网络图片
查看>>
【AdaBoost算法】积分图代码实现
查看>>
如何让jquery-easyui的combobox像select那样不可编辑
查看>>
Linq之扩展方法
查看>>
【Bug Fix】Error : Can't create table 'moshop_1.#sql-534_185' (errno: 150)
查看>>
Android DownloadManager 的使用
查看>>
Android数据的四种存储方式
查看>>
上海互联网整体沉沦:盛大巨人全没落 8年没出一个马云
查看>>
fabric批量操作远程操作主机的练习
查看>>
css知多少(7)——盒子模型
查看>>
【转】在Ubuntu上下载、编译和安装Android最新源代码
查看>>