+#!/usr/bin/python3
+
+import asyncio
+import datetime
+import logging
+import panoramisk
+
+from . import misc
+from .decorators import *
+
+loop = asyncio.get_event_loop()
+
+class Asterisk(misc.Object):
+ def init(self):
+ self.__manager = None
+
+ # Connect as soon as the event loop starts
+ loop.create_task(self.connect())
+
+ @property
+ def manager(self):
+ if not self.__manager:
+ raise RuntimeError("Asterisk is not connected")
+
+ return self.__manager
+
+ async def connect(self):
+ """
+ Connects to Asterisk
+ """
+ manager = panoramisk.Manager(
+ host = self.settings.get("asterisk-ami-host"),
+ username = self.settings.get("asterisk-ami-username"),
+ secret = self.settings.get("asterisk-ami-secret"),
+
+ on_connect=self._on_connect,
+ )
+
+ # Connect
+ await manager.connect()
+
+ return manager
+
+ def _on_connect(self, manager):
+ logging.debug("Connection to Asterisk established")
+
+ # Close any existing connections
+ if self.__manager:
+ self.__manager.close()
+
+ self.__manager = manager
+
+ async def ping(self):
+ manager = await self.manager()
+
+ result = manager.ping()
+ print(result)
+
+ async def get_sip_channels(self, filter=None):
+ channels = []
+
+ for data in await self.manager.send_action({"Action" : "CoreShowChannels"}):
+ # Skip header and trailer
+ if data.eventlist:
+ continue
+
+ # Parse channel
+ channel = Channel(self.backend, data)
+
+ # Apply filter
+ if filter and not channel.matches(filter):
+ continue
+
+ channels.append(channel)
+
+ return channels
+
+
+class Channel(misc.Object):
+ def init(self, data):
+ self.data = data
+
+ def __str__(self):
+ return self.connected_line
+
+ @property
+ def account_code(self):
+ return self.data.AccountCode
+
+ @property
+ def connected_line(self):
+ return self.data.ConnectedLineName or self.data.ConnectedLineNum
+
+ def matches(self, filter):
+ return filter in (
+ self.data.CallerIDNum,
+ )
+
+ @property
+ def duration(self):
+ h, m, s = self.data.Duration.split(":")
+
+ try:
+ h, m, s = int(h), int(m), int(s)
+ except TypeError:
+ return 0
+
+ return datetime.timedelta(hours=h, minutes=m, seconds=s)
+
+ def is_connected(self):
+ return self.data.ChannelStateDesc == "Up"
+
+ def is_ringing(self):
+ return self.data.ChannelStateDesc == "Ringing"