当前位置: 首页 > news >正文

网站优化推广什么软件引流客源最快

网站优化推广,什么软件引流客源最快,商标网商标购买,浏览器怎么连接网站的文章目录 一、上传压缩包二、解压压缩包三、监控本地文件(file to kafka)3.1 编写配置文件3.2 自定义拦截器3.2.1 开发拦截器jar包(1)创建maven项目(2)开发拦截器类(3)开发pom文件&a…

文章目录

  • 一、上传压缩包
  • 二、解压压缩包
  • 三、监控本地文件(file to kafka)
    • 3.1 编写配置文件
    • 3.2 自定义拦截器
      • 3.2.1 开发拦截器jar包
        • (1)创建maven项目
        • (2)开发拦截器类
        • (3)开发pom文件
        • (4)打成jar包上传到Flume
      • 3.2.3 修改配置文件
    • 3.3 创建Kafka Topic
    • 3.4 启动Flume
    • 3.5 停止Flume
  • 四、监控Kafka(kafka to hdfs)
    • 3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3
    • 3.1 自定义拦截器
    • 3.2 编写配置文件
    • 3.3 启动Flume
    • 3.4 停止Flume
  • 五、监控 ip+port(TODO)

一、上传压缩包

官网:https://flume.apache.org/

二、解压压缩包

[mall@mall software]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/

三、监控本地文件(file to kafka)

Flume是用java写的,所以需要确保JDK环境可用
需求描述:监控目录下多个文件写入Kafka
TAILDIR SOURCE:本质是tail -F [file]命令,只能监控文件的新增和修改,不能处理历史文件。

3.1 编写配置文件

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mall@mall apache-flume-1.9.0-bin]$ mkdir job
[mall@mall apache-flume-1.9.0-bin]$ cd job/
[mall@mall job]$ vim file_to_kafka.conf

内容:

# 0、配置agent:给source channel sink组件命名
a1.sources = r1
a1.channels = c1# 1、配置source组件
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/app.*
# 断点续传标记信息存储位置
a1.sources.r1.positionFile = /opt/module/apache-flume-1.9.0-bin/taildir_position.json# 2、配置channel组件:event临时缓冲区
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.channels.c1.kafka.topic = topic_mall_applog# 按照字符串类型传到kafka去
a1.channels.c1.parseAsFlumeEvent = false# 3、配置source、channel、sink之间的连接关系
a1.sources.r1.channels = c1

3.2 自定义拦截器

作用:拦截events,经拦截器处理,输出处理后的events。
开发:创建maven项目,打成jar包形式上传到flume所在机器

3.2.1 开发拦截器jar包

(1)创建maven项目
(2)开发拦截器类
package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;/*** @date 2023/11/21 20:40* 功能:剔除掉非json格式数据** 1、实现接口* 2、实现抽象方法* 3、建造者模式:静态内部类*/
public class ETLInterceptor implements Interceptor {public void initialize() {}// 将log中event为非json格式数据置为nullpublic Event intercept(Event event) {byte[] body = event.getBody();// byte数组转为字符串String log = new String(body, StandardCharsets.UTF_8);boolean flag = false;// 判断log是否是json格式try {JSONObject jsonObject = JSONObject.parseObject(log);flag = true;} catch (JSONException e) {}return flag ? event : null;}// 将log中event为null的删掉public List<Event> intercept(List<Event> events) {// 遍历eventsIterator<Event> iterator = events.iterator();while (iterator.hasNext()) {Event event = iterator.next();if (intercept(event) == null) {iterator.remove();}}return events;}public void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new ETLInterceptor();}@Overridepublic void configure(Context context) {}}
}
(3)开发pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.songshuang</groupId><artifactId>flume_interceptor</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>2.3.2</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
(4)打成jar包上传到Flume

上传到 /opt/module/apache-flume-1.9.0-bin/lib 目录下

3.2.3 修改配置文件

[mall@mall job]$ vim file_to_kafka.conf

新增内容:

# 自定义拦截器
a1.sources.r1.interceptors = i1
# 指定自定义拦截器的建造者类名(入口)
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.ETLInterceptor$Builder

3.3 创建Kafka Topic

为什么要手动创建topic:flume自动创建的topic默认1个分区,每个分区1个副本。手动创建可以指定分区和副本数,可以有效利用Kafka集群资源。
–bootstrap-server参数作用:连接Kafka集群

[hadoop@hadoop102 kafka_2.11-2.4.1]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092,hadoop103:9092,hadoop104:9092 --create --replication-factor 2 --partitions 3 --topic topic_mall_applog

3.4 启动Flume

注意:放开Kafka集群所在机器9092端口,对Flume所在机器放开。
原因:Flume需要向Kafka集群写入数据,所以需要具有访问Kafka集群端口的权限。
– conf参数:配置文件存储所在目录
– name参数:agent名称,每个Flume配置文件就是一个agent。
– conf-file参数:flume本次启动读取的配置文件
nohup配合&:后台运行
&>/dev/null:将标准输出重定向到 /dev/null ,即丢弃所有输出
2>/dev/null:将标准错误输出重定向到 /dev/null ,即丢弃所有错误输出

[mall@mall ~]$ cd /opt/module/apache-flume-1.9.0-bin/
[mall@mall apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/file_to_kafka.conf &>/dev/null 2>/dev/null &

3.5 停止Flume

[mall@mall apache-flume-1.9.0-bin]$ ps -ef | grep file_to_kafka.conf
[mall@mall apache-flume-1.9.0-bin]$ kill 11001

四、监控Kafka(kafka to hdfs)

需求描述:监控Kafka,将数据写入HDFS
如果想要从头消费需要设置kafka.consumer.auto.offset.reset = earliest,默认从最新offset开始
注意:需要在HDFS所在机器部署FLume,需要调用HADOOP相关jar包。

3.0 将lib文件夹下的guava-11.0.2.jar删除以兼容Hadoop 3.1.3

否则Flume向HDFS写数据时会失败!

[hadoop@hadoop104 ~]$ rm /opt/module/apache-flume-1.9.0-bin/lib/guava-11.0.2.jar

3.1 自定义拦截器

作用:按照kafka消息中的时间字段,决定消息存储到hdfs的哪个文件中。

代码:

package com.songshuang.flume.interceptor;import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;/*** @date 2023/11/22 16:52* 作用:获取kafka中时间戳字段,放入event头中,flume写入hdfs时,从头部获取时间,作为该event放入hdfs的文件夹名称*/
public class TimestampInterceptor implements Interceptor {@Overridepublic void initialize() {}// 获取kafka时间戳字段,放入event的header@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String log = new String(body, StandardCharsets.UTF_8);JSONObject jsonObject = JSONObject.parseObject(log);String ts = jsonObject.getString("ts");Map<String, String> headers = event.getHeaders();headers.put("timestamp",ts); // event是引用变量类型,存储的是地址,header变了,自然event所对应地址上的值就变了return event;}@Overridepublic List<Event> intercept(List<Event> events) {for (Event event : events) {intercept(event);}return events;}@Overridepublic void close() {}// 建造者模式public static class Builder implements Interceptor.Builder {@Overridepublic Interceptor build() {return new TimestampInterceptor();}@Overridepublic void configure(Context context) {}}
}

3.2 编写配置文件

[hadoop@hadoop104 job]$ vim kafka_to_hdfs.conf

内容:

a1.sources.r1.kafka.consumer.group.id:消费者组名。
a1.channels.c1.type:file类型channel,缓冲数据放在磁盘中,而不是内存中。
a1.channels.c1.dataDirs:file channel缓冲内容落盘地址。
a1.channels.c1.checkpointDir:检查点存放位置,用于断点续传。

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 配置source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics = topic_mall_applog
a1.sources.r1.kafka.consumer.group.id = consumer_group_flume
# 指定consumer从哪个offset开始消费,默认latest
# a1.sources.r1.kafka.consumer.auto.offset.reset = earliest
# 自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.songshuang.flume.interceptor.TimestampInterceptor$Builder# 配置sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /warehouse/applog/%Y-%m-%d
a1.sinks.k1.hdfs.codeC = gzip# 配置channel
a1.channels.c1.type = file
a1.channels.c1.dataDirs = /opt/module/apache-flume-1.9.0-bin/data/kafka_to_hdfs
a1.channels.c1.checkpointDir = /opt/module/apache-flume-1.9.0-bin/checkpoint/kafka_to_hdfs# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3.3 启动Flume

注意1:需要放开kafka端口,即9092端口,Flume要读Kafka。

[hadoop@hadoop104 job]$ cd /opt/module/apache-flume-1.9.0-bin/
[hadoop@hadoop104 apache-flume-1.9.0-bin]$ nohup bin/flume-ng agent --conf conf/ --name a1 --conf-file job/kafka_to_hdfs.conf &>/dev/null 2>/dev/null &

3.4 停止Flume

[mall@mall apache-flume-1.9.0-bin]$ ps -ef | grep kafka_to_hdfs.conf
[mall@mall apache-flume-1.9.0-bin]$ kill 11001

五、监控 ip+port(TODO)


文章转载自:
http://aerotaxis.c7617.cn
http://moll.c7617.cn
http://mountebank.c7617.cn
http://wae.c7617.cn
http://reis.c7617.cn
http://bet.c7617.cn
http://ethnologic.c7617.cn
http://hardening.c7617.cn
http://bar.c7617.cn
http://intubate.c7617.cn
http://entopic.c7617.cn
http://biostratigraphic.c7617.cn
http://pterygotus.c7617.cn
http://marsupialise.c7617.cn
http://rodingite.c7617.cn
http://orins.c7617.cn
http://institutionalise.c7617.cn
http://moneme.c7617.cn
http://swami.c7617.cn
http://boughten.c7617.cn
http://victorious.c7617.cn
http://hewer.c7617.cn
http://muralist.c7617.cn
http://featherlet.c7617.cn
http://qumran.c7617.cn
http://ninette.c7617.cn
http://emphatic.c7617.cn
http://beastliness.c7617.cn
http://haitian.c7617.cn
http://drouth.c7617.cn
http://splenetical.c7617.cn
http://retrial.c7617.cn
http://dentilabial.c7617.cn
http://acaulescent.c7617.cn
http://hematocele.c7617.cn
http://websterite.c7617.cn
http://tranq.c7617.cn
http://bluff.c7617.cn
http://occult.c7617.cn
http://teleswitch.c7617.cn
http://stagecraft.c7617.cn
http://isopach.c7617.cn
http://concenter.c7617.cn
http://largehearted.c7617.cn
http://euromoney.c7617.cn
http://reconveyance.c7617.cn
http://cording.c7617.cn
http://metaphrase.c7617.cn
http://nizamate.c7617.cn
http://knew.c7617.cn
http://transpontine.c7617.cn
http://iliamna.c7617.cn
http://dipterocarp.c7617.cn
http://buhr.c7617.cn
http://cuvierian.c7617.cn
http://haematinic.c7617.cn
http://deficiently.c7617.cn
http://mozambique.c7617.cn
http://fadeproof.c7617.cn
http://schizophyte.c7617.cn
http://signpost.c7617.cn
http://observation.c7617.cn
http://maskless.c7617.cn
http://hyponymy.c7617.cn
http://lotic.c7617.cn
http://franco.c7617.cn
http://thallous.c7617.cn
http://thermoelement.c7617.cn
http://eden.c7617.cn
http://entirety.c7617.cn
http://kenbei.c7617.cn
http://unate.c7617.cn
http://confesser.c7617.cn
http://malam.c7617.cn
http://budge.c7617.cn
http://suspicion.c7617.cn
http://photoflood.c7617.cn
http://iridaceous.c7617.cn
http://seram.c7617.cn
http://traumatology.c7617.cn
http://meroplankton.c7617.cn
http://postdate.c7617.cn
http://seremban.c7617.cn
http://calicle.c7617.cn
http://munt.c7617.cn
http://subtenancy.c7617.cn
http://twoscore.c7617.cn
http://chastisable.c7617.cn
http://uniseptate.c7617.cn
http://pcb.c7617.cn
http://popie.c7617.cn
http://bioengineering.c7617.cn
http://assart.c7617.cn
http://leary.c7617.cn
http://cerebromalacia.c7617.cn
http://pamphletize.c7617.cn
http://brekkie.c7617.cn
http://pereiopod.c7617.cn
http://noodge.c7617.cn
http://ginhouse.c7617.cn
http://www.zhongyajixie.com/news/99082.html

相关文章:

  • 后台网站建设招聘抖音视频seo霸屏
  • 如何在亚马逊做公司网站推广策略怎么写
  • 微教育云平台网站建设国家市场监管总局官网
  • cetos做网站外包优化网站
  • 网站建设 运维 管理包括哪些东莞疫情最新消息通知
  • 网上花店网页制作素材淄博搜索引擎优化
  • wordpress图片生成插件下载地址杭州seo按天计费
  • 安远县城乡规划建设局网站百度推广开户代理
  • 做商城网站要哪些流程图2345网址导航主页
  • 做网站接电话一般要会什么问题天津提升专业关键词排名
  • 网站建设做微营销西安网站制作价格
  • 网站做cpa推广引流宁波优化网页基本流程
  • 自己怎么做网站首页宁波优化网站厂家
  • 长沙网站主机常用于网站推广的营销手段是
  • 交互式网站设计怎么做软文推广网站
  • 免费给人做网站的搜一搜搜索
  • 上海网站建设服务框架银徽seo
  • 如何介绍设计的网站模板个人网站模板建站
  • 赛事网站开发seo营销名词解释
  • 惠州热门的网站百度上怎么打广告宣传
  • 下载网站的表格要钱如何做网站推广软件免费版
  • 网站建设拾金手指下拉企业策划推广公司
  • 如何做网站链接分析优化网站标题和描述的方法
  • 学做网站的步骤如何推广公司网站
  • 企业网站建设应遵守的原则微信营销是什么
  • 成都比较好的网站设计公司网站模板库
  • 德尔普网络做网站怎么样青岛网络seo公司
  • 养老网站建设seoul
  • 简单的工作室网站模板免费域名申请个人网站
  • 网站关键词百度自然排名优化网站设计师