您的位置:首頁 >聚焦 >

即時看!造個Python輪子,實現(xiàn)根據(jù)Excel生成Model和數(shù)據(jù)導(dǎo)入腳本

2022-11-19 07:10:56    來源:程序員客棧
1前言

最近遇到一個需求,有幾十個Excel,每個的字段都不一樣,然后都差不多是第一行是表頭,后面幾千上萬的數(shù)據(jù),需要把這些Excel中的數(shù)據(jù)全都加入某個已經(jīng)上線的Django項目


(資料圖片僅供參考)

這就需要每個Excel建個表,然后一個個導(dǎo)入了

這樣的效率太低,不能忍

所以我造了個自動生成 Model 和導(dǎo)入腳本的輪子

2思路

首先拿出 pandas,它的 DataFrame 用來處理數(shù)據(jù)很方便

pandas 加載 Excel 之后,提取表頭,我們要通過表頭來生成數(shù)據(jù)表的字段。有些 Excel 的表頭是中文的,需要先做個轉(zhuǎn)換。

一開始我是想用翻譯API,全都翻譯成英文,不過發(fā)現(xiàn)免費的很慢有限額,微軟、DeepL都要申請,很麻煩。索性用個拼音轉(zhuǎn)換庫,全都轉(zhuǎn)換成拼音得了~

然后字段的長度也要確定,或者全部用不限制長度的 TextField

權(quán)衡一下,我還是做一下字段長度判定的邏輯,遍歷整個表,找出各個字段最長的數(shù)據(jù),然后再加一個偏移量,作為最大長度。

接著生成 Model 類,這里我用 jinja2 模板語言,先把大概的模板寫好,然后根據(jù)提取出來的字段名啥的生成。

最后生成 admin 配置和導(dǎo)入腳本,同理,也是用 jinja2 模板。

3實現(xiàn)

簡單介紹下思路,現(xiàn)在開始上代碼。

就幾行而已,Python很省代碼~

模型

首先定義倆模型

字段模型

classField(object):def__init__(self,name:str,verbose_name:str,max_length:int=128):self.name=nameself.verbose_name=verbose_nameself.max_length=max_lengthdef__str__(self):returnf"{self.name}:{self.verbose_name}"def__repr__(self):returnself.__str__()

Model模型

為了符合Python關(guān)于變量的命名規(guī)范,snake_name屬性是用正則表達式實現(xiàn)駝峰命名轉(zhuǎn)蛇形命名

classModel(object):def__init__(self,name:str,verbose_name:str,id_field:Field,fields:List[Field]):self.name=nameself.verbose_name=verbose_nameself.id_field=id_fieldself.fields:List[Field]=fields@propertydefsnake_name(self):importrepattern=re.compile(r"(?{self.name}:{self.verbose_name}"def__repr__(self):returnself.__str__()

代碼模板

使用 jinja2 實現(xiàn)。

本身 jinja2 是 Flask、Django 之類的框架用來渲染網(wǎng)頁的。

不過單獨使用的效果也不錯,我的 DjangoStarter 框架也是用這個 jinja2 來自動生成 CRUD 代碼~

Model模板

# -*- coding:utf-8 -*-from django.db import modelsclass {{ model.name }}(models.Model):    """{{ model.verbose_name }}"""    {% for field in model.fields -%}    {{ field.name }} = models.CharField("{{ field.verbose_name }}", default="", null=True, blank=True, max_length={{ field.max_length }})    {% endfor %}    class Meta:        db_table = "{{ model.snake_name }}"        verbose_name = "{{ model.verbose_name }}"        verbose_name_plural = verbose_name

Admin配置模板

@admin.register({{ model.name }})class {{ model.name }}Admin(admin.ModelAdmin):    list_display = [{% for field in model.fields %}"{{ field.name }}", {% endfor %}]    list_display_links = None    def has_add_permission(self, request):        return False    def has_delete_permission(self, request, obj=None):        return False    def has_view_permission(self, request, obj=None):        return False

數(shù)據(jù)導(dǎo)入腳本

這里做了幾件事:

使用 pandas 處理空值,填充空字符串已有數(shù)據(jù)進行批量更新新數(shù)據(jù)批量插入

更新邏輯麻煩一點,因為數(shù)據(jù)庫一般都有每次最大更新數(shù)量的限制,所以我做了分批處理,通過 update_data_once_max_lines控制每次最多同時更新多少條數(shù)據(jù)。

def import_{{ model.snake_name }}():    file_path = path_proc(r"{{ excel_filepath }}")    logger.info(f"讀取文件: {file_path}")    xlsx = pd.ExcelFile(file_path)    df = pd.read_excel(xlsx, 0, header={{ excel_header }})    df.fillna("", inplace=True)    logger.info("開始處理數(shù)據(jù)")    id_field_list = {{ model.name }}.objects.values_list("{{ model.id_field.name }}", flat=True)    item_list = list({{ model.name }}.objects.all())    def get_item(id_value):        for i in item_list:            if i.shen_qing_ren_zheng_jian_hao_ma == id_value:                return i        return None    insert_data = []    update_data_once_max_lines = 100    update_data_sub_set_index = 0    update_data = [[]]    update_fields = set()    for index, row in df.iterrows():        if "{{ model.id_field.verbose_name }}" not in row:            logger.error("id_field {} is not existed".format("{{ model.id_field.verbose_name }}"))            continue        if row["{{ model.id_field.verbose_name }}"] in id_field_list:            item = get_item(row["{{ model.id_field.verbose_name }}"])            {% for field in model.fields -%}            if "{{ field.verbose_name }}" in row:                if item.{{ field.name }} != row["{{ field.verbose_name }}"]:                    item.{{ field.name }} = row["{{ field.verbose_name }}"]                    update_fields.add("{{ field.name }}")            {% endfor %}            if len(update_data[update_data_sub_set_index]) >= update_data_once_max_lines:                update_data_sub_set_index += 1                update_data.append([])            update_data[update_data_sub_set_index].append(item)        else:            # {% for field in model.fields -%}{{ field.verbose_name }},{%- endfor %}            model_obj = {{ model.name }}()            {% for field in model.fields -%}            if "{{ field.verbose_name }}" in row:                model_obj.{{ field.name }} = row["{{ field.verbose_name }}"]            {% endfor %}            insert_data.append(model_obj)    logger.info("開始批量導(dǎo)入")    {{ model.name }}.objects.bulk_create(insert_data)    logger.info("導(dǎo)入完成")    if len(update_data[update_data_sub_set_index]) > 0:        logger.info("開始批量更新")        for index, update_sub in enumerate(update_data):            logger.info(f"正在更新 {index * update_data_once_max_lines}-{(index + 1) * update_data_once_max_lines} 條數(shù)據(jù)")            {{ model.name }}.objects.bulk_update(update_sub, list(update_fields))        logger.info("更新完成")

主體代碼

剩下的全是核心代碼了

引用依賴

先把用到的庫導(dǎo)入

importosimportrefromtypingimportList,Optionalfrompypinyinimportpinyin,lazy_pinyin,Stylefromjinja2importEnvironment,PackageLoader,FileSystemLoader

或者后面直接去我的完整代碼里面拿也行~

老規(guī)矩,我封裝了一個類。

構(gòu)造方法需要指定 Excel 文件地址,還有表頭的行索引。

classExcelToModel(object):def__init__(self,filepath,header_index=0):self.filepath=filepathself.header_index=header_indexself.columns=[]self.fields:List[Field]=[]self.base_dir=os.path.dirname(os.path.abspath(__file__))self.template_path=os.path.join(self.base_dir,"templates")self.jinja2_env=Environment(loader=FileSystemLoader(self.template_path))self.load_file()

這里面有個 self.load_file()后面再貼。

字段名中文轉(zhuǎn)拼音

用了 pypinyin這個庫,感覺還不錯。

轉(zhuǎn)換后用正則表達式,去除符號,只保留英文和數(shù)字。

代碼如下,也是放在 ExcelToModel類里邊。

@staticmethoddefto_pinyin(text:str)->str:pattern=r"~`!#$%^&*()_+-=|\";"":/.,?><~·!@#¥%……&*()——+-=“:’;、。,?》{《}】【\n\]\["text=re.sub(r"[%s]+"%pattern,"",text)return"_".join(lazy_pinyin(text,style=Style.NORMAL))

加載文件

拿出萬能的 pandas,按照前面說的思路,提取表頭轉(zhuǎn)換成字段,并且遍歷數(shù)據(jù)確定每個字段的最大長度,我這里偏移值是32,即在當(dāng)前數(shù)據(jù)最大長度基礎(chǔ)上加上32個字符。

defload_file(self):importpandasaspdxlsx=pd.ExcelFile(self.filepath)df=pd.read_excel(xlsx,0,header=self.header_index)df.fillna("",inplace=True)self.columns=list(df.columns)forcolinself.columns:field=Field(self.to_pinyin(col),col)self.fields.append(field)forindex,rowindf.iterrows():item_len=len(str(row[col]))ifitem_len>field.max_length:field.max_length=item_len+32print(field.verbose_name,field.name,field.max_length)

如果覺得這樣生成表太慢,可以把確定最大長度的這塊代碼去掉,就下面這塊代碼

forindex,rowindf.iterrows():item_len=len(str(row[col]))ifitem_len>field.max_length:field.max_length=item_len+32

手動指定最大長度或者換成不限制長度的 TextField就行。

生成文件

先構(gòu)造個 context 然后直接用 jinja2 的 render功能生成代碼。

為了在導(dǎo)入時判斷數(shù)據(jù)存不存在,生成代碼時要指定 id_field_verbose_name,即Excel文件中類似“證件號碼”、“編號”之類的列名,注意是Excel中的表頭列名。

deffind_field_by_verbose_name(self,verbose_name)->Optional[Field]:forfieldinself.fields:iffield.verbose_name==verbose_name:returnfieldreturnNonedefgenerate_file(self,model_name:str,verbose_name:str,id_field_verbose_name:str,output_filepath:str):template=self.jinja2_env.get_template("output.jinja2")context={"model":Model(model_name,verbose_name,self.find_field_by_verbose_name(id_field_verbose_name),self.fields),"excel_filepath":self.filepath,"excel_header":self.header_index,}withopen(output_filepath,"w+",encoding="utf-8")asf:render_result=template.render(context)f.write(render_result)

使用

看代碼。

tool=ExcelToModel("file.xlsx")tool.generate_file("CitizenFertility","房價與居民生育率","證件號碼","output/citizen_fertility.py")

生成出來的代碼都在一個文件里,請根據(jù)實際情況放到項目的各個位置。

4完整代碼

發(fā)布到Github了

地址: https://github.com/Deali-Axy/excel_to_model

5小結(jié)

目前看來完美契合需求,極大節(jié)省工作量~

實際跑起來,不得不吐槽 Python 羸弱的性能,占內(nèi)存還大… 湊合著用吧。也許后面有時間會優(yōu)化一下~

關(guān)鍵詞: 最大長度 證件號碼 正則表達式

相關(guān)閱讀