go语言实现Elasticsearches批量修改查询及发送MQ操作示例
来源:脚本之家    时间:2022-04-19 15:55:51
目录
update_by_query批量修改索引添加字段查询es发送MQ

update_by_query批量修改

POST post-v1_1-2021.02,post-v1_1-2021.03,post-v1_1-2021.04/_update_by_query
{
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "join_field": {
              "value": "post"
            }
          }
        },
        {
          "term": {
            "platform": {
              "value": "toutiao"
            }
          }
        },
        {
          "exists": {
            "field": "liked_count"
          }
        }
      ]
    }
  },
  "script":{
    "source":"ctx._source.liked_count=0",
    "lang":"painless"
  }
}

索引添加字段

PUT user_tiktok/_doc/_mapping?include_type_name=true
{
  "post_signature":{
    "StuClass":{
      "type":"keyword"
    },
    "post_token":{
      "type":"keyword"
    }
  }
}
PUT user_toutiao/_mapping
{
  "properties": {
    "user_token": {
      "type": "text"
    }
  }
}

查询es发送MQ

from celery import Celery
from elasticsearch import Elasticsearch
import logging
import arrow
import pytz
from elasticsearch.helpers import scan, streaming_bulk
import redis
pool_16_8 = redis.ConnectionPool(host="10.0.3.100", port=6379, db=8, password="EfcHGSzKqg6cfzWq")
rds_16_8 = redis.StrictRedis(connection_pool=pool_16_8)
logger = logging.getLogger("elasticsearch")
logger.disabled = False
logger.setLevel(logging.INFO)
es_zoo_connection = Elasticsearch("http://eswriter:e s密码@e sip:4000", dead_timeout=10,
                                  retry_on_timeout=True)
logger = logging.getLogger(__name__)
class ES(object):
    index = None
    doc_type = None
    id_field = "_id"
    version = ""
    source_id_field = ""
    aliase_field = ""
    separator = "-"
    aliase_func = None
    es = None
    tz = pytz.timezone("Asia/Shanghai")
    logger = logger
    @classmethod
    def mget(cls, ids=None, index=None, **kwargs):
        index = index or cls.index
        docs = cls.es.mget(body={"ids": ids}, doc_type=cls.doc_type, index=index, **kwargs)
        return docs
    @classmethod
    def count(cls, query=None, index=None, **kwargs):
        index = index or cls.index
        c = cls.es.count(doc_type=cls.doc_type, body=query, index=index, **kwargs)
        return c.get("count", 0)
    @classmethod
    def upsert(cls, doc, doc_id=None, index=None, doc_as_upsert=True, **kwargs):
        body = {
            "doc": doc,
        }
        if doc_as_upsert:
            body["doc_as_upsert"] = True
        id = doc_id or cls.id_name(doc)
        index = index or cls.index_name(doc)
        cls.es.update(index, id, cls.doc_type, body, **kwargs)
    @classmethod
    def search(cls, index=None, query=None, **kwargs):
        index = index or cls.index
        return cls.es.search(index=index, body=query, **kwargs)
    @classmethod
    def scan(cls, query, index=None, **kwargs):
        return scan(cls.es,
                    query=query,
                    index=index or cls.index,
                    **kwargs)
    @classmethod
    def index_name(cls, doc):
        if cls.aliase_field and cls.aliase_field in doc.keys():
            aliase_part = doc[cls.aliase_field]
            if isinstance(aliase_part, str):
                aliase_part = arrow.get(aliase_part)
            if isinstance(aliase_part, int):
                aliase_part = arrow.get(aliase_part).astimezone(cls.tz)
            if cls.version:
                index = "{}{}{}{}{}".format(cls.index, cls.separator, cls.version, cls.separator,
                                            cls.aliase_func(aliase_part))
            else:
                index = "{}{}{}".format(cls.index, cls.separator, cls.aliase_func(aliase_part))
        else:
            index = cls.index
        return index
    @classmethod
    def id_name(cls, doc):
        id = doc.get(cls.id_field) and doc.pop(cls.id_field) or doc.get(cls.source_id_field)
        if not id:
            print("========", doc)
        assert id, "doc _id must not be None"
        return id
    @classmethod
    def bulk_upsert(cls, docs, **kwargs):
        """
        批量操作文章, 仅支持 index 和 update
        """
        op_type = kwargs.get("op_type") or "update"
        chunk_size = kwargs.get("chunk_size")
        if op_type == "update":
            upsert = kwargs.get("upsert", True)
            if upsert is None:
                upsert = True
        else:
            upsert = False
        actions = cls._gen_bulk_actions(docs, cls.index_name, cls.doc_type, cls.id_name, op_type, upsert=upsert)
        result = streaming_bulk(cls.es, actions, chunk_size=chunk_size, raise_on_error=False, raise_on_exception=False,
                                max_retries=5, request_timeout=25)
        return result
    @classmethod
    def _gen_bulk_actions(cls, docs, index_name, doc_type, id_name, op_type, upsert=True, **kwargs):
        assert not upsert or (upsert and op_type == "update"), "upsert should use "update" as op_type"
        for doc in docs:
            # 支持 index_name 作为一个工厂函数
            if callable(index_name):
                index = index_name(doc)
            else:
                index = index_name
            if op_type == "index":
                _source = doc
            elif op_type == "update" and not upsert:
                _source = {"doc": doc}
            elif op_type == "update" and upsert:
                _source = {"doc": doc, "doc_as_upsert": True}
            else:
                continue
            if callable(id_name):
                id = id_name(doc)
            else:
                id = id_name
            # 生成 Bulk 动作
            action = {
                "_op_type": op_type,
                "_index": index,
                "_type": doc_type,
                "_id": id,
                "_source": _source
            }
            yield action
class tiktokEsUser(ES):
    index = "user_tiktok"
    doc_type = "_doc"
    id_field = "_id"
    source_id_field = "user_id"
    es = es_zoo_connection
from kombu import Exchange, Queue, binding
def data_es_route_task_spider(name, args, kwargs, options, task=None, **kw):
    return {
        "exchange": "tiktok",
        "exchange_type": "topic",
        "routing_key": name
    }
class DataEsConfig_download(object):
    broker_url = "amqp://用户:密码@ip:端口/"
    task_ignore_result = True
    task_serializer = "json"
    accept_content = ["json"]
    task_default_queue = "default"
    task_default_exchange = "default"
    task_default_routing_key = "default"
    exchange = Exchange("tiktok", type="topic")
    task_queues = [
        Queue(
            "tiktok.user_avatar.download",
            [binding(exchange, routing_key="tiktok.user_avatar.download")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.post_avatar.download",
            [binding(exchange, routing_key="tiktok.post_avatar.download")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.post.spider",
            [binding(exchange, routing_key="tiktok.post.spider")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.post.save",
            [binding(exchange, routing_key="tiktok.post.save")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.user.save",
            [binding(exchange, routing_key="tiktok.user.save")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.post_avatar.invalid",
            [binding(exchange, routing_key="tiktok.post_avatar.invalid")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.user_avatar.invalid",
            [binding(exchange, routing_key="tiktok.user_avatar.invalid")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
        Queue(
            "tiktok.comment.save",
            [binding(exchange, routing_key="tiktok.comment.save")],
            queue_arguments={"x-queue-mode": "lazy"}
        ),
    ]
    task_routes = (data_es_route_task_spider,)
    enable_utc = True
    timezone = "Asia/Shanghai"
# 下载app
tiktok_app = Celery(
    "tiktok",
    include=[
        "task.tasks",
    ]
)
tiktok_app.config_from_object(DataEsConfig_download)
# 发任务生产者,更新舆情user历史信息
def send_post():
    query = {
        "query": {
            "bool": {
                "must": [
                    {
                        "exists": {
                            "field": "post_signature"
                        }
                    },
                    {
                        "range": {
                            "following_num": {
                                "gte": 1000
                            }
                        }
                    }
                ]
            }
        },
        "_source": ["region", "sec_uid", "post_signature"]
    }
    # query = {
    #     "query": {
    #         "bool": {
    #             "must": [
    #                 {"exists": {
    #                     "field": "post_signature"
    #                 }},
    #                 {
    #                     "match": {
    #                         "region": "MY"
    #                     }
    #                 }
    #             ]
    #         }
    #     },
    #     "_source": ["region", "sec_uid", "post_signature"]
    # }
    r = tiktokEsUser.scan(query=query, scroll="30m", request_timeout=100)
    for item in map(lambda x: x["_source"], r):
        tiktok_app.send_task("tiktok.post.spider", args=(item,))
def send_sign_token():
    query = {
        "query": {
            "bool": {
                "must": [
                    {
                        "exists": {
                            "field": "post_signature"
                        }
                    },
                    {
                        "range": {
                            "following_num": {
                                "gte": 1000
                            }
                        }
                    },
                    {
                        "range": {
                            "create_time": {
                                "gte": "2021-01-06T00:00:00",
                                "lte": "2021-01-06T01:00:00"
                            }
                        }
                    }
                ]
            }
        },
        "_source": ["user_id", "sec_uid"]
    }
    r = tiktokEsUser.scan(query=query, scroll="30m", request_timeout=100)
    for item in map(lambda x: x["_source"], r):
        tiktok_app.send_task("tiktok.user.sign_token", args=(item,))
if __name__ == "__main__":
    send_post()
    # send_sign_token()

以上就是go语言实现Elasticsearches批量修改查询及发送MQ操作示例的详细内容,更多关于go实现Elasticsearches修改查询发送MQ的资料请关注脚本之家其它相关文章!

关键词: 相关文章 历史信息 批量操作 作为一个

X 关闭

X 关闭