我编写了Logstash配置文件,它读取一个csv文件并在elasticsearch中对其进行索引 . 但我这样做会遇到一些问题 .
1)在我的输入文件中,我无法将日期列转换为时间戳 . csv文件中date列的值为:
- 2016年1月22日20:38 [m / d / yyyy H:mm]
所以任何人都可以告诉我如何将日期列转换为kibana中的时间戳?
2)此外,任何人都可以告诉我如何将纬度和经度列转换为geoip . 问题是我只有纬度和经度,我没有源是geoip过滤器中的必填字段 .
例如:complaint_latitude:12.91518877 complaint_longitude:77.48066853
所以我不确定在源字段中放什么
geoip {
source => "??"
}
3)在我的文件中处理一些行时,logstash给出了以下错误:解析csv文件时出错 . 并非所有的行都显示我这个错误,但非常行,所以我丢失了大量的数据,因为我无法索引它 .
Here is my logstash config file:
input{
file{
path =>["D:\Project\Logstash Config\Icmc\complaints.csv"]
start_position => "beginning"
#sincedb_path => "/dev/null"
sincedb_path => "/tmp/since.db"
}
}
filter{
csv{
separator => ","
columns =>["category_name", "complaint_sub_category_iid",
"parent_cat_name", "category_parent_iid",
"civic_agency_name", "complaint_title",
"complaint_user_iid", "user_iid", "user_full_name",
"complaint_mobile_number", "complaint_ward_iid",
"ward_name", "complaint_location",
"complaint_address_1", "complaint_latitude",
"complaint_longitude",
"complaint_created","latest_comp_satus_id",
"latest_comp_status_name", "complaint_description"]
remove_field => ["message"]
}
mutate{
convert => { "complaint_latitude" => "float"}
}
mutate{
convert => { "complaint_longitude" => "float"}
}
}
filter{
geoip {
source => "clientip"
}
}
filter{
date{
match => ["complaint_created", "M/d/yyyy H:mm"]
target => "@timestamp"
}
mutate{
add_field => ["[geoip][lnglat]" , "%{[complaint_longitude]}",
"tmplat", "%{[complaint_latitude]}"]
}
mutate{
merge => ["[geoip][lnglat]", "tmplat"]
}
mutate{
convert => ["[geoip][lnglat]", "float"]
remove_field => ["tmplat"]
remove_field => ["complaint_created"]
}
}
output{
elasticsearch{
hosts =>["localhost:9200"]
index => "icmc"
#document_type => "complaints_filed"
user => "elastic"
password => "elastic"
}
stdout { }
}
1 回答
对于#1,由于Kibana使用
@timestamp
字段,您可以在csv过滤器中使用@timestamp
代替日期列(complaint_created
?)或使用date
过滤器来定位@timestamp .