Commit 78862a3a by 张云飞

zyf

parent 68bbd47b
This source diff could not be displayed because it is too large. You can view the blob instead.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-02 21:46
# @Author : zhangyunfei
# @File : create-es-index.py
# @Software: PyCharm
from elasticsearch import Elasticsearch
def create_index():
'''
创建索引,创建索引名称为tx_ic_bigdata_business_index_01,es7没有类型
'''
# 创建映射
mappings = {
"settings": {
"number_of_shards": 5,
"number_of_replicas": 1
},
"mappings": {
# "_doc": {
"properties": {
"id": {
"type": "integer"
},
"company_name": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
},
"analyzer": "ik_max_word"
},
"company_name_standard": {
"type": "text",
"fields": {
"raw": {
"type": "keyword"
}
},
"analyzer": "standard"
},
"credit_code": {
"type": "keyword"
},
"legal_person": {
"type": "keyword"
},
"reg_number": {
"type": "keyword"
},
"reg_location": {
"type": "keyword"
},
"reg_capital":
{
"type": "double"
},
"reg_unit":
{
"type": "keyword"
},
"from_time": {
"type": "long"
},
"to_time": {
"type": "long"
},
"estiblish_time": {
"type": "long"
},
"business_scope": {
"type": "keyword"
}
}
}
# }
}
# es = Elasticsearch(['43.247.184.94'], http_auth=("admines", "adminGSBes."), port=7200)
# es = Elasticsearch(['192.168.1.131'])
es = Elasticsearch(['9.223.8.84'], http_auth=("elastic", "Brg2020!"), port=9200)
print("创建索引:")
print("索引是否存在:",es.indices.exists(index="tx_ic_bigdata_business_index_01"))
if es.indices.exists(index="tx_ic_bigdata_business_index_01") is not True:
res = es.indices.create(index='tx_ic_bigdata_business_index_01', body=mappings)
print(res)
create_index()
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-01 22:10
# @Time : 2020-05-27 9:53
# @Author : zhangyunfei
# @File : mysqltest.py
# @File : main.py
# @Software: PyCharm
import pymysql
if __name__ == '__main__':
conn = pymysql.connect(host="9.139.8.35", port=13057, user="brg", password="Brg2020!", db="brg-tx-shop")
sql = 'SELECT * FROM x_product_type'
# 获取游标
cursor = conn.cursor()
# 执行sql语句
conn.ping(reconnect=True)
cursor.execute(sql)
conn.commit()
result = cursor.fetchall()
print("---------------------start-------------")
print(result)
print("---------------------end-------------")
cursor.close()
conn.close()
import ast
import json
from hanshujisuan import heming
from flask import Flask,request,jsonify
server = Flask(__name__)
server.config['JSON_AS_ASCII'] =False
@server.route('/gsb/heming', methods=['POST'])
def my_handler():
# get request_body
if request.method == 'POST':
input_data = request.json
print(input_data)
if len(input_data) != 0:
if input_data.get("cityname") != None and input_data.get("keyword") != None and input_data.get("btname") != None:
try:
if input_data.get("cityname") == "":
jsondata = {
"code": "500",
"message": "缺少cityname值",
"data": {}
}
# jsondata = json.dumps(jsondata, ensure_ascii=False)
# infostr = jsondata
return jsonify(jsondata)
elif input_data.get("keyword") == "":
jsondata = {
"code": "500",
"message": "缺少keyname值",
"data": {}
}
# jsondata = json.dumps(jsondata, ensure_ascii=False)
# infostr = jsondata
return jsonify(jsondata)
elif input_data.get("btname") == "":
jsondata = {
"code": "500",
"message": "缺少btname值",
"data": {}
}
# jsondata = json.dumps(jsondata, ensure_ascii=False)
# infostr = jsondata
return jsonify(jsondata)
elif input_data.get("orgname") == None and input_data.get("sitcity") == None:
input_data.update({"orgname": "有限公司"})
input_data.update({"sitcity": "1"})
info = heming(input_data)
# return value must be iterable
return jsonify(info)
elif input_data.get("orgname") == None:
input_data.update({"orgname": "有限公司"})
if input_data.get("sitcity") == "":
input_data.update({"sitcity": "1"})
info = heming(input_data)
# return value must be iterable
return jsonify(info)
else:
info = heming(input_data)
# return value must be iterable
return jsonify(info)
elif input_data.get("sitcity") == None:
input_data.update({"sitcity": "1"})
if input_data.get("orgname") == "":
input_data.update({"orgname": "有限公司"})
info = heming(input_data)
# return value must be iterable
return jsonify(info)
else:
info = heming(input_data)
# return value must be iterable
return jsonify(info)
else:
info = heming(input_data)
# return value must be iterable
return jsonify(info)
except Exception as e:
jsondata = {
"code": "500",
"message": "异常:"+str(e),
"data": {}
}
return jsonify(jsondata)
else:
jsondata = {
"code": "500",
"message": "缺少参数",
"data": {}
}
# jsondata = json.dumps(jsondata, ensure_ascii=False)
# infostr = jsondata
return jsonify(jsondata)
else:
jsondata = {
"code": "500",
"message": "数据结构传输错误",
"data": {}
}
# jsondata = json.dumps(jsondata, ensure_ascii=False)
# infostr = jsondata
return jsonify(jsondata)
# server.run(host='192.168.1.131', port=35502, debug=True)
server.run(host='0.0.0.0', port=35502, debug=True)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-07-02 17:38
# @Author : zhangyunfei
# @File : tx-ex-insert.py
# @Software: PyCharm
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020-06-26 17:16
# @Author : zhangyunfei
# @File : tx_es_data.py
# @Software: PyCharm
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.helpers import bulk
import json
'''
读取txt里面的全部数据,插入到新的索引中
'''
def DetData(datainfo):#根据ES5中返回的结果,组织导入ES6数据结构
isTrue = True
action = {}
try:
doc = {}
action["_index"] = "tx_ic_bigdata_business_index_01"
# action["_type"] = "_doc" #elasticsearch7.0+版本需要注释
if "id" in datainfo.keys():
id = datainfo["id"]
doc["id"] = id
action["_id"] = id
if "company_name" in datainfo.keys():
company_name = datainfo["company_name"]
if company_name:
doc["company_name"] = company_name
doc["company_name_standard"] = company_name
else:
doc["company_name"] = None
doc["company_name_standard"] = None
else:
doc["company_name"] = None
if "credit_code" in datainfo.keys():
credit_code = datainfo["credit_code"]
if credit_code:
doc["credit_code"] = credit_code
else:
doc["credit_code"] = None
else:
doc["credit_code"] = None
if "legal_person" in datainfo.keys():
legal_person = datainfo["legal_person"]
if legal_person:
doc["legal_person"] = legal_person
else:
doc["legal_person"] = None
else:
doc["legal_person"] = None
if "company_org_type" in datainfo.keys():
company_org_type = datainfo["company_org_type"]
if company_org_type:
doc["company_org_type"] = company_org_type
else:
doc["company_org_type"] = None
else:
doc["company_org_type"] = None
if "reg_location" in datainfo.keys():
reg_location = datainfo["reg_location"]
if reg_location:
doc["reg_location"] = reg_location
else:
doc["reg_location"] = None
else:
doc["reg_location"] = None
if "estiblish_time" in datainfo.keys():
estiblish_time = datainfo["estiblish_time"]
if estiblish_time:
doc["estiblish_time"] = estiblish_time
else:
doc["estiblish_time"] = None
else:
doc["estiblish_time"] = None
if "from_time" in datainfo.keys():
from_time = datainfo["from_time"]
if from_time:
doc["from_time"] = from_time
else:
doc["from_time"] = None
else:
doc["from_time"] = None
if "to_time" in datainfo.keys():
to_time = datainfo["to_time"]
if to_time:
doc["to_time"] = to_time
else:
doc["to_time"] = None
else:
doc["to_time"] = None
if "reg_capital" in datainfo.keys():
reg_capital = datainfo["reg_capital"]
if reg_capital:
doc["reg_capital"] = reg_capital
else:
doc["reg_capital"] = None
else:
doc["reg_capital"] = None
if "reg_unit" in datainfo.keys():
reg_unit = datainfo["reg_unit"]
if reg_unit:
doc["reg_unit"] = reg_unit
else:
doc["reg_unit"] = None
else:
doc["reg_unit"] = None
if "business_scope" in datainfo.keys():
business_scope = datainfo["business_scope"]
if business_scope:
doc["business_scope"] = business_scope
else:
doc["business_scope"] = None
else:
doc["business_scope"] = None
action["_source"] = doc
except Exception as e:
isTrue = False
return isTrue, action
def main():
error = "ES_error.txt"
f1 = open(error, "a")
index_name = "tx_ic_bigdata_business_index_01"
es = Elasticsearch(['9.223.8.84'], http_auth=("elastic", "Brg2020!"), port=9200)
# es = Elasticsearch(['43.247.184.94'], http_auth=("admines", "adminGSBes."), port=7200)
# es = Elasticsearch(['192.168.1.131'])
readfile = open('2.txt','r',encoding='utf-8')
lines = readfile.readlines() # 获取全部
bulkList = []
for line in lines:
line = line.strip()
ll = json.loads(line)
isTrue, action = DetData(ll)
if isTrue:
bulkList.append(action)
else:
f1.write("获取" + str(ll) + "的数据出错!\r\n")
if len(bulkList) == 500:
print('插入500...............')
try:
bulk(es, bulkList, index=index_name, raise_on_error=True) # 批量插入
except Exception as e:
# for er in e.errors:
try:
for er in e.args[1]: # 和上面的for是一样的,都能取到错误信息
f1.write("插入" + str(er["index"]["data"]) + "的数据出错!\r\n")
except:
f1.write("插入" + str(bulkList) + "的数据出错!\r\n")
bulkList = []
if len(bulkList) > 0:
try:
bulk(es, bulkList, index=index_name, raise_on_error=True) # 批量插入
except Exception as e:
# for er in e.errors:
try:
for er in e.args[1]: # 和上面的for是一样的,都能取到错误信息
f1.write("插入" + str(er["index"]["data"]) + "的数据出错!\r\n")
except:
f1.write("插入" + str(bulkList) + "的数据出错!\r\n")
bulkList = []
f1.flush()
f1.close()
print("over")
main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment