獨立執行
應用翻譯轉換
程式目標是將某個omf檔案中的基底語言進行置換,使用前須先修改前四行參數。
例如 test_flow.omf 是一個以繁體中文設計的應用,已經透過OMFLOW內建的功能上傳過各語言的翻譯,當英語客戶需要此應用借鑒時,可以利用該程式將“設計”的語言從繁體中文改成英文。
※ 請注意,omf檔案中需含有對應翻譯,此程式只是幫您進行翻譯對調,而非自動翻譯。
import json
path = '/some_path/test_flow.omf' #請依照實際路徑及名稱修改
trans_to_path = '/some_path/test_flow.omf'
original_language = 'zh-hant' #請依照實際需求修改
replace_language = 'ja'
translator_onlytitle_formitem_type_list = ['box12','box6','areabox','n_user','n_group','u_gps','label']
translator_titleplaceholder_formitem_type_list = ['h_title','inputbox','date','datetime','subquery','inputcaculate','file']
translator_lists_formitem_type_list = ['h_level','h_status','list','checkbox']
translator_special_formitem_type_list = ['subtable']
def loopFlowObject(lan_dict, flowobject):
try:
items = flowobject.get('items',[])
for i in items:
item_type = i['type']
if item_type != 'line':
config = i['config']
item_text = lan_dict.get(i['text'],'')
if item_text:
i['text'] = item_text
if item_type in ['start','python','setform']:
loop_list = ['input','output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type == 'end':
loop_list = ['output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type in ['outflow','inflow','subflow']:
loop_list = ['subflow_input','subflow_output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type == 'form':
loop_list = ['input','output','subflow_input','subflow_output','input1','input2']
loopInOutPut(lan_dict, loop_list, config)
action1_text = lan_dict.get(config['action1_text'], '')
if action1_text:
config['action1_text'] = action1_text
action2_text = lan_dict.get(config['action2_text'], '')
if action2_text:
config['action2_text'] = action2_text
form_object = config.get('form_object',{})
if form_object:
if form_object.get('items',[]):
loopFormObject(lan_dict, form_object)
elif item_type in ['async','switch']:
pass
except:
pass
def loopFormObject(lan_dict, formobject):
try:
items = formobject.get('items',[])
for i in items:
item_type = i['type']
config = i['config']
if item_type in translator_onlytitle_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
elif item_type in translator_titleplaceholder_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
placeholder = lan_dict.get(config['placeholder'],'')
if placeholder:
config['placeholder'] = placeholder
elif item_type == 'h_group':
group_title = lan_dict.get(config['group_title'],'')
if group_title:
config['group_title'] = group_title
user_title = lan_dict.get(config['user_title'],'')
if user_title:
config['user_title'] = user_title
elif item_type in translator_lists_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
lists = config['lists']
for l in lists:
text = lan_dict.get(l['text'],'')
if text:
l['text'] = text
elif item_type in translator_special_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
loopFormObject(lan_dict, config['form_object'])
except:
pass
def loopInOutPut(lan_dict, loop_list, config):
try:
for i in loop_list:
n_lst = config[i]
for n in n_lst:
des = lan_dict.get(n['des'],'')
if des:
n['des'] = des
except:
pass
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
f.close()
omf_list = json.loads(content)
for omflow_app_dict in omf_list:
this_app_language_package = omflow_app_dict.get('language_package',{})
this_app_language_package = json.loads(this_app_language_package) if isinstance(this_app_language_package, str) else this_app_language_package
zh_hant = this_app_language_package.get('zh-hant',{})
en = this_app_language_package.get('en',{})
ja = this_app_language_package.get('ja',{})
zh_hans = this_app_language_package.get('zh-hans',{})
replace_dict = this_app_language_package.get(replace_language,{}).copy()
this_app_name = replace_dict.get(omflow_app_dict.get('app_name',''),'')
if this_app_name:
omflow_app_dict['app_name'] = this_app_name
this_app_flow_list = omflow_app_dict.get('flow_list',{})
for flow in this_app_flow_list:
this_flow_name = replace_dict.get(flow['flow_name'],'')
if this_flow_name:
flow['flow_name'] = this_flow_name
this_description = replace_dict.get(flow['description'],'')
if this_description:
flow['description'] = this_description
this_flowobject = flow.get('flowobject',{})
loopFlowObject(replace_dict, this_flowobject)
this_formobject = flow.get('formobject',{})
if not this_formobject:
this_formobject = this_flowobject.get('form_object',{})
loopFormObject(replace_dict, this_formobject)
this_subflow_list = this_flowobject.get('subflow',[])
for subflow in this_subflow_list:
this_subflow_name = replace_dict.get(subflow['name'],'')
if this_subflow_name:
subflow['name'] = this_subflow_name
this_subflow_description = replace_dict.get(subflow['description'],'')
if this_subflow_description:
subflow['description'] = this_subflow_description
loopFlowObject(replace_dict, subflow)
new_dict = {}
new_lan = ''
for origin_text in replace_dict:
replace_text = replace_dict[origin_text]
if replace_text:
if origin_text in zh_hant:
trans_text = zh_hant.pop(origin_text)
zh_hant[replace_text] = trans_text
if replace_language == 'zh-hant':
new_dict[replace_text] = origin_text
if origin_text in en:
trans_text = en.pop(origin_text)
en[replace_text] = trans_text
if replace_language == 'en':
new_dict[replace_text] = origin_text
if origin_text in ja:
trans_text = ja.pop(origin_text)
ja[replace_text] = trans_text
if replace_language == 'ja':
new_dict[replace_text] = origin_text
if origin_text in zh_hans:
trans_text = zh_hans.pop(origin_text)
zh_hans[replace_text] = trans_text
if replace_language == 'zh-hans':
new_dict[replace_text] = origin_text
this_app_language_package[original_language] = new_dict
omflow_app_dict['language_package'] = json.dumps(this_app_language_package)
result = json.dumps(omf_list)
with open(trans_to_path, "w", encoding="utf-8") as f:
f.write(result)
f.close()
except Exception as e:
print(e.__str__())
找出組織圖上特定職務/權責的使用者
將組織圖上特定的職務/權責使用者整理出csv檔案
從系統中匯出組織圖的檔案,以及從報表匯出的使用者的資料,合併整理出使用者的權責/職務
import json
f = open("mainOrg.omf", "r", encoding='UTF-8')
orgobject = f.read()
orgobject = json.loads(orgobject)
#找出職務元件上指定名稱的權責底下連接的元件
sirList = []
for org_item in orgobject['items']:
if org_item['type'] == 'role' and '主管' in org_item['config']['responsibility']:
for line_item in orgobject['items']:
if line_item['type'] == 'line' and line_item['config']['source_item'] == org_item['id']:
sirList.append(line_item['config']['target_item'])
#找出前面元件的使用者id
peopleList = []
for org_item in orgobject['items']:
if org_item['type'] == 'people' and org_item['id'] in sirList:
if not org_item['config']['noid'] in peopleList:
peopleList.append(org_item['config']['noid'])
import csv
#開啟報表產出的使用者csv,比對使用者id生成新的csv
readyList = []
with open('syscomUser.csv', newline='', encoding='UTF-8') as syscomUserfile:
spamreader = csv.reader(syscomUserfile, delimiter=',', quotechar='|')
for row in spamreader:
if row[0] in peopleList:
readyList.append(row[1:])
with open('syscomUserWithSir.csv', 'w', newline='', encoding='UTF-8') as syscomUserWithSirfile:
spamwriter = csv.writer(syscomUserWithSirfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
for row in readyList:
spamwriter.writerow(row)
表單資料匯出
本節將提供python代碼教導如何將OMFLOW表單及其附件一同匯出成PDF檔案。
安裝套件
執行Script之前須先滿足以下條件:
Python套件:
pip install requests
pip install pdfkit
PDF 軟體: 請從官方網站( https://wkhtmltopdf.org )下載適用於您當前環境的安裝檔,然後進行安裝。
執行腳本
import requests, os, pdfkit, shutil, zipfile, threading, time
from datetime import datetime
stop_thread = False
def zip_folder(folder_path, zip_filename='data.zip'):
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
for foldername, subfolders, filenames in os.walk(folder_path):
for filename in filenames:
file_path = os.path.join(foldername, filename)
arcname = os.path.relpath(file_path, folder_path)
zipf.write(file_path, arcname=arcname)
def your_while_loop(token_url, user, token):
while not stop_thread:
time.sleep(10)
response = requests.request("GET", token_url, auth=( user, token ))
ann = '''
#######################################################
# OMFLOW EXPORT TOOL #
# #
# Please make sure the current environment already #
# has the following packages. #
# #
# python package: requests, pdfkit. #
# pdf software: wkhtmltopdf.(https://wkhtmltopdf.org) #
# #
#######################################################
'''
try:
print(ann)
#user input param
server = input("Enter the OMFLOW server domain (ex. https://10.1.15.3:443):")
if server:
if server[-1] == '/':
server = server[:-1]
user = input("Enter the OMFLOW administrator username:")
token = input("Enter the password:")
api_path = input("Enter the API_PATH:")
print('')
print('Enter the wkhtmltopdf path or press enter to skip.')
print('ex. C:\\\\Program Files\\\\wkhtmltopdf\\\\bin\\\\wkhtmltopdf.exe')
path_wkhtmltopdf = input("wkhtmltopdf path:")
print('')
if path_wkhtmltopdf:
pdfkit_config = pdfkit.configuration(wkhtmltopdf=path_wkhtmltopdf)
else:
pdfkit_config = None
script_path = os.path.abspath(__file__)
script_directory = os.path.dirname(script_path)
download_directory_path = os.path.join(script_directory, 'data')
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
token_url = f'{server}/accounts/api/security/get/'
server_version_url = f'{server}/rest/app/server-version/get/'
list_omdata_url = f'{server}/rest/flowmanage/api/omdata/list/{api_path}'
html_url = f'{server}/rest/flowmanage/api/print-form/'
list_file_url = f'{server}/rest/api/files/list/'
get_mapping_url = f'{server}/rest/api/mapping-path/get/'
download_url = f'{server}/'
#param
total_size = 0
script_start_time = datetime.now()
#get token
response = requests.request("GET", token_url, auth=( user, token ))
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get security. error message: {response_dict.get('message','')}")
security = response_dict.get("result", {}).get("security")
#anothrt thread to get token
extra_thread = threading.Thread(target=your_while_loop, args=(token_url, user, token))
extra_thread.start()
#check Server version at least 1.2.0.0
values={
"security" : security,
"omflow_restapi" : 1
}
response = requests.post(server_version_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get server version. error message: {response_dict.get('message','')}")
server_version = response_dict.get('result',{}).get('version','')
if server_version < 1002000000:
raise Exception(f"OMFLOW sever version at least 1.2.0.0")
#get flow_uuid
search_conditions = []
values={
"security" : security,
"omflow_restapi" : 1,
"search_conditions" : search_conditions,
"search_columns" : ['flow_uuid'],
"exclude_conditions" : [],
"order_columns" : [],
"limit" : 1,
"start" : 0
}
response = requests.post(list_omdata_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when list omdata. error message: {response_dict.get('message','')}")
result = response_dict.get('result')
if result:
flow_uuid = result[0]['flow_uuid'].replace('-','')
#get list
search_conditions = []
search_conditions.append({'column':'history','condition':'=','value':False})
search_conditions.append({'column':'running','condition':'=','value':False})
search_conditions.append({'column':'error','condition':'=','value':False})
search_conditions.append({'column':'closed','condition':'=','value':True})
values={
"security" : security,
"omflow_restapi" : 1,
"search_conditions" : search_conditions,
"search_columns" : ['id','data_no'],
"exclude_conditions" : [],
"order_columns" : ['-data_no','id'],
"limit" : 0,
"start" : 0
}
response = requests.post(list_omdata_url, json=values)
result = response.json().get('result')
total_count = len(result)
print(f'The total number of records for the target flow is {total_count}.')
print('Start to export omdata ...')
loop_count = 1
for omdata in result:
data_no = omdata['data_no']
data_id = omdata['id']
#check folder exists
data_no_directory_path = os.path.join(download_directory_path, data_no)
if not os.path.exists(data_no_directory_path):
os.makedirs(data_no_directory_path)
#check pdf exists
pdf_file_path = os.path.join(data_no_directory_path, f'{data_no}.pdf')
if not os.path.exists(pdf_file_path):
#get html
this_html_url = html_url + f'{flow_uuid}/{data_no}/{data_id}/'
values={
"security" : security,
"omflow_restapi" : 1,
}
response = requests.post(this_html_url, json=values)
if response.status_code != 200:
raise Exception(f'OMFLOW return status code {response.status_code} when get omdata html.')
this_html_content = response.content.decode('utf-8')
this_html_content = this_html_content.replace('window.print();window.close();','')
this_html_content = this_html_content.replace('<link rel="stylesheet" href="/static/plugins/fontawesome-free-5.13.0-web/css/all.css">','')
#html to pdf
if pdfkit_config:
pdfkit.from_string(this_html_content, pdf_file_path, configuration=pdfkit_config)
else:
pdfkit.from_string(this_html_content, pdf_file_path)
#amount size
total_size += os.path.getsize(pdf_file_path)
#get file path
this_file_search_conditions = []
this_file_search_conditions.append({'column':'flow_uuid','condition':'=','value':flow_uuid})
this_file_search_conditions.append({'column':'data_no','condition':'=','value':data_no})
this_file_search_conditions.append({'column':'delete','condition':'=','value':False})
values={
"security" : security,
"omflow_restapi" : 1,
"app_name" : 'omformflow',
"search_conditions" : this_file_search_conditions,
"search_columns" : ['file_name','data_id','formitm_id'],
"limit" : 0
}
response = requests.post(list_file_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
this_file_list = response_dict.get('result')
for this_file in this_file_list:
#check file exists
this_file_path = os.path.join(data_no_directory_path, this_file['file_name'])
if not os.path.exists(this_file_path):
#get mapping path
if this_file['formitm_id']:
this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['formitm_id'], this_file['file_name'])
else:
this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['file_name'])
values={
"security" : security,
"omflow_restapi" : 1,
"path" : this_rel_path,
"path_type" : 'download_files'
}
response = requests.post(get_mapping_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
mapping_path = response_dict.get('result')
#download file
this_download_url = download_url + mapping_path
response = requests.get(this_download_url)
if response.status_code != 200:
raise Exception(f'OMFLOW return status code {response.status_code} when download files.')
#write file
this_file_content = response.content
with open(this_file_path, 'wb') as f:
f.write(this_file_content)
f.close()
#amount size
total_size += os.path.getsize(this_file_path)
#print info every 10 records
if loop_count % 10 == 0:
now_time = datetime.now()
print(f'Currently, {loop_count} records have been exported, consuming {total_size / (1024 ** 2):.2f} MB of space.')
t, u, f = shutil.disk_usage(script_directory)
print(f'There is still {f / (1024 ** 3):.2f} GB of space available on the disk')
x = (total_count - loop_count) / loop_count
print(f'Anticipated additional requirement of {x * total_size / (1024 ** 3):.2f} GB.')
time_difference = now_time - script_start_time
minutes_difference = float(time_difference.total_seconds() / 60)
print(f'Anticipated additional time required: {x * minutes_difference:.2f} minutes.')
loop_count += 1
#zip folder
print('Export OmData finished. Start to zip folder.')
zip_file_path = os.path.join(script_directory, 'data.zip')
if os.path.exists(zip_file_path):
os.remove(zip_file_path)
#print info
now_time = datetime.now()
time_difference = now_time - script_start_time
minutes_difference = float(time_difference.total_seconds() / 60)
print(f'Anticipated additional time required: {4 * minutes_difference:.2f} minutes.')
zip_folder(download_directory_path, zip_file_path)
#remove folder
shutil.rmtree(download_directory_path)
else:
print(f'Unable to retrieve any data from {api_path}.')
#stop thread
stop_thread = True
extra_thread.join()
print('OMFLOW data export finish!')
except Exception as e:
#stop thread
stop_thread = True
extra_thread.join()
print('OMFLOW data export error: ',e.__str__())
執行Script步驟如下:
請依照圖示輸入下列資訊:
OMFLOW 伺服器網域/IP:port(例如 https://10.1.15.3:443)
具有管理員權限的帳號(OMFLOW 管理員)
使用者密碼
欲匯出流程的 API 路徑
若已將 wkhtmltopdf.exe 路徑新增至環境變數(PATH),您可按 Enter 跳過此項。
執行過程中,畫面將顯示流程的總關閉記錄數,並同時進行下載任務。在每完成 10 條記錄的執行後,將提供以下訊息:
目前的空間使用量
目前環境中的可用空間
預估即將下載所需的空間
預估即將下載所需的時間
※ 上述項目 3 和 4 中提供的估算是基於已完成任務的資源消耗,可能不完全準確。
修改下載條件
此腳本使用 OMFLOW API 完成,允許使用者根據需要修改內容。
如果您希望下載尚未關閉的數據,您可以在原始腳本的第 117 行刪除以下部分。
search_conditions.append({'column':'closed','condition':'=','value':True})
如果您希望只檢索 'formitm_1' 欄位等於 'SYSCOM' 的記錄,您可以添加以下內容。
search_conditions.append({'column':' formitm_1','condition':'=','value': ' SYSCOM'})
詳細查詢條件教學可參考【查詢表單】章節
最后更新于