From ca2124b303889211e9068cd0a3aa20173f47f991 Mon Sep 17 00:00:00 2001 From: Ryan Winter Date: Wed, 28 Feb 2024 13:57:23 -0800 Subject: [PATCH] add sample Signed-off-by: Ryan Winter --- .../.static/img/overview.png | Bin 0 -> 22294 bytes samples/dapr-pubsub-dotnet/build.ps1 | 16 ++ samples/dapr-pubsub-dotnet/deploy.ps1 | 13 ++ .../deploy/telemetryprocessor.yaml | 202 ++++++++++++++++++ samples/dapr-pubsub-dotnet/readme.md | 120 +++++++++++ .../Controllers/DeviceTelemetryController.cs | 46 ++++ .../TelemetryPersister/Dockerfile | 17 ++ .../DaprRawPayloadInputFormatter.cs | 41 ++++ .../TelemetryPersister/Models/CommandInfo.cs | 10 + .../Models/DeviceTelemetry.cs | 16 ++ .../TelemetryPersister/Program.cs | 50 +++++ .../Properties/launchSettings.json | 31 +++ .../TelemetryPersister.csproj | 16 ++ .../TelemetryPersister/appsettings.json | 9 + .../TelemetryProcessor/TelemetryProcessor.sln | 31 +++ .../TelemetryTransformer/Dockerfile | 17 ++ .../Models/DeviceTelemetry.cs | 10 + .../TelemetryTransformer/Program.cs | 69 ++++++ .../Services/DeviceTelemetryReceiver.cs | 161 ++++++++++++++ .../TelemetryTransformer.csproj | 21 ++ 20 files changed, 896 insertions(+) create mode 100644 samples/dapr-pubsub-dotnet/.static/img/overview.png create mode 100644 samples/dapr-pubsub-dotnet/build.ps1 create mode 100644 samples/dapr-pubsub-dotnet/deploy.ps1 create mode 100644 samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml create mode 100644 samples/dapr-pubsub-dotnet/readme.md create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs create mode 100644 samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj diff --git a/samples/dapr-pubsub-dotnet/.static/img/overview.png b/samples/dapr-pubsub-dotnet/.static/img/overview.png new file mode 100644 index 0000000000000000000000000000000000000000..3eaf4131b1d14f3f54cdd9b35a84d8ba32b2f3c3 GIT binary patch literal 22294 zcmZ6z1z42L_dmRVC`yS+m!O1F($WnI5{iJdbT3j%he`^Fh=4SLlr%^#EDF-yODrfI z3rjcbJK*y?-{1fBy1W$b`<^*-=HzG2nJ^7CCE{x|*B}rG@#9DG&ma)IuMh}MIsq>D z4d0+d1^5TY<(ZN!q^O5(8T@d`N=8)%0x69mJTbusKVNlkL!~dSKr<;F?-J1cJkh9hLar|y{2{%^&T6P zkncQ`=dOq(saw)>iO1H%*Ke8Ku&2Wh`TemP@ly7QLo&_0Y3KN`h0fRu#MQ@@bh;l| zCZaRgIc_N^_xM|$@o%S%jU8K@@gI9O_ywknC1AUH3b4nh{&jzSqstJk?%v*gQle+^ z+`S!la3SoBCX@m1wRLoE*5e6JB~+ii&*pZzbVI_%Nnp+i2l6Sbgiu<^Y_Z32Y-~TT z*ImW10%} zV-Q`VT@`+seEc4I4D=9b5dt`5=VP~}SkzY^n<%dX4O=Kwjo}VNy8ZmYKvGA6oytK5 z1IzozWjsy7!;o}?Ktl*kLTb(TRp|zv#i!bC=;gG}$sd)jK{RL@xgmyBO_94_M=PCm zg{yx>i+rSeVPq87N(j-wJ%30P^}fOpv&$l`%oWUp7vM|=!?Aw-r$xD-=T4rz5JwaZ zB%KPp$7Oz5y?FZ9E3@X+7O|VlT~|GaAdu$kFq}X~KR-WgCXukV?V7~r#FUiSRuYKD zbrVYI11e}iLG41^x;zHb^Z`fu9w1*XQY1Z@Ak-6XPCZ2RS5YO|;EuEY;ON#`W zBDD;N|7|@+SFS>s>7OSpOl2l zV+Ku#^!vEO!^7FusBkQEZBMd@%bECer{brAq9WE~K<4F&O2O!*$NWUpkU6tJlxB`v z`#t%XN40^5@}0ltp0_dt)8?xJ2GTRo$0$)iek&j&@fx`^EId7HIHD0pmfWi&*(3H# zl9MS$dU_vQDIw{%(JeS8l>Lpb%mj;flRhT=9NapJX3lYmG`Ss8QGLr6dgEh(VB5m~dX58FH3qSl(QofzR4 zch+Wbf;@!wo}=EGv_Yf+6W^9KLXXqkCLa0itr;jZ zLK%oj;cw;CJXF^;W{Q)nhaZ=EC(w#31(ZzOCK4X`J?{NLP)v*?N)jR<3^?%o7A*3X zHLH%`wn$YZEqrY?Q|*UaW09|1{c}$3or2hyuaSKth^Fe%siN{CvV~7~9VS_HC~LGI z7d_KJ#(oM%2=ynDQ*_g&$SoA<#fYdt>881O$n{BEtJlPRe%4!}jgWivfsn-;qiCv zSFYQ+gQ1ZfIF22h<`-9{hjn&_jo~wdB62Kvh42O zw9%6XdrG>UHHD?i2`X@so&2W}w$i*no?j5aoiwzOZbpP_@>!OWi(V(0}slP_q>xY)V&-y;@xNs?Sx$2S)O%v zc5Xd^NS8AM0n#-QW)u6C&|6d3!+9OXrB+91-|>go!S-dvc3K6~>R{D^zY8tzVN;Pz z3nBn6e`f@iLOaqpD-sD$;vcYjiR^YC9(ge;N-)(P5MqO}_~b%FSZQz2@soRb>HE#Er4c?4d*t|`>`odfx8UZOfc`g+@DLKN z`(qDpIdfT54bRd17`qZKQgdg~goMPCG$l`?vx9`7V|in8k4+J#gt`BitaT(s`I3;pM|{T0l9$B@L|X4Inj}Vb>b7_W)NqN-8N#e#J@L5# z<#{n(zn6EDsGyu(wd;HG#w+(cB_Zke&PkXn3Ev1quP!)ZtZtYp?rz)tQ;>hhp z2!~9B=lG^5IB`sLPzeNjMn>z(UyBh5e_C9NftO@lRp#O5?%=-yaZ&=}d=zLXQ&X02 z6z=92Ua}e}wcq%a_UxT;;Z6aIS?*T9mhi22oa|P$-WHB+@K~SxMft}pP9rA5fj=mo zARaTht9?(Vp{cKUi^)>RJl`tKbIX3Gu}GNnow;SJ*zx_m8>5pYtBOtH;)HD@UDeBo z2gfeuA8*eG*Xg%6)P8o|A1U8IXxd;HTYtff~sl^xg1k3v^UXk}kZY=#)RSuyAO zzuY(yJAlZ2vNhCa9*h5zWD7~eF%c%AA%j`H^NoLJ2H$s;uIYzE z>*$Z=Ow-z1h1_9RSl(ps8jIv}!lGOAYAxO-(@oRp3DCQL;dsW7{n&9^#!V5CR4 zlBiz`LSJC3ZWlEY!PBabp|jXG14W+LU3c2B_!50O6G~zsL5b}j2tQ-Xj0}AcQbeX? zlHy!f{}YTNJI2o2C&jvvCdPx{KGA*5*`>_6YO%*>yM7HH->w(;wWH$%V8-OKTI$ANO-`OsQZBvTkx~ojkn}(@2;d69Q={!1?3nvwECDB+G8B+*tiI*l;>%CG-dH_F zWYcx{a1`C|JQyvl#9kF9@vbzQIvdg=YAPZ(Uz=uz@vJ6V7{Av7E2vr1?T`0WNERF= zJ_|Bxgqqxp(WLhV9_VZsQSa{I`Xtp25-KUtSmgCuD=ccN#EwhTwsxmz_1VJWS1qg& z^&pPwqxm&;!)9Kzp1HH2Wc~*iCLwiX1u|Ng4V~x|SE66}uXnKv)+_&V{L0uu=^9c6W0V9E-2n6Y?ke<`PD5u%)7Q zk5T^6z)HL8aa&Xh730HF$8fC(bsI4iy?w-7noR||J$~)<aSX$r_i4j`b-rfkB6{ z@i@-u-%R)_t0i!dXxtSksjK5mBZgd84Ma88F}4r$F3|vrJFdaCV;MZy zOI-)+KPOA_vyF0ob=jLfSUslNSK%LXVlk$9>~4AD;yln*0IQ_j@5iQ4kRNCxZ^;>TUo(|`R^2`=I7^&Hiq>=9ra(= z9F%(O@GxC}S9G{Ru2?!k$O;BT)td0NmOZho|=Si!I>sxon>m~;Uyh2=u={_8HD`><$)c^hKeOu; z=pfx-FCG`I_;?E!locz|5{CQO^EiIT1X{8P=dTIZI{Ql>gt74)r5fI1S;1BpVl|}j zgk_b0S8St%;V7-QQWbm_JDwzT4x`I(L1L}hXCKT38qVWEl&=Xit&{NIKH|@3XS=Oe z7=}%x)**(ktR;)Y1{<=qc`Ysm8uI^6^tKQeblVTlZ*+B6$!cXf=WpV?j$Ipzs`;MKR*ORGOU5;y%ZsW~ zE^9AF-)1RMC1RisonN)$-Zl`SoGiA!=+Ci7qc1UuY zB5?rp@rA3gD7=JlF6JCN3gjcACYSvOl38S6CkEu$x*`SfvZC1v&26@Pe*H+soc6sI zt`UtEFebe07MuLruBRlo98Z6b^X0RVy(xzx7E2`Ir>*uduuB;@m(@wCpv3c6hI4~Q zcHQp+G-6)&in1ays8aSSJnbNmoKV$L$o1`)3hDk5ySBkMGKS6YS}W!6Sp|#V?R(2D zALbuf9jptnMTRV(Vij+L5oKwDBWzuHC#mks{T&5iz0#Y;5`;S8q8g%36?2Ju{(_6r z8u@u^a&1hv#>|tZm;;0gX6VU7z!IOa(07^@4yz!M;OG_BAOYr*Tn)KObwkw>buFvf%Z9EJ9)0(~28W5A9gzRN3AjW{Y>URLicCwv}wKE^E=s?WMx zgFj=@D__Lq6<>Djz1W|f&iZYPutOkXaczP0~e~Y zonhf^li-I#iNvTVw=7CZ%K0(;`?BYodrb8<#?*In;3bziUt^}oGUwbAa$6>2Lf1?vBZYZIhD&p%DU)*9DVid`uma7CqcGNxiv=^)fdJdw z*wT=e(mm7APXDG-3?2su$5tI~?D-B{SQF-3fGcnRP5dHowQm3SgcAhJtHLb4hhguO z3++^PviY_QaEE~%`sYhdfY1WsN9{Lq*ze!iZJW&t3c~l~ehpx;FNKOBhC_HJluV{n zWufPERsLrY2>LrZI{N2+jB6wED6&8-0_TVZ=~|o>SuV|g25H{wPqW_Hea@y23PtN) zC604lBA#afdjAoZVK5_@SIMJN55UjxCxs5GcfAJk^DBIs@XiJEBK>{Jy_tAdR-M>5 zL0-KK`xLyV_z#$wDw3}FA35ytQdjpi_uHs+_JsMIAD36@@J4Y3Og16(Fbk2+0P zTVpD1B{*z;~|x8=MDo^1$_hYO%l|7Lk_eaRl97j`do zwp(MgLTdRd<7O@;h4Ck)FQQGEW@{7PDRK8u-`wn+H|9#j^`%uW4AMCoX=i$rkOidn zdvd7QW}>jo!}U{S!m@kS0K(Ijrwof?B0~<>jMhp!e;&NiU2~bK`{0& z0f0qN7z~Dy0it6G>>*9?|JWXg@>*h~syid$UJuq>9lueZ7a@LAI^}=N1p|`;VKSG4#WRZbQj1FIM?{=B&&dGxtWs4#R0Hdva7H9|3uMv z>1Y4X8q>ns^MJKONGlE(Q$dL$mHyj`tBugp3&fmnbZVj3{JDhfMfO0^YQW%|&h-Bb z!V%15_d=qi*xxp>>$T#Yf7Nc}l1bGR-`ku~$C)JCU9MnSIRH=o<806QTV*fgz4Xef z`?9}ertOO;@l?GRFFq*IS>OJ@T%>*JF=szf$zTlL(E>@aNi}a^(EnxSJ$+t3_`fnR@rX!HyXjx}&HVL;|M%fV3Q8SZ zAoPz0M&rw#TTeX-3W|Z{?``nVq9?5UA|m(HZTOkcisOHQO=DHGi|Px-#^M1(t>0WQl*}X$MfKvv3y(P`@c1hb2fYguYA8m-1OK4^s6 zM z%#8%_O(Aj7k?^Uh*&M|=G8}=wb`6RYR$$LS)Z(guIE!Hk&1w}XVcogSZA&7rX< zvPA!*xXI`v%bw62@?Y9)=y8mM&$TpqHp-EC&QQbq2cfo|o?k0`z3}>QFNhb4y8&}p zA#7kHMz42y;Ke1@b~Ve}IS;&(QDm^4HYuunycGd z!v#flK6Fx&f70tFQyRf@`uXQ`UB#<#k-~fabn#)W(}*V8^2vkwdivnt5W~f1)`)V7 zlkCP?hv^6EF+Nqd&kv;7@bBkSS#>|W(*dWMJ?ZeCUna00tTH@&F3{n{dZ{ktsj$El z!pWz~pn2S3wsQLc{wAbwSVH}7&KSL1hs<`3f!{15WY*tRCPcxrEu%|`zOEP2@z*|o zDKYwtGPpsNl>Fj$Q?Acuw$T^6z{m}dw%i7Gm2@g-$j>aQtinh`R^Xl+R1<^Gkl zV8y!X#NzcCx6#Lt9tXEdCZfceI+HU zbsQy(1zwL*CU9DHH$n`tWF+?YwVzfOBuXkDS%=Yw`g}La}U7@Bk~u zWUQ$O^YS1LVX)z0Qkr+;41rNqG8BH#RwmGzVWL#c&nuVViY$9Cc^mg%&2kTqX za5CEzgQNysh{a#8c-h4miqg>~BbX!Hs6PGZShsg9d1_Plzn7NF;c_xExZ~+VH9z z(xHhh(N7O4kap1LFnlo)>ueO32=tAN!U)N=%k}Q}4(A9l|8{%THOp_BVhGukbc23^ zR{$QMHc=CLIF!nUUa%ki9c;+-TOy*CsY*37Z&G|^O#Vj~ttby>>NBQ47FEq>BThH{ zwc;JNgsOieb=I@kwA-l)KOJ{C&~VY%?kAmgZU|>eDVP7WK90YXLD^QN9mXN-9VT{Z z$Cd`hAx1*ZP|p_?99|MmVD|{A@Fdh|U~NXBtT?yz(g*cFa z$&Q1HuN-vJqG!ik6C=m*J%7Pyp7Z4^8lu{&HuewAl}jQ+uSr`!x$NMQ7*3VnaXjvI zCIiRqAi;wOJUwltN`8`g$zZoKx_*wYXCFs?>FLMhnsb}cfB*Ixeg!KO`9}-=k*w}y zO)TQ&BDBrsr@OxMen(gMLetgBxa3)(^%k*dxlTh=I$CQ37xwn&er>CK*C&wykB6Qx zWIDqY%oh@`he8;?%XFgiXpg8=kwI^EWq|1e{{W1*mcLAQgfevGDiOTr37cbBf+?3b zg0_V2>}%E~R)0Io8-k`)T6?C-$J~zPQ%^Yc%HJlom+LsgJu59L^L{CNJD!qqS-bs2 z<%1KT%iEKy3^&bi4nxh86skVEne2a7X`erGGtjKJe>_G6qC_$mRXY{V(%lTJFVL5)}DA#m`rzD_yDU8h%Jp z0nTnzp2ZPKy5d+VeNe?uo@c)Bu)uL;%#?84h@dA{xa?f^KFJaN$)@X5>EqLhD)XZJ zMS`6Uy8SV&jy1e9*47XuMtOL7UiiZdFK+$ddr^W+ zM}5}PD&?g0sg8-2Et&iY{%(pbt`9Ct$=)b-)5%pr_#v&p0+oIj@Cc->Ukb=Qmc*@E zMN&-*PsEG~OmuFp^B2gsD=Q_rJXBA_owPs?>38T6XSyS~j*~~?w5_g5qlf3rh(f8c zY@*6qvy>W*MXWT85x*#6_vM&7FUwn5P#%}8#w!G^VayiM%Mp)rz z3hOId7y@VN<1w=koqG?^_i;vzP~Wc2s&qGVqT}wIf#RO;e_eIxP>T$H zBtLXwMhC^FT+)!d2vCTlJg&MM8_cUzBt7UVGd^gpU+Ap+Xfdx+ofD0lAO4t)fz9))qX)0BR_H*ea40Ke;L^a2ir{O5|qNt~Fh$LyaopgEzwa^hsmBh+o z9VEx(n>{lOLmkgB1=+)2gq5_$$?>n$?|AA6C-Cff;=bPehDn;` zHfx~~Cbo0M0m8Q`;X2|OeNz_U@{LZy+f&T&UMb&3@--PQ4_Wiytmbw<_d9%J_G{Vn z&ea3dJG7c^7m@qyOs0^b?Hw@wJ^dQF9HLdt#HGE^w|h5;v1>AW#2A-{C1Pmr*pBWsEZdHK}+=l0Ok!5n`G){1GL(*l(I;2r4|vM z6^C%XR&n%#h@hn?uZjuX+>NOp1UOtCa^`)239>)xp=<5r?sF++8L%MF)xH~7=pqREn^ z92v7`(EcP-7l!aAxc2XUepq(n&!)AJWRu0%pm{deHQ1ZRqUf({L2r9l0|tfxM{5%m zAB|Yo$>!Y5e}vZvC1@4m3`ALKcEpJBVIedWu&1A}H|Z^`rPCO?j_!sf_o0g^r00II zqDQWo=l~c3I1nAm7M6i`$Mb!SMf)AlN5_y~3SfU8ne8=btQN}!ot0S13)_jyV;O^n z_Di9Jiamq|E!Xl^l1$8ak9iboI^oVV@m!bCmE)LuRVgg7KRM`Y^m#nlB2_)G0dX$& zS?1JEzX6F~wb zKigEaKW?jr?5=13Obc^~=Uvn{>AaTRV~&{5`q)%dGXc$Zb)+PQ%joksu)P)1aY7PH z5|}Coe!d^WJTAIz(N(eT=HhT>iErE)3rhMx>IE_>V#${DuBUi$O;y^z`7AfeZhS5c z5Y07Boo6wgRTEqB*{>w~T~Gp@6VKPRcRObe4dp%f&GaG?d z$`*T_C!@wm%pKV(+kFiY*QxlgYu~*J*G6;f6586j#&FQg_w09fRYlM@M?_>96%!jw zMNd@Qv%RWXOXz6}do%b()FbOo?}*A+`;QVkp1cBmplIKVpU^N8?y2hKQ&H>mkkw3{ zP`xdWVB67mnXAN2`bK0j+KMgC5)IPh@9}kG12u%wPDMpU6RGqDsP_c^09CW^4&fbXbf&u4NjJ8$Y>5vb<== z_44os`LMfqi{V07a*GxsP61Y0wR)`b&NJTU_DtTwVP}wrxods8uLeVA={v`P1~!WQ zUh0f4U-mEaF98|O?>MB9_!@W@8R}|g-F;e&_#3E-C5=mC!bf_A&*d?tM>H72sjKWY z9Pa|T7LoZ^gx5E!&}E6vG|AMC)46x;vG)~)D%J(JbL(w5uHvu1#T1hw9>|99oX4eCzaxqbvxx@jXKdf5B#wtgj4S*Z(Hs zp{d_-y3?Wd_vIGl_@uxHiaE3R$i=ZMrSCqwZoPBlb63Z0gxYXjvFj+|h)PnXlbfv$ zGh3aUpQ7l@XKSln6EbDMCeJ2g=tGljq~>uc_|=nl=Xkfi50~xVTRDx#zx@{|8CCMs z>bVM^a$WK~6}ln!W3{R_Vt;pvg#9&#-cLQ&&vlVvbzHRfT6?%*6s0i9nx@Z=XF9~v zB<|T|3S%BWeIIN`PNYjve`njPhZb?=STtA?!J#v*?K#Rl{3GIx>o#tV-ZattkvC1! zC>JaFTa5;Mvd66WtxMCp57SOe5ih26iKGE8e12XPwe`*3ydr(#*;(lFv-r*WsRIE%#ea4 z=~$85B=Jtp!|=(8mOP#J<(}_iSx^c0&0eQd-Cv;eq+O5<|1@!yliTf)dPRa9N4^Q| z!gKOxngz8!UPLQoF?Zt}JF{kA9Y{~!FbXS><_|`RV~FRT68m^t_Hz0zIWxI)WM?=w z;Wic3f3C?WQ~u)42VrK~nBk6h`$7&*%sQ(nmRv(|sPj3ysX-RNSVQSvQ$Ps7X)<2s zkKkNt>SV1IahPy^X0hlWdoatsm?p0Y~;CH$jXTpW;pPi9F<3Hb9|466>j))-S z8a(UHx|B4`Xl5_xZRqY}o!e+!X^Z4=xWPzJFo@3Y3Svg?E?)!oG&jP{DZaC|50t|{ zwU|fgu^7HSA7B;f9{2aRWUQvp!jDC_6K|wcBtsydGK1@RHT1!k5XdhI5O{+Y0tJvk zK&0(K1_Oay|NlQy_|(-rhbutX1Ox37DB_4lgUV4<)4JZX-TFmSg4v^@N+V ztrk_I*YDi9Gdnx$ybXaQEq#ch7h^r{2&ST;Va81%w%^i)lFBu_JEVR3Q(5=5Zt6?Trrh_Gy|0K6v$-ZQ9v^=COJ zE$$|-HHfqeEhA%Ea`LB}#8o9FuV#K-6~Xxgjs;L$JUonIFnbFN1+@?pdwY8? zFFo*f`3-t!)|jjApA%dB{qdfHpljWgAW`%Z6he?fBBL4dBhnhcvvx;5fww24oy430S zV5#BG>JRK>CX}KBhn)I$E7v4k=F%95MkLty{!uV#hYgp4 zDH|DgBlpgY*zoKVNwENAc!z>!OOQP9?bh;jGEsdG|C|@pw8-nweMx)FNJEpDloWF_ zfYOALCTv9diVrWYbcBLkI$lahg!B@aTEQWt#Fv?O3Q^;~UCD+yJNgkXv@f%zI=-u7x2>Ui4Nh`HYc|0#$@dI!x zG$GM{dbM(IZWUK-PT2P_>TJi``HS95D!xlVCwxGQ0GnEJ?%E?G>_+oC$8`BTs=;qs z%+F%NZn~oRUVoL}9*-S0ENI*a7boEQ@GPmeaW((p(GQZt?xdo#a=l=~*G^2)0BzYA zo~`$|(*$*pn!F5Q`qrvLMner-{71kbK=GOK2bYAf)uoHOn5nJQ>1s?0_UA+uVzU#w zc7*8+vJIKGx^f-QjmCT-+n&T4 zFDANwJErBz)nLWc!=AH0{{HspaqQWtFv4+lLUL$xfpl_v#ocE(FHYF;s8Ai_WsTiP z#gL>t^c}PP>gDDY1PM^2p{B0!so8q(v>X)6Nav^L+NQ29&c>?L)1^nhl{V$E5s5h- z*J7+4on~sV7F}ed>*XHH)G02{MfK*Nf|q79-jww#=|6RmMwY zGzF|$;2>E*()(!RbjGT@~*3`q=BaKx~krR_7haLO&-OC@U9@1|e`Fk|l zer0WGZDnI)o4=IEU_xmk>b5d;a#EL^+cvbb?ReVktzR@*?Hk{(lqM!M?RhYH6E&-T zS3U(JzO_S|`f&eklgRF`8jO1?J-zn`7)$%m7Y2E%eX0Y`=CLe3+8EV0s&blm`_|Oi zXQrm-P8O`~j+NcvvDNNDB7Fwf^1dN+*Q5lMu`1uCsW{=b6567~s8KvfB(H>ogpd#` z+kLm?gyww7t=+iM&JtSgmY~45CE>3^t5B`)>GeS)%%DXg;hv(;qr4G0K;VWmG#3mjQB=J)%5ANwBv6}R_=cLJM&f)QV04I`bHCHiD}bYD%j1}qjh#F zo%%u*2Af&>>yMrodYoDLdnPTtQSkk3HZ4#)D(q1xEv*qaXOrUy`t*CZ2 zR?21c*xxQ{@>$k5I%8Z2jScXkcN;rI=U)Fz*!Q)MUs9PqI4fj4L|yS4Sottrwyf{1 z)+2eGNqXAT5K3}*tz6Cz1&@5{2Ew_9!?gEm#$9J8#i&uh^b9#aY%Ah$YFa>SZWMKx z#dVsY8XXl$a%i=F6qZ(V?8`Xe;E-!-)stM~v6+#4-lD>MY53&LVK5KfVSC!~k-nry z?`y_nUrBhK8D@F3uJfR~9pmS3zUUsJH}!paSn7D#8*%uldi9ZW<3V7P-2~RjtT$+R zF*4xzDAIpd%70_Bkr0@ieYpSkoO!`DM(j>cSY8lzH>!b4bd?mlS9NrmyGv644;u%a z)_WAttmoasAr08Y%woUAO5u8JDn^3uz84BbN?Jcc*W=TRVDz)XmV}lRdmW>aFy7B5 zW0#VAT8r&()~{ptn}2m>p|42psREHX#2)M)E*f1=*}$B(8##?S`fM5Zs&n~ll~0q= zZ}Iq;dhRu6DT#~xt$sd)18D>O>-Q{+id|cjpVqb1D)J&Tgvtx;3|e| zUYZ0pOMjmxY>%zMIuMnxIDNW0ecg9;|LiovO=oot#eBtoV|GHF$nfm6!n@fttzkFR zO-Jw`pE{r|@4&@^xe&2s)o_;Iu%9kiQ+qVv?KexC!ASAgecPAXY4#iGv>ovq?vpdC zy~?_+aBZaTv=XL@qI~_RF{M3jbn0*_YD&suWq{FO`@_-)Y-@LvzPpwg6biMrex#Nj z25^hS4ut5d)%cIaUagfJ)7uqmaJA$#y@QT=JgMFH#g%n7gK}4b86_VMY{jWc9rC-M z4rUwm*RP*e{i!->l;A3MDMlk^h|e7>c5mnK@U_`s!=AU8p3&f$7}jk+C;{zXcQnCO zjF`n>YxJR+*L)aprRNH?k0Sq6s@8+;@-6o6jdK+DFmP>S{!N3ImZSqCJ^791@benv zD6U-{;bD;vIYD8f>UQP!E9*uOWyhP2h?$N^{}ZceTJNK=uFdd`h zw#!}5$gMSa;#>XEZHA$+n`}!Tn1Clr?C|?}va`Y8u#>j;j$vC%>R4+w>AK@t+2+s5 z(eM6t`|XS6os8INSlEQrUL|~L?~k|f=14JB8lLws{dovr!;eO6t+g!8J9r8Ten*>shGuWROhh}QRhqqA$vB~7S?37;I}5_ujAS#b+`xb zCh?5fRSi3a(HR@~Y*t}kRrND9>~iS%9vJm=>Cziz){-)C*DVQ04i zaaf8y8N-}xVI()|l@yx|nXeU}Zu@(0Us)%sUo<{#kg7iFMeIkb+c#i-&H^WcIomz< zpD8px>@r@*{_-F5Z59*u|7|vbsdbkwC_ ziXp#gxCt_qILOSvKaRqG@c zufH{w!E<5KGgHn_5-z^F_CcwqC+voBTo;M%&Xg2gitl3a)aX{?p5wYWQ>}47lRBKf ztUoK9#Iv-PB;l!3Ixvamk?o;)t=W-MrG}eXh0_9lrS%K<)z%>CQ7z`b%-DEn^0!gz4x{rC!9*BI5qhNYcdn4&4e+a|toc#JK}= zV~})j789y6=sy##ph6k2zz}7;L)RTaw(w<&u~hkMcCnAKQYSfWgO9qF9QT~*IklRY z@JwHIqjL}FaF`eA?o-i{#*rzf2#S$hpN<-j`7Y1Lld|<<#`c@;H~d-E?8Y$Q1yrsa zNnGk@Vq8l=o}#{>O}WI(`^j}q;H{y;;4`vjzjyr(kY58cGc#cSGl!YdL|hg?BI}H+ z0pkAXcm8P-x5n)EeudpMP=H)l>G$F{#`HEa4S*CSl;Azt5gA`FE#7sr{-e($z#@Qx zp}9+-89B%(gU^;Ia{>}tzr4I$TFNW(rL;4iCn_SM?dolyTH|@>{V`e_H@Dq`%~??R z=u87KRmOfH&)j8`2?UisYfIRE4#g^*zPmfv{^V^|5u8gPk5n=F{i^h3)vtWw#>p>6ox7oawGa zotiWSGr&v19RCnbui3AD^jnRMjezpGOCK0bs;I+AvbGOD3cM8@~t&RG|FI=fyBqGn#3bY|*_P^(SiWv(Jb?L3FN&sCiDu%7;uo zP8wHfq=P7PDX9M2a^qFiKd%}6h?f@oVaqS}JTg4;AEXM<_JrltpA?Im|)1~t@#v4^l z#ypV{VTaFdFNOpMgSxR;&}BH?mf+$pfm~db&-HOz3Og&e>ech*pyvn}r(vMCo1R)- zh2{DvghYTU%$7;M%!yCBtK%JKAl7Hcr+2OabzNbIA>7mpi(JuTi7gLX z6B8K*&2&0YV5etb;8VcSy(BbD32soxeI}G7;rf}R<}w%u)GnBX_B+_V*q0iT;%$Dk z0J$$8V$hT!r)UAnWBG-J_cxdntMYU~5iy-ANkKtD+bki3B`GBZ<^6%A#u8Mq<`S6* zYs*u9K%2QH(dbN6p7%R#UB{EQDIhrnWod@jv_QO=i5rM|@wdHqNl35=NWewszpm#e zob*esZM*FKOj0AUK+r!aQL}@dOi6I#Mbv{@Ro{0{9U-3-T-yx8KvDQ0(sS8B<<-=PfLcrdM)5|U_b_Y-hKi0_p#_&vgwbN=Fc!Oo|CHbZo5SFnms12!A*n$C77d(NF84do{8ROK zp5o&7F$rgnRI8irJWZytoBK~0<(DfEo=my*$J+{jT5dIc z_#z*OY6D!ltONQ$z(x=VI4?|sPUk~W;%|OnqG#7(ncZ|K*5=|O;j$vQM1Q=Oaw-H; z($6G}fY$Lx_auK{gS5RZp>=r67NTFpSF4(N-djD_L6nNH;h&Wfss+T{G7)a1La`Ri zfJE;AppD{N59yr=+`73?z+#G;-;zc~K2WT>TKaF@I4eY;e(E`>f~R+W2zHYl)G2BW z^2fvIe|y_}83q;jhGDldry$o^v$-d+*Xfltx0ppST3z=yi!Utw4gI@sVHAa}WTgnnsX_ z+y|Qk03)~7C87TN;BFJsC$4ROujs%wwamvrbW*eN_D92qQ_R}Ix z9=m6TI*$*mSF2RrPyqO>V6;mC7$0-~1xrA=;lKC$joQKU7a*T_;OuLdzJN;``uU?b8a}E z`JP^DdI~6s-Opy{mxOH2BXX-pyJ!H*TpKYM$(ytqA5;Uic$$Xs;XEkE-jD>vm`VL` z=lSGO<5%L#T^FbkA%oYAM&op?WvlUem&iV! zY!SJ;)|GANusD}ZoYd){rr66x3dPXy;D-s^b*wG0sy9MaaDqf6xZC_rIEVB<@OnpJ z_#I@p{I`Uv1_!cmr-&dEPxv*Ydw=x(wIGj6+L|HkJqY z(X_Gb?;pwL(5!@-2oAoT)mvnB^GvUOJM(E5)Egm=@4`M~1#!b@mR}3x5lvwgzphWN z{+)_Tms`HmY5(@%0HyR()GC(Y&D(OHHG1O|rgt+E4iW`&sv8_$Gb}Qf4~fXXi#yRG zOs|c<(pSkU`DqxsaNgU=L;o(dBkCPUw|QG2I~NXA{R$?Vlb0E)c)Q zVCTKMft?OvldlC$*K_Qy7YKFp=#jcg$NX=@q0@TarR+UA4+B4p&8ZKLycMmuqquwN z``%EmkNLi!K@}2#n3EJA8`9TWvZ&d%*Ob0KEt?Q}*jxJcdDqOkryz{Zq zq$=WzE%FSBAv=-{2c|w~ja0Qg>|%Bf(7ooqw5_m=*9pS(e5ZdKCqK!1Tb;^iEs(qv z+2;69BHGKu+nN!$#cD0(ZO)&OnsK7~O*eelGBGJ1G@o}1F$x2_^?bX>WX6?<2AcK_LtcQ5XQUDGc%LV4LVp1ml!cD z1_F~S|1`RBZ*E{}d`T54IwEW1hBvf}1ioFLKyWR3e-w=JDi0(NlReE?T$m+whsTthchePgn!!@5*&jwTy z+8s8r)2CZ0(jk$(X@rH1_63B7*zs!Ca8}mVg*zzm>#<^mPEWhfFMgzkj&;0 zZ?)_#zgu$Kw#ydT4q`?@=ww;(Mc9sPQIk8_0pf&9wub`u{_{in{pEl1a$Sxe{o!8I z_U>whu@{KYWR(&tDKi#OKu?Km;FFPsKd3N&-CXA`BK zkDb(}Py?4bFKcW6INwTbo62BE9vvJUR5ETUdQfL{7k>;6L!_`!x9j7Azu(D{D1{3q zK07?(_a}#@7HeCipF840JRC#|iWJqa1rs4}q~BA1A{G69)cW3*Lu#|HDwiG&%Pkn8 zs8I$YtL4(B=2}N$VzX7l2e!|_>+uVPHmjDy;@wb0)BPNNi}xbNL_#eB$8yG}e)jbR za(oiyuHnnmp7$p{D@jd^7+nW;JP_hd6)4L>LFzMLGLd)Wc#O^E`x~X&p~-_7&mT@t4(7e_H-?(A<7Amed0V&XLQdqHnEv z`Han0Fbr~q4g`(_wx+wS7mz6$t`z0Pgot1)hwVi4R+&Sl0I9+!<+_TR#WZ(ZtZ3Om zaNz&QsJP+CJ}b>FAk)BR3@7ygvWJZ+$`j@>AQ>eW`3%`NtEi0IiNc*TkPckmy+%)} zif$>tw8*yY1BV*vC8tjur>~Zi(Td<W!RRqdn!ZkwtmQ2t8yYjvR+)btN0=>`}MU zj#NcBqrf`UQAWI8?9OoT<)-FM%5{Y`%+$;`Q7I`>!PlBL2r|3Q41V@ZJPjKm?gAFN zgwKOy)1X;SDK}WNSuzgYFbZ~AjM}};nrGbz&n(j~3Aj_kTw=@nFDE=zd z5G`G9LOwAW6?$z0mtK&c4|&D+*KxTr0570lJx`icHR$)S>}YMNX^C0f2#!{;9imW@ zXbz;VaRM+y$W>8jiiR9kLQ&rC@cVn45dqHr-Dl<+nz4LeUn+(0x!2IhV4KzUbTPa= zZhCrpapOROMDk8ph+8bap5{cu9G!W%K=XqxLIes>8y(>sYQz-%&Ar>BmY~1Y)J_^S zG%Br~DB1r+76i*eXk(gvW|sLJjP!yYa9}E5wErkfZ7xHeLqth-zMyn)(gKf!W-u%d z`56sA+m-rYMel96wOtGgp|t>)Bh`{2C<)>|snc|~Rx@z_3yBvs-IG;ef$0K@{#yv| zK2!n|a6lhJRAF1LeEE?c0jJQoMONncGOrb$q81eX)z@=Ee;m1y0rPn&0W&a|&rKA@ zCKf5k!7oH)s4btBna%C)vM=~Cs>3IP#J< zFe@MPFcsp;4kYZ?bpMISj|I%+{<;Z+>tS*R?){tDmd%_rPIh+NF9aatWI4^sBwRp! zJv2@@IH>79oaZ~}>xq|!!Q!w~5=7-`az+|fLP*CvbYw5Xc32rS2~P&!!?i5+_XlbR?{8d$_=<-4!=fa(g_;I z7`T284#}P?>i2a}bYYg8$Sh^DGFmqwQMQg|c|mwj5}jUTuhoDf6pzE#%j0O6aqQ`S zA=K0+wq-AmB^|MB`bbp6af%mhj+=%9E$s`yZ^vM>sjA-22Mqc{>=);Hzc!(R8>22m z7L4XZw3V{fb=;m?3$F5!G~g!og3!ePBwWGQk2vz;ASG{LA%jt2F^M@*W*x$>RUsTSD1pZd4;1B2WG@X-mY8&Jv$YgPH|dx{%>@5 zZgT=0P0L#j0Cj-m91M_$JfJ&nEg6K7wrl;59#{vf3Q!FK>RTlhwH}*yP_)DEzbHo3 zNJnok@k~3gBX=G-NFEqpFC7t0O3_~p)CFp_bYuSI9KI0Bvg08>(3vxg-wolVFs%hH z##)KMj6bWBr0AsuS5@8!z$zO7Du{tk%RH+6MFAkp>m~;b)=+}9MzDT}QdrV3yNeXvLF=cN3V#CrD8<$ zeyvKoL2K;~DOstzR)6GY`^5iFM+lG%nh7v?7jDWIRK>B=VM zJdn}=mX*mtMcUVqkwf}Q!vymAm%B{I!~-*$06ncH$CG;RTsB;rcqP^Jflk~Bc1gjC$pvcPoGL(ep|BMnfIhB|0*)SPa1^78)ys!u_| literal 0 HcmV?d00001 diff --git a/samples/dapr-pubsub-dotnet/build.ps1 b/samples/dapr-pubsub-dotnet/build.ps1 new file mode 100644 index 0000000..cd85d01 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/build.ps1 @@ -0,0 +1,16 @@ +param ( + [Parameter(Mandatory=$True)] + [string]$Version + + [Parameter(Mandatory=$True)] + [string]$ContainerRegistry +) + +docker build .\src\TelemetryProcessor\TelemetryTransformer\. -f .\src\TelemetryProcessor\TelemetryTransformer\Dockerfile -t telemetrytransformer:$Version +docker build .\src\TelemetryProcessor\TelemetryPersister\. -f .\src\TelemetryProcessor\TelemetryPersister\Dockerfile -t telemetrypersister:$Version + +docker tag telemetrypersister:$Version $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:$Version +docker tag telemetrytransformer:$Version $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:$Version + +docker push $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:$Version +docker push $ContainerRegistry/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:$Version \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/deploy.ps1 b/samples/dapr-pubsub-dotnet/deploy.ps1 new file mode 100644 index 0000000..4ca961c --- /dev/null +++ b/samples/dapr-pubsub-dotnet/deploy.ps1 @@ -0,0 +1,13 @@ +param ( + [Parameter(Mandatory=$True)] + [string]$ContainerRegistry, + + [Parameter(Mandatory=$True)] + [string]$Version +) + +$contents = (Get-Content .\deploy\telemetryprocessor.yaml) -Replace '#{container_registry}#', $ContainerRegistry + +$contents = $contents -replace '#{image_version}#', $Version + +$contents | kubectl apply -n azure-iot-operations -f - \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml b/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml new file mode 100644 index 0000000..d53fa62 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/deploy/telemetryprocessor.yaml @@ -0,0 +1,202 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: aio-mq-pubsub + namespace: azure-iot-operations +spec: + type: pubsub.aio-mq-pubsub-pluggable + version: v1 + metadata: + - name: url + value: "aio-mq-dmqtt-frontend:8883" + - name: satTokenPath + value: "/var/run/secrets/tokens/mqtt-client-token" + - name: tlsEnabled + value: true + - name: caCertPath + value: "/var/run/certs/aio-mq-ca-cert/ca.crt" + - name: logLevel + value: "Info" +--- +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: aio-mq-statestore + namespace: azure-iot-operations +spec: + type: state.aio-mq-statestore-pluggable + version: v1 + metadata: + - name: url + value: "aio-mq-dmqtt-frontend:8883" + - name: satTokenPath + value: "/var/run/secrets/tokens/mqtt-client-token" + - name: tlsEnabled + value: true + - name: caCertPath + value: "/var/run/certs/aio-mq-ca-cert/ca.crt" + - name: logLevel + value: "Info" +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: dapr-client + namespace: azure-iot-operations + annotations: + aio-mq-broker-auth/group: dapr-workload +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: telemetrytransformer + namespace: azure-iot-operations + labels: + app: telemetrytransformer +spec: + replicas: 1 + selector: + matchLabels: + app: telemetrytransformer + template: + metadata: + labels: + app: telemetrytransformer + annotations: + dapr.io/enabled: "true" + dapr.io/unix-domain-socket-path: "/tmp/dapr-components-sockets" + dapr.io/app-id: "telemetrytransformer" + dapr.io/enable-api-logging: "true" + dapr.io/app-port: "5050" + dapr.io/app-protocol: "grpc" + dapr.io/log-level: "debug" + dapr.io/sidecar-liveness-probe-delay-seconds: "30" + dapr.io/sidecar-liveness-probe-timeout-seconds: "10" + spec: + serviceAccountName: dapr-client + volumes: + - name: dapr-unix-domain-socket + emptyDir: {} + - name: mqtt-client-token + projected: + sources: + - serviceAccountToken: + path: mqtt-client-token + audience: aio-mq + expirationSeconds: 86400 + - name: aio-ca-trust-bundle + configMap: + name: aio-ca-trust-bundle-test-only + imagePullSecrets: + - name: aio-pullsecret + containers: + - name: telemetrytransformer + image: #{container_registry}#/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrytransformer:#{image_version}# + imagePullPolicy: Always + env: + - name: Logging__LogLevel__Default + value: Debug + # Container for the pluggable component + - name: aio-mq-components + image: ghcr.io/azure/iot-mq-dapr-components:latest + volumeMounts: + - name: dapr-unix-domain-socket + mountPath: /tmp/dapr-components-sockets + - name: mqtt-client-token + mountPath: /var/run/secrets/tokens + - name: aio-ca-trust-bundle + mountPath: /var/run/certs/aio-mq-ca-cert/ +--- +kind: Service +apiVersion: v1 +metadata: + name: telemetrytransformer-workload-svc + namespace: azure-iot-operations + labels: + app: telemetrytransformer +spec: + selector: + app: telemetrytransformer + ports: + - protocol: TCP + port: 5050 #6001 + targetPort: 5050 # 6001 + type: ClusterIP +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: telemetrypersister + namespace: azure-iot-operations + labels: + app: telemetrypersister +spec: + replicas: 1 + selector: + matchLabels: + app: telemetrypersister + template: + metadata: + labels: + app: telemetrypersister + annotations: + dapr.io/enabled: "true" + dapr.io/unix-domain-socket-path: "/tmp/dapr-components-sockets" + dapr.io/app-id: "telemetrypersister" + dapr.io/enable-api-logging: "true" + dapr.io/app-port: "80" #"6001" + dapr.io/app-protocol: "http" + dapr.io/log-level: "debug" + dapr.io/sidecar-liveness-probe-delay-seconds: "30" + dapr.io/sidecar-liveness-probe-timeout-seconds: "10" + spec: + serviceAccountName: dapr-client + volumes: + - name: dapr-unix-domain-socket + emptyDir: {} + - name: mqtt-client-token + projected: + sources: + - serviceAccountToken: + path: mqtt-client-token + audience: aio-mq + expirationSeconds: 86400 + - name: aio-ca-trust-bundle + configMap: + name: aio-ca-trust-bundle-test-only + imagePullSecrets: + - name: aio-pullsecret + containers: + - name: telemetrypersister + image: #{container_registry}#/explore-iot-operations/samples/dapr-pubsub-dotnet/telemetrypersister:#{image_version}# + imagePullPolicy: Always + env: + - name: Logging__LogLevel__Default + value: Debug + # Container for the pluggable component + - name: aio-mq-components + image: ghcr.io/azure/iot-mq-dapr-components:latest + volumeMounts: + - name: dapr-unix-domain-socket + mountPath: /tmp/dapr-components-sockets + - name: mqtt-client-token + mountPath: /var/run/secrets/tokens + - name: aio-ca-trust-bundle + mountPath: /var/run/certs/aio-mq-ca-cert/ +--- +kind: Service +apiVersion: v1 +metadata: + name: telemetrypersister-workload-svc + namespace: azure-iot-operations + labels: + app: telemetrypersister +spec: + selector: + app: telemetrypersister + ports: + - protocol: TCP + port: 80 #6001 + targetPort: 80 # 6001 + type: ClusterIP + \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/readme.md b/samples/dapr-pubsub-dotnet/readme.md new file mode 100644 index 0000000..ecdd1ce --- /dev/null +++ b/samples/dapr-pubsub-dotnet/readme.md @@ -0,0 +1,120 @@ +## Introduction + +This sample shows how you can use Dapr in .NET projects to receive messages from an MQTT topic and publish messages to another MQTT topic. + +## Prerequisites + +- [Docker Desktop](https://www.docker.com/products/docker-desktop/) or [Rancher Desktop](https://rancherdesktop.io/) to be able to build the container images on your computer +- [Powershell](https://learn.microsoft.com/en-us/powershell/scripting/install/installing-powershell?view=powershell-7.3) installed on your computer to be able to use the build and deploy powershell scripts +- A Kubernetes cluster where Dapr and Azure IoT Operations is installed +- A Container Registry where docker images can be pushed to +- A pullsecret for the Container Registry must exist in the Kubernetes cluster. The pullsecret must have the name `aio-pullsecret`. Information on how to create a pull secret for an Azure Container Registry can be found [here](https://learn.microsoft.com/en-us/azure/container-registry/container-registry-auth-kubernetes#create-an-image-pull-secret). More generic information on how to create a pull secret can be found [here](https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/). + +## Overview + +This sample consists of two .NET workloads that are communicating with each other via the IoT MQ broker. + +![Solution overview](.static/img/overview.png) + +- Raw JSON messages are published to the IoT MQ MQTT broker +- The `telemetrytransformer` component is implemented as a Console Application that uses Dapr pub/sub to receive (non cloud-event) raw messages from the IoT MQ broker +- The `telemetrytransformer` publishes cloud-event messages to the IoT MQ broker +- The `telemetrypersister` component is implemented as an ASP.NET Web API project that uses Dapr pub/sub to receive cloud-event messages from the IOT MQ broker +- The `telemetrypersister` has an additional subscription where Dapr is used to receive non cloud-event messages from an MQTT topic. + +## How to build and deploy + +- Build the .NET components using the `build.ps1` powershell script file. + This script takes 2 parameters: + - Version: this parameter is used to assign a tag to the container images that are being build for the `telemetrytransformer` and `telemetrypersister` component. + - ContainerRegistry: the URL of the container registry to where the images must be pushed. + +- Deploy the sample to your Kubernetes cluster by using the `deploy.ps1` powershell script. + This script takes 3 parameters: + - ContainerRegistry: the URL of the container registry where the images reside. + - Version: the version of the `telemetrytransformer` and `telemetrypersister` components that you want to deploy + +## Test the sample + +You can test the sample by publishing JSON messages to the `devices/+/#` MQTT topic on the IOT MQ broker. + +To do this, you first need to find out the external IP address that you can use to send messages to IoT MQ. You need the IP address of the IoT MQ aio-mq-dmqtt-frontend service. + +To do this, execute the `kubectl get svc -n azure-iot-operations` command. +This command will show all the services that exist in the namespace. One of those services is the `aio-mq-dmqtt-frontend` service. Use the external IP address of that service in the following command: + +> [!NOTE] +> The mosquitto pub needs to switch to a different auth, or we need to enable username/password. Maybe switch this to a mqtt client deployed onto the cluster similar to other tutorials + +``` +mosquitto_pub -h -t "devices/device1/sensor1" -m '{"Tag":"fan_speed","Value":35,"Timestamp":"2023-07-03T18:20:00Z"}' -q 1 -u client1 -P password +``` + +After you've published a message via the command above, you should see some information in the logs of the `telemetrytransformer` pod. For instance: + +To get the name of the pod for which to retrieve the logs, execute this command: + +``` +kubectl get pods -n azure-iot-operations +``` + +``` +kubectl logs -n azure-iot-operations telemetrytransformer-75cf9f8978-47fqm +``` +shows: +``` +<6>2023-07-04 07:54:17.711 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] OnTopicEvent called on topic devices/device1/sensor2 +<6>2023-07-04 07:54:17.711 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] payload = {"Tag":"fan_speed","Value":37,"Timestamp":"2023-07-04T18:20:00Z"} +Received message: {"Tag":"fan_speed","Value":37,"Timestamp":"2023-07-04T18:20:00Z"} +Message deserialized: +Timestamp = 07/04/2023 18:20:00 +00:00 +Tag = fan_speed +Value = 37 +<6>2023-07-04 07:54:17.712 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] Publishing message ... +<6>2023-07-04 07:54:17.760 +00:00 TelemetryTransformer.Services.DeviceTelemetryReceiver[0] Message published. +``` + +Since the `telemetrytransformer` worker has published a new message to the IoT MQ broker, and the `telemetrypersister` worker is listening to the specific topic where the message has been sent to, you should see that the `telemetrypersister` has done some work as well. + +Displaying the logs of the `telemetrypersister` should show this: + +Again, first get the exact name of the pod by executing this command: + +``` +kubectl get pods -n azure-iot-operations +``` + +Once you have the name of the pod, execute this command: + +``` +kubectl logs -n azure-iot-operations telemetrypersister-7fcf59885d-fd9qb +``` + +``` +<6>2023-07-04 07:54:17.784 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] DeviceTelemetry message received +<6>2023-07-04 07:54:17.784 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] Persisting telemetry for device: {"deviceId":"device1","timestamp":"2023-07-04T18:20:00+00:00","tag":"fan_speed","value":37} +``` + +The `telemetrypersister` also contains a subscription on a topic where raw payloads (no cloudevents) are expected. +This subscription is listening on the `commands/#` topic. + +To test this, publish a JSON message to the `commands/#` topic in the IoT MQ broker: + +> [!NOTE] +> The mosquitto pub needs to switch to a different auth, or we need to enable username/password. Maybe switch this to a mqtt client deployed onto the cluster similar to other tutorials + +``` +mosquitto_pub -h -t "commands/exec" -m '{"cmd":"reboot"}' -q 1 -u client1 -P password +``` + +The logs of the `telemetrypersister` should show this: + +``` +<6>2023-07-04 14:14:56.860 +00:00 TelemetryPersister.Controllers.DeviceTelemetryController[0] Command received: reboot +``` + +> Be aware that in this sample, all loglevels are set to 'Debug'. This means a lot of logs are generated, which is obviously not something you want to do by default in production scenarios! + +## Acknowledgements + +Big thanks to [Frederik Gheysels](https://github.com/fgheysels) for contributing this sample! diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs new file mode 100644 index 0000000..9a165ee --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Controllers/DeviceTelemetryController.cs @@ -0,0 +1,46 @@ +using Dapr; +using Microsoft.AspNetCore.Mvc; +using System.Text.Json; +using Dapr.Client; +using TelemetryPersister.Models; + +namespace TelemetryPersister.Controllers +{ + [Route("api/[controller]")] + [ApiController] + public class DeviceTelemetryController : ControllerBase + { + private readonly ILogger _logger; + private readonly DaprClient _daprClient; + + private static readonly JsonSerializerOptions SerializerOptions = new JsonSerializerOptions + { + WriteIndented = false + }; + + public DeviceTelemetryController(DaprClient daprClient, ILogger logger) + { + _daprClient = daprClient; + _logger = logger; + } + + [Topic(pubsubName: "telemetrypubsub", name: "devicetelemetry")] + [HttpPost("/devicetelemetery")] + public ActionResult ReceiveVesselTelemetry([FromBody] DeviceTelemetry message) + { + _logger.LogInformation("DeviceTelemetry message received"); + _logger.LogInformation($"Persisting telemetry for device: {JsonSerializer.Serialize(message, SerializerOptions)}"); + + return Ok(); + } + + [Topic(pubsubName: "telemetrypubsub", name: "commands/#", enableRawPayload: true)] + [HttpPost("/devicecommands")] + public ActionResult ReceiveVesselCommand([FromBody] CommandInfo cmd ) + { + _logger.LogInformation("CommandInfo received: " + cmd.Command); + + return Ok(); + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile new file mode 100644 index 0000000..8ecd6aa --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Dockerfile @@ -0,0 +1,17 @@ +FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base +WORKDIR /app +EXPOSE 80 + +FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build +WORKDIR /src +COPY ["TelemetryPersister.csproj", "TelemetryPersister/"] +RUN dotnet restore "TelemetryPersister/TelemetryPersister.csproj" +COPY . . + +FROM build AS publish +RUN dotnet publish "TelemetryPersister.csproj" -c Release -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "TelemetryPersister.dll"] \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs new file mode 100644 index 0000000..f77fd91 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Infrastructure/DaprRawPayloadInputFormatter.cs @@ -0,0 +1,41 @@ +using Microsoft.AspNetCore.Mvc.Formatters; + +namespace TelemetryPersister.Infrastructure +{ + /// + /// An InputFormatter that is responsible for formatting the raw byte-stream payloads that are transferred + /// by Dapr to a typed model that the ASP.NET controller expects. + /// + /// This InputFormatter is required to overcome the HTTP 415 'media type not supported' error. + /// See also this issue on Github: https://github.com/dapr/dotnet-sdk/issues/989 + /// + public class DaprRawPayloadInputFormatter : InputFormatter + { + public DaprRawPayloadInputFormatter() + { + SupportedMediaTypes.Add("application/octet-stream"); + } + + public override async Task ReadRequestBodyAsync(InputFormatterContext context) + { + using (MemoryStream str = new MemoryStream()) + { + try + { + await context.HttpContext.Request.Body.CopyToAsync(str); + + var jsonString = System.Text.Encoding.UTF8.GetString(str.ToArray()); + + var deserializedModel = System.Text.Json.JsonSerializer.Deserialize(jsonString, context.ModelType); + + return InputFormatterResult.Success(deserializedModel); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + return InputFormatterResult.Failure(); + } + } + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs new file mode 100644 index 0000000..ddf427e --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/CommandInfo.cs @@ -0,0 +1,10 @@ +using System.Text.Json.Serialization; + +namespace TelemetryPersister.Models +{ + public class CommandInfo + { + [JsonPropertyName("cmd")] + public string? Command { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs new file mode 100644 index 0000000..d9fe5bd --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Models/DeviceTelemetry.cs @@ -0,0 +1,16 @@ +using System.Text.Json.Serialization; + +namespace TelemetryPersister.Models +{ + public class DeviceTelemetry + { + [JsonPropertyName("deviceId")] + public string? DeviceId{ get; set; } + [JsonPropertyName("timestamp")] + public DateTimeOffset Timestamp { get; set; } + [JsonPropertyName("tag")] + public string? Tag { get; set; } + [JsonPropertyName("value")] + public object? Value { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs new file mode 100644 index 0000000..a4ff25b --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Program.cs @@ -0,0 +1,50 @@ +using TelemetryPersister.Infrastructure; + +namespace TelemetryPersister +{ + public class Program + { + public static void Main(string[] args) + { + var builder = WebApplication.CreateBuilder(args); + + // Add services to the container. + + builder.Services.AddControllers(options => options.InputFormatters.Add(new DaprRawPayloadInputFormatter())) + .AddDapr(); + builder.Services.AddDaprClient(); + // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle + builder.Services.AddEndpointsApiExplorer(); + builder.Services.AddSwaggerGen(); + + builder.Services.AddLogging(logBuilder => + { + logBuilder.SetMinimumLevel(LogLevel.Information); + logBuilder.AddSystemdConsole(options => + { + options.UseUtcTimestamp = true; + options.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz "; + }); + }); + + var app = builder.Build(); + + // Configure the HTTP request pipeline. + if (app.Environment.IsDevelopment()) + { + app.UseSwagger(); + app.UseSwaggerUI(); + } + + app.UseAuthorization(); + + app.UseCloudEvents(); + + app.MapControllers(); + + app.MapSubscribeHandler(); + + app.Run(); + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json new file mode 100644 index 0000000..c6454e2 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/Properties/launchSettings.json @@ -0,0 +1,31 @@ +{ + "$schema": "https://json.schemastore.org/launchsettings.json", + "iisSettings": { + "windowsAuthentication": false, + "anonymousAuthentication": true, + "iisExpress": { + "applicationUrl": "http://localhost:64492", + "sslPort": 44394 + } + }, + "profiles": { + "TelemetryPersister": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": true, + "launchUrl": "swagger", + "applicationUrl": "https://localhost:7065;http://localhost:5030", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + }, + "IIS Express": { + "commandName": "IISExpress", + "launchBrowser": true, + "launchUrl": "swagger", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj new file mode 100644 index 0000000..1f9db4e --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/TelemetryPersister.csproj @@ -0,0 +1,16 @@ + + + + net6.0 + enable + enable + Linux + + + + + + + + + diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json new file mode 100644 index 0000000..10f68b8 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryPersister/appsettings.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft.AspNetCore": "Warning" + } + }, + "AllowedHosts": "*" +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln new file mode 100644 index 0000000..d29eca3 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryProcessor.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.33530.505 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TelemetryTransformer", "TelemetryTransformer\TelemetryTransformer.csproj", "{B0B42D0B-909B-4546-B7ED-D991682693DA}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TelemetryPersister", "TelemetryPersister\TelemetryPersister.csproj", "{E3FCD9F6-D872-49D0-B9C8-3603F3339268}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B0B42D0B-909B-4546-B7ED-D991682693DA}.Release|Any CPU.Build.0 = Release|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E3FCD9F6-D872-49D0-B9C8-3603F3339268}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {BEE37979-4585-4BAF-A2EB-077035EB065C} + EndGlobalSection +EndGlobal diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile new file mode 100644 index 0000000..6cda5ff --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Dockerfile @@ -0,0 +1,17 @@ +FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base +WORKDIR /app +EXPOSE 5050 + +FROM mcr.microsoft.com/dotnet/sdk:6.0-bullseye-slim-amd64 AS build +WORKDIR /src +COPY ["TelemetryTransformer.csproj", "TelemetryTransformer/"] +RUN dotnet restore "TelemetryTransformer/TelemetryTransformer.csproj" +COPY . . + +FROM build AS publish +RUN dotnet publish "TelemetryTransformer.csproj" -c Release -o /app/publish /p:UseAppHost=false + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "TelemetryTransformer.dll"] \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs new file mode 100644 index 0000000..dd0e2fd --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Models/DeviceTelemetry.cs @@ -0,0 +1,10 @@ +namespace TelemetryTransformer.Models +{ + public class DeviceTelemetry + { + public string DeviceId { get; set; } + public DateTimeOffset Timestamp { get; set; } + public string Tag { get; set; } + public object Value { get; set; } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs new file mode 100644 index 0000000..2fa9af0 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Program.cs @@ -0,0 +1,69 @@ +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Server.Kestrel.Core; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using TelemetryTransformer.Services; + +namespace TelemetryTransformer +{ + internal class Program + { + static void Main(string[] args) + { + CreateHostBuilder(args).Build().Run(); + } + + public static IHostBuilder CreateHostBuilder(string[] args) + { + return Host + .CreateDefaultBuilder(args) + .ConfigureServices(s => + { + s.AddDaprClient(); + + }) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.ConfigureServices(services => + { + services.AddGrpc(); + }); + + webBuilder.ConfigureKestrel(options => + { + // Setup a HTTP/2 endpoint without TLS. + options.ListenLocalhost(5050, o => o.Protocols = HttpProtocols.Http2); + }); + + webBuilder.Configure((ctx, app) => + { + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapGrpcService(); + + endpoints.MapGet("/", async context => + { + await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); + }); + }); + }); + + }) + .ConfigureLogging((hostingContext, logging) => + { + logging.ClearProviders(); + logging.SetMinimumLevel(LogLevel.Information); + logging.AddSystemdConsole(consoleLogging => + { + consoleLogging.UseUtcTimestamp = true; + consoleLogging.TimestampFormat = "yyyy-MM-dd HH:mm:ss.fff zzz "; + }); + }); + } + } +} \ No newline at end of file diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs new file mode 100644 index 0000000..812448b --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/Services/DeviceTelemetryReceiver.cs @@ -0,0 +1,161 @@ +using Dapr.AppCallback.Autogen.Grpc.v1; +using Dapr.Client; +using Dapr.Client.Autogen.Grpc.v1; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using Microsoft.Extensions.Logging; +using System.Text; +using System.Text.Json; +using TelemetryTransformer.Models; + +namespace TelemetryTransformer.Services +{ + internal class DeviceTelemetryReceiver : AppCallback.AppCallbackBase + { + private readonly ILogger _logger; + private readonly DaprClient _daprClient; + + public DeviceTelemetryReceiver(DaprClient daprClient, ILogger logger) + { + _logger = logger; + _daprClient = daprClient; + } + + public override Task OnInvoke(InvokeRequest request, ServerCallContext context) + { + _logger.LogTrace("Request.Method: " + request.Method); + + _logger.LogTrace("Context.Method:" + context.Method); + _logger.LogTrace("Context.Host:" + context.Host); + _logger.LogTrace("Context.Peer:" + context.Peer); + + foreach (var h in context.RequestHeaders) + { + _logger.LogTrace(h.Key + "=" + h.Value); + } + + return Task.FromResult(new InvokeResponse()); + } + + public override Task ListTopicSubscriptions(Empty request, ServerCallContext context) + { + var result = new ListTopicSubscriptionsResponse(); + + var vesselTelemetrySubscription = new TopicSubscription + { + PubsubName = "telemetrypubsub", + Topic = "devices/+/#", + }; + vesselTelemetrySubscription.Metadata.Add("rawPayload", "true"); + + result.Subscriptions.Add(vesselTelemetrySubscription); + + return Task.FromResult(result); + } + + public override async Task OnTopicEvent(TopicEventRequest request, ServerCallContext context) + { + _logger.LogInformation("OnTopicEvent called on topic {0}", request.Topic); + _logger.LogInformation("payload = " + request.Data.ToStringUtf8()); + + await ProcessTelemetryAsync(request, _daprClient); + + + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Success }; + } + + private async Task ProcessTelemetryAsync(TopicEventRequest message, DaprClient daprClient) + { + string deviceId; + + try + { + deviceId = RetrieveDeviceIdFromTopic(message.Topic); + } + catch (InvalidOperationException ex) + { + _logger.LogError(ex, "Unable to process message"); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop }; + } + + var deserializedMessage = DeserializeReceivedMessage(message.Data.ToByteArray()); + + if (deserializedMessage == null) + { + _logger.LogError("Unable to process message with body " + message.Data.ToStringUtf8()); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Drop }; + } + + var vesselTelemetryMessage = new DeviceTelemetry() + { + DeviceId = deviceId, + Tag = deserializedMessage.Tag, + Value = deserializedMessage.Value, + Timestamp = deserializedMessage.Timestamp + }; + + try + { + _logger.LogInformation("Publishing message ..."); + + await daprClient.PublishEventAsync( + "telemetrypubsub", + "devicetelemetry", + data: vesselTelemetryMessage); + + var toCloudMetaData = new Dictionary() + { + ["rawPayload"] = "true" + }; + + await daprClient.PublishEventAsync( + "telemetrypubsub", + "telemetry_tocloud", + data: vesselTelemetryMessage, + metadata: toCloudMetaData); + + _logger.LogInformation("Message published."); + + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Success }; + } + catch (Exception ex) + { + _logger.LogError(ex, "Unexpected error occurred"); + return new TopicEventResponse() { Status = TopicEventResponse.Types.TopicEventResponseStatus.Retry }; + } + } + + private static string RetrieveDeviceIdFromTopic(string topic) + { + var parts = topic.Split('/'); + + if (parts.Length != 3) + { + throw new InvalidOperationException("Message received on invalid topic"); + } + + return parts[1]; + } + + private static ReceivedMessage? DeserializeReceivedMessage(byte[] message) + { + Console.WriteLine("Received message: " + Encoding.UTF8.GetString(message)); + + var deserializedMessage = JsonSerializer.Deserialize(message); + + Console.WriteLine("Message deserialized: "); + Console.WriteLine($"Timestamp = {deserializedMessage.Timestamp}"); + Console.WriteLine($"Tag = {deserializedMessage.Tag}"); + Console.WriteLine($"Value = {deserializedMessage.Value}"); + + return deserializedMessage; + } + + private class ReceivedMessage + { + public DateTimeOffset Timestamp { get; set; } + public string Tag { get; set; } + public object Value { get; set; } + } + } +} diff --git a/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj new file mode 100644 index 0000000..279bbe8 --- /dev/null +++ b/samples/dapr-pubsub-dotnet/src/TelemetryProcessor/TelemetryTransformer/TelemetryTransformer.csproj @@ -0,0 +1,21 @@ + + + + Exe + net6.0 + enable + enable + + + + + + + + + + + + + +