i recently took up a project on openvpn when i got the source code of an openvpn application written in python by ivanzoid
http://code.google.com/p/pyopenvpnman/ but after laying my hands on many things and reshaping the application i couldnt go any further because of some errors detected
1.writing to a config file from the settings button
2.error in connection state
the code is below ...thanks
# -*- coding: cp1252 -*-
import wx
import os
import sys
import subprocess
import socket, asyncore, asynchat
from datetime import datetime
from logdlg import *
id_CONNECT = 1
id_DISCONNECT= 2
id_VIEWLOG= 3
id_ABOUT = 4
id_SETTINGS = 5
id_EXIT = 6
id_refreshserver = 7
def escapePassword(password):
result = password.replace('\\', '\\\\').replace('"', '\\"').replace(' ', '\\ ')
return result
class MyDialog(wx.Dialog):
def __init__(self, parent):
wx.Dialog.__init__(self, parent,size=(150, 150))
panel = wx.Panel(self)
protocol_text = wx.StaticText(panel, label='Protocol', pos=(10,2))
rb1 = wx.RadioButton(panel, label='TCP', pos=(10, 20), style=wx.RB_GROUP)
rb2 = wx.RadioButton(panel, label='UDP', pos=(10, 40))
savebtn = wx.Button(panel, label='Save', pos=(10, 110))
savebtn.Bind(wx.EVT_BUTTON, self.editconfig)
self.port_text = wx.StaticText(panel, label='Port', pos=(10,60))
self.port_settings= wx.TextCtrl(panel, -1, "", size=(60,-1), pos=(10,78))
rb1.Bind(wx.EVT_RADIOBUTTON, self.value1)
rb2.Bind(wx.EVT_RADIOBUTTON, self.value2)
self.SetSize((150,180))
self.SetTitle('Settings')
self.Centre()
self.Show(True)
def value1(self, event):
self.protocol='tcp'
def value2(self, event):
self.protocol='udp'
def editconfig(self, e):
self.port=self.port_settings.GetValue()
config_file=open('C:/Program Files/TechbitszVPN/tvpn/test.ovpn','w+')
config_file.write('''###############################################################################
# vpn beta
dev tun
proto %s
remote x.x.x.x %s
''' % (self.protocol,self.port))
config_file.close
self.Destroy()
class ManagementInterfaceHandler(asynchat.async_chat):
def __init__(self, mainwnd, addr, port):
asynchat.async_chat.__init__(self)
#print 'ManagementInterfaceHandler construct'
self.mainwnd = mainwnd
self.port = port
self.buf = ''
self.set_terminator('\n')
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.connect((addr, port))
def handle_connect(self):
#print 'handle_connect ({0})'.format(self.port)
asynchat.async_chat.handle_connect(self)
def handle_close(self):
#print 'handle_close'
self.mainwnd.Disconnected(self.port)
asynchat.async_chat.handle_close(self)
def collect_incoming_data(self, data):
#print 'collect_incoming_data ({0}) data: "{1}"'.format(self.port, data)
self.buf += data
def found_terminator(self):
#print 'found_terminator ({0}) buf: "{1}"'.format(self.port, self.buf)
login_file=open('login.txt','r')
us=login_file.readline()
pa=login_file.readline()
if self.buf.startswith(">PASSWORD:Need 'Auth'"):
username = us
password = pa
self.send('username "Auth" {0}\n'.format(username))
self.send('password "Auth" "{0}"\n'.format(escapePassword(password)))
elif self.buf.startswith('>HOLD:Waiting for hold release'):
self.send('log on all\n') # enable logging and dump current log contents
self.send('state on all\n') # ask openvpn to automatically report its state and show current
self.send('hold release\n') # tell openvpn to continue its start procedure
elif self.buf.startswith('>LOG:'):
self.mainwnd.GotLogLine(self.port, self.buf[5:])
elif self.buf.startswith('>STATE:'):
self.mainwnd.GotStateLine(self.port, self.buf[7:])
self.buf = ''
# 'enum' of connection states
(disconnected, failed, connecting, disconnecting, connected) = range(5)
class Connection(object):
def __init__(self, name):
self.name = name
self.state = disconnected # do not set this field directly, use MainWindow.setConnState()
self.sock = None # ManagementInterfaceHandler
self.port = 0
self.logbuf = []
self.logdlg = None # LogDlg
def stateString(self):
if self.state == disconnected:
return 'Disconnected'
elif self.state == failed:
return 'Error'
elif self.state == connecting:
return 'Connecting'
elif self.state == disconnecting:
return 'Disconnecting'
elif self.state == connected:
return 'Connected'
elif self.state == auth_failed:
return 'auth'
else:
return 'Error'
def getBasePath():
if hasattr(sys, "frozen") and sys.frozen == "windows_exe":
return os.path.dirname(os.path.abspath(sys.executable))
else:
return os.path.dirname(os.path.abspath(__file__))
class MainWindow(wx.Frame):
def __init__(self, parent, id, title):
wx.Frame.__init__(self, parent, id, title, size=(359,270),style=wx.MINIMIZE_BOX
| wx.SYSTEM_MENU | wx.CAPTION | wx.CLOSE_BOX)
# check cmdline options
self.ovpnpath = 'C:\\Program Files\\TechbitszVPN'
if len(sys.argv) == 3:
if sys.argv[1] == '--openvpn':
self.ovpnpath = sys.argv[2]
self.path = getBasePath() + '/'
self.ovpnconfigpath = self.ovpnpath + '\\tvpn'
self.ovpnexe = self.ovpnpath + '\\bin\\TechbitszVPN.exe'
self.traymsg = 'TechbitszVPN Client'
self.connections = {}
# set app icon
self.SetIcon(wx.Icon(self.path + 'images/app.ico', wx.BITMAP_TYPE_ICO))
# init tray icon
self.notconnectedIcon = wx.Icon(self.path + 'images/fail16.ico', wx.BITMAP_TYPE_ICO)
self.waitingIcon = wx.Icon(self.path + 'images/waiting16.ico', wx.BITMAP_TYPE_ICO)
self.connectedIcon = wx.Icon(self.path + 'images/ack16.ico', wx.BITMAP_TYPE_ICO)
self.trayicon = wx.TaskBarIcon()
self.trayicon.SetIcon(self.notconnectedIcon, self.traymsg)
self.wndshown = True
self.Bind(wx.EVT_ICONIZE, self.OnIconize)
self.trayicon.Bind(wx.EVT_TASKBAR_LEFT_DOWN, self.OnTrayIconClick)
#Combobox
panel = wx.Panel(self)
panel.SetBackgroundColour('SEA GREEN')
self.list = wx.ComboBox(panel,
style=wx.CB_READONLY,size=(90, 20), pos=(75, 140))
self.list.Bind(wx.EVT_COMBOBOX, self.updateList())
self.updateList()
#Static Text
accountlabel = wx.StaticText(panel, label='Account Info', pos=(75,40))
userlabel = wx.StaticText(panel, label='Username:', pos=(10,63))
passlabel = wx.StaticText(panel, label='Password:', pos=(10,93))
self.username1 = wx.TextCtrl(panel, -1, "", size=(90, 20), pos=(75,60))
self.password1 = wx.TextCtrl(panel, -1, "", size=(90,20), pos=(75,90), style=wx.TE_PASSWORD)
server_text=wx.StaticText(panel,label='Country', pos=(75,120))
status_text=wx.StaticText(panel,label='Status:', pos=(10, 170))
font = wx.Font(7.4, wx.DEFAULT, wx.NORMAL, wx.BOLD)
server_text.SetFont(font)
accountlabel.SetFont(font)
userlabel.SetForegroundColour('white')
passlabel.SetForegroundColour('white')
status_text.SetForegroundColour('white')
#################################################
self.connectBtn= wx.Button(panel, id_CONNECT, 'Connect', size=(90, 25), pos=(230, 25))
self.disconnectBtn = wx.Button(panel, id_DISCONNECT, 'Disconnect', size=(90, 25), pos=(230, 50))
self.viewlogBtn= wx.Button(panel, id_VIEWLOG, 'View log', size=(90, 25), pos=(230, 75))
self.aboutBtn = wx.Button(panel, id_ABOUT, 'About', size=(90, 25), pos=(230, 100))
self.settingsBtn = wx.Button(panel, id_SETTINGS, 'Settings', size=(90, 25), pos=(230, 125))
self.refreshserverBtn = wx.Button(panel, id_refreshserver, 'Refresh Settings', size=(90, 25), pos=(230, 150))
self.exitBtn = wx.Button(panel, id_EXIT, 'Exit', size=(90, 25), pos=(230, 175))
self.Bind(wx.EVT_BUTTON, self.OnCmdConnect,id=id_CONNECT)
self.Bind(wx.EVT_BUTTON, self.OnCmdDisconnect, id=id_DISCONNECT)
self.Bind(wx.EVT_BUTTON, self.OnCmdViewLog, id=id_VIEWLOG)
self.Bind(wx.EVT_BUTTON, self.OnCmdAbout, id=id_ABOUT)
self.Bind(wx.EVT_BUTTON, self.OnCmdexit, id=id_EXIT)
self.Bind(wx.EVT_BUTTON, self.OnCmdsettings, id=id_SETTINGS)
self.Bind(wx.EVT_BUTTON, self.Onrefreshserver, id=id_refreshserver)
self.statusbar = self.CreateStatusBar()
self.statusbar.SetStatusText(' Copyright © Techbitsz Inc. 2013 ')
self.Centre()
self.Show(True)
self.list.GetSelection()
#########################################################################
# create timer which will poll incoming data from sockets to
# our ManagementInterfaceHandler
self.timer = wx.Timer(self, wx.NewId())
self.Bind(wx.EVT_TIMER, self.OnTimer)
self.timer.Start(20, wx.TIMER_CONTINUOUS)
def Onrefreshserver(self, event):
self.updateList()
def getConnList(self, path):
"""Returns list of connections in the OpenVPN's config directory."""
if not os.path.exists(path):
return []
files = os.listdir(path)
ovpnfiles = filter(lambda s: s.endswith('.ovpn'), files)
ovpnnames = map(lambda s: s[:-5], ovpnfiles)
return ovpnnames
def updateList(self):
"""Updates list of connections in the main window."""
# save all previous (current) connections to a dict of pairs name:connection
prevconns = {}
for c in self.connections.itervalues():
prevconns[c.name] = c
# get list of new connections
newlist = self.getConnList(self.ovpnconfigpath)
# delete listctrl contents
self.list.Clear()
self.connections.clear()
for i, s in enumerate(newlist):
if s in prevconns: # check if this connection existed previously
self.connections[i] = prevconns[s]
else:
self.connections[i] = Connection(s)
self.list.Insert(s,0)
def updateConnection(self, index):
"""Updates connection's list item given by its index."""
if index != -1:
pnl = wx.Panel(self)
connection_state1=wx.StaticText(self, label=self.connections[index].stateString(), pos=(75, 170))
connection_state1.SetForegroundColour('white')
connection_state1.SetBackgroundColour('sea green')
def connectClick(self,event):
self.connectBtn.Disable()
self.settingsBtn.Disable()
self.aboutBtn.Disable()
self.exitBtn.Disable()
def disconnectClick(self,event):
self.connectBtn.Enable()
self.settingsBtn.Enable()
self.aboutBtn.Enable()
self.exitBtn.Enable()
def trayIconByState(self, state):
"""Return corresponding wx.Icon for tray depending on the given state of connection."""
if state == connecting or state == disconnecting:
return self.waitingIcon
elif state == connected:
return self.connectedIcon
else:
return self.notconnectedIcon
def updateTrayIcon(self):
"""Updates tray icon. If there are at least one 'conected' connection, shows 'connected' icon, otherwise shows 'disconnected' icon."""
maxstate = disconnected
for c in self.connections.itervalues():
if c.state > maxstate:
maxstate = c.state
self.trayicon.SetIcon(self.trayIconByState(maxstate), self.traymsg)
def OnIconize(self, event):
"""Called when user clicks minimize button."""
self.Hide()
self.wndshown = False
def OnTrayIconClick(self, event):
if self.wndshown:
self.Hide()
self.wndshown = False
else:
self.Iconize(False)
self.Show(True)
self.Raise()
self.wndshown = True
def setConnState(self, index, state):
"""Sets the state of connection given by index (which is its Id from listctrl) and updates trayicon if necessary."""
self.connections[index].state = state
if state == disconnected:
self.connections[index].port = 0
if state == connected:
self.trayicon.SetIcon(self.trayIconByState(state), self.traymsg)
else:
self.updateTrayIcon()
def OnTimer(self, event):
"""Used for detecting if there is incoming data in the sockets."""
asyncore.poll(timeout=0)
def getNextAvailablePort(self):
"""Returns next minimal unused port starting from 10598."""
minport = 10598
found = False
while not found:
found = True
for c in self.connections.itervalues():
if c.port != 0:
if c.port == minport:
found = False
minport += 1
break
return minport
def indexFromPort(self, port):
"""Returns index (id) of connected connection based on its port."""
for i, c in self.connections.iteritems():
if c.port == port:
return i
return -1
def OnCmdConnect(self, event):
#print 'OnCmdConnect'
index = self.list.GetCurrentSelection()
u=self.username1.GetValue()
p=self.password1.GetValue()
login_file=open('login.txt','w')
login_file.write(u)
login_file.write('\n')
login_file.write(p)
if index == -1:
return
port = self.getNextAvailablePort()
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
subprocess.Popen([self.ovpnexe,
'--config', self.ovpnconfigpath + '\\' + self.connections[index].name + '.ovpn',
'--management', '127.0.0.1', '{0}'.format(port),
'--management-query-passwords',
'--management-log-cache', '200',
'--management-hold'],
cwd=self.ovpnconfigpath,
startupinfo=startupinfo)
self.connections[index].sock = ManagementInterfaceHandler(self, '127.0.0.1', port)
self.connections[index].port = port
self.setConnState(index, connecting)
self.updateConnection(index)
def OnCmdsettings(self, event):
dia = MyDialog(self)
dia.ShowModal()
dia.Destroy()
def ParseLogLine(self, line):
"""Parses and returns log line received from OpenVPN Management Interface."""
tokens = line.split(',', 2)
unixtime = tokens[0]
flags = tokens[1]
msg = tokens[2]
time = datetime.fromtimestamp(float(unixtime))
str_time = time.ctime()
flags_map = {'I':'INFO', 'F':'FATAL', 'N':'ERROR', 'W':'WARNING', 'D':'DEBUG'}
str_flags = ''
if flags in flags_map:
str_flags = ' ' + flags_map[flags] + ':'
#return str_time + str_flags + ' ' + msg
return str_time + ' ' + msg
def GotLogLine(self, port, line):
"""Called from ManagementInterfaceHandler when new line is received."""
#print 'got log line: "{0}"'.format(line)
index = self.indexFromPort(port)
parsedline = self.ParseLogLine(line)
self.connections[index].logbuf.append(parsedline)
if self.connections[index].logdlg != None:
self.connections[index].logdlg.AppendText(parsedline)
def GotStateLine(self, port, line):
"""Called from ManagementInterfaceHandler when new line describing current OpenVPN's state is received."""
#print 'got state line: "{0}"'.format(line)
list = line.split(',', 2)
state = list[1]
if state == 'CONNECTED':
index = self.indexFromPort(port)
self.setConnState(index, connected)
self.updateConnection(index)
def OnCmdDisconnect(self, event):
#print 'OnCmdDisconnect'
index = self.list.GetSelection()
if index == -1:
return
self.setConnState(index, disconnecting)
self.connections[index].sock.send('signal SIGTERM\n')
self.disconnectClick(event)
# from ManagementInterfaceHandler
def Disconnected(self, port):
"""Called from ManagementInterfaceHandler when socket to OpenVPN Management Interface is closed."""
index = self.indexFromPort(port)
self.setConnState(index, disconnected)
self.updateConnection(index)
def auth(self, port):
"""Called from ManagementInterfaceHandler when socket to OpenVPN Management Interface is closed."""
index = self.indexFromPort(port)
self.setConnState(index, auth_failed)
self.updateConnection(index)
def OnCmdViewLog(self, event):
#print 'OnCmdViewLog'
index = self.list.GetSelection();
if self.connections[index].logdlg != None: # ?
return
logdlg = LogDlg(self, self.connections[index].port, self.connections[index].name)
self.connections[index].logdlg = logdlg
logdlg.Bind(wx.EVT_CLOSE, self.OnLogDlgClose, logdlg)
for l in self.connections[index].logbuf:
logdlg.AppendText(l)
logdlg.Show(True)
def OnLogDlgClose(self, event):
"""Called when user closes Log window."""
#print 'OnLogDlgClose'
dlg = event.GetEventObject()
port = dlg.port
index = self.indexFromPort(port)
dlg.Destroy()
self.connections[index].logdlg = None
def OnCmdexit(self,event):
dlg=wx.MessageDialog(self, "Do you really wanna exit this application", "Confirm Exit", wx.OK|wx.CANCEL|wx.ICON_QUESTION)
result=dlg.ShowModal()
if result == wx.ID_OK:
sys.exit(0)
def OnCmdAbout(self, event):
description = """
Enjoy Secure Internet Access
"""
aboutinfo = wx.AboutDialogInfo()
aboutinfo.SetName('TechbitszVPN')
aboutinfo.SetVersion('1.0')
aboutinfo.SetDescription(description)
wx.AboutBox(aboutinfo)
class App(wx.App):
def OnInit(self):
wnd = MainWindow(None, -1, "TechbitszVPN :: v1.0")
wnd.Show(True)
return True
if __name__ == '__main__':
app = App(0)
app.MainLoop()