backend_PYTHON = \
src/backend/__init__.py \
src/backend/accounts.py \
+ src/backend/asterisk.py \
src/backend/base.py \
src/backend/blog.py \
src/backend/bugzilla.py \
return "postfixMailUser" in self.classes
def has_sip(self):
- return False # XXX
return "sipUser" in self.classes or "sipRoutingObject" in self.classes
def is_lwl(self):
return sip_registrations
- @lazy_property
- def sip_channels(self):
- return self.backend.talk.freeswitch.get_sip_channels(self)
+ async def get_sip_channels(self):
+ if not self.has_sip():
+ return []
+
+ return await self.backend.asterisk.get_sip_channels(self.sip_id)
def get_cdr(self, date=None, limit=None):
return self.backend.talk.freeswitch.get_cdr_by_account(self, date=date, limit=limit)
--- /dev/null
+#!/usr/bin/python3
+
+import asyncio
+import datetime
+import logging
+import panoramisk
+
+from . import misc
+from .decorators import *
+
+loop = asyncio.get_event_loop()
+
+class Asterisk(misc.Object):
+ def init(self):
+ self.__manager = None
+
+ # Connect as soon as the event loop starts
+ loop.create_task(self.connect())
+
+ @property
+ def manager(self):
+ if not self.__manager:
+ raise RuntimeError("Asterisk is not connected")
+
+ return self.__manager
+
+ async def connect(self):
+ """
+ Connects to Asterisk
+ """
+ manager = panoramisk.Manager(
+ host = self.settings.get("asterisk-ami-host"),
+ username = self.settings.get("asterisk-ami-username"),
+ secret = self.settings.get("asterisk-ami-secret"),
+
+ on_connect=self._on_connect,
+ )
+
+ # Connect
+ await manager.connect()
+
+ return manager
+
+ def _on_connect(self, manager):
+ logging.debug("Connection to Asterisk established")
+
+ # Close any existing connections
+ if self.__manager:
+ self.__manager.close()
+
+ self.__manager = manager
+
+ async def ping(self):
+ manager = await self.manager()
+
+ result = manager.ping()
+ print(result)
+
+ async def get_sip_channels(self, filter=None):
+ channels = []
+
+ for data in await self.manager.send_action({"Action" : "CoreShowChannels"}):
+ # Skip header and trailer
+ if data.eventlist:
+ continue
+
+ # Parse channel
+ channel = Channel(self.backend, data)
+
+ # Apply filter
+ if filter and not channel.matches(filter):
+ continue
+
+ channels.append(channel)
+
+ return channels
+
+
+class Channel(misc.Object):
+ def init(self, data):
+ self.data = data
+
+ def __str__(self):
+ return self.connected_line
+
+ @property
+ def account_code(self):
+ return self.data.AccountCode
+
+ @property
+ def connected_line(self):
+ return self.data.ConnectedLineName or self.data.ConnectedLineNum
+
+ def matches(self, filter):
+ return filter in (
+ self.data.CallerIDNum,
+ )
+
+ @property
+ def duration(self):
+ h, m, s = self.data.Duration.split(":")
+
+ try:
+ h, m, s = int(h), int(m), int(s)
+ except TypeError:
+ return 0
+
+ return datetime.timedelta(hours=h, minutes=m, seconds=s)
+
+ def is_connected(self):
+ return self.data.ChannelStateDesc == "Up"
+
+ def is_ringing(self):
+ return self.data.ChannelStateDesc == "Ringing"
import tornado.httpclient
from . import accounts
+from . import asterisk
from . import blog
from . import bugzilla
from . import campaigns
# Initialize backend modules.
self.accounts = accounts.Accounts(self)
+ self.asterisk = asterisk.Asterisk(self)
self.bugzilla = bugzilla.Bugzilla(self)
self.fireinfo = fireinfo.Fireinfo(self)
self.iuse = iuse.IUse(self)
import PIL.Image
import PIL.ImageFilter
import PIL.ImageOps
+import datetime
import io
import ipaddress
import location
#_ = handler.locale.translate
_ = lambda x: x
+ if isinstance(s, datetime.timedelta):
+ s = s.total_seconds()
+
hrs, s = divmod(s, 3600)
min, s = divmod(s, 60)
</section>
{% if current_user == account or current_user.is_staff() %}
+
+ {# SIP Channels #}
+
+ {% if sip_channels %}
+ <section class="section">
+ <div class="container">
+ {% for channel in sip_channels %}
+ <div class="notification
+ {% if channel.is_ringing() %}is-warning
+ {% elif channel.is_connected() %}is-success
+ {% end %}">
+ <span class="icon-text">
+ <span class="icon">
+ <i class="fa-solid fa-phone-volume"></i>
+ </span>
+ <span>{{ channel }}</span>
+ </span>
+
+ {% if channel.duration %}
+ <span class="is-pulled-right">
+ {{ format_time(channel.duration) }}
+ </span>
+ {% end %}
+ </div>
+ {% end %}
+ </div>
+ </section>
+ {% end %}
+
<section class="section">
<div class="container">
<div class="columns">
<ul>
{% if account.has_sip() %}
<li>
- {% if account.sip_channels %}
+ {% if sip_channels %}
<i class="fas fa-phone-volume has-text-warning fa-fw"></i>
{% elif account.sip_registrations %}
<i class="fas fa-phone-square has-text-success fa-fw"></i>
class ShowHandler(base.BaseHandler):
@tornado.web.authenticated
- def get(self, uid):
+ async def get(self, uid):
account = self.backend.accounts.get_by_uid(uid)
if not account:
raise tornado.web.HTTPError(404, "Could not find account %s" % uid)
- self.render("users/show.html", account=account)
+ # Fetch SIP channels
+ sip_channels = await account.get_sip_channels()
+
+ self.render("users/show.html", account=account, sip_channels=sip_channels)
class AvatarHandler(base.BaseHandler):