Commit 3f839045 by 蒋勇

d

parent 7e4cb088
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/2/11 19:43
# @Author : fanhui
# @File : GetFIinfoFromJson.py
# @Software: PyCharm
#变化率添加符号
def AddSign(factor):
factor=factor
if "-" in factor and"%"in factor:
factor=factor.replace("-", "") + "↓"
elif "%"in factor and "-"not in factor:
factor = factor+"↑"
# print(factor)
return factor
def GetFIinfoFromJson(GetAllinfo):
# 获取传入的参数
GetAllinfo = GetAllinfo
Else_info={
"Detection_zone": GetAllinfo.get("Detection_zone",None),
"CreatTime": GetAllinfo.get("CreatTime",None),
"company_name": GetAllinfo.get("company_name",None),
"Task_Num": GetAllinfo.get("Task_Num",None)
}
FIinfo=GetAllinfo.get("fi",None)#如果key不存在,会返回None,因此需进行判断
Fi_dict={}
if FIinfo:
Total_trade_income=FIinfo[0].get("Total_trade_income",None)
Bq_time=''
Sq_time=''
if Total_trade_income:
print(eval(Total_trade_income),type(eval(Total_trade_income)))
keys_list = []
Total_trade_income=eval(Total_trade_income)#str转dict,用eval
for t_k,t_v in Total_trade_income.items():
keys_list.append(t_k)
Bq_time=keys_list[0]
Sq_time=keys_list[1]
Total_profit = eval(FIinfo[1].get("Total_profit"))
Net_profit = eval(FIinfo[2].get("Net_profit"))
Total_Liabilities = eval(FIinfo[3].get("Total_Liabilities"))
Owner_rights = eval(FIinfo[4].get("Owner_rights"))
Total_sales = eval(FIinfo[5].get("Total_sales"))
Total_assets = eval(FIinfo[6].get("Total_assets"))
Total_liabilities_and_Owner_rights = eval(FIinfo[7].get("Total_liabilities_and_Owner_rights"))
sales_expense=eval(FIinfo[8].get("sales_expense"))
Net_interest_rate = eval(FIinfo[9].get("Net_interest_rate"))
Equity_ratio = eval(FIinfo[10].get("Equity_ratio"))
Gearing_ratio = eval(FIinfo[11].get("Gearing_ratio"))
sales_growth_rate = eval(FIinfo[12].get("sales_growth_rate"))
Net_profit_growth_rate = eval(FIinfo[13].get("Net_profit_growth_rate"))
print(type(FIinfo[14].get("total_assets_growth_rate")),FIinfo[14].get("total_assets_growth_rate"))
total_assets_growth_rate = eval(FIinfo[14].get("total_assets_growth_rate"))
Fi_dict["Bq_time"] = Bq_time
Fi_dict["Sq_time"] = Sq_time
Fi_dict["Bq_Total_trade_income"]=Total_trade_income[Bq_time]
Fi_dict["Sq_Total_trade_income"] = Total_trade_income[Sq_time]
Fi_dict["change_rate_1"] = AddSign(Total_trade_income["change_rate"])
Fi_dict["Bq_Total_profit"]=Total_profit[Bq_time]
Fi_dict["Sq_Total_profit"]=Total_profit[Sq_time]
Fi_dict["change_rate_2"]=AddSign(Total_profit["change_rate"])
Fi_dict["Bq_Net_profit"]=Net_profit[Bq_time]
Fi_dict["Sq_Net_profit"]=Net_profit[Sq_time]
Fi_dict["change_rate_3"]=AddSign(Net_profit["change_rate"])
Fi_dict["Bq_Total_Liabilities"]=Total_Liabilities[Bq_time]
Fi_dict["Sq_Total_Liabilities"]=Total_Liabilities[Sq_time]
Fi_dict["change_rate_4"]=AddSign(Total_Liabilities["change_rate"])
Fi_dict["Bq_Owner_rights"]=Owner_rights[Bq_time]
Fi_dict["Sq_Owner_rights"]=Owner_rights[Sq_time]
Fi_dict["change_rate_5"]=AddSign(Owner_rights["change_rate"])
Fi_dict["Bq_Total_sales"]=Total_sales[Bq_time]
Fi_dict["Sq_Total_sales"]=Total_sales[Sq_time]
Fi_dict["change_rate_6"]=AddSign(Total_sales["change_rate"])
Fi_dict["Bq_Total_assets"]=Total_assets[Bq_time]
Fi_dict["Sq_Total_assets"]=Total_assets[Sq_time]
Fi_dict["change_rate_7"]=AddSign(Total_assets["change_rate"])
Fi_dict["Bq_Total_liabilities_and_Owner_rights"]=Total_liabilities_and_Owner_rights[Bq_time]
Fi_dict["Sq_Total_liabilities_and_Owner_rights"]=Total_liabilities_and_Owner_rights[Sq_time]
Fi_dict["Bq_sales_expense"] = sales_expense[Bq_time]
Fi_dict["Sq_sales_expense"] = sales_expense[Sq_time]
Fi_dict["Bq_Net_interest_rate"] = Net_interest_rate[Bq_time]
Fi_dict["Sq_Net_interest_rate"] = Net_interest_rate[Sq_time]
Fi_dict["Bq_Equity_ratio"] = Equity_ratio[Bq_time]
Fi_dict["Sq_Equity_ratio"] = Equity_ratio[Sq_time]
Fi_dict["Bq_Gearing_ratio"] = Gearing_ratio[Bq_time]
Fi_dict["Sq_Gearing_ratio"] = Gearing_ratio[Sq_time]
Fi_dict["Bq_sales_growth_rate"] = sales_growth_rate[Bq_time]
Fi_dict["Sq_sales_growth_rate"] = sales_growth_rate[Sq_time]
Fi_dict["Bq_Net_profit_growth_rate"] = Net_profit_growth_rate[Bq_time]
Fi_dict["Sq_Net_profit_growth_rate"] = Net_profit_growth_rate[Sq_time]
Fi_dict["Bq_total_assets_growth_rate"] = total_assets_growth_rate[Bq_time]
Fi_dict["Sq_total_assets_growth_rate"] = total_assets_growth_rate[Sq_time]
return Else_info,Fi_dict
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/12/26 17:33
# @Author : fanhui
# @File : sql_tax_test.py
# @Software: PyCharm
import os, sys
from flask import request, Flask,jsonify
from flask_cors import *
import json
server = Flask(__name__)
interface_path = os.path.dirname(__file__)
sys.path.insert(0, interface_path) # 将当前文件的父目录加入临时系统变量
server.config['JSON_AS_ASCII'] = False
CORS(server, supports_credentials=True) # 跨域请求
# 获取传入的参数,建立大字典
def CreatBdictFromJson(GetAllinfo):
# 读取指标文件,获取权重及描述信息
file = open('tax_index.txt', 'r', encoding='utf-8')
lines = file.readlines()
dict_all = {}
for line in lines:
linesplit = line.strip().split('\t')
tax_index = {
"factors_code": linesplit[3].replace('"', ""),
"factors_name": linesplit[4].replace('"', ""),
"s_score": int(linesplit[5].replace('"', "")),
"factors_describe": linesplit[6].replace('"', ""),
"status": 0
}
dict_all[linesplit[4].replace('"', "")] = tax_index
# print(dict_all)
# 获取传入的参数
GetAllinfo = GetAllinfo
GetAllinfo_new = eval(GetAllinfo.get("ri", None)) # 如果key不存在,会返回None,因此需进行判断
for factors_name, value in dict_all.items():
try:
dict_all[factors_name]["status"] = GetAllinfo_new[factors_name]
except:
pass
# print(dict_all)
# 计算总体风险异常比例、以及被监控风险比例和被稽查风险比例
qualified = [] # 合格
disqualified = [] # 不合格
for k, v in dict_all.items():
s_status = v['status']
s_score = v['s_score']
if s_status == 1:
qualified.append(s_score)
if s_status == 2:
disqualified.append(s_score)
A_count = sum(qualified) + sum(disqualified) # 计算所有合格与不合格对应指标权重之和(分母)
B_count = sum(disqualified) # 计算所有不合格对应指标权重之和(分子)
r1 = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
print(r1, type(r1))
t_r1 = str(int(r1 * 100)) + "%"
t_r2 = str(int((r1 - 0.29) * 100)) + "%"
t_r3 = str(int((r1 - 0.49) * 100)) + "%"
standard1 = 0.3
standard2 = 0.5
# 建立第一层规则
rules = {r1 <= standard1: {"t_r1": t_r1, "t_r2": "1%", "t_r3": "1%"},
r1 > standard1 and r1 <= standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": "1%"},
r1 > standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": t_r3}}
ALLRisk = {}
for k1, v1 in rules.items():
if k1:
ALLRisk = v1
print(ALLRisk,"+++++++++++++")
return dict_all, ALLRisk
# 输入计算出的风险异常比例,返回提示结果
def RiskAbnormal(t_r1):
RiskA = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_1 = {"t_r1_s<='0.3'": "企业风险异常同比其他企业比例较低",
"t_r1_s>'0.3'": "企业风险异常同比其他企业比例明显偏高"
}
for k1, v1 in rules_1.items():
if eval(str(k1)):
RiskA = {"score":t_r1,"describe":v1}
return RiskA
# 输入计算出的被检测风险值,判断被监测风险结果
def MonitoredRisk(t_r2):
MRisk = {}
t_r2_s = str(int(t_r2.replace("%", "")) / 100)
rules_2 = {"t_r2_s<='0.1'": "企业被监控的风险同比其他企业比例较低",
"t_r2_s>'0.1' and t_r2_s<='0.5'": "经检测,企业有一定可能性被选为税务监控对象",
"t_r2_s>'0.5'": "经检测,企业有很大可能性被选为税务监控对象"
}
for k2, v2 in rules_2.items():
if eval(str(k2)):
MRisk = {"score": t_r2, "describe": v2}
return MRisk
# 输入计算出的被稽查风险值,判断被稽查风险结果
def AuditedRisk(t_r3):
ARisk = {}
t_r3_s = str(int(t_r3.replace("%", "")) / 100)
rules_3 = {"t_r3_s<='0.1'": "企业被税务稽查的可能性较低",
"t_r3_s>'0.1' and t_r3_s<='0.3'": "企业有一定可能性被税务稽查",
"t_r3_s>'0.3'": "企业有很大可能性被税务稽查"
}
for k3, v3 in rules_3.items():
if eval(str(k3)):
ARisk = {"score":t_r3,"describe":v3}
return ARisk
# 输入总体计算的风险异常比例,得到提示语(也可与RiskAbnormal函数合并)
def information(t_r1):
info = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_0 = {"t_r1_s<='0.3'": "贵企业的风险异常情况同比其他企业较少,如需更多更详细的诊断服务,请联系我们>>",
"t_r1_s>'0.3' and t_r1_s<='0.6'": "企业的风险异常情况同比其他企业明显偏多,有较大可能性成为税务系统监控的对象,请企业的财税人员参考以下详细检测情况进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.6' and t_r1_s<='0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有一定可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有很大可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>"
}
for k0, v0 in rules_0.items():
if eval(str(k0)):
info = {"score":t_r1,"describe":v0}
return info
# 建立不同风险监测项与指标的对应关系
tax_desc = {
"IT_YCSR": ["所得税行业预警税负率",
"销售(营业)收入变动率",
"收入利润率",
"销售成本大于销售收入",
"当期新增其他应收款/销售收入>80%",
"期末存货大于实收资本差异幅度异常",
"成本费用利润率",
"利润总额变动率",
"投资利润率",
"预付账款为负数",
"应收账款为负数",
"应付账款为负数",
"总资产周转率",
"预收账款过大异常",
"资产利润率",
"纳税人营业外支出变动率与营业外收入变动率弹性系数异常",
"纳税人流动资产变动率与营业收入变动率弹性系数异常",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"主营业务收入变动率",
"主营业务成本变动率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常",
"纳税人主营业务收入变动率与主营业务成本变动率弹性系数异常"],
"IT_XZCB": ["所得税行业预警税负率",
"销售成本大于销售收入",
"成本费用利润率",
"期末存货大于实收资本差异幅度异常",
"收入成本率",
"产品销售(营业)成本变动率",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"总资产周转率",
"资产负债率",
"主营业务成本变动率",
"纳税人固定资产综合折旧率变动异常将会导致系统预警",
"纳税人无形资产综合摊销率变动异常",
"高比例出口的亏损企业",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常"],
"IT_XZFY": ["所得税行业预警税负率",
"销售成本大于销售收入",
"成本费用利润率",
"期末存货大于实收资本差异幅度异常",
"收入费用率",
"期间费用变动率",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"总资产周转率",
"资产负债率",
"成本费用率",
"费用率变动率",
"营业(管理、财务)费用变动率",
"主营业务成本变动率",
"高比例出口的亏损企业",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常",
"纳税人期间费用变动率与主营业务收入变动率弹性系数异常",
"广告费和业务宣传费异常",
"业务招待费异常"],
"VAT_XKFP": ["销售额变动率",
"销售毛利(益)率",
"税收负担变动率",
"税收负担率",
"增值税专用发票用量变动异常",
"增值税一股纳税人税负变动异常",
"增值税普通发票用量变动异常",
"进项税额变动率∶应纳税额变动率"],
"VAT_YCXX": ["销售额变动率",
"税收负担变动率",
"税收负担率",
"存货变动率∶销售成本变动率",
"存货变动率∶销售收入变动率",
"存货周转率∶资金周转率",
"增值税一股纳税人税负变动异常",
"纳税人销售额变动率与应纳税额变动率弹性系数异常",
"进项税额变动率∶应纳税额变动率",
"应收账款变动率∶销项税额变动率",
"预收账款大于零且存货大于预收账款又无留抵",
"进项税额变动率高于销售税额变动率",
"在建工程增加额",
"长期投资增加额分析",
"企业期末预收账款变动率与销售收入变动率弹性系数异常",
"纳税人销售毛利率变动率与税负率变动率弹性系统异常"],
"VAT_XZJX": ["销售毛利(益)率",
"税收负担变动率",
"税收负担率",
"增值税一股纳税人税负变动异常",
"纳税人销售额变动率与应纳税额变动率弹性系数异常",
"应付账款变动率∶进项税额变动率",
"进项税额变动率高于销售税额变动率",
"在建工程增加额",
"长期投资增加额分析",
"企业期末预收账款变动率与销售收入变动率弹性系数异常",
"应税销售额变动率",
"纳税人销售毛利率变动率与税负率变动率弹性系统异常"],
"CT_FX": ["其他应付款变动额",
"应付帐款变动额",
"营业外支出变动额",
"长期投资变动额"],
"ALLT_FX": ["消费税申报收入与增值税申报收入差异",
"所得税申报收入与增值税申报收入差异",
"长期零申报"]
}
# 建立细分税种风险类别(根据传入的不合格指标之和进行比较)
def RiskCompareRule(wx_part):
risk_describe = ''
if wx_part and wx_part != None:
risk_relus = {
wx_part >= 5 : "高危风险",
# wx_part >= 5 and wx_part <= 10: "高危风险",
wx_part >= 3 and wx_part <= 4: "重度风险",
wx_part == 2: "中度风险",
wx_part <= 1: "轻微风险"
}
for k, v in risk_relus.items():
if k:
risk_describe = v
else:
risk_describe = "数据缺失"
return risk_describe
#风险描述去重
def getUniqueItems(iterable):
seen = set()
result = []
for item in iterable:
if item not in seen:
seen.add(item)
result.append(item)
return result
# 计算所得税隐藏收入风险异常(对所属指标不合格降序排列,展示前三的风险描述信息,计算不合格总和及风险比例)
def IT_YCSR_Risk(dict_all):
IT_YCSR_list = tax_desc.get("IT_YCSR") #获取所得税隐藏收入指标名称
IT_YCSR_dict = {} #建立所得税隐藏收入字典
for f1 in IT_YCSR_list:
IT_YCSR_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in IT_YCSR_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=IT_YCSR_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in IT_YCSR_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_YCSR = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_YCSR_score = str(int(risk_IT_YCSR * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe) #风险描述去重
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_IT_YCSR_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_IT_YCSR={
"score":Risk_IT_YCSR_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_IT_YCSR
# 计算所得税虚增费用风险异常
def IT_XZFY_Risk(dict_all):
IT_XZFY_list = tax_desc.get("IT_XZFY") #获取所得税虚增费用指标名称
IT_XZFY_dict = {} #建立所得税虚增费用字典
for f2 in IT_XZFY_list:
IT_XZFY_dict[f2] = dict_all[f2]
#判断所有指标是否为缺失
Risk_not_null={}
for k2, v2 in IT_XZFY_dict.items():
s_status2 = v2["status"]
if s_status2 >0:
Risk_not_null[k2]=IT_XZFY_dict[k2]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s2_str_dict = {}
for k_part2, v_part2 in IT_XZFY_dict.items():
s_s2 = v_part2["s_score"]
s_str2 = v_part2["factors_describe"]
s_status2 = v_part2["status"]
if s_status2 == 1:
satisfied.append(s_s2)
if s_status2 == 2:
part_s2_str_dict[s_s2] = s_str2
dissatisfied.append(s_s2)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_XZFY = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_XZFY_score = str(int(risk_IT_XZFY * 100)) + "%" # 格式转化
part_s2_str_dict = sorted(part_s2_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p2_describe = [] # 获取风险权重前三提示
for pt in part_s2_str_dict:
p2_describe.append(pt[1])
p2_describe = getUniqueItems(p2_describe)
if len(p2_describe) > 3:
p2_describe = p2_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_IT_XZFY_score="0%"
p2_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_IT_XZFY={
"score":Risk_IT_XZFY_score,
"describe":p2_describe,
"risk":risk_describe
}
return Risk_IT_XZFY
# 计算所得税虚增成本风险异常
def IT_XZCB_Risk(dict_all):
IT_XZCB_list = tax_desc.get("IT_XZCB") #获取所得税虚增成本指标名称
IT_XZCB_dict = {} #建立所得税虚增成本字典
for f1 in IT_XZCB_list:
IT_XZCB_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in IT_XZCB_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=IT_XZCB_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in IT_XZCB_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_XZCB = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_XZCB_score = str(int(risk_IT_XZCB * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_IT_XZCB_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_IT_XZCB={
"score":Risk_IT_XZCB_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_IT_XZCB
# 计算增值税虚开发票风险异常
def VAT_XKFP_Risk(dict_all):
VAT_XKFP_list = tax_desc.get("VAT_XKFP") #获取增值税虚开发票指标名称
VAT_XKFP_dict = {} #建立增值税虚开发票字典
for f1 in VAT_XKFP_list:
VAT_XKFP_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_XKFP_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_XKFP_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_XKFP_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_XKFP = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_XKFP_score = str(int(risk_VAT_XKFP * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_XKFP_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_VAT_XKFP={
"score":Risk_VAT_XKFP_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_XKFP
# 计算增值税虚增进项风险异常
def VAT_XZJX_Risk(dict_all):
VAT_XZJX_list = tax_desc.get("VAT_XZJX") #获取增值税虚增进项指标名称
VAT_XZJX_dict = {} #建立增值税虚增进项字典
for f1 in VAT_XZJX_list:
VAT_XZJX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_XZJX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_XZJX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_XZJX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_XZJX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_XZJX_score = str(int(risk_VAT_XZJX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_XZJX_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_VAT_XZJX={
"score":Risk_VAT_XZJX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_XZJX
# 计算增值税隐藏销项风险异常
def VAT_YCXX_Risk(dict_all):
VAT_YCXX_list = tax_desc.get("VAT_YCXX") #获取增值税隐藏销项指标名称
VAT_YCXX_dict = {} #建立增值税隐藏销项字典
for f1 in VAT_YCXX_list:
VAT_YCXX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_YCXX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_YCXX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_YCXX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_YCXX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_YCXX_score = str(int(risk_VAT_YCXX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_YCXX_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_VAT_YCXX={
"score":Risk_VAT_YCXX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_YCXX
# 计算消费税风险异常
def CT_FX_Risk(dict_all):
CT_FX_list = tax_desc.get("CT_FX") #获取消费税指标名称
CT_FX_dict = {} #建立消费税字典
for f1 in CT_FX_list:
CT_FX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in CT_FX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=CT_FX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in CT_FX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_CT_FX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_CT_FX_score = str(int(risk_CT_FX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_CT_FX_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_CT_FX={
"score":Risk_CT_FX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_CT_FX
# 综合
def ALLT_FX_Risk(dict_all):
ALLT_FX_list = tax_desc.get("ALLT_FX") #获取综合风险指标名称
ALLT_FX_dict = {} #建立综合风险字典
for f1 in ALLT_FX_list:
ALLT_FX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in ALLT_FX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=ALLT_FX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in ALLT_FX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_ALLT_FX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_ALLT_FX_score = str(int(risk_ALLT_FX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_ALLT_FX_score="0%"
p1_describe="贵公司提供的检测数据不能支撑此风险项检测,您可以尝试按季度或年度检测"
risk_describe="数据缺失"
Risk_ALLT_FX={
"score":Risk_ALLT_FX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_ALLT_FX
#整合调用前面构建函数,并结合传入数据建立风险监测结果函数
def Riskinfo(GetAllinfo):
dict_all, ALLRisk = CreatBdictFromJson(GetAllinfo)
RiskA = RiskAbnormal(ALLRisk["t_r1"]) # 风险异常比例
MRisk = MonitoredRisk(ALLRisk["t_r2"]) # 被监控风险
ARisk = AuditedRisk(ALLRisk["t_r3"]) # 被稽查风险
info = information(ALLRisk["t_r1"]) # 提示语
Risk_IT_YCSR = IT_YCSR_Risk(dict_all) # 企业所得税隐藏收入风险信息
Risk_IT_XZFY = IT_XZFY_Risk(dict_all) # 企业所得税虚增费用风险信息
Risk_IT_XZCB = IT_XZCB_Risk(dict_all) # 企业所得税虚增成本风险信息
Risk_VAT_XKFP = VAT_XKFP_Risk(dict_all) # 企业增值税虚开发票风险信息
Risk_VAT_XZJX = VAT_XZJX_Risk(dict_all) # 企业增值税虚增进项风险信息
Risk_VAT_YCXX = VAT_YCXX_Risk(dict_all) # 企业增值税隐藏销项风险信息
Risk_CT_FX = CT_FX_Risk(dict_all) # 消费税风险信息
Risk_ALLT_FX = ALLT_FX_Risk(dict_all) # 综合风险信息
RI_result = {
"RiskA": RiskA,
"MRisk": MRisk,
"ARisk": ARisk,
"info": info,
"Risk_IT_YCSR": Risk_IT_YCSR,
"Risk_IT_XZFY": Risk_IT_XZFY,
"Risk_IT_XZCB": Risk_IT_XZCB,
"Risk_VAT_XKFP": Risk_VAT_XKFP,
"Risk_VAT_XZJX": Risk_VAT_XZJX,
"Risk_VAT_YCXX": Risk_VAT_YCXX,
"Risk_CT_FX": Risk_CT_FX,
"Risk_ALLT_FX": Risk_ALLT_FX
}
print(RI_result, "------------------------------")
return RI_result
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2020/2/11 19:25
# @Author : fanhui
# @File : getcompanyinfofromES.py
# @Software: PyCharm
import os, sys
from flask_cors import *
import json
from flask import Flask
import oss2,time,datetime
from elasticsearch import Elasticsearch
server = Flask(__name__)
interface_path = os.path.dirname(__file__)
sys.path.insert(0, interface_path) # 将当前文件的父目录加入临时系统变量
server.config['JSON_AS_ASCII'] = False
CORS(server, supports_credentials=True) # 跨域请求
# 工商数据接口
def GetComanyinfoFromES(companyname):
esip = "43.247.184.94"
esport = 9200
esuser = "admines"
espassword = "adminGSBes"
es = Elasticsearch([esip], http_auth=(esuser, espassword), port=esport, timeout=15000)
companyinfo = es.search(index="bigdata_ic_gsb_company_op",
body={"query": {"bool": {"must": {"term": {"company_name.raw": companyname}}}}})
companylist = {}
if companyinfo["hits"]["hits"]:
for row in companyinfo["hits"]["hits"]:
companyname = row['_source']["company_name"] # 公司名称
legal_person = row['_source']["legal_person"] # 法定代表人
if legal_person is None:
legal_person = "_"
cred_code = row['_source']["credit_code"] # 统一社会信用代码
if cred_code is None:
cred_code = "_"
gsb_province = row['_source']["base"] # 所属省份
if gsb_province is None:
gsb_province = "_"
gsb_city = row['_source']["gsb_city"] #城市
if gsb_city is None:
gsb_city="_"
reg_location = row['_source']["reg_location"] # 企业地址
if reg_location is None:
reg_location = "_"
establish_time = time.strftime("%Y-%m-%d", time.localtime(row["_source"]["estiblish_time"])) # 成立日期
if establish_time is None:
establish_time = "_"
company_cate_1 = row['_source']["gsb_company_cate_1"] # 所属行业
if company_cate_1 is None:
company_cate_1 = "_"
company_org_type = row['_source']["company_org_type"] # 企业类型
if company_org_type is None:
company_org_type = "_"
reg_institute = row['_source']["reg_institute"] # 登记机关
if reg_institute is None:
reg_institute = "_"
reg_status = row['_source']["reg_status"] # 营业状态
if reg_status is None:
reg_status = "_"
reg_capital = row['_source']["reg_capital"] # 注册资本
reg_unit=row['_source']["reg_unit"] # 注册资本单位
if reg_capital is None:
reg_capital = "_"
else:
if reg_unit=="万人民币":
reg_capital = str(reg_capital) + "万元"
else:
reg_capital = str(reg_capital)
from_time = time.strftime("%Y-%m-%d", time.localtime(row["_source"]["from_time"])) # 营业期限开始
if from_time is None:
from_time = "_"
to_time = time.strftime("%Y-%m-%d", time.localtime(row["_source"]["to_time"])) # 营业期限结束
if to_time is None:
to_time = "_"
approved_time = time.strftime("%Y-%m-%d", time.localtime(row["_source"]["approved_time"])) # 核准日期
if approved_time is None:
approved_time = "_"
business_scope = row['_source']["business_scope"] # 经营范围
if business_scope is None:
business_scope = "_"
companylist["companyname"] = companyname
companylist["legal_person"] = legal_person
companylist["cred_code"] = cred_code
companylist["province"] = gsb_province
companylist["city"] = gsb_city
companylist["reg_location"] = reg_location
companylist["establish_time"] = establish_time
companylist["company_cate_1"] = company_cate_1
companylist["company_org_type"] = company_org_type
companylist["reg_institute"] = reg_institute
companylist["reg_capital"] = reg_capital
companylist["reg_status"] = reg_status
companylist["from_time"] = from_time
companylist["to_time"] = to_time
companylist["approved_time"] = approved_time
companylist["business_scope"] = business_scope
return companylist
# a=GetComanyinfoFromES("用友网络科技股份有限公司")
# print(a)
\ No newline at end of file
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2019/12/26 17:33
# @Time : 2020/2/11 10:02
# @Author : fanhui
# @File : sql_tax_test.py
# @File : risk_to_word.py
# @Software: PyCharm
import os, sys
from flask import request, Flask,jsonify
from getcompanyinfofromES import GetComanyinfoFromES
from GetFIinfoFromJson import GetFIinfoFromJson
from GetRiskinfoFromJson import Riskinfo
import time,requests,oss2,sys,os,json
from flask import Flask ,request
from docxtpl import DocxTemplate,InlineImage
from docx.shared import Mm
from pyecharts.charts import Gauge
from pyecharts.render import make_snapshot
from snapshot_selenium import snapshot
from pyecharts import options as opts
from flask_cors import *
import json
server = Flask(__name__)
interface_path = os.path.dirname(__file__)
......@@ -16,704 +24,198 @@ sys.path.insert(0, interface_path) # 将当前文件的父目录加入临时系
server.config['JSON_AS_ASCII'] = False
CORS(server, supports_credentials=True) # 跨域请求
# 获取传入的参数,建立大字典
def CreatBdictFromJson(new_status):
# 读取指标文件,获取权重及描述信息
file = open('tax_index.txt', 'r', encoding='utf-8')
lines = file.readlines()
dict_all = {}
for line in lines:
linesplit = line.strip().split('\t')
tax_index = {
"factors_code": linesplit[3].replace('"', ""),
"factors_name": linesplit[4].replace('"', ""),
"s_score": int(linesplit[5].replace('"', "")),
"factors_describe": linesplit[6].replace('"', ""),
"status": 0
}
dict_all[linesplit[4].replace('"', "")] = tax_index
# print(dict_all)
# 获取传入的参数
new_status = new_status
for factors_name, value in dict_all.items():
try:
dict_all[factors_name]["status"] = new_status[factors_name]
except:
pass
# print(dict_all)
# 计算总体风险异常比例、以及被监控风险比例和被稽查风险比例
qualified = [] # 合格
disqualified = [] # 不合格
for k, v in dict_all.items():
s_status = v['status']
s_score = v['s_score']
if s_status == 1:
qualified.append(s_score)
if s_status == 2:
disqualified.append(s_score)
A_count = sum(qualified) + sum(disqualified) # 计算所有合格与不合格对应指标权重之和(分母)
B_count = sum(disqualified) # 计算所有不合格对应指标权重之和(分子)
r1 = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
print(r1, type(r1))
t_r1 = str(int(r1 * 100)) + "%"
t_r2 = str(int((r1 - 0.29) * 100)) + "%"
t_r3 = str(int((r1 - 0.49) * 100)) + "%"
standard1 = 0.3
standard2 = 0.5
# 建立第一层规则
rules = {r1 <= standard1: {"t_r1": t_r1, "t_r2": "1%", "t_r3": "1%"},
r1 > standard1 and r1 <= standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": "1%"},
r1 > standard2: {"t_r1": t_r1, "t_r2": t_r2, "t_r3": t_r3}}
ALLRisk = {}
for k1, v1 in rules.items():
if k1:
ALLRisk = v1
print(ALLRisk,"+++++++++++++")
return dict_all, ALLRisk
# 输入计算出的风险异常比例,返回提示结果
def RiskAbnormal(t_r1):
RiskA = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_1 = {"t_r1_s<='0.3'": "企业风险异常同比其他企业比例较低",
"t_r1_s>'0.3'": "企业风险异常同比其他企业比例明显偏高"
}
for k1, v1 in rules_1.items():
if eval(str(k1)):
RiskA = {"score":t_r1,"describe":v1}
return RiskA
# 输入计算出的被检测风险值,判断被监测风险结果
def MonitoredRisk(t_r2):
MRisk = {}
t_r2_s = str(int(t_r2.replace("%", "")) / 100)
rules_2 = {"t_r2_s<='0.1'": "企业被监控的风险同比其他企业比例较低",
"t_r2_s>'0.1' and t_r2_s<='0.5'": "经检测,企业有一定可能性被选为税务监控对象",
"t_r2_s>'0.5'": "经检测,企业有很大可能性被选为税务监控对象"
}
for k2, v2 in rules_2.items():
if eval(str(k2)):
MRisk = {"score": t_r2, "describe": v2}
return MRisk
# 输入计算出的被稽查风险值,判断被稽查风险结果
def AuditedRisk(t_r3):
ARisk = {}
t_r3_s = str(int(t_r3.replace("%", "")) / 100)
rules_3 = {"t_r3_s<='0.1'": "企业被税务稽查的可能性较低",
"t_r3_s>'0.1' and t_r3_s<='0.3'": "企业有一定可能性被税务稽查",
"t_r3_s>'0.3'": "企业有很大可能性被税务稽查"
}
for k3, v3 in rules_3.items():
if eval(str(k3)):
ARisk = {"score":t_r3,"describe":v3}
return ARisk
# 输入总体计算的风险异常比例,得到提示语(也可与RiskAbnormal函数合并)
def information(t_r1):
info = {}
t_r1_s = str(int(t_r1.replace("%", "")) / 100)
rules_0 = {"t_r1_s<='0.3'": "贵企业的风险异常情况同比其他企业较少,如需更多更详细的诊断服务,请联系我们>>",
"t_r1_s>'0.3' and t_r1_s<='0.6'": "企业的风险异常情况同比其他企业明显偏多,有较大可能性成为税务系统监控的对象,请企业的财税人员参考以下详细检测情况进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.6' and t_r1_s<='0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有一定可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>",
"t_r1_s>'0.8'": "如果税务局选择企业所在行业进行行业稽查,该企业有很大可能性成为被选案例,请财税人员尽快参照以下比例进行自查,如需进行更详细的诊断服务请联系我们>>"
}
for k0, v0 in rules_0.items():
if eval(str(k0)):
info = {"score":t_r1,"describe":v0}
return info
# 建立不同风险监测项与指标的对应关系
tax_desc = {
"IT_YCSR": ["所得税行业预警税负率",
"销售(营业)收入变动率",
"收入利润率",
"销售成本大于销售收入",
"当期新增其他应收款/销售收入>80%",
"期末存货大于实收资本差异幅度异常",
"成本费用利润率",
"利润总额变动率",
"投资利润率",
"预付账款为负数",
"应收账款为负数",
"应付账款为负数",
"总资产周转率",
"预收账款过大异常",
"资产利润率",
"纳税人营业外支出变动率与营业外收入变动率弹性系数异常",
"纳税人流动资产变动率与营业收入变动率弹性系数异常",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"主营业务收入变动率",
"主营业务成本变动率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常",
"纳税人主营业务收入变动率与主营业务成本变动率弹性系数异常"],
"IT_XZCB": ["所得税行业预警税负率",
"销售成本大于销售收入",
"成本费用利润率",
"期末存货大于实收资本差异幅度异常",
"收入成本率",
"产品销售(营业)成本变动率",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"总资产周转率",
"资产负债率",
"主营业务成本变动率",
"纳税人固定资产综合折旧率变动异常将会导致系统预警",
"纳税人无形资产综合摊销率变动异常",
"高比例出口的亏损企业",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常"],
"IT_XZFY": ["所得税行业预警税负率",
"销售成本大于销售收入",
"成本费用利润率",
"期末存货大于实收资本差异幅度异常",
"收入费用率",
"期间费用变动率",
"纳税人期末存货与当期累计收入差异幅度异常",
"账面存货率过高将会导致系统预警",
"总资产周转率",
"资产负债率",
"成本费用率",
"费用率变动率",
"营业(管理、财务)费用变动率",
"主营业务成本变动率",
"高比例出口的亏损企业",
"收入负担变动率",
"应税所得率变动率",
"应纳税所得额变动率",
"行业预警利润率",
"其他业务利润变动率",
"主营业务利润变动率",
"纳税人主营业务收入变动率与主营业务利润变动率弹性系数异常",
"纳税人期间费用变动率与主营业务收入变动率弹性系数异常",
"广告费和业务宣传费异常",
"业务招待费异常"],
"VAT_XKFP": ["销售额变动率",
"销售毛利(益)率",
"税收负担变动率",
"税收负担率",
"增值税专用发票用量变动异常",
"增值税一股纳税人税负变动异常",
"增值税普通发票用量变动异常",
"进项税额变动率∶应纳税额变动率"],
"VAT_YCXX": ["销售额变动率",
"税收负担变动率",
"税收负担率",
"存货变动率∶销售成本变动率",
"存货变动率∶销售收入变动率",
"存货周转率∶资金周转率",
"增值税一股纳税人税负变动异常",
"纳税人销售额变动率与应纳税额变动率弹性系数异常",
"进项税额变动率∶应纳税额变动率",
"应收账款变动率∶销项税额变动率",
"预收账款大于零且存货大于预收账款又无留抵",
"进项税额变动率高于销售税额变动率",
"在建工程增加额",
"长期投资增加额分析",
"企业期末预收账款变动率与销售收入变动率弹性系数异常",
"纳税人销售毛利率变动率与税负率变动率弹性系统异常"],
"VAT_XZJX": ["销售毛利(益)率",
"税收负担变动率",
"税收负担率",
"增值税一股纳税人税负变动异常",
"纳税人销售额变动率与应纳税额变动率弹性系数异常",
"应付账款变动率∶进项税额变动率",
"进项税额变动率高于销售税额变动率",
"在建工程增加额",
"长期投资增加额分析",
"企业期末预收账款变动率与销售收入变动率弹性系数异常",
"应税销售额变动率",
"纳税人销售毛利率变动率与税负率变动率弹性系统异常"],
"CT_FX": ["其他应付款变动额",
"应付帐款变动额",
"营业外支出变动额",
"长期投资变动额"],
"ALLT_FX": ["消费税申报收入与增值税申报收入差异",
"所得税申报收入与增值税申报收入差异",
"长期零申报"]
}
# 建立细分税种风险类别(根据传入的不合格指标之和进行比较)
def RiskCompareRule(wx_part):
risk_describe = ''
if wx_part and wx_part != None:
risk_relus = {
wx_part >= 5 : "高危风险",
# wx_part >= 5 and wx_part <= 10: "高危风险",
wx_part >= 3 and wx_part <= 4: "重度风险",
wx_part == 2: "中度风险",
wx_part <= 1: "轻微风险"
}
for k, v in risk_relus.items():
if k:
risk_describe = v
else:
risk_describe = "数据缺失"
return risk_describe
#风险描述去重
def getUniqueItems(iterable):
seen = set()
result = []
for item in iterable:
if item not in seen:
seen.add(item)
result.append(item)
return result
# 计算所得税隐藏收入风险异常(对所属指标不合格降序排列,展示前三的风险描述信息,计算不合格总和及风险比例)
def IT_YCSR_Risk(dict_all):
IT_YCSR_list = tax_desc.get("IT_YCSR") #获取所得税隐藏收入指标名称
IT_YCSR_dict = {} #建立所得税隐藏收入字典
for f1 in IT_YCSR_list:
IT_YCSR_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in IT_YCSR_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=IT_YCSR_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in IT_YCSR_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_YCSR = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_YCSR_score = str(int(risk_IT_YCSR * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe) #风险描述去重
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_IT_YCSR_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_IT_YCSR={
"score":Risk_IT_YCSR_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_IT_YCSR
# 计算所得税虚增费用风险异常
def IT_XZFY_Risk(dict_all):
IT_XZFY_list = tax_desc.get("IT_XZFY") #获取所得税虚增费用指标名称
IT_XZFY_dict = {} #建立所得税虚增费用字典
for f2 in IT_XZFY_list:
IT_XZFY_dict[f2] = dict_all[f2]
#判断所有指标是否为缺失
Risk_not_null={}
for k2, v2 in IT_XZFY_dict.items():
s_status2 = v2["status"]
if s_status2 >0:
Risk_not_null[k2]=IT_XZFY_dict[k2]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s2_str_dict = {}
for k_part2, v_part2 in IT_XZFY_dict.items():
s_s2 = v_part2["s_score"]
s_str2 = v_part2["factors_describe"]
s_status2 = v_part2["status"]
if s_status2 == 1:
satisfied.append(s_s2)
if s_status2 == 2:
part_s2_str_dict[s_s2] = s_str2
dissatisfied.append(s_s2)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_XZFY = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_XZFY_score = str(int(risk_IT_XZFY * 100)) + "%" # 格式转化
part_s2_str_dict = sorted(part_s2_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p2_describe = [] # 获取风险权重前三提示
for pt in part_s2_str_dict:
p2_describe.append(pt[1])
p2_describe = getUniqueItems(p2_describe)
if len(p2_describe) > 3:
p2_describe = p2_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_IT_XZFY_score="0%"
p2_describe=""
risk_describe="数据缺失"
Risk_IT_XZFY={
"score":Risk_IT_XZFY_score,
"describe":p2_describe,
"risk":risk_describe
}
return Risk_IT_XZFY
# 计算所得税虚增成本风险异常
def IT_XZCB_Risk(dict_all):
IT_XZCB_list = tax_desc.get("IT_XZCB") #获取所得税虚增成本指标名称
IT_XZCB_dict = {} #建立所得税虚增成本字典
for f1 in IT_XZCB_list:
IT_XZCB_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in IT_XZCB_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=IT_XZCB_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in IT_XZCB_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_IT_XZCB = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_IT_XZCB_score = str(int(risk_IT_XZCB * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
# 生成图片
def charts(datalist, charttype, pngpath):
try:
img = None
if charttype == 1: # 仪表盘
img = Gauge(init_opts=opts.InitOpts(width="450px", height="450px"))
img.add("", [(datalist[0]["key"], datalist[0]["value"])],
axisline_opts=opts.AxisLineOpts(linestyle_opts=opts.LineStyleOpts(color=[(0.3, "#67e0e3"), (0.7, "#37a2da"), (1, "#fd666d")], width=35)),
label_opts=opts.LabelOpts(font_size=55,formatter="{value}%"))
if img:
make_snapshot(snapshot, img.render(), pngpath)
return True
else:
return False
except Exception as e:
print(e)
return False
#生成报告(es工商、财务信息、风险监测(仪表盘图))
def WriteReport(Else_info,Companyinfo,Fi_dict,Risk_info):
tpl = DocxTemplate('tax_word_model.docx')
context={}
#其他传入参数
context['CreatTime']=Else_info['CreatTime']
context['Detection_zone'] = Else_info['Detection_zone']
#01-02工商
context['template'] =Companyinfo["companyname"]
context['legal_person'] =Companyinfo["legal_person"]
context['cred_code'] =Companyinfo["cred_code"]
context['province'] =Companyinfo["province"]
context['city'] =Companyinfo["city"]
context['reg_location'] =Companyinfo["reg_location"]
context['establish_time'] =Companyinfo["establish_time"]
context['company_cate_1'] =Companyinfo["company_cate_1"]
context['company_org_type'] =Companyinfo["company_org_type"]
context['reg_institute'] =Companyinfo["reg_institute"]
context['reg_capital'] =Companyinfo["reg_capital"]
context['reg_status'] =Companyinfo["reg_status"]
context['from_time'] =Companyinfo["from_time"]
context['to_time'] =Companyinfo["to_time"]
context['approved_time'] =Companyinfo["approved_time"]
context['business_scope'] =Companyinfo["business_scope"]
#03-05财务信息
context["Bq_time"] = Fi_dict["Bq_time"]
context["Sq_time"] = Fi_dict["Sq_time"]
context["Bq_Total_trade_income"] = Fi_dict["Bq_Total_trade_income"]
context["Sq_Total_trade_income"] = Fi_dict["Sq_Total_trade_income"]
context["change_rate_1"] = Fi_dict["change_rate_1"]
context["Bq_Total_profit"] = Fi_dict["Bq_Total_profit"]
context["Sq_Total_profit"] = Fi_dict["Sq_Total_profit"]
context["change_rate_2"] = Fi_dict["change_rate_2"]
context["Bq_Net_profit"] = Fi_dict["Bq_Net_profit"]
context["Sq_Net_profit"] = Fi_dict["Sq_Net_profit"]
context["change_rate_3"] = Fi_dict["change_rate_3"]
context["Bq_Total_Liabilities"] = Fi_dict["Bq_Total_Liabilities"]
context["Sq_Total_Liabilities"] = Fi_dict["Sq_Total_Liabilities"]
context["change_rate_4"] = Fi_dict["change_rate_4"]
context["Bq_Owner_rights"] = Fi_dict["Bq_Owner_rights"]
context["Sq_Owner_rights"] = Fi_dict["Sq_Owner_rights"]
context["change_rate_5"] = Fi_dict["change_rate_5"]
context["Bq_Total_sales"] = Fi_dict["Bq_Total_sales"]
context["Sq_Total_sales"] = Fi_dict["Sq_Total_sales"]
context["change_rate_6"] = Fi_dict["change_rate_6"]
context["Bq_Total_assets"] = Fi_dict["Bq_Total_assets"]
context["Sq_Total_assets"] = Fi_dict["Sq_Total_assets"]
context["change_rate_7"] = Fi_dict["change_rate_7"]
context["Bq_Total_liabilities_and_Owner_rights"] = Fi_dict["Bq_Total_liabilities_and_Owner_rights"]
context["Sq_Total_liabilities_and_Owner_rights"] = Fi_dict["Sq_Total_liabilities_and_Owner_rights"]
context["Bq_sales_expense"] = Fi_dict["Bq_sales_expense"]
context["Sq_sales_expense"] = Fi_dict["Sq_sales_expense"]
context["Bq_Net_interest_rate"] = Fi_dict["Bq_Net_interest_rate"]
context["Sq_Net_interest_rate"] = Fi_dict["Sq_Net_interest_rate"]
context["Bq_Equity_ratio"] = Fi_dict["Bq_Equity_ratio"]
context["Sq_Equity_ratio"] = Fi_dict["Sq_Equity_ratio"]
context["Bq_Gearing_ratio"] = Fi_dict["Bq_Gearing_ratio"]
context["Sq_Gearing_ratio"] = Fi_dict["Sq_Gearing_ratio"]
context["Bq_sales_growth_rate"] = Fi_dict["Bq_sales_growth_rate"]
context["Sq_sales_growth_rate"] = Fi_dict["Sq_sales_growth_rate"]
context["Bq_Net_profit_growth_rate"] = Fi_dict["Bq_Net_profit_growth_rate"]
context["Sq_Net_profit_growth_rate"] = Fi_dict["Sq_Net_profit_growth_rate"]
context["Bq_total_assets_growth_rate"] = Fi_dict["Bq_total_assets_growth_rate"]
context["Sq_total_assets_growth_rate"] = Fi_dict["Sq_total_assets_growth_rate"]
# 06仪表盘图(需获取评分grade)
grades1 = Risk_info["RiskA"]["score"].replace('%', "")
grades2 = Risk_info["MRisk"]["score"].replace('%', "")
grades3 = Risk_info["ARisk"]["score"].replace('%', "")
guage1 = ""
guage2 = ""
guage3 = ""
if charts([{"key": "风险异常比例", "value": grades1}], 1, "yibiaopan1.png"):
guage1 = "yibiaopan1.png"
if charts([{"key": "被监测风险", "value": grades2}], 1, "yibiaopan2.png"):
guage2 = "yibiaopan2.png"
if charts([{"key": "被稽查风险", "value": grades3}], 1, "yibiaopan3.png"):
guage3 = "yibiaopan3.png"
context['guage1'] = InlineImage(tpl, guage1, width=Mm(60))
context['guage2'] = InlineImage(tpl, guage2, width=Mm(60))
context['guage3'] = InlineImage(tpl, guage3, width=Mm(60))
context['RiskA_desc'] ="企业风险异常同比其他企业比例较低"
context['MRisk_desc'] = "企业风险异常同比其他企业比例较低"
context['ARisk_desc'] = "企业风险异常同比其他企业比例较低"
# 风险信息
context["RiskA_desc"] = Risk_info["RiskA"]["describe"]
context["MRisk_desc"] = Risk_info["MRisk"]["describe"]
context["ARisk_desc"] = Risk_info["ARisk"]["describe"]
context["describe_info"] = Risk_info["info"]["describe"]
#细分风险项
context["IT_YCSR_Risk_desc"] = str(Risk_info["Risk_IT_YCSR"]["describe"]).replace("['","").replace("']","").replace("'","")
context["IT_XZFY_Risk_desc"] = str(Risk_info["Risk_IT_XZFY"]["describe"]).replace("['","").replace("']","").replace("'","")
context["IT_XZCB_Risk_desc"] = str(Risk_info["Risk_IT_XZCB"]["describe"]).replace("['","").replace("']","").replace("'","")
context["VAT_XKFP_Risk_desc"] = str(Risk_info["Risk_VAT_XKFP"]["describe"]).replace("['","").replace("']","").replace("'","")
context["VAT_YCXX_Risk_desc"] = str(Risk_info["Risk_VAT_YCXX"]["describe"]).replace("['","").replace("']","").replace("'","")
context["VAT_XZJX_Risk_desc"] = str(Risk_info["Risk_VAT_XZJX"]["describe"]).replace("['","").replace("']","").replace("'","")
context["CT_FX_Risk_desc"] = str(Risk_info["Risk_CT_FX"]["describe"]).replace("['","").replace("']","").replace("'","")
context["ALLT_FX_Risk_desc"] = str(Risk_info["Risk_ALLT_FX"]["describe"]).replace("['","").replace("']","").replace("'","")
tpl.render(context)
addr = "/tmp/TaxRiskReport.docx"
# addr = "G://TaxRiskReport26.docx"
tpl.save(addr)
print(addr,"+++++++++")
return addr
def put2oss(report):
# 阿里云接口地址
endpoint = 'http://oss-cn-beijing.aliyuncs.com'
auth = oss2.Auth('LTAIyAUK8AD04P5S', 'DHmRtFlw2Zr3KaRwUFeiu7FWATnmla')
bucket = oss2.Bucket(auth, endpoint, 'gsb-zc')
current_file_path = "TaxRiskReport.docx"
current_fold = time.strftime('%Y%m%d%H%M%S', time.localtime())
p = str(current_fold) + "_" + current_file_path
bucket.put_object_from_file(p, report)
addr="https://gsb-zc.oss-cn-beijing.aliyuncs.com/"+p
return addr
#全半角转化函数
def strQ2B(ustring):
rstring = ""
for uchar in ustring:
inside_code = ord(uchar)
if inside_code == 12288: # 全角空格直接转换
inside_code = 32
elif (inside_code >= 65281 and inside_code <= 65374): # 全角字符(除空格)根据关系转化
inside_code -= 65248
rstring += chr(inside_code)
return rstring
# 调用更新报告接口
def UpdateTaskRptUrl(batchid, rptUrl):
url = 'https://fkctlapi.gongsibao.com/api/rpt/rptApi/updateTaskRptUrl' # 获取更新报告接口
report_addr = {"batchid": batchid, "rptUrl": rptUrl} # 根据任务号进行接口访问参数
UpdateInfo = requests.post(url, report_addr) #传参并访问接口
companyInfo_New = json.loads(UpdateInfo.text)# 访问接口返回的信息
print(companyInfo_New)
if companyInfo_New["msg"]=="操作成功":
return batchid+"报告已更新完成"
else:
Risk_IT_XZCB_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_IT_XZCB={
"score":Risk_IT_XZCB_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_IT_XZCB
return batchid+"报告更新失败"
# 计算增值税虚开发票风险异常
def VAT_XKFP_Risk(dict_all):
VAT_XKFP_list = tax_desc.get("VAT_XKFP") #获取增值税虚开发票指标名称
VAT_XKFP_dict = {} #建立增值税虚开发票字典
for f1 in VAT_XKFP_list:
VAT_XKFP_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_XKFP_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_XKFP_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_XKFP_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_XKFP = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_XKFP_score = str(int(risk_VAT_XKFP * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_XKFP_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_VAT_XKFP={
"score":Risk_VAT_XKFP_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_XKFP
# 计算增值税虚增进项风险异常
def VAT_XZJX_Risk(dict_all):
VAT_XZJX_list = tax_desc.get("VAT_XZJX") #获取增值税虚增进项指标名称
VAT_XZJX_dict = {} #建立增值税虚增进项字典
for f1 in VAT_XZJX_list:
VAT_XZJX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_XZJX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_XZJX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_XZJX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_XZJX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_XZJX_score = str(int(risk_VAT_XZJX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_XZJX_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_VAT_XZJX={
"score":Risk_VAT_XZJX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_XZJX
# 计算增值税隐藏销项风险异常
def VAT_YCXX_Risk(dict_all):
VAT_YCXX_list = tax_desc.get("VAT_YCXX") #获取增值税隐藏销项指标名称
VAT_YCXX_dict = {} #建立增值税隐藏销项字典
for f1 in VAT_YCXX_list:
VAT_YCXX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in VAT_YCXX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=VAT_YCXX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in VAT_YCXX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_VAT_YCXX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_VAT_YCXX_score = str(int(risk_VAT_YCXX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_VAT_YCXX_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_VAT_YCXX={
"score":Risk_VAT_YCXX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_VAT_YCXX
# 计算消费税风险异常
def CT_FX_Risk(dict_all):
CT_FX_list = tax_desc.get("CT_FX") #获取消费税指标名称
CT_FX_dict = {} #建立消费税字典
for f1 in CT_FX_list:
CT_FX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in CT_FX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=CT_FX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in CT_FX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_CT_FX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_CT_FX_score = str(int(risk_CT_FX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_CT_FX_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_CT_FX={
"score":Risk_CT_FX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_CT_FX
# 综合
def ALLT_FX_Risk(dict_all):
ALLT_FX_list = tax_desc.get("ALLT_FX") #获取综合风险指标名称
ALLT_FX_dict = {} #建立综合风险字典
for f1 in ALLT_FX_list:
ALLT_FX_dict[f1] = dict_all[f1]
#判断所有指标是否为缺失
Risk_not_null={}
for k1, v1 in ALLT_FX_dict.items():
s_status1 = v1["status"]
if s_status1 >0:
Risk_not_null[k1]=ALLT_FX_dict[k1]
if Risk_not_null:
satisfied = []
dissatisfied = []
part_s1_str_dict = {}
for k_part1, v_part1 in ALLT_FX_dict.items():
s_s1 = v_part1["s_score"]
s_str = v_part1["factors_describe"]
s_status1 = v_part1["status"]
if s_status1 == 1:
satisfied.append(s_s1)
if s_status1 == 2:
part_s1_str_dict[s_s1] = s_str
dissatisfied.append(s_s1)
A_count = sum(satisfied) + sum(dissatisfied) # 计算合格与不合格对应指标权重之和(分母)
B_count = sum(dissatisfied) # 计算不合格对应指标权重之和(分子)
risk_ALLT_FX = float("%.2f" % (B_count / A_count)) # 计算风险异常比例
Risk_ALLT_FX_score = str(int(risk_ALLT_FX * 100)) + "%" # 格式转化
part_s1_str_dict = sorted(part_s1_str_dict.items(), key=lambda x: x[0], reverse=True) # 按照权重排序
p1_describe = [] # 获取风险权重前三提示
for pt in part_s1_str_dict:
p1_describe.append(pt[1])
p1_describe = getUniqueItems(p1_describe)
if len(p1_describe) > 3:
p1_describe = p1_describe[:3]
# 风险类别获取
risk_describe = RiskCompareRule(B_count)
else:
Risk_ALLT_FX_score="0%"
p1_describe=""
risk_describe="数据缺失"
Risk_ALLT_FX={
"score":Risk_ALLT_FX_score,
"describe":p1_describe,
"risk":risk_describe
}
return Risk_ALLT_FX
# 风险监测信息接口
@server.route('/gsb/api/report', methods=['POST'])
# 风险监测自动报告接口
@server.route('/gsb/api/report2', methods=['POST'])
def report():
if request.method =='POST':
print("=======调用风险检测自动报告接口")
data = request.json
json_str = json.dumps(data)
new_status = json.loads(json_str)
dict_all, ALLRisk = CreatBdictFromJson(new_status)
RiskA=RiskAbnormal(ALLRisk["t_r1"]) #风险异常比例
MRisk=MonitoredRisk(ALLRisk["t_r2"]) #被监控风险
ARisk=AuditedRisk(ALLRisk["t_r3"]) #被稽查风险
info=information(ALLRisk["t_r1"]) #提示语
Risk_IT_YCSR=IT_YCSR_Risk(dict_all) #企业所得税隐藏收入风险信息
Risk_IT_XZFY = IT_XZFY_Risk(dict_all) #企业所得税虚增费用风险信息
Risk_IT_XZCB = IT_XZCB_Risk(dict_all) #企业所得税虚增成本风险信息
Risk_VAT_XKFP=VAT_XKFP_Risk(dict_all) #企业增值税虚开发票风险信息
Risk_VAT_XZJX = VAT_XZJX_Risk(dict_all) #企业增值税虚增进项风险信息
Risk_VAT_YCXX = VAT_YCXX_Risk(dict_all) #企业增值税隐藏销项风险信息
Risk_CT_FX=CT_FX_Risk(dict_all) #消费税风险信息
Risk_ALLT_FX=ALLT_FX_Risk(dict_all) #综合风险信息
result={
"RiskA":RiskA,
"MRisk":MRisk,
"ARisk":ARisk,
"info":info,
"Risk_IT_YCSR":Risk_IT_YCSR,
"Risk_IT_XZFY":Risk_IT_XZFY,
"Risk_IT_XZCB":Risk_IT_XZCB,
"Risk_VAT_XKFP": Risk_VAT_XKFP,
"Risk_VAT_XZJX": Risk_VAT_XZJX,
"Risk_VAT_YCXX": Risk_VAT_YCXX,
"Risk_CT_FX": Risk_CT_FX,
"Risk_ALLT_FX": Risk_ALLT_FX
}
print(result,"------------------------------")
return jsonify(result)
GetAllinfo = json.loads(json_str)
companyname=GetAllinfo["company_name"] #获取监测企业名称
batchid=GetAllinfo["Task_Num"] #获取任务号
companyname = strQ2B(companyname)
if companyname:
Companyinfo=GetComanyinfoFromES(companyname) #根据输入公司名获取工商信息
Else_info, Fi_dict=GetFIinfoFromJson(GetAllinfo)#获取输入其他信息和财务信息
Risk_info=Riskinfo(GetAllinfo)#计算风险项
report = WriteReport(Else_info,Companyinfo,Fi_dict,Risk_info) #将工商、财务、风险三部分内容写入word中
rptUrl = put2oss(report) # 上传文件到oss上
print("报告已生成",rptUrl)
result=UpdateTaskRptUrl(batchid, rptUrl)# 完成后调用接口,通知服务已完成
return result
else:
return "请输入检测公司名称"
server.run(host='0.0.0.0', port=80, debug=True)
server.run(host='0.0.0.0', port=80, debug=True)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment