做网站 贴吧,页面设计包括哪些方面,前端是什么工作,黑龙江建筑信息网官网文章目录一、安装docker-compose插件1. 下载docker-compose插件2. 赋予权限二、搭建ELKKAFKA环境2.1. 编写docker-compose2.2. 启动docker-compose2.3. 验证效果2.4. 安装logstash三、微信项目投递消息kafka3.1. 微信集成kafka3.2. 配置kafka3.3. aop拦截3.4. 消息投递3.5. 测试…
文章目录一、安装docker-compose插件1. 下载docker-compose插件2. 赋予权限二、搭建ELKKAFKA环境2.1. 编写docker-compose2.2. 启动docker-compose2.3. 验证效果2.4. 安装logstash三、微信项目投递消息kafka3.1. 微信集成kafka3.2. 配置kafka3.3. aop拦截3.4. 消息投递3.5. 测试接口3.6. apipost 发送请求3.7. kibana 查看日志一、安装docker-compose插件
1. 下载docker-compose插件
curl -L https://github.com/docker/compose/releases/download/1.23.2/docker-compose-$(uname -s)-$(uname -m) -o /usr/local/bin/docker-compose2. 赋予权限
chmod x /usr/local/bin/docker-compose二、搭建ELKKAFKA环境 内存建议4g及以上 2.1. 编写docker-compose
cd /app/
mkdir mayiktelkkafka上传docker-compose.yml
version: 2
services:zookeeper:image: wurstmeister/zookeeperports:- 2181:2181kafka:image: wurstmeister/kafkavolumes:- /etc/localtime:/etc/localtimeports:- 9092:9092environment:KAFKA_ADVERTISED_HOST_NAME: 192.168.122.128KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_PORT: 9092KAFKA_LOG_RETENTION_HOURS: 120KAFKA_MESSAGE_MAX_BYTES: 10000000KAFKA_REPLICA_FETCH_MAX_BYTES: 10000000KAFKA_GROUP_MAX_SESSION_TIMEOUT_MS: 60000KAFKA_NUM_PARTITIONS: 3KAFKA_DELETE_RETENTION_MS: 1000kafka-manager:image: sheepkiller/kafka-managerenvironment:ZK_HOSTS: 192.168.122.128ports:- 9001:9001elasticsearch:image: daocloud.io/library/elasticsearch:6.5.4restart: alwayscontainer_name: elasticsearchports:- 9200:9200kibana:image: daocloud.io/library/kibana:6.5.4restart: alwayscontainer_name: kibanaports:- 5601:5601environment:- elasticsearch_urlhttp://192.168.122.128:9200depends_on:- elasticsearch2.2. 启动docker-compose
docker-compose up这个错误需要你检查一下命令后面是否有多余的空格删除重新运行即可 启动成功后的效果图 成功启动后有5个容器如果容器个数不够根据容器ID查看日志我使用的是虚拟机启动后es容器启动失败查查看日志 异常信息解决方案-跳转max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
2.3. 验证效果
访问zkhttp://192.168.122.128:2181 访问eshttp://192.168.122.128:9200 访问kibanahttp://192.168.122.128:5601/app/kibana#/home?_g()
2.4. 安装logstash 提前安装jdk环境logstash需要https://blog.csdn.net/weixin_40816738/article/details/108532702 上传或者下载logstash-6.4.3.tar.gz到服务器中
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.4.3.tar.gz解压
tar -zxvf logstash-6.4.3.tar.gz安装插件
cd logstash-6.4.3
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch编写配置文件
cd config
vim elk-kafka.conf内容如下
input {kafka {bootstrap_servers 192.168.122.128:9092topics mayikt-log}
}
filter {#Only matched data are send to output.
}output {elasticsearch {action index #The operation on EShosts 192.168.122.128:9200 #Ellasticsearch host,can be array.index mayikt_logs #The index towrite data to.}
}启动logstash
cd bin
./logstash -f ../config/elk-kafka.conf三、微信项目投递消息kafka
3.1. 微信集成kafka dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency3.2. 配置kafka
bootstrap.yml
spring:kafka:bootstrap-servers: 192.168.122.128:9092 #指定kafka server的地址集群配多个中间逗号隔开producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default_consumer_group #群组IDenable-auto-commit: trueauto-commit-interval: 1000key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.3. aop拦截
package com.mayikt.api.impl.elk.log;import com.alibaba.fastjson.JSONObject;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;/*** * elkkafka采集*/
Aspect
Component
public class AopLogAspect {Value(${server.port})private String serverPort;Autowiredprivate LogContainer logContainer;// 申明一个切点 里面是 execution表达式Pointcut(execution(* com.mayikt.api.impl.*.*.*(..)))private void serviceAspect() {}//// 请求method前打印内容Before(value serviceAspect())public void methodBefore(JoinPoint joinPoint) {ServletRequestAttributes requestAttributes (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request requestAttributes.getRequest();JSONObject jsonObject new JSONObject();SimpleDateFormat df new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);// 设置日期格式jsonObject.put(request_time, df.format(new Date()));jsonObject.put(request_url, request.getRequestURL().toString());jsonObject.put(request_method, request.getMethod());jsonObject.put(signature, joinPoint.getSignature());jsonObject.put(request_args, Arrays.toString(joinPoint.getArgs()));// IP地址信息jsonObject.put(ip_addres, getIpAddr(request) : serverPort);JSONObject requestJsonObject new JSONObject();requestJsonObject.put(request, jsonObject);jsonObject.put(request_time, df.format(new Date()));jsonObject.put(log_type, info);// 将日志信息投递到kafka中String log requestJsonObject.toJSONString();logContainer.putLog(log);}//// 在方法执行完结后打印返回内容AfterReturning(returning o, pointcut serviceAspect())public void methodAfterReturing(Object o) {ServletRequestAttributes requestAttributes (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request requestAttributes.getRequest();JSONObject respJSONObject new JSONObject();JSONObject jsonObject new JSONObject();SimpleDateFormat df new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);// 设置日期格式jsonObject.put(response_time, df.format(new Date()));jsonObject.put(response_content, JSONObject.toJSONString(o));// IP地址信息jsonObject.put(ip_addres, getIpAddr(request) : serverPort);jsonObject.put(log_type, info);respJSONObject.put(response, jsonObject);// 将日志信息投递到kafka中
// kafkaTemplate.send(mayikt-log,respJSONObject.toJSONString());logContainer.putLog(respJSONObject.toJSONString());}
//
///*** 异常通知** param point*/AfterThrowing(pointcut serviceAspect(), throwing e)public void serviceAspect(JoinPoint point, Exception e) {ServletRequestAttributes requestAttributes (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();HttpServletRequest request requestAttributes.getRequest();JSONObject jsonObject new JSONObject();SimpleDateFormat df new SimpleDateFormat(yyyy-MM-dd HH:mm:ss);// 设置日期格式jsonObject.put(request_time, df.format(new Date()));jsonObject.put(request_url, request.getRequestURL().toString());jsonObject.put(request_method, request.getMethod());jsonObject.put(signature, point.getSignature());jsonObject.put(request_args, Arrays.toString(point.getArgs()));jsonObject.put(error, e.toString());// IP地址信息jsonObject.put(ip_addres, getIpAddr(request) : serverPort);jsonObject.put(log_type, error);JSONObject requestJsonObject new JSONObject();requestJsonObject.put(request, jsonObject);// 将日志信息投递到kafka中String log requestJsonObject.toJSONString();logContainer.putLog(log);}
//public static String getIpAddr(HttpServletRequest request) {//X-Forwarded-ForXFF是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。String ipAddress request.getHeader(x-forwarded-for);if (ipAddress null || ipAddress.length() 0 || unknown.equalsIgnoreCase(ipAddress)) {ipAddress request.getHeader(Proxy-Client-IP);}if (ipAddress null || ipAddress.length() 0 || unknown.equalsIgnoreCase(ipAddress)) {ipAddress request.getHeader(WL-Proxy-Client-IP);}if (ipAddress null || ipAddress.length() 0 || unknown.equalsIgnoreCase(ipAddress)) {ipAddress request.getRemoteAddr();if (ipAddress.equals(127.0.0.1) || ipAddress.equals(0:0:0:0:0:0:0:1)) {//根据网卡取本机配置的IPInetAddress inet null;try {inet InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}ipAddress inet.getHostAddress();}}//对于通过多个代理的情况第一个IP为客户端真实IP,多个IP按照,分割if (ipAddress ! null ipAddress.length() 15) { //***.***.***.***.length() 15if (ipAddress.indexOf(,) 0) {ipAddress ipAddress.substring(0, ipAddress.indexOf(,));}}return ipAddress;}
}3.4. 消息投递
package com.mayikt.api.impl.elk.log;import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import java.util.concurrent.LinkedBlockingDeque;Component
public class LogContainer {private LogThread logThread;Autowiredprivate KafkaTemplateString, Object kafkaTemplate;public LogContainer() {logThread new LogThread();logThread.start();}private static LinkedBlockingDequeString logs new LinkedBlockingDeque();/*** 存入一条日志消息到并发队列中** param log*/public void putLog(String log) {logs.offer(log);}/*** 异步日志线程 实时从队列中获取内容*/class LogThread extends Thread {Overridepublic void run() {while (true) {/*** 代码的优化* 当前线程批量获取多条日志消息 投递kafka 批量**/String log logs.poll();if (!StringUtils.isEmpty(log)) {/// 将该消息投递到kafka中 批量形式投递kafkakafkaTemplate.send(mayikt-log, log);}}}}}
3.5. 测试接口
package com.mayikt.api.weixin;import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;public interface WeChatService {/*** feign rpc远程调用 405* param a* return*/GetMapping(/getWeChat)String getWeChat( RequestParam(a)Integer a);
}
3.6. apipost 发送请求
http://localhost:9000/getWeChat?a1234568883.7. kibana 查看日志