獨立執行

應用翻譯轉換

程式目標是將某個omf檔案中的基底語言進行置換,使用前須先修改前四行參數。

例如 test_flow.omf 是一個以繁體中文設計的應用,已經透過OMFLOW內建的功能上傳過各語言的翻譯,當英語客戶需要此應用借鑒時,可以利用該程式將“設計”的語言從繁體中文改成英文。

※ 請注意,omf檔案中需含有對應翻譯,此程式只是幫您進行翻譯對調,而非自動翻譯。

import json


path = '/some_path/test_flow.omf'    #請依照實際路徑及名稱修改
trans_to_path = '/some_path/test_flow.omf'
original_language = 'zh-hant'        #請依照實際需求修改
replace_language = 'ja'



translator_onlytitle_formitem_type_list = ['box12','box6','areabox','n_user','n_group','u_gps','label']
translator_titleplaceholder_formitem_type_list = ['h_title','inputbox','date','datetime','subquery','inputcaculate','file']
translator_lists_formitem_type_list = ['h_level','h_status','list','checkbox']
translator_special_formitem_type_list = ['subtable']

def loopFlowObject(lan_dict, flowobject):
    try:
        items = flowobject.get('items',[])
        for i in items:
            item_type = i['type']
            if item_type != 'line':
                config = i['config']
                
                item_text = lan_dict.get(i['text'],'')
                if item_text:
                    i['text'] = item_text
                
                if item_type in ['start','python','setform']:
                    loop_list = ['input','output']
                    loopInOutPut(lan_dict, loop_list, config)
                
                elif item_type == 'end':
                    loop_list = ['output']
                    loopInOutPut(lan_dict, loop_list, config)
                
                elif item_type in ['outflow','inflow','subflow']:
                    loop_list = ['subflow_input','subflow_output']
                    loopInOutPut(lan_dict, loop_list, config)
                
                elif item_type == 'form':
                    loop_list = ['input','output','subflow_input','subflow_output','input1','input2']
                    loopInOutPut(lan_dict, loop_list, config)
                    
                    action1_text = lan_dict.get(config['action1_text'], '')
                    if action1_text:
                        config['action1_text'] = action1_text
                    action2_text = lan_dict.get(config['action2_text'], '')
                    if action2_text:
                        config['action2_text'] = action2_text
                    
                    form_object = config.get('form_object',{})
                    if form_object:
                        if form_object.get('items',[]):
                            loopFormObject(lan_dict, form_object)
                
                elif item_type in ['async','switch']:
                    pass
    except:
        pass


def loopFormObject(lan_dict, formobject):
    try:
        items = formobject.get('items',[])
        for i in items:
            item_type = i['type']
            config = i['config']
            if item_type in translator_onlytitle_formitem_type_list:
                title = lan_dict.get(config['title'],'')
                if title:
                    config['title'] = title
                
            elif item_type in translator_titleplaceholder_formitem_type_list:
                title = lan_dict.get(config['title'],'')
                if title:
                    config['title'] = title
                
                placeholder = lan_dict.get(config['placeholder'],'')
                if placeholder:
                    config['placeholder'] = placeholder
                
            elif item_type == 'h_group':
                group_title = lan_dict.get(config['group_title'],'')
                if group_title:
                    config['group_title'] = group_title
                
                user_title = lan_dict.get(config['user_title'],'')
                if user_title:
                    config['user_title'] = user_title
                
            elif item_type in translator_lists_formitem_type_list:
                title = lan_dict.get(config['title'],'')
                if title:
                    config['title'] = title
                
                lists = config['lists']
                for l in lists:
                    text = lan_dict.get(l['text'],'')
                    if text:
                        l['text'] = text
        
            elif item_type in translator_special_formitem_type_list:
                title = lan_dict.get(config['title'],'')
                if title:
                    config['title'] = title
                
                loopFormObject(lan_dict, config['form_object'])
    except:
        pass


def loopInOutPut(lan_dict, loop_list, config):
        try:
            for i in loop_list:
                n_lst = config[i]
                for n in n_lst:
                    des = lan_dict.get(n['des'],'')
                    if des:
                        n['des'] = des
        except:
            pass



try:
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()
        f.close()
    omf_list = json.loads(content)
    for omflow_app_dict in omf_list:
        this_app_language_package = omflow_app_dict.get('language_package',{})
        this_app_language_package = json.loads(this_app_language_package) if isinstance(this_app_language_package, str) else this_app_language_package
        zh_hant = this_app_language_package.get('zh-hant',{})
        en = this_app_language_package.get('en',{})
        ja = this_app_language_package.get('ja',{})
        zh_hans = this_app_language_package.get('zh-hans',{})
        replace_dict = this_app_language_package.get(replace_language,{}).copy()
        this_app_name = replace_dict.get(omflow_app_dict.get('app_name',''),'')
        if this_app_name:
            omflow_app_dict['app_name'] = this_app_name
        this_app_flow_list = omflow_app_dict.get('flow_list',{})
        for flow in this_app_flow_list:
            this_flow_name = replace_dict.get(flow['flow_name'],'')
            if this_flow_name:
                flow['flow_name'] = this_flow_name
            this_description = replace_dict.get(flow['description'],'')
            if this_description:
                flow['description'] = this_description
            
            this_flowobject = flow.get('flowobject',{})
            loopFlowObject(replace_dict, this_flowobject)
            this_formobject = flow.get('formobject',{})
            if not this_formobject:
                this_formobject = this_flowobject.get('form_object',{})
            loopFormObject(replace_dict, this_formobject)
            
            this_subflow_list = this_flowobject.get('subflow',[])
            for subflow in this_subflow_list:
                this_subflow_name = replace_dict.get(subflow['name'],'')
                if this_subflow_name:
                    subflow['name'] = this_subflow_name
                this_subflow_description = replace_dict.get(subflow['description'],'')
                if this_subflow_description:
                    subflow['description'] = this_subflow_description
                loopFlowObject(replace_dict, subflow)
        
        new_dict = {}
        new_lan = ''
        for origin_text in replace_dict:
            replace_text = replace_dict[origin_text]
            if replace_text:
                if origin_text in zh_hant:
                    trans_text = zh_hant.pop(origin_text)
                    zh_hant[replace_text] = trans_text
                    if replace_language == 'zh-hant':
                        new_dict[replace_text] = origin_text
                if origin_text in en:
                    trans_text = en.pop(origin_text)
                    en[replace_text] = trans_text
                    if replace_language == 'en':
                        new_dict[replace_text] = origin_text
                if origin_text in ja:
                    trans_text = ja.pop(origin_text)
                    ja[replace_text] = trans_text
                    if replace_language == 'ja':
                        new_dict[replace_text] = origin_text
                if origin_text in zh_hans:
                    trans_text = zh_hans.pop(origin_text)
                    zh_hans[replace_text] = trans_text
                    if replace_language == 'zh-hans':
                        new_dict[replace_text] = origin_text
        
        this_app_language_package[original_language] = new_dict
        omflow_app_dict['language_package'] = json.dumps(this_app_language_package)

    result = json.dumps(omf_list)
    
    with open(trans_to_path, "w", encoding="utf-8") as f:
        f.write(result)
        f.close()
except Exception as e:
    print(e.__str__())

找出組織圖上特定職務/權責的使用者

將組織圖上特定的職務/權責使用者整理出csv檔案

從系統中匯出組織圖的檔案,以及從報表匯出的使用者的資料,合併整理出使用者的權責/職務

import json

f = open("mainOrg.omf", "r", encoding='UTF-8')
orgobject = f.read()
orgobject = json.loads(orgobject)


#找出職務元件上指定名稱的權責底下連接的元件
sirList = []
for org_item in orgobject['items']:
    if org_item['type'] == 'role' and '主管' in org_item['config']['responsibility']:
        for line_item in orgobject['items']:
            if line_item['type'] == 'line' and line_item['config']['source_item'] == org_item['id']:
                sirList.append(line_item['config']['target_item'])
#找出前面元件的使用者id
peopleList = []
for org_item in orgobject['items']:
    if org_item['type'] == 'people' and org_item['id'] in sirList:
        if not org_item['config']['noid'] in peopleList:
            peopleList.append(org_item['config']['noid'])
import csv

#開啟報表產出的使用者csv,比對使用者id生成新的csv
readyList = []
with open('syscomUser.csv', newline='', encoding='UTF-8') as syscomUserfile:
    spamreader = csv.reader(syscomUserfile, delimiter=',', quotechar='|')
  
    for row in spamreader:
        if row[0] in peopleList:
            readyList.append(row[1:])
                
                
with open('syscomUserWithSir.csv', 'w', newline='', encoding='UTF-8') as syscomUserWithSirfile:
    spamwriter = csv.writer(syscomUserWithSirfile, delimiter=',',
                        quotechar='|', quoting=csv.QUOTE_MINIMAL)
    for row in readyList:
        spamwriter.writerow(row)

表單資料匯出

本節將提供python代碼教導如何將OMFLOW表單及其附件一同匯出成PDF檔案。

安裝套件

執行Script之前須先滿足以下條件:

Python套件:

pip install requests
pip install pdfkit

PDF 軟體: 請從官方網站( https://wkhtmltopdf.org )下載適用於您當前環境的安裝檔,然後進行安裝。

執行腳本


import requests, os, pdfkit, shutil, zipfile, threading, time
from datetime import datetime


stop_thread = False

def zip_folder(folder_path, zip_filename='data.zip'):
    with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
        for foldername, subfolders, filenames in os.walk(folder_path):
            for filename in filenames:
                file_path = os.path.join(foldername, filename)
                arcname = os.path.relpath(file_path, folder_path)
                zipf.write(file_path, arcname=arcname)

def your_while_loop(token_url, user, token):
    while not stop_thread:
        time.sleep(10)
        response = requests.request("GET", token_url, auth=( user, token ))

ann = '''

#######################################################
#                 OMFLOW EXPORT TOOL                  #
#                                                     #
#   Please make sure the current environment already  #
#            has the following packages.              #
#                                                     #
# python package: requests, pdfkit.                   #
# pdf software: wkhtmltopdf.(https://wkhtmltopdf.org) #
#                                                     #
#######################################################

'''
try:
    print(ann)
    #user input param
    server = input("Enter the OMFLOW server domain (ex. https://10.1.15.3:443):")
    if server:
        if server[-1] == '/':
            server = server[:-1]
    user = input("Enter the OMFLOW administrator username:")
    token = input("Enter the password:")
    api_path = input("Enter the API_PATH:")
    print('')
    print('Enter the wkhtmltopdf path or press enter to skip.')
    print('ex. C:\\\\Program Files\\\\wkhtmltopdf\\\\bin\\\\wkhtmltopdf.exe')
    path_wkhtmltopdf = input("wkhtmltopdf path:")
    print('')
    if path_wkhtmltopdf:
        pdfkit_config = pdfkit.configuration(wkhtmltopdf=path_wkhtmltopdf)
    else:
        pdfkit_config = None
    script_path = os.path.abspath(__file__)
    script_directory = os.path.dirname(script_path)
    download_directory_path = os.path.join(script_directory, 'data')
    if not os.path.exists(download_directory_path):
        os.makedirs(download_directory_path)

    token_url = f'{server}/accounts/api/security/get/'
    server_version_url = f'{server}/rest/app/server-version/get/'
    list_omdata_url = f'{server}/rest/flowmanage/api/omdata/list/{api_path}'
    html_url = f'{server}/rest/flowmanage/api/print-form/'
    list_file_url = f'{server}/rest/api/files/list/'
    get_mapping_url = f'{server}/rest/api/mapping-path/get/'
    download_url = f'{server}/'
    #param
    total_size = 0
    script_start_time = datetime.now()
    #get token
    response = requests.request("GET", token_url, auth=( user, token ))
    response_dict = response.json()
    this_status = response_dict.get('status', 404)
    if this_status != 200:
        raise Exception(f"OMFLOW return status code {this_status} when get security. error message: {response_dict.get('message','')}")
    security = response_dict.get("result", {}).get("security")
    #anothrt thread to get token
    extra_thread = threading.Thread(target=your_while_loop, args=(token_url, user, token))
    extra_thread.start()
    #check Server version at least 1.2.0.0
    values={
        "security" : security,
        "omflow_restapi" : 1
    }
    response = requests.post(server_version_url, json=values)
    response_dict = response.json()
    this_status = response_dict.get('status', 404)
    if this_status != 200:
        raise Exception(f"OMFLOW return status code {this_status} when get server version. error message: {response_dict.get('message','')}")
    server_version = response_dict.get('result',{}).get('version','')
    if server_version < 1002000000:
        raise Exception(f"OMFLOW sever version at least 1.2.0.0")
    #get flow_uuid
    search_conditions = []
    values={
        "security" : security,
        "omflow_restapi" : 1,
        "search_conditions" : search_conditions,
        "search_columns" : ['flow_uuid'],
        "exclude_conditions" : [],
        "order_columns" : [],
        "limit" : 1,
        "start" : 0
    }
    response = requests.post(list_omdata_url, json=values)
    response_dict = response.json()
    this_status = response_dict.get('status', 404)
    if this_status != 200:
        raise Exception(f"OMFLOW return status code {this_status} when list omdata. error message: {response_dict.get('message','')}")
    result = response_dict.get('result')
    if result:
        flow_uuid = result[0]['flow_uuid'].replace('-','')
        #get list
        search_conditions = []
        search_conditions.append({'column':'history','condition':'=','value':False})
        search_conditions.append({'column':'running','condition':'=','value':False})
        search_conditions.append({'column':'error','condition':'=','value':False})
        search_conditions.append({'column':'closed','condition':'=','value':True})
        values={
            "security" : security,
            "omflow_restapi" : 1,
            "search_conditions" : search_conditions,
            "search_columns" : ['id','data_no'],
            "exclude_conditions" : [],
            "order_columns" : ['-data_no','id'],
            "limit" : 0,
            "start" : 0
        }
        response = requests.post(list_omdata_url, json=values)
        result = response.json().get('result')
        total_count = len(result)
        print(f'The total number of records for the target flow is {total_count}.')
        print('Start to export omdata ...')
        loop_count = 1
        for omdata in result:
            data_no = omdata['data_no']
            data_id = omdata['id']
            #check folder exists
            data_no_directory_path = os.path.join(download_directory_path, data_no)
            if not os.path.exists(data_no_directory_path):
                os.makedirs(data_no_directory_path)
            #check pdf exists
            pdf_file_path = os.path.join(data_no_directory_path, f'{data_no}.pdf')
            if not os.path.exists(pdf_file_path):
                #get html
                this_html_url = html_url + f'{flow_uuid}/{data_no}/{data_id}/'
                values={
                    "security" : security,
                    "omflow_restapi" : 1,
                    }
                response = requests.post(this_html_url, json=values)
                if response.status_code != 200:
                    raise Exception(f'OMFLOW return status code {response.status_code} when get omdata html.')
                this_html_content = response.content.decode('utf-8')
                this_html_content = this_html_content.replace('window.print();window.close();','')
                this_html_content = this_html_content.replace('<link rel="stylesheet" href="/static/plugins/fontawesome-free-5.13.0-web/css/all.css">','')
                #html to pdf
                if pdfkit_config:
                    pdfkit.from_string(this_html_content, pdf_file_path, configuration=pdfkit_config)
                else:
                    pdfkit.from_string(this_html_content, pdf_file_path)
            #amount size
            total_size += os.path.getsize(pdf_file_path)
            #get file path
            this_file_search_conditions = []
            this_file_search_conditions.append({'column':'flow_uuid','condition':'=','value':flow_uuid})
            this_file_search_conditions.append({'column':'data_no','condition':'=','value':data_no})
            this_file_search_conditions.append({'column':'delete','condition':'=','value':False})
            values={
                "security" : security,
                "omflow_restapi" : 1,
                "app_name" : 'omformflow',
                "search_conditions" : this_file_search_conditions,
                "search_columns" : ['file_name','data_id','formitm_id'],
                "limit" : 0
                }
            response = requests.post(list_file_url, json=values)
            response_dict = response.json()
            this_status = response_dict.get('status', 404)
            if this_status != 200:
                raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
            this_file_list = response_dict.get('result')
            for this_file in this_file_list:
                #check file exists
                this_file_path = os.path.join(data_no_directory_path, this_file['file_name'])
                if not os.path.exists(this_file_path):
                    #get mapping path
                    if this_file['formitm_id']:
                        this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['formitm_id'], this_file['file_name'])
                    else:
                        this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['file_name'])
                    values={
                        "security" : security,
                        "omflow_restapi" : 1,
                        "path" : this_rel_path,
                        "path_type" : 'download_files'
                        }
                    response = requests.post(get_mapping_url, json=values)
                    response_dict = response.json()
                    this_status = response_dict.get('status', 404)
                    if this_status != 200:
                        raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
                    mapping_path = response_dict.get('result')
                    #download file
                    this_download_url = download_url + mapping_path
                    response = requests.get(this_download_url)
                    if response.status_code != 200:
                        raise Exception(f'OMFLOW return status code {response.status_code} when download files.')
                    #write file
                    this_file_content = response.content
                    with open(this_file_path, 'wb') as f:
                        f.write(this_file_content)
                        f.close()
                #amount size
                total_size += os.path.getsize(this_file_path)
            #print info every 10 records
            if loop_count % 10 == 0:
                now_time = datetime.now()
                print(f'Currently, {loop_count} records have been exported, consuming {total_size / (1024 ** 2):.2f} MB of space.')
                t, u, f = shutil.disk_usage(script_directory)
                print(f'There is still {f / (1024 ** 3):.2f} GB of space available on the disk')
                x = (total_count - loop_count) / loop_count
                print(f'Anticipated additional requirement of {x * total_size / (1024 ** 3):.2f} GB.')
                time_difference = now_time - script_start_time
                minutes_difference = float(time_difference.total_seconds() / 60)
                print(f'Anticipated additional time required: {x * minutes_difference:.2f} minutes.')
            loop_count += 1
        #zip folder
        print('Export OmData finished. Start to zip folder.')
        zip_file_path = os.path.join(script_directory, 'data.zip')
        if os.path.exists(zip_file_path):
            os.remove(zip_file_path)
        #print info
        now_time = datetime.now()
        time_difference = now_time - script_start_time
        minutes_difference = float(time_difference.total_seconds() / 60)
        print(f'Anticipated additional time required: {4 * minutes_difference:.2f} minutes.')
        zip_folder(download_directory_path, zip_file_path)
        #remove folder
        shutil.rmtree(download_directory_path)
    else:
        print(f'Unable to retrieve any data from {api_path}.')
    #stop thread
    stop_thread = True
    extra_thread.join()
    print('OMFLOW data export finish!')
except Exception as e:
    #stop thread
    stop_thread = True
    extra_thread.join()
    print('OMFLOW data export error: ',e.__str__())

執行Script步驟如下:

請依照圖示輸入下列資訊:

  1. OMFLOW 伺服器網域/IP:port(例如 https://10.1.15.3:443)

  2. 具有管理員權限的帳號(OMFLOW 管理員)

  3. 使用者密碼

  4. 欲匯出流程的 API 路徑

  5. 若已將 wkhtmltopdf.exe 路徑新增至環境變數(PATH),您可按 Enter 跳過此項。

執行過程中,畫面將顯示流程的總關閉記錄數,並同時進行下載任務。在每完成 10 條記錄的執行後,將提供以下訊息:

  1. 目前的空間使用量

  2. 目前環境中的可用空間

  3. 預估即將下載所需的空間

  4. 預估即將下載所需的時間

※ 上述項目 3 和 4 中提供的估算是基於已完成任務的資源消耗,可能不完全準確。

修改下載條件

此腳本使用 OMFLOW API 完成,允許使用者根據需要修改內容。

  1. 如果您希望下載尚未關閉的數據,您可以在原始腳本的第 117 行刪除以下部分。

search_conditions.append({'column':'closed','condition':'=','value':True})
  1. 如果您希望只檢索 'formitm_1' 欄位等於 'SYSCOM' 的記錄,您可以添加以下內容。

search_conditions.append({'column':' formitm_1','condition':'=','value': ' SYSCOM'})

詳細查詢條件教學可參考【查詢表單】章節

最后更新于