Execute independently
Execute independently
Apply translation transformation
The goal of the program is to replace the base language in an omf file. The first four lines of parameters must be modified before use.
For example, test_flow.omf is an application designed in Traditional Chinese. Translations in various languages have been uploaded through OMFLOW's built-in function. When English-speaking customers need this application for reference, they can use this program to transfer the "design" language from Traditional Chinese changed to English.
※ Please note that the omf file must contain the corresponding translation. This program only helps you perform translation swaps, not automatic translation.
import json
path = '/some_path/test_flow.omf' #Please modify according to the actual path and name
trans_to_path = '/some_path/test_flow.omf'
original_language = 'zh-hant' #Please modify according to actual needs
replace_language = 'ja'
translator_onlytitle_formitem_type_list = ['box12','box6','areabox','n_user','n_group','u_gps','label']
translator_titleplaceholder_formitem_type_list = ['h_title','inputbox','date','datetime','subquery','inputcaculate','file']
translator_lists_formitem_type_list = ['h_level','h_status','list','checkbox']
translator_special_formitem_type_list = ['subtable']
def loopFlowObject(lan_dict, flowobject):
try:
items = flowobject.get('items',[])
for i in items:
item_type = i['type']
if item_type != 'line':
config = i['config']
item_text = lan_dict.get(i['text'],'')
if item_text:
i['text'] = item_text
if item_type in ['start','python','setform']:
loop_list = ['input','output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type == 'end':
loop_list = ['output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type in ['outflow','inflow','subflow']:
loop_list = ['subflow_input','subflow_output']
loopInOutPut(lan_dict, loop_list, config)
elif item_type == 'form':
loop_list = ['input','output','subflow_input','subflow_output','input1','input2']
loopInOutPut(lan_dict, loop_list, config)
action1_text = lan_dict.get(config['action1_text'], '')
if action1_text:
config['action1_text'] = action1_text
action2_text = lan_dict.get(config['action2_text'], '')
if action2_text:
config['action2_text'] = action2_text
form_object = config.get('form_object',{})
if form_object:
if form_object.get('items',[]):
loopFormObject(lan_dict, form_object)
elif item_type in ['async','switch']:
pass
except:
pass
def loopFormObject(lan_dict, formobject):
try:
items = formobject.get('items',[])
for i in items:
item_type = i['type']
config = i['config']
if item_type in translator_onlytitle_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
elif item_type in translator_titleplaceholder_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
placeholder = lan_dict.get(config['placeholder'],'')
if placeholder:
config['placeholder'] = placeholder
elif item_type == 'h_group':
group_title = lan_dict.get(config['group_title'],'')
if group_title:
config['group_title'] = group_title
user_title = lan_dict.get(config['user_title'],'')
if user_title:
config['user_title'] = user_title
elif item_type in translator_lists_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
lists = config['lists']
for l in lists:
text = lan_dict.get(l['text'],'')
if text:
l['text'] = text
elif item_type in translator_special_formitem_type_list:
title = lan_dict.get(config['title'],'')
if title:
config['title'] = title
loopFormObject(lan_dict, config['form_object'])
except:
pass
def loopInOutPut(lan_dict, loop_list, config):
try:
for i in loop_list:
n_lst = config[i]
for n in n_lst:
des = lan_dict.get(n['des'],'')
if des:
n['des'] = des
except:
pass
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
f.close()
omf_list = json.loads(content)
for omflow_app_dict in omf_list:
this_app_language_package = omflow_app_dict.get('language_package',{})
this_app_language_package = json.loads(this_app_language_package) if isinstance(this_app_language_package, str) else this_app_language_package
zh_hant = this_app_language_package.get('zh-hant',{})
en = this_app_language_package.get('en',{})
ja = this_app_language_package.get('ja',{})
zh_hans = this_app_language_package.get('zh-hans',{})
replace_dict = this_app_language_package.get(replace_language,{}).copy()
this_app_name = replace_dict.get(omflow_app_dict.get('app_name',''),'')
if this_app_name:
omflow_app_dict['app_name'] = this_app_name
this_app_flow_list = omflow_app_dict.get('flow_list',{})
for flow in this_app_flow_list:
this_flow_name = replace_dict.get(flow['flow_name'],'')
if this_flow_name:
flow['flow_name'] = this_flow_name
this_description = replace_dict.get(flow['description'],'')
if this_description:
flow['description'] = this_description
this_flowobject = flow.get('flowobject',{})
loopFlowObject(replace_dict, this_flowobject)
this_formobject = flow.get('formobject',{})
if not this_formobject:
this_formobject = this_flowobject.get('form_object',{})
loopFormObject(replace_dict, this_formobject)
this_subflow_list = this_flowobject.get('subflow',[])
for subflow in this_subflow_list:
this_subflow_name = replace_dict.get(subflow['name'],'')
if this_subflow_name:
subflow['name'] = this_subflow_name
this_subflow_description = replace_dict.get(subflow['description'],'')
if this_subflow_description:
subflow['description'] = this_subflow_description
loopFlowObject(replace_dict, subflow)
new_dict = {}
new_lan = ''
for origin_text in replace_dict:
replace_text = replace_dict[origin_text]
if replace_text:
if origin_text in zh_hant:
trans_text = zh_hant.pop(origin_text)
zh_hant[replace_text] = trans_text
if replace_language == 'zh-hant':
new_dict[replace_text] = origin_text
if origin_text in en:
trans_text = en.pop(origin_text)
en[replace_text] = trans_text
if replace_language == 'en':
new_dict[replace_text] = origin_text
if origin_text in ja:
trans_text = ja.pop(origin_text)
ja[replace_text] = trans_text
if replace_language == 'ja':
new_dict[replace_text] = origin_text
if origin_text in zh_hans:
trans_text = zh_hans.pop(origin_text)
zh_hans[replace_text] = trans_text
if replace_language == 'zh-hans':
new_dict[replace_text] = origin_text
this_app_language_package[original_language] = new_dict
omflow_app_dict['language_package'] = json.dumps(this_app_language_package)
result = json.dumps(omf_list)
with open(trans_to_path, "w", encoding="utf-8") as f:
f.write(result)
f.close()
except Exception as e:
print(e.__str__())
Find the users with specific positions/responsibilities on the organization chart
Organize specific job/authority users on the organization chart into csv files
Export the organization chart file from the system and the user information exported from the report, and merge and sort out the user's rights and responsibilities/positions
import json
f = open("mainOrg.omf", "r", encoding='UTF-8')
orgobject = f.read()
orgobject = json.loads(orgobject)
#Find the components connected under the authority and responsibility of the specified name on the job component
sirList = []
for org_item in orgobject['items']:
if org_item['type'] == 'role' and 'supervisor' in org_item['config']['responsibility']:
for line_item in orgobject['items']:
if line_item['type'] == 'line' and line_item['config']['source_item'] == org_item['id']:
sirList.append(line_item['config']['target_item'])
#Find the user id of the previous element
peopleList = []
for org_item in orgobject['items']:
if org_item['type'] == 'people' and org_item['id'] in sirList:
if not org_item['config']['noid'] in peopleList:
peopleList.append(org_item['config']['noid'])
import csv
#Open the user csv generated by the report and compare the user id to generate a new csv
readyList = []
with open('syscomUser.csv', newline='', encoding='UTF-8') as syscomUserfile:
spamreader = csv.reader(syscomUserfile, delimiter=',', quotechar='|')
for row in spamreader:
if row[0] in peopleList:
readyList.append(row[1:])
with open('syscomUserWithSir.csv', 'w', newline='', encoding='UTF-8') as syscomUserWithSirfile:
spamwriter = csv.writer(syscomUserWithSirfile, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
for row in readyList:
spamwriter.writerow(row)
Export form data
This section will provide python code to teach how to export the OMFLOW form and its attachments into a PDF file.
Installation Kit
The following conditions must be met before executing the Script:
Python suite:
pip install requests
pip install pdfkit
PDF software: Please download the installation file suitable for your current environment from the official website (https://wkhtmltopdf.org), and then install it.
Execute script
import requests, os, pdfkit, shutil, zipfile, threading, time
from datetime import datetime
stop_thread = False
def zip_folder(folder_path, zip_filename='data.zip'):
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
for foldername, subfolders, filenames in os.walk(folder_path):
for filename in filenames:
file_path = os.path.join(foldername, filename)
arcname = os.path.relpath(file_path, folder_path)
zipf.write(file_path, arcname=arcname)
def your_while_loop(token_url, user, token):
while not stop_thread:
time.sleep(10)
response = requests.request("GET", token_url, auth=( user, token ))
ann = '''
################################################ #####
#OMFLOW EXPORT TOOL #
# #
# Please make sure the current environment already #
# has the following packages. #
# #
# python package: requests, pdfkit. #
# pdf software: wkhtmltopdf.(https://wkhtmltopdf.org) #
# #
################################################ #####
'''
try:
print(ann)
#user input param
server = input("Enter the OMFLOW server domain (ex. https://10.1.15.3:443):")
if server:
if server[-1] == '/':
server = server[:-1]
user = input("Enter the OMFLOW administrator username:")
token = input("Enter the password:")
api_path = input("Enter the API_PATH:")
print('')
print('Enter the wkhtmltopdf path or press enter to skip.')
print('ex. C:\\\\Program Files\\\\wkhtmltopdf\\\\bin\\\wkhtmltopdf.exe')
path_wkhtmltopdf = input("wkhtmltopdf path:")
print('')
if path_wkhtmltopdf:
pdfkit_config = pdfkit.configuration(wkhtmltopdf=path_wkhtmltopdf)
else:
pdfkit_config = None
script_path = os.path.abspath(__file__)
script_directory = os.path.dirname(script_path)
download_directory_path = os.path.join(script_directory, 'data')
if not os.path.exists(download_directory_path):
os.makedirs(download_directory_path)
token_url = f'{server}/accounts/api/security/get/'
server_version_url = f'{server}/rest/app/server-version/get/'
list_omdata_url = f'{server}/rest/flowmanage/api/omdata/list/{api_path}'
html_url = f'{server}/rest/flowmanage/api/print-form/'
list_file_url = f'{server}/rest/api/files/list/'
get_mapping_url = f'{server}/rest/api/mapping-path/get/'
download_url = f'{server}/'
#param
total_size = 0
script_start_time = datetime.now()
#get token
response = requests.request("GET", token_url, auth=( user, token ))
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get security. error message: {response_dict.get('message','')}")
security = response_dict.get("result", {}).get("security")
#anothrt thread to get token
extra_thread = threading.Thread(target=your_while_loop, args=(token_url, user, token))
extra_thread.start()
#check Server version at least 1.2.0.0
values={
"security" : security,
"omflow_restapi" : 1
}
response = requests.post(server_version_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get server version. error message: {response_dict.get('message','')}")
server_version = response_dict.get('result',{}).get('version','')
if server_version < 1002000000:
raise Exception(f"OMFLOW server version at least 1.2.0.0")
#get flow_uuid
search_conditions = []
values={
"security" : security,
"omflow_restapi" : 1,
"search_conditions" : search_conditions,
"search_columns" : ['flow_uuid'],
"exclude_conditions" : [],
"order_columns" : [],
"limit" : 1,
"start" : 0
}
response = requests.post(list_omdata_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when list omdata. error message: {response_dict.get('message','')}")
result = response_dict.get('result')
if result:
flow_uuid = result[0]['flow_uuid'].replace('-','')
#get list
search_conditions = []
search_conditions.append({'column':'history','condition':'=','value':False})
search_conditions.append({'column':'running','condition':'=','value':False})
search_conditions.append({'column':'error','condition':'=','value':False})
search_conditions.append({'column':'closed','condition':'=','value':True})
values={
"security" : security,
"omflow_restapi" : 1,
"search_conditions" : search_conditions,
"search_columns" : ['id','data_no'],
"exclude_conditions" : [],
"order_columns" : ['-data_no','id'],
"limit" : 0,
"start" : 0
}
response = requests.post(list_omdata_url, json=values)
result = response.json().get('result')
total_count = len(result)
print(f'The total number of records for the target flow is {total_count}.')
print('Start to export omdata ...')
loop_count = 1
for omdata in result:
data_no = omdata['data_no']
data_id = omdata['id']
#check folder exists
data_no_directory_path = os.path.join(download_directory_path, data_no)
if not os.path.exists(data_no_directory_path):
os.makedirs(data_no_directory_path)
#check pdf exists
pdf_file_path = os.path.join(data_no_directory_path, f'{data_no}.pdf')
if not os.path.exists(pdf_file_path):
#gethtml
this_html_url = html_url + f'{flow_uuid}/{data_no}/{data_id}/'
values={
"security" : security,
"omflow_restapi" : 1,
}
response = requests.post(this_html_url, json=values)
if response.status_code != 200:
raise Exception(f'OMFLOW return status code {response.status_code} when get omdata html.')
this_html_content = response.content.decode('utf-8')
this_html_content = this_html_content.replace('window.print();window.close();','')
this_html_content = this_html_content.replace('<link rel="stylesheet" href="/static/plugins/fontawesome-free-5.13.0-web/css/all.css">','')
#html to pdf
if pdfkit_config:
pdfkit.from_string(this_html_content, pdf_file_path, configuration=pdfkit_config)
else:
pdfkit.from_string(this_html_content, pdf_file_path)
#amount size
total_size += os.path.getsize(pdf_file_path)
#get file path
this_file_search_conditions = []
this_file_search_conditions.append({'column':'flow_uuid','condition':'=','value':flow_uuid})
this_file_search_conditions.append({'column':'data_no','condition':'=','value':data_no})
this_file_search_conditions.append({'column':'delete','condition':'=','value':False})
values={
"security" : security,
"omflow_restapi" : 1,
"app_name" : 'omformflow',
"search_conditions" : this_file_search_conditions,
"search_columns" : ['file_name','data_id','formitm_id'],
"limit" : 0
}
response = requests.post(list_file_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
this_file_list = response_dict.get('result')
for this_file in this_file_list:
#check file exists
this_file_path = os.path.join(data_no_directory_path, this_file['file_name'])
if not os.path.exists(this_file_path):
#get mapping path
if this_file['formitm_id']:
this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['formitm_id'], this_file['file_name'])
else:
this_rel_path = os.path.join('omformflow', flow_uuid, data_no, str(this_file['data_id']), this_file['file_name'])
values={
"security" : security,
"omflow_restapi" : 1,
"path" : this_rel_path,
"path_type" : 'download_files'
}
response = requests.post(get_mapping_url, json=values)
response_dict = response.json()
this_status = response_dict.get('status', 404)
if this_status != 200:
raise Exception(f"OMFLOW return status code {this_status} when get mapping url. error message: {response_dict.get('message','')}")
mapping_path = response_dict.get('result')
#download file
this_download_url = download_url + mapping_path
response = requests.get(this_download_url)
if response.status_code != 200:
raise Exception(f'OMFLOW return status code {response.status_code} when download files.')
#write file
this_file_content = response.content
with open(this_file_path, 'wb') as f:
f.write(this_file_content)
f.close()
#amount size
total_size += os.path.getsize(this_file_path)
#print info every 10 records
if loop_count % 10 == 0:
now_time = datetime.now()
print(f'Currently, {loop_count} records have been exported, consuming {total_size / (1024 ** 2):.2f} MB of space.')
t, u, f = shutil.disk_usage(script_directory)
print(f'There is still {f / (1024 ** 3):.2f} GB of space available on the disk')
x = (total_count - loop_count) / loop_count
print(f'Anticipated additional requirement of {x * total_size / (1024 ** 3):.2f} GB.')
time_difference = now_time - script_start_time
minutes_difference = float(time_difference.total_seconds() / 60)
print(f'Anticipated additional time required: {x * minutes_difference:.2f} minutes.')
loop_count += 1
#zip folder
print('Export OmData finished. Start to zip folder.')
zip_file_path = os.path.join(script_directory, 'data.zip')
if os.path.exists(zip_file_path):
os.remove(zip_file_path)
#print info
now_time = datetime.now()
time_difference = now_time - script_start_time
minutes_difference = float(time_difference.total_seconds() / 60)
print(f'Anticipated additional time required: {4 * minutes_difference:.2f} minutes.')
zip_folder(download_directory_path, zip_file_path)
#remove folder
shutil.rmtree(download_directory_path)
else:
print(f'Unable to retrieve any data from {api_path}.')
#stop thread
stop_thread = True
extra_thread.join()
print('OMFLOW data export finish!')
except Exception as e:
#stop thread
stop_thread = True
extra_thread.join()
print('OMFLOW data export error: ',e.__str__())
The steps to execute Script are as follows:
Please enter the following information as shown:
OMFLOW server domain/IP:port (for example https://10.1.15.3:443)
Account with administrator rights (OMFLOW administrator)
User password
API path of the process to be exported
If the wkhtmltopdf.exe path has been added to the environment variable (PATH), you can press Enter to skip this item.
During the execution process, the screen will display the total number of closed records of the process, and the download task will be performed at the same time. After every 10 records have been executed, the following message will be provided:
Current space usage
Available space in the current environment
Estimate the space required for upcoming downloads
Estimate the time required for the upcoming download
※ The estimates provided in items 3 and 4 above are based on the resource consumption of completed tasks and may not be completely accurate.
Modify download conditions
This script is completed using the OMFLOW API, allowing the user to modify the content as needed.
If you wish to download data that has not yet been closed, you can delete the following section on line 117 of the original script.
search_conditions.append({'column':'closed','condition':'=','value':True})
If you want to retrieve only records where the 'formitm_1' field is equal to 'SYSCOM', you can add the following content.
search_conditions.append({'column':' formitm_1','condition':'=','value': ' SYSCOM'})
For detailed query condition instructions, please refer to the [Query Form] chapter.
Last updated