首页 文章

如何将日期字段转换为logstash配置文件中的时间戳?

提问于
浏览
0

我编写了Logstash配置文件,它读取一个csv文件并在elasticsearch中对其进行索引 . 但我这样做会遇到一些问题 .

1)在我的输入文件中,我无法将日期列转换为时间戳 . csv文件中date列的值为:

  • 2016年1月22日20:38 [m / d / yyyy H:mm]

所以任何人都可以告诉我如何将日期列转换为kibana中的时间戳?

2)此外,任何人都可以告诉我如何将纬度和经度列转换为geoip . 问题是我只有纬度和经度,我没有源是geoip过滤器中的必填字段 .

例如:complaint_latitude:12.91518877 complaint_longitude:77.48066853

所以我不确定在源字段中放什么

geoip {
    source => "??"
    }

3)在我的文件中处理一些行时,logstash给出了以下错误:解析csv文件时出错 . 并非所有的行都显示我这个错误,但非常行,所以我丢失了大量的数据,因为我无法索引它 .

Here is my logstash config file:

input{

    file{
         path =>["D:\Project\Logstash Config\Icmc\complaints.csv"]
         start_position => "beginning"
         #sincedb_path => "/dev/null"
         sincedb_path => "/tmp/since.db"
        }

   }

  filter{

      csv{
            separator => ","
            columns =>["category_name", "complaint_sub_category_iid", 
                       "parent_cat_name", "category_parent_iid", 
                       "civic_agency_name", "complaint_title", 
                       "complaint_user_iid", "user_iid", "user_full_name", 
                       "complaint_mobile_number", "complaint_ward_iid", 
                       "ward_name", "complaint_location",  
                       "complaint_address_1", "complaint_latitude", 
                       "complaint_longitude", 
                       "complaint_created","latest_comp_satus_id", 
                       "latest_comp_status_name", "complaint_description"]

            remove_field => ["message"]
        }

     mutate{
         convert => { "complaint_latitude" => "float"}
         }
    mutate{
         convert => { "complaint_longitude" => "float"}
          }
     }

filter{
     geoip {
         source => "clientip"
          }
     }

filter{

        date{
            match => ["complaint_created", "M/d/yyyy H:mm"]
            target => "@timestamp"
           }

        mutate{
            add_field => ["[geoip][lnglat]" , "%{[complaint_longitude]}", 
                     "tmplat", "%{[complaint_latitude]}"]
             }

        mutate{
            merge => ["[geoip][lnglat]", "tmplat"]
             }

        mutate{
            convert => ["[geoip][lnglat]", "float"]
            remove_field => ["tmplat"]
            remove_field => ["complaint_created"]
             }
   }


  output{
        elasticsearch{
        hosts =>["localhost:9200"]
        index => "icmc"
        #document_type => "complaints_filed"
        user => "elastic"
        password => "elastic"
        }

        stdout {  }
      }

1 回答

  • 0

    对于#1,由于Kibana使用 @timestamp 字段,您可以在csv过滤器中使用 @timestamp 代替日期列( complaint_created ?)或使用 date 过滤器来定位@timestamp .

相关问题