1. 程式人生 > >hive資料直接寫入到es索引中

hive資料直接寫入到es索引中

1、建立索引

    put:   http://es.dm.csdn.net/item_for_related

    post: http://es.dm.csdn.net/item_for_related/item/_mapping

  {
    "blog": {
        "_all": {
            "enabled": false
        },
        "properties": {            
            "id": {
                "type": "long"
            },
             "source_type": {
                "type": "keyword"
            },
             "title": {
                "type": "text"
            },
             "body": {
                "type": "text"
            },
            "user_name": {
                "type": "keyword"
            },
            "created_at": {
                "type": "keyword"
            },
            "quality_score": {
                "type": "float"
            },
            "tags": {
                "type": "text"
            },
            "system_tag": {
                "type": "text"
            }            
            
        }
    }
}

2、建立hive表結構和es的對應

CREATE EXTERNAL TABLE `item_for_related_txt` (
  id string,
  title string,
  body string,
  source_type string,
  user_name string,
  created_at string,
  tags string,
  quality_score string,
  system_tag string
)STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' 
TBLPROPERTIES ('es.nodes' = '192.168.100.212,192.168.100.213,192.168.100.214,192.168.100.215,192.168.100.216',
'es.index.auto.create' = 'false',
'es.resource' = 'item_for_related/item',
'es.write.operation' = 'upsert',
'es.mapping.id' = 'id',
'es.batch.size.entries'='1000',
'es.batch.write.refresh'='true',
'es.batch.write.retry.wait'='30s');

3、寫資料到hive表中

set mapred.job.name=import_item_for_related_txt;
set mapred.job.queue.name=hadoop;
add jar /data/1/usr/local/hive/lib/elasticsearch-hadoop-5.1.1.jar;
insert overwrite table item_for_related_txt 
select a.itemid as id,a.title,b.content,'blog' as source_type,a.username as user_name,a.posttime as created_at,c.tags as tags, d.quality_score, '' as system_tag  from item_txt a 
left join itemcontent_txt b on a.articleid = b.articleid
    left join itemtags_txt c on a.itemid = c.itemid
left join blog_extend_attr_txt d on a.itemid = d.id
    where a.posttime > '2011-01-01 00:00:00' and d.quality_score > 1.0;

4、去es中查資料

    get: http://es.dm.csdn.net/item_for_related/blog/_search

 post:http://es.dm.csdn.net/item_for_related/blog/_search

{
    "query": {
        "bool": {
            "must": [               
                {
                    "range": {
                        "created_at": {
                            "gte": "2017-11-11"
                        }
                    }
                }
            ]
        }
    },
    "from": 0,
    "size": 10,
    "_source": [
        "id",
        "title",
        "created_at",
        "user_name",
        "quality_score"
    ]
}

{
    "query": {
        "bool": {
            "must": [
                {
                    "multi_match": {
                        "query": "python",
                        "type": "best_fields",
                        "fields": [
                            "title",
                            "tags"
                        ]
                    }
                },
                {
                    "range": {
                        "created_at": {
                            "gte": "2017-10-21"
                        }
                    }
                }
            ]
        }
    },
    "from": 0,
    "size": 10,
    "_source": [
        "id",
        "title",
        "created_at",
        "user_name",
        "quality_score"
    ]
}