Skip to content

Instantly share code, notes, and snippets.

@kashifpk
Created July 7, 2023 19:02
Show Gist options
  • Save kashifpk/3aa0c2f93e63b7a79222e9f619899d96 to your computer and use it in GitHub Desktop.
Save kashifpk/3aa0c2f93e63b7a79222e9f619899d96 to your computer and use it in GitHub Desktop.
Sample Chirpstack GRPC communication
import grpc
from grpc import Channel
from uuid import UUID
from chirpstack_api import api, common
from chirpstack_pydantic.models.common.common_p2p import Region
from chirpstack_pydantic.models.api.deviceProfile_p2p import ListDeviceProfilesResponse
from chirpstack_pydantic import protobuf_to_pydantic
CHIRPSTACK_API_TOKEN = 'fill-api-access-token'
CHIRPSTACK_API_BASE_URL = 'http://localhost:8090/api/'
CHIRPSTACK_USE_SECURE_CHANNEL = False
class ServiceBase:
"Base class for service classes"
def __init__(
self, channel: Channel, headers: dict, api_base_url: str
) -> None:
self.channel = channel
self._headers = headers
self._api_base_url = api_base_url
class DeviceProfileService(ServiceBase):
async def list(self, tenant_id: UUID, limit=100, offset=0) -> ListDeviceProfilesResponse:
service = api.DeviceProfileServiceStub(self.channel)
request = api.ListDeviceProfilesRequest(
tenant_id=str(tenant_id), limit=limit, offset=offset
)
response = await service.List(request, metadata=self._headers)
return protobuf_to_pydantic(response)
async def create(
self, tenant_id: UUID, name: str, region: Region,
supports_class_b=False, supports_class_c=False) -> api.CreateDeviceProfileResponse:
service = api.DeviceProfileServiceStub(self.channel)
device_profile = api.DeviceProfile(
tenant_id=str(tenant_id),
region=region,
name=name,
mac_version=common.MacVersion.LORAWAN_1_0_3,
reg_params_revision=common.RegParamsRevision.A,
supports_class_b=supports_class_b,
supports_class_c=supports_class_c,
supports_otaa=True)
request = api.CreateDeviceProfileRequest(
device_profile=device_profile
)
response = await service.Create(request, metadata=self._headers)
return response
async def get(self, profile_id: UUID) -> api.GetDeviceProfileResponse:
service = api.DeviceProfileServiceStub(self.channel)
request = api.GetDeviceProfileRequest(
id=str(profile_id)
)
response = await service.Get(request, metadata=self._headers)
return response
class ChirpstackClient:
def __init__(self):
self._headers = [("authorization", "Bearer %s" % CHIRPSTACK_API_TOKEN)]
self._api_base_url = CHIRPSTACK_API_BASE_URL
self.open_channel()
# Load services
self.device_profiles = DeviceProfileService(
self._channel, self._headers, self._api_base_url
)
def open_channel(self, settings) -> grpc.Channel:
server = "localhost:8080"
if CHIRPSTACK_USE_SECURE_CHANNEL:
self._channel = grpc.aio.secure_channel(server, grpc.ssl_channel_credentials())
else:
self._channel = grpc.aio.insecure_channel(server)
async def close(self):
if self._channel:
await self._channel.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment