""" 通用方法类 """ from django.http import HttpResponse import json from datetime import date, datetime from django.db import connection from django.conf import settings import hashlib import decimal class CJsonEncoder(json.JSONEncoder): def default(self, obj): if isinstance(obj, datetime): return obj.strftime('%Y-%m-%d %H:%M:%S') elif isinstance(obj, date): return obj.strftime('%Y-%m-%d') elif isinstance(obj, decimal.Decimal): # python无法json_dumps decimal类型,转成float型 return float(obj) else: return json.JSONEncoder.default(self, obj) def sql_array(result, select): """ sql数据查询转换为数组 :param result:查询结果 :param select:查询的字段 :return: """ return_list = [] for i in result: val_list = {} for j in select: val_list[j] = i.__getattribute__(j) # 获取字段的值 return_list.append(val_list) return return_list def re_su(msg='', data='', url='', code=0): """ 返回成功信息 :param msg: :param data: :param url: :param code: :return: """ if data == '': data = {} result = { 'flag': True, 'msg': msg, 'data': data, 'url': url, 'code': code } return result def re_err(msg, data='', url='', code=1): """ 返回报错信息到上一层 :param msg: :param data: :param url: :param code: :return: """ if data == '': data = {} result = { 'flag': False, 'msg': msg, 'data': data, 'url': url, 'code': code } return result def re_json_su(msg='', data='', url='', code=0): """ 返回 失败错误信息 :param msg: :param data: :param url: :param code: :return: """ if data == '': data = {} result = { 'flag': True, 'msg': msg, 'data': data, 'url': url, 'code': code } return HttpResponse(json.dumps(result, cls=CJsonEncoder)) def re_json_err(msg, data='', url='', code=1): """ 返回json报错信息到前端 :param msg: :param data: :param url: :param code: :return: """ if data == '': data = {} result = { 'flag': False, 'msg': msg, 'data': data, 'url': url, 'code': code } return HttpResponse(json.dumps(result, cls=CJsonEncoder)) def re_json(data): """ 返回字符串 :param data: :return: """ return HttpResponse(json.dumps(data, cls=CJsonEncoder)) def select_all(sql, param): """ select查询 :param sql: :return: """ cursor = connection.cursor() cursor.execute(sql, param) items = cursor.fetchall() # 读取所有 desc = cursor.description li = [] if items: for item in items: d = {} i = 0 for de in desc: d[de[0]] = str(item[i]) i = i + 1 li.append(d) return li else: return li def select_row(sql, param): """ select查询 :param sql: :return: """ cursor = connection.cursor() cursor.execute(sql, param) items = cursor.fetchall() # 读取所有 desc = cursor.description li = [] if items: for item in items: d = {} i = 0 for de in desc: d[de[0]] = item[i] i = i + 1 li.append(d) return li[0] else: return li def auth_code_for_cs(request_time): """ cs接口调用加密函数 :param request_time: :return: """ request_time = str(request_time) user = settings.ADM_USER user_key = settings.ADM_USER_KEY # 加密规则为 request_time(linux时间戳)前三位作为种子+ user + user_key+后三位,进行MD5加密后,换成大写字母 auth_code = ( request_time[0:3] + "|" + user + "|" + user_key + "|" + request_time[(len(request_time) - 3):len(request_time)]) m2 = hashlib.md5() # 进行md5加密 m2.update(auth_code.encode("utf-8")) return m2.hexdigest().upper() def get_client_ip(request): """ 获取用户ip :param request: :return: """ try: real_ip = request.META['HTTP_X_FORWARDED_FOR'] regip = real_ip.split(",")[0] except: try: regip = request.META['REMOTE_ADDR'] except: regip = "" return regip