12. elk+kafka+filebeat

2020-04-17  本文已影响0人  _zt_d58b

一、elasticsearch

二、kibana

三、kafka

persistent-volume.yaml

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk1

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

---

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk2

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

---

kind: PersistentVolume

apiVersion: v1

metadata:

  name: k8s-pv-zk3

  namespace: viomi-kafka

  annotations:

    volume.beta.kubernetes.io/storage-class: "anything"

  labels:

    type: local

spec:

  capacity:

    storage: 3Gi

  accessModes:

    - ReadWriteOnce

  hostPath:

    path: "/data/zookeeper"

  persistentVolumeReclaimPolicy: Recycle

zookeeper.yaml

apiVersion: v1

kind: Service

metadata:

  name: zk-hs

  namespace: viomi-kafka

  labels:

    app: zk

spec:

  ports:

  - port: 2888

    name: server

  - port: 3888

    name: leader-election

  clusterIP: None

  selector:

    app: zk

---

apiVersion: v1

kind: Service

metadata:

  name: zk-cs

  namespace: viomi-kafka

  labels:

    app: zk

spec:

  ports:

  - port: 2181

    name: client

  selector:

    app: zk

---

apiVersion: policy/v1beta1

kind: PodDisruptionBudget

metadata:

  name: zk-pdb

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

      app: zk

  maxUnavailable: 1

---

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: zk

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

      app: zk

  serviceName: zk-hs

  replicas: 3

  updateStrategy:

    type: RollingUpdate

  podManagementPolicy: Parallel

  template:

    metadata:

      labels:

        app: zk

    spec:

      affinity:

        podAntiAffinity:

          requiredDuringSchedulingIgnoredDuringExecution:

            - labelSelector:

                matchExpressions:

                  - key: "app"

                    operator: In

                    values:

                    - zk

              topologyKey: "kubernetes.io/hostname"

      containers:

      - name: kubernetes-zookeeper

        imagePullPolicy: IfNotPresent

        image: "hub.kce.ksyun.com/yunmi-infra/viomi/zookeeper:3.4.10"

        resources:

          requests:

            memory: "100Mi"

            cpu: "0.1"

        ports:

        - containerPort: 2181

          name: client

        - containerPort: 2888

          name: server

        - containerPort: 3888

          name: leader-election

        command:

        - sh

        - -c

        - "start-zookeeper \

          --servers=3 \

          --data_dir=/var/lib/zookeeper/data \

          --data_log_dir=/var/lib/zookeeper/data/log \

          --conf_dir=/opt/zookeeper/conf \

          --client_port=2181 \

          --election_port=3888 \

          --server_port=2888 \

          --tick_time=2000 \

          --init_limit=10 \

          --sync_limit=5 \

          --heap=512M \

          --max_client_cnxns=60 \

          --snap_retain_count=3 \

          --purge_interval=12 \

          --max_session_timeout=40000 \

          --min_session_timeout=4000 \

          --log_level=INFO"

        readinessProbe:

          exec:

            command:

            - sh

            - -c

            - "zookeeper-ready 2181"

          initialDelaySeconds: 10

          timeoutSeconds: 5

        livenessProbe:

          exec:

            command:

            - sh

            - -c

            - "zookeeper-ready 2181"

          initialDelaySeconds: 10

          timeoutSeconds: 5

        volumeMounts:

        - name: datadir

          mountPath: /data/zookeeper

      securityContext:

        runAsUser: 1000

        fsGroup: 1000

  volumeClaimTemplates:

  - metadata:

      name: datadir

      annotations:

        volume.beta.kubernetes.io/storage-class: "anything"

    spec:

      accessModes: [ "ReadWriteOnce" ]

      resources:

        requests:

          storage: 3Gi

apiVersion: apps/v1

kind: StatefulSet

metadata:

  name: kafka

  namespace: viomi-kafka

spec:

  selector:

    matchLabels:

        app: kafka

  serviceName: kafka-svc

  replicas: 3

  template:

    metadata:

      labels:

        app: kafka

    spec:

      nodeSelector:

          travis.io/schedule-only: "kafka"

      tolerations:

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "NoSchedule"

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "NoExecute"

        tolerationSeconds: 3600

      - key: "travis.io/schedule-only"

        operator: "Equal"

        value: "kafka"

        effect: "PreferNoSchedule"

      affinity:

        podAntiAffinity:

          requiredDuringSchedulingIgnoredDuringExecution:

            - labelSelector:

                matchExpressions:

                  - key: "app"

                    operator: In

                    values:

                    - kafka

              topologyKey: "kubernetes.io/hostname"

        podAffinity:

          preferredDuringSchedulingIgnoredDuringExecution:

            - weight: 1

              podAffinityTerm:

                labelSelector:

                    matchExpressions:

                      - key: "app"

                        operator: In

                        values:

                        - zk

                topologyKey: "kubernetes.io/hostname"

      terminationGracePeriodSeconds: 300

      containers:

      - name: k8s-kafka

        imagePullPolicy: Always

        #        image: hub.kce.ksyun.com/yunmi-infra/viomi/kafka:latest

        image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-kafka:2.11-1.1.1

        resources:

          requests:

            memory: "600Mi"

            cpu: 500m

        ports:

        - containerPort: 9092

          name: server

        command:

        - sh

        - -c

        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \

          --override listeners=PLAINTEXT://:9092 \

          --override zookeeper.connect=zk-0.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-1.zk-hs.viomi-kafka.svc.cluster.local:2181,zk-2.zk-hs.viomi-kafka.svc.cluster.local:2181 \

          --override log.dir=/var/lib/kafka \

          --override auto.create.topics.enable=true \

          --override auto.leader.rebalance.enable=true \

          --override background.threads=10 \

          --override compression.type=producer \

          --override delete.topic.enable=false \

          --override leader.imbalance.check.interval.seconds=300 \

          --override leader.imbalance.per.broker.percentage=10 \

          --override log.flush.interval.messages=9223372036854775807 \

          --override log.flush.offset.checkpoint.interval.ms=60000 \

          --override log.flush.scheduler.interval.ms=9223372036854775807 \

          --override log.retention.bytes=-1 \

          --override log.retention.hours=12 \

          --override log.roll.hours=12 \

          --override log.roll.jitter.hours=0 \

          --override log.segment.bytes=1073741824 \

          --override log.segment.delete.delay.ms=60000 \

          --override message.max.bytes=1000012 \

          --override min.insync.replicas=1 \

          --override num.io.threads=8 \

          --override num.network.threads=3 \

          --override num.recovery.threads.per.data.dir=1 \

          --override num.replica.fetchers=1 \

          --override offset.metadata.max.bytes=4096 \

          --override offsets.commit.required.acks=-1 \

          --override offsets.commit.timeout.ms=5000 \

          --override offsets.load.buffer.size=5242880 \

          --override offsets.retention.check.interval.ms=600000 \

          --override offsets.retention.minutes=1440 \

          --override offsets.topic.compression.codec=0 \

          --override offsets.topic.num.partitions=50 \

          --override offsets.topic.replication.factor=3 \

          --override offsets.topic.segment.bytes=104857600 \

          --override queued.max.requests=500 \

          --override quota.consumer.default=9223372036854775807 \

          --override quota.producer.default=9223372036854775807 \

          --override replica.fetch.min.bytes=1 \

          --override replica.fetch.wait.max.ms=500 \

          --override replica.high.watermark.checkpoint.interval.ms=5000 \

          --override replica.lag.time.max.ms=10000 \

          --override replica.socket.receive.buffer.bytes=65536 \

          --override replica.socket.timeout.ms=30000 \

          --override request.timeout.ms=30000 \

          --override socket.receive.buffer.bytes=102400 \

          --override socket.request.max.bytes=104857600 \

          --override socket.send.buffer.bytes=102400 \

          --override unclean.leader.election.enable=true \

          --override zookeeper.session.timeout.ms=6000 \

          --override zookeeper.set.acl=false \

          --override broker.id.generation.enable=true \

          --override connections.max.idle.ms=600000 \

          --override controlled.shutdown.enable=true \

          --override controlled.shutdown.max.retries=3 \

          --override controlled.shutdown.retry.backoff.ms=5000 \

          --override controller.socket.timeout.ms=30000 \

          --override default.replication.factor=1 \

          --override fetch.purgatory.purge.interval.requests=1000 \

          --override group.max.session.timeout.ms=300000 \

          --override group.min.session.timeout.ms=6000 \

          --override inter.broker.protocol.version=0.11.0.3 \

          --override log.cleaner.backoff.ms=15000 \

          --override log.cleaner.dedupe.buffer.size=134217728 \

          --override log.cleaner.delete.retention.ms=86400000 \

          --override log.cleaner.enable=true \

          --override log.cleaner.io.buffer.load.factor=0.9 \

          --override log.cleaner.io.buffer.size=524288 \

          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \

          --override log.cleaner.min.cleanable.ratio=0.5 \

          --override log.cleaner.min.compaction.lag.ms=0 \

          --override log.cleaner.threads=1 \

          --override log.cleanup.policy=delete \

          --override log.index.interval.bytes=4096 \

          --override log.index.size.max.bytes=10485760 \

          --override log.message.timestamp.difference.max.ms=9223372036854775807 \

          --override log.message.timestamp.type=CreateTime \

          --override log.preallocate=false \

          --override log.retention.check.interval.ms=300000 \

          --override max.connections.per.ip=2147483647 \

          --override num.partitions=4 \

          --override producer.purgatory.purge.interval.requests=1000 \

          --override replica.fetch.backoff.ms=1000 \

          --override replica.fetch.max.bytes=1048576 \

          --override replica.fetch.response.max.bytes=10485760 \

          --override reserved.broker.max.id=1000 "

        env:

        - name: KAFKA_HEAP_OPTS

          value : "-Xmx512M -Xms512M"

        - name: KAFKA_OPTS

          value: "-Dlogging.level=INFO"

        readinessProbe:

          tcpSocket:

            port: 9092

          timeoutSeconds: 1

          initialDelaySeconds: 5

      securityContext:

        runAsUser: 0

        fsGroup: 1000

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-1

  name: zookeeper-cluster1

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-1

---

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-2

  name: zookeeper-cluster2

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-2

---

apiVersion: v1

kind: Service

metadata:

  labels:

    app: zookeeper-cluster-service-3

  name: zookeeper-cluster3

  namespace: viomi-kafka

spec:

  ports:

  - name: client

    port: 2181

    protocol: TCP

  - name: follower

    port: 2888

    protocol: TCP

  - name: leader

    port: 3888

    protocol: TCP

  selector:

    app: zookeeper-cluster-service-3

kafka_manager.yaml

apiVersion: apps/v1

kind: Deployment

metadata:

  name: kafka-manager

  namespace: viomi-kafka

spec:

  replicas: 1

  selector:

    matchLabels:

      app: kafka-manager

  template:

    metadata:

      labels:

        app: kafka-manager

    spec:

      containers:

        - image: zenko/kafka-manager

          #basicAuth:

          #  enabled: false

          name: kafka-manager

          ports:

          - name: kafka-manager

            containerPort: 9000

            protocol: TCP

          env:

          - name: ZK_HOSTS

            value: "zoo1:2181,zoo2:2181,zoo3:2181"

apiVersion: v1

kind: Service

metadata:

        #  annotations:

        #service.beta.kubernetes.io/ksc-loadbalancer-id: 63405a34-2875-4b4b-b169-ae37b285100e

  labels:

    app: kafka-manager

  name: kafka-manager-server

  namespace: viomi-kafka

spec:

  ports:

  - name: "9000"

  #  nodePort: 32662

    port: 9000

    protocol: TCP

    targetPort: 9000

  selector:

    app: kafka-manager

  type: ClusterIP

apiVersion: extensions/v1beta1

kind: Ingress

metadata:

  annotations:

    kubernetes.io/ingress.class: traefik

  name: kafka-manager-ingress

  namespace: viomi-kafka

spec:

  rules:

  - host: kafka-manager.viomi.com.cn

    http:

      paths:

      - backend:

          serviceName: kafka-manager-server

          servicePort: 9000

四、filebeat

apiVersion: v1

kind: ConfigMap

metadata:

  name: filebeat-config

  namespace: kube-system

  labels:

    k8s-app: filebeat

data:

  filebeat.yml: |-

    processors:

    - add_cloud_metadata: ~

    - add_docker_metadata: ~

    logging.level: error

    filebeat.inputs:

    - type: container

      scan_frequency: 1s

      backoff_factor: 2

      backoff: 1s

      tail_files: true

      max_backoff: 30s

      spool_size: 2048

      paths:

        - /data/docker/containers/*/*-json.log

        - /data/docker/containers/*/*-json.log-*

      include_lines: ['INFO','ERROR','WARN','DEBUG']

      #multiline.pattern: '^\[[0-9]{4}-[0-9]{2}|^\['

      multiline.pattern: '^\[[0-9]{4}-[0-9]{2}-[0-9]{2}|^\[0-9]{4}\/|^[[:space:]]+|^Caused by:'

      multiline.negate: false

      multiline.match: after

      multiline.max_lines: 150

      processors:

        - add_kubernetes_metadata:

            host: ${NODE_NAME}

            matchers:

            - logs_path:

                logs_path: "/data/docker/containers/"

    output.kafka:

      version: "1.1.1"

      enabled: true

      hosts: ["10.62.1.135:9092"]

      topic: "elk_kafka"

      topics:

        - topic: "yunmi-infra"

          when.contains:

            kubernetes.namespace: "yunmi-infra"

        - topic: "yunmi-trade"

          when.contains:

            kubernetes.namespace: "yunmi-trade"

        - topic: "yunmi-business"

          when.contains:

            kubernetes.namespace: "yunmi-business"

        - topic: "yunmi-front"

          when.contains:

            kubernetes.namespace: "yunmi-front"

        - topic: "yunmi-vwater"

          when.contains:

            kubernetes.namespace: "yunmi-vwater"

        - topic: "yunmi-bigdata"

          when.contains:

            kubernetes.namespace: "yunmi-bigdata"

        - topic: "kube-system"

          when.contains:

            kubernetes.namespace: "kube-system"

      partition.round_robin:

        reachable_only: true

      required_acks: 0

      compression: gzip

      compression_level: 1

      max_message_bytes: 10000000

apiVersion: apps/v1

kind: DaemonSet

metadata:

  labels:

    app: filebeat

    id: filebeat

  name: filebeat

  namespace: kube-system

spec:

  revisionHistoryLimit: 10

  selector:

    matchLabels:

      app: filebeat

      id: filebeat

  template:

    metadata:

      annotations:

        cattle.io/timestamp: "2019-11-28T07:38:18Z"

      creationTimestamp: null

      labels:

        app: filebeat

        id: filebeat

      name: filebeat

    spec:

      hostAliases:

      - ip: "10.62.1.135"

        hostnames:

        - "kafka-0.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-1.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

        - "kafka-2.kafka-svc.viomi-kafka.svc.cluster.local"

      serviceAccountName: filebeat

      containers:

      - image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi-filebeat:latest

        imagePullPolicy: Always

        name: filebeat

        args: [

          "-c", "/etc/filebeat.yml",

          "-e",

        ]

        env:

        - name: NODE_NAME

          valueFrom:

            fieldRef:

              fieldPath: spec.nodeName

        resources:

          limits:

            cpu: "3"

            memory: 3000Mi

          requests:

            cpu: 300m

            memory: 300Mi

        securityContext:

          privileged: true

          procMount: Default

          runAsUser: 0

        terminationMessagePath: /dev/termination-log

        terminationMessagePolicy: File

        volumeMounts:

        - mountPath: /data/docker/containers

          name: containers

        - name: config

          mountPath: /etc/filebeat.yml

          readOnly: true

          subPath: filebeat.yml

      dnsPolicy: ClusterFirst

      restartPolicy: Always

      schedulerName: default-scheduler

      securityContext: {}

      terminationGracePeriodSeconds: 30

      volumes:

      - hostPath:

          path: /data/docker/containers

          type: ""

        name: containers

      - name: config

        configMap:

          defaultMode: 0600

          name: filebeat-config

  updateStrategy:

    type: OnDelete

五、logstash

logstash.yaml        

apiVersion: apps/v1

kind: StatefulSet

metadata:

  labels:

    app: viomi-logstash

  name: viomi-logstash

  namespace: kube-system

spec:

  podManagementPolicy: OrderedReady

  replicas: 9

  selector:

    matchLabels:

      app: viomi-logstash

      id: viomi-logstash

  serviceName: elasticsearch-logging

  template:

    metadata:

      creationTimestamp: null

      labels:

        app: viomi-logstash

        id: viomi-logstash

    spec:

      containers:

      - args:

        - /usr/share/logstash/bin/logstash -f /etc/logstash/conf/logstash.conf

        command:

        - /bin/sh

        - -c

        image: hub.kce.ksyun.com/yunmi-infra/viomi/viomi_logstash_latest:5.6.4

        imagePullPolicy: IfNotPresent

        name: viomi-logstash

        resources:

          limits:

            memory: 2000Mi

          requests:

            memory: 512Mi

        volumeMounts:

        - mountPath: /etc/logstash/conf/

          name: conf

      dnsPolicy: ClusterFirst

      initContainers:

      - command:

        - bash

        - -c

        - |

          hostname=`echo $HOSTNAME | awk -F '-' '{print $NF}'`

          if [[ $hostname -eq 0 ]]; then

            cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf && cat /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 1 ]]; then

            cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 2 ]]; then

            cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 3 ]]; then

            cp /mnt/conf.d/logstash_other.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 4 ]]; then

            cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 5 ]]; then

            cp /mnt/conf.d/logstash_infra.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 6 ]]; then

            cp /mnt/conf.d/logstash_trade.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 7 ]]; then

            cp /mnt/conf.d/logstash_business.conf /etc/logstash/conf/logstash.conf

          fi

          if [[ $hostname -eq 8 ]]; then

            cp /mnt/conf.d/logstash_test.conf /etc/logstash/conf/logstash.conf

          fi

        image: hub.kce.ksyun.com/yunmi-infra/viomi/rocketmq:broker-4.5.0

        imagePullPolicy: IfNotPresent

        name: init-broker

        resources:

          limits:

            memory: 1000Mi

        terminationMessagePath: /dev/termination-log

        terminationMessagePolicy: File

        volumeMounts:

        - mountPath: /mnt/conf.d/

          name: viomi-logstash-config

        - mountPath: /etc/logstash/conf/

          name: conf

      restartPolicy: Always

      schedulerName: default-scheduler

      securityContext:

        fsGroup: 1000

        runAsUser: 0

      terminationGracePeriodSeconds: 30

      volumes:

      - configMap:

          defaultMode: 420

          name: viomi-logstash-config

        name: viomi-logstash-config

      - emptyDir: {}

        name: conf

logstash_conf.yaml

apiVersion: v1

kind: ConfigMap

metadata:

  name: viomi-logstash-config

  namespace: kube-system

  labels:

    k8s-app: logstash-config

data:

  logstash_infra.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-infra"

        topics => ["yunmi-infra"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_business.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-business"

        topics => ["yunmi-business"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_trade.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-trade"

        topics => ["yunmi-trade"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_other.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true

        partition_assignment_strategy => "org.apache.kafka.clients.consumer.RoundRobinAssignor"

        group_id  => "logstash-other"

        topics => ["prod_elk_kafka","kafka_topic","kube-system","elk_kafka","yunmi-front","yunmi-vwater","yunmi-bigdata"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

        index => "logstash-%{[kubernetes][namespace]}-%{[kubernetes][container][name]}-%{+YYYY.MM.dd}"

      }

    }

  logstash_test.conf: |-

    input {

      kafka {

        bootstrap_servers => "10.62.1.135:9092"

        auto_offset_reset => "latest"

        #consumer_threads => 6

        decorate_events => true 

        group_id  => "logstash-test-grp"

        topics => ["test_elk_kafka"]

        codec => json {

          charset => "UTF-8"

        }

      }

    }

    filter {

      json {

        source => "message"

      }

      mutate {

        remove_field => ["kafka","stream","tags","log","ecs","@version","input","tag","fields","agent", "[kubernetes][node]","[kubernetes][pod][uid]","host","[kubernetes][labels]","[kubernetes][container][image]","[kubernetes][agent]","[kubernetes][replicaset]"]

      }

    }

    output {

      elasticsearch {

        hosts => ["10.62.1.130:9200"]

      }

    }

上一篇 下一篇

猜你喜欢

热点阅读