For setup syslog to accept from netowrk device, the configration of filebeat as below:
filebeat.yml
filebeat.inputs:
- type: syslog
protocol.tcp:
host: "0.0.0.0:5014"
fields:
type: syslog
fields_under_root: true
setup.template.settings:
index.number_of_shards: 1
setup.kibana:
output.logstash:
hosts: ["x.x.x.x:10083"]
enabled: true
processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- add_cloud_metadata: ~
- add_docker_metadata: ~
- add_kubernetes_metadata: ~
For logstash pipeline setting you should like do this kinds of configuration:
input {
beats {
port => 5044
}
}
filter {
date {
match => [ "@timestamp", "yyyy-MM-dd HH:mm:ss Z" ]
}
mutate {
remove_field => ["@version", "_index", "_source", "ecs"]
}
if [type] == "syslog" {
grok {
match => { "message" => "<%{NONNEGINT:syslog_pri}>%{NONNEGINT:version}%{SPACE}(?:-|%{TIMESTAMP_ISO8601:syslog_timestamp})%{SPACE}(?:-|%{IPORHOST:hostname})%{SPACE}(?:%{SYSLOG5424PRINTASCII:program}|-)%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:process_id})%{SPACE}(?:-|%{SYSLOG5424PRINTASCII:message_id})%{SPACE}(?:-|(?(\[.*?[^\\]\])+))(?:%{SPACE}%{GREEDYDATA:syslog_message}|)"}
match => { "message" => "(<%{NUMBER:syslog_event_id}>)?%{SYSLOGTIMESTAMP:syslog_timestamp} (%{SYSLOGHOST:syslog_hostname} )?%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:%{GREEDYDATA:syslog_message}" }
match => { "message" => "(<%{NUMBER:syslog_event_id}>)?%{TIMESTAMP_ISO8601:syslog_timestamp} (%{SYSLOGHOST:syslog_hostname} )?%{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?:%{GREEDYDATA:syslog_message}" }
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_tag => [ "syslog" ]
}
mutate {
add_field => { "[@metadata][target_index]" => "syslog-%{+YYYY.MM.dd}" }
}
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
if [event][module] == "nginx" {
grok {
match => [ "message" , "%{COMMONAPACHELOG}+%{GREEDYDATA:extra_fields}"]
overwrite => [ "message" ]
}
mutate {
convert => ["response", "integer"]
convert => ["bytes", "integer"]
convert => ["responsetime", "float"]
add_field => { "[@metadata][target_index]" => "nginx-%{+YYYY.MM.dd}" }
}
geoip {
source => "address"
target => "clientgeo"
add_tag => ["nginx-geoip"]
}
}
if [event][module] == "auditd" {
grok {
match => { "message" => "type=%{WORD:audit_type} msg=audit\(%{NUMBER:audit_epoch:timestamp}:%{NUMBER:audit_counter}\): pid=%{NUMBER:audit_pid} uid=%{NUMBER:audit_uid} auid=%{NUMBER:audit_audid} ses=%{NUMBER:ses} msg=\'op=%{WORD:operation}:%{WORD:detail_operation} grantors=%{WORD:pam_login},%{WORD:pam_key},%{WORD:pam_limit},%{WORD:pam_system} acct=\"%{WORD:acct_user}\" exe=\"%{GREEDYDATA:exec}\" hostname=%{GREEDYDATA:hostname} addr=%{GREEDYDATA:ipaddr} terminal=%{WORD:terminal} res=%{WORD:result}" }
}
date {
match => [ "audit_epoch", "UNIX_MS" ]
}
mutate {
split => ["host", "."]
add_field => { "hostname" => "%{[host][0]}" }
add_field => { "podName" => "%{[host][1]}" }
add_field => { "ignore" => "%{[host][2]}" }
remove_field => ["ignore", "host"]
add_field => { "[@metadata][target_index]" => "audit-%{+YYYY.MM.dd}" }
}
}
if [container][id] =~ /service/ {
mutate {
add_field => { "[@metadata][target_index]" => "%{[container][id]}-%{+YYYY.MM.dd}" }
}
}
}
output {
elasticsearch {
hosts => ["https://x.x.x.x:9200"]
user => "elastic"
password => "xxxxxxxx"
#data_stream => "true"
#data_stream_dataset => "xxxxxx"
index => "%{[@metadata][target_index]}"
ssl_enabled => "true"
#ssl => true
#ssl_certificate_verification => true
#cacert => "/usr/share/logstash/config/ca.pem"
ssl_verification_mode => "none"
ssl_certificate_authorities => "/usr/share/logstash/config/ca.pem"
ca_trusted_fingerprint => "xxxxxxxxxxxxxxxxxxxxxxxxxx"
}
}