Files
doc-exports/docs/dli/dev/dli_09_0065.html
Su, Xiaomeng 89b6bedc33 dli_dev_0104_version
Reviewed-by: Rechenburg, Matthias <matthias.rechenburg@t-systems.com>
Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
2024-01-08 15:25:35 +00:00

266 lines
30 KiB
HTML

<a name="dli_09_0065"></a><a name="dli_09_0065"></a>
<h1 class="topictitle1">Scala Example Code</h1>
<div id="body8662426"><div class="section" id="dli_09_0065__section1487551213016"><h4 class="sectiontitle">Development Description</h4><p id="dli_09_0065__en-us_topic_0190597601_p492312464537">The CloudTable OpenTSDB and MRS OpenTSDB can be connected to DLI as data sources.</p>
<ul id="dli_09_0065__ul1979617383303"><li id="dli_09_0065__li117961438183017">Prerequisites<p id="dli_09_0065__en-us_topic_0190597601_p1944354710257"><a name="dli_09_0065__li117961438183017"></a><a name="li117961438183017"></a>A datasource connection has been created on the DLI management console. </p>
<div class="note" id="dli_09_0065__note1358715714155"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0065__p692572617287">Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.</p>
</div></div>
</li><li id="dli_09_0065__li26155217311">Constructing dependency information and creating a Spark session<ol id="dli_09_0065__en-us_topic_0190597601_ol433013261137"><li id="dli_09_0065__en-us_topic_0190597601_li1825623917170">Import dependencies.<div class="p" id="dli_09_0065__en-us_topic_0190597601_p0101458105614"><a name="dli_09_0065__en-us_topic_0190597601_li1825623917170"></a><a name="en-us_topic_0190597601_li1825623917170"></a>Maven dependency involved<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen5760163172012"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4
5</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="o">&lt;</span><span class="n">dependency</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">groupId</span><span class="o">&gt;</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">&lt;/</span><span class="n">groupId</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">artifactId</span><span class="o">&gt;</span><span class="n">spark</span><span class="o">-</span><span class="n">sql_2</span><span class="o">.</span><span class="mi">11</span><span class="o">&lt;/</span><span class="n">artifactId</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">version</span><span class="o">&gt;</span><span class="mf">2.3</span><span class="o">.</span><span class="mi">2</span><span class="o">&lt;/</span><span class="n">version</span><span class="o">&gt;</span>
<span class="o">&lt;/</span><span class="n">dependency</span><span class="o">&gt;</span>
</pre></div>
</td></tr></table></div>
</div>
<div class="p" id="dli_09_0065__en-us_topic_0190597601_p9835524175614">Import dependency packages.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen539112516557"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">scala.collection.mutable</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Row</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.rdd.RDD</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span>
</pre></div>
</td></tr></table></div>
</div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li133002613132">Create a session.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen12232591413"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">sparkSession</span> <span class="k">=</span> <span class="nc">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">().</span><span class="n">getOrCreate</span><span class="o">()</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li973422614238">Create a table to connect to an OpenTSDB data source.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen15140130103119"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;create table opentsdb_test using opentsdb options(</span>
<span class="s"> 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242', </span>
<span class="s"> 'metric'='ctopentsdb',</span>
<span class="s"> 'tags'='city,location')&quot;</span><span class="o">)</span>
</pre></div>
</td></tr></table></div>
<div class="tablenoborder"><a name="dli_09_0065__en-us_topic_0190597601_table463015581831"></a><a name="en-us_topic_0190597601_table463015581831"></a><table cellpadding="4" cellspacing="0" summary="" id="dli_09_0065__en-us_topic_0190597601_table463015581831" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Parameters for creating a table</caption><thead align="left"><tr id="dli_09_0065__en-us_topic_0190597601_row66299581732"><th align="left" class="cellrowborder" valign="top" width="20.61%" id="mcps1.3.1.3.2.1.3.2.2.3.1.1"><p id="dli_09_0065__en-us_topic_0190597601_p4629205820312">Parameter</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="79.39%" id="mcps1.3.1.3.2.1.3.2.2.3.1.2"><p id="dli_09_0065__en-us_topic_0190597601_p3629105812311">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_09_0065__en-us_topic_0190597601_row46303582037"><td class="cellrowborder" valign="top" width="20.61%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.1 "><p id="dli_09_0065__en-us_topic_0190597601_p156291581539">host</p>
</td>
<td class="cellrowborder" valign="top" width="79.39%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.2 "><p id="dli_09_0065__en-us_topic_0190597601_p362912583317">OpenTSDB IP address.</p>
<ul id="dli_09_0065__en-us_topic_0190597601_ul4418253112819"><li id="dli_09_0065__en-us_topic_0190597601_li941895310283">To access CloudTable OpenTSDB, specify the OpenTSDB connection address. You can log in to the CloudTable console, choose <strong id="dli_09_0065__b19454488474">Cluster Mode</strong> and click the target cluster name, and obtain the OpenTSDB connection address from the cluster information.</li><li id="dli_09_0065__en-us_topic_0190597601_li172141657102819">You can also access the MRS OpenTSDB. If you have created an enhanced datasource connection, enter the IP address and port number of the node where the OpenTSDB is located. The format is <strong id="dli_09_0065__b1350932265111">IP:PORT</strong>. If the OpenTSDB has multiple nodes, separate their IP addresses by semicolons (;). </li></ul>
</td>
</tr>
<tr id="dli_09_0065__en-us_topic_0190597601_row46301958432"><td class="cellrowborder" valign="top" width="20.61%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.1 "><p id="dli_09_0065__en-us_topic_0190597601_p10630158730">metric</p>
</td>
<td class="cellrowborder" valign="top" width="79.39%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.2 "><p id="dli_09_0065__en-us_topic_0190597601_p4630258134">Name of the metric in OpenTSDB corresponding to the DLI table to be created.</p>
</td>
</tr>
<tr id="dli_09_0065__en-us_topic_0190597601_row1663011587314"><td class="cellrowborder" valign="top" width="20.61%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.1 "><p id="dli_09_0065__en-us_topic_0190597601_p263025820311">tags</p>
</td>
<td class="cellrowborder" valign="top" width="79.39%" headers="mcps1.3.1.3.2.1.3.2.2.3.1.2 "><p id="dli_09_0065__en-us_topic_0190597601_p763011582314">Tags corresponding to the metric, used for operations such as classification, filtering, and quick search. A maximum of 8 tags, including all <strong id="dli_09_0065__en-us_topic_0190597601_b109141852521">tagk</strong> values under the metric, can be added and are separated by commas (,).</p>
</td>
</tr>
</tbody>
</table>
</div>
</li></ol>
</li><li id="dli_09_0065__li385700326">Connecting to data sources through SQL APIs<ol id="dli_09_0065__en-us_topic_0190597601_ol1775910275118"><li id="dli_09_0065__en-us_topic_0190597601_li18759142205116">Insert data.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen66292184333"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)&quot;</span><span class="o">)</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li10243120145111">Query data.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen8335528349"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;select * from opentsdb_test&quot;</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
</pre></div>
</td></tr></table></div>
<p id="dli_09_0065__en-us_topic_0190597601_p980612423518">Response</p>
<p id="dli_09_0065__en-us_topic_0190597601_p1111911912352"><span><img id="dli_09_0065__en-us_topic_0190597601_image155658561726" src="en-us_image_0223997429.png"></span></p>
</li></ol>
</li><li id="dli_09_0065__li1085417208326">Connecting to data sources through DataFrame APIs<ol id="dli_09_0065__en-us_topic_0190597601_ol19161120205217"><li id="dli_09_0065__en-us_topic_0190597601_li159162206521">Construct a schema.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen1757244585212"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4
5</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">attrTag1Location</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;location&quot;</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrTag2Name</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrTimestamp</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;timestamp&quot;</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrValue</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;value&quot;</span><span class="o">,</span> <span class="nc">DoubleType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrs</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="n">attrTag1Location</span><span class="o">,</span> <span class="n">attrTag2Name</span><span class="o">,</span> <span class="n">attrTimestamp</span><span class="o">,</span> <span class="n">attrValue</span><span class="o">)</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li979715115537">Construct data based on the schema type.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen764613543101"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">mutableRow</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Any</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="s">&quot;aaa&quot;</span><span class="o">,</span> <span class="s">&quot;abc&quot;</span><span class="o">,</span> <span class="mi">123456L</span><span class="o">,</span> <span class="mf">30.0</span><span class="o">)</span>
<span class="k">val</span> <span class="n">rddData</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Row</span><span class="o">]</span> <span class="k">=</span> <span class="n">sparkSession</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="nc">Array</span><span class="o">(</span><span class="nc">Row</span><span class="o">.</span><span class="n">fromSeq</span><span class="o">(</span><span class="n">mutableRow</span><span class="o">)),</span> <span class="mi">1</span><span class="o">)</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li6847933185312">Import data to OpenTSDB.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen3727448564"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="n">sparkSession</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rddData</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StructType</span><span class="o">(</span><span class="n">attrs</span><span class="o">)).</span><span class="n">write</span><span class="o">.</span><span class="n">insertInto</span><span class="o">(</span><span class="s">&quot;opentsdb_test&quot;</span><span class="o">)</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__en-us_topic_0190597601_li135821518544">Read data from OpenTSDB.<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597601_screen5984155015578"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4
5</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">val</span> <span class="n">map</span> <span class="k">=</span> <span class="k">new</span> <span class="n">mutable</span><span class="o">.</span><span class="nc">HashMap</span><span class="o">[</span><span class="kt">String</span><span class="p">,</span> <span class="kt">String</span><span class="o">]()</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;metric&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;ctopentsdb&quot;</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;tags&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;city,location&quot;</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;Host&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;opentsdb-3xcl8dir15m58z3.cloudtable.com:4242&quot;</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">&quot;opentsdb&quot;</span><span class="o">).</span><span class="n">options</span><span class="o">(</span><span class="n">map</span><span class="o">.</span><span class="n">toMap</span><span class="o">).</span><span class="n">load</span><span class="o">().</span><span class="n">show</span><span class="o">()</span>
</pre></div>
</td></tr></table></div>
<p id="dli_09_0065__en-us_topic_0190597601_p175631523185414">Response</p>
<p id="dli_09_0065__en-us_topic_0190597601_p12563223125419"><span><img id="dli_09_0065__en-us_topic_0190597601_image66313435445" src="en-us_image_0223997490.png"></span></p>
</li></ol>
</li><li id="dli_09_0065__li555164943210">Submitting a Spark job<ol id="dli_09_0065__ol1192401417332"><li id="dli_09_0065__li1692416144334">Generate a JAR package based on the code and upload the package to DLI.<p id="dli_09_0065__dli_09_0063_p1749619513385"><a name="dli_09_0065__li1692416144334"></a><a name="li1692416144334"></a></p>
<p id="dli_09_0065__dli_09_0063_p114961151385"></p>
</li><li id="dli_09_0065__li88617207166">In the Spark job editor, select the corresponding dependency module and execute the Spark job.<p id="dli_09_0065__p9497103318162"><a name="dli_09_0065__li88617207166"></a><a name="li88617207166"></a></p>
<div class="p" id="dli_09_0065__p25861721101614"><div class="note" id="dli_09_0065__note1580312115349"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><ul id="dli_09_0065__ul4803121183410"><li id="dli_09_0065__en-us_topic_0197738142_li58215295819">If the Spark version is 2.3.2 (will be offline soon) or 2.4.5, specify the <strong id="dli_09_0065__b87256462576">Module</strong> to <strong id="dli_09_0065__b472534615717">sys.datasource.opentsdb</strong> when you submit a job.</li><li id="dli_09_0065__li6624653171317">If the Spark version is 3.1.1, you do not need to select a module. Configure <strong id="dli_09_0065__b19884720589">Spark parameters (--conf)</strong>.<p id="dli_09_0065__p1723617371259">spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*</p>
<p id="dli_09_0065__p6236153714259">spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/opentsdb/*</p>
</li></ul>
</div></div>
</div>
</li></ol>
</li></ul>
</div>
<div class="section" id="dli_09_0065__section18620114012348"><h4 class="sectiontitle">Complete Example Code</h4><ul id="dli_09_0065__ul1994601919475"><li id="dli_09_0065__li69463192478">Maven dependency<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597500_screen131251919174915"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
2
3
4
5</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="o">&lt;</span><span class="n">dependency</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">groupId</span><span class="o">&gt;</span><span class="n">org</span><span class="o">.</span><span class="n">apache</span><span class="o">.</span><span class="n">spark</span><span class="o">&lt;/</span><span class="n">groupId</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">artifactId</span><span class="o">&gt;</span><span class="n">spark</span><span class="o">-</span><span class="n">sql_2</span><span class="o">.</span><span class="mi">11</span><span class="o">&lt;/</span><span class="n">artifactId</span><span class="o">&gt;</span>
<span class="o">&lt;</span><span class="n">version</span><span class="o">&gt;</span><span class="mf">2.3</span><span class="o">.</span><span class="mi">2</span><span class="o">&lt;/</span><span class="n">version</span><span class="o">&gt;</span>
<span class="o">&lt;/</span><span class="n">dependency</span><span class="o">&gt;</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__li01647468475">Connecting to data sources through SQL APIs<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597500_screen144461426184015"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">org.apache.spark.sql.SparkSession</span>
<span class="k">object</span> <span class="nc">Test_OpenTSDB_CT</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="c1">// Create a SparkSession session.</span>
<span class="k">val</span> <span class="n">sparkSession</span> <span class="k">=</span> <span class="nc">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">().</span><span class="n">getOrCreate</span><span class="o">()</span>
<span class="c1">// Create a data table for DLI association OpenTSDB</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;create table opentsdb_test using opentsdb options(</span>
<span class="s"> 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',</span>
<span class="s"> 'metric'='ctopentsdb',</span>
<span class="s"> 'tags'='city,location')&quot;</span><span class="o">)</span>
<span class="c1">//*****************************SQL module***********************************</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;insert into opentsdb_test values('futian', 'abc', '1970-01-02 18:17:36', 30.0)&quot;</span><span class="o">)</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;select * from opentsdb_test&quot;</span><span class="o">).</span><span class="n">show</span><span class="o">()</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</td></tr></table></div>
</li><li id="dli_09_0065__li109641210144812">Connecting to data sources through DataFrame APIs<div class="codecoloring" codetype="Scala" id="dli_09_0065__en-us_topic_0190597500_screen18873163294917"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre> 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41</pre></div></td><td class="code"><div class="highlight"><pre><span></span><span class="k">import</span> <span class="nn">scala.collection.mutable</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.</span><span class="o">{</span><span class="nc">Row</span><span class="o">,</span> <span class="nc">SparkSession</span><span class="o">}</span>
<span class="k">import</span> <span class="nn">org.apache.spark.rdd.RDD</span>
<span class="k">import</span> <span class="nn">org.apache.spark.sql.types._</span>
<span class="k">object</span> <span class="nc">Test_OpenTSDB_CT</span> <span class="o">{</span>
<span class="k">def</span> <span class="n">main</span><span class="o">(</span><span class="n">args</span><span class="k">:</span> <span class="kt">Array</span><span class="o">[</span><span class="kt">String</span><span class="o">])</span><span class="k">:</span> <span class="kt">Unit</span> <span class="o">=</span> <span class="o">{</span>
<span class="c1">// Create a SparkSession session.</span>
<span class="k">val</span> <span class="n">sparkSession</span> <span class="k">=</span> <span class="nc">SparkSession</span><span class="o">.</span><span class="n">builder</span><span class="o">().</span><span class="n">getOrCreate</span><span class="o">()</span>
<span class="c1">// Create a data table for DLI association OpenTSDB</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">sql</span><span class="o">(</span><span class="s">&quot;create table opentsdb_test using opentsdb options(</span>
<span class="s"> 'Host'='opentsdb-3xcl8dir15m58z3.cloudtable.com:4242',</span>
<span class="s"> 'metric'='ctopentsdb',</span>
<span class="s"> 'tags'='city,location')&quot;</span><span class="o">)</span>
<span class="c1">//*****************************DataFrame model***********************************</span>
<span class="c1">// Setting schema</span>
<span class="k">val</span> <span class="n">attrTag1Location</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;location&quot;</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrTag2Name</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;name&quot;</span><span class="o">,</span> <span class="nc">StringType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrTimestamp</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;timestamp&quot;</span><span class="o">,</span> <span class="nc">LongType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrValue</span> <span class="k">=</span> <span class="k">new</span> <span class="nc">StructField</span><span class="o">(</span><span class="s">&quot;value&quot;</span><span class="o">,</span> <span class="nc">DoubleType</span><span class="o">)</span>
<span class="k">val</span> <span class="n">attrs</span> <span class="k">=</span> <span class="nc">Array</span><span class="o">(</span><span class="n">attrTag1Location</span><span class="o">,</span> <span class="n">attrTag2Name</span><span class="o">,</span> <span class="n">attrTimestamp</span><span class="o">,</span><span class="n">attrValue</span><span class="o">)</span>
<span class="c1">// Populate data according to the type of schema</span>
<span class="k">val</span> <span class="n">mutableRow</span><span class="k">:</span> <span class="kt">Seq</span><span class="o">[</span><span class="kt">Any</span><span class="o">]</span> <span class="k">=</span> <span class="nc">Seq</span><span class="o">(</span><span class="s">&quot;aaa&quot;</span><span class="o">,</span> <span class="s">&quot;abc&quot;</span><span class="o">,</span> <span class="mi">123456L</span><span class="o">,</span> <span class="mf">30.0</span><span class="o">)</span>
<span class="k">val</span> <span class="n">rddData</span><span class="k">:</span> <span class="kt">RDD</span><span class="o">[</span><span class="kt">Row</span><span class="o">]</span> <span class="k">=</span> <span class="n">sparkSession</span><span class="o">.</span><span class="n">sparkContext</span><span class="o">.</span><span class="n">parallelize</span><span class="o">(</span><span class="nc">Array</span><span class="o">(</span><span class="nc">Row</span><span class="o">.</span><span class="n">fromSeq</span><span class="o">(</span><span class="n">mutableRow</span><span class="o">)),</span> <span class="mi">1</span><span class="o">)</span>
<span class="c1">//Import the constructed data into OpenTSDB</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">createDataFrame</span><span class="o">(</span><span class="n">rddData</span><span class="o">,</span> <span class="k">new</span> <span class="nc">StructType</span><span class="o">(</span><span class="n">attrs</span><span class="o">)).</span><span class="n">write</span><span class="o">.</span><span class="n">insertInto</span><span class="o">(</span><span class="s">&quot;opentsdb_test&quot;</span><span class="o">)</span>
<span class="c1">//Read data on OpenTSDB</span>
<span class="k">val</span> <span class="n">map</span> <span class="k">=</span> <span class="k">new</span> <span class="n">mutable</span><span class="o">.</span><span class="nc">HashMap</span><span class="o">[</span><span class="kt">String</span><span class="p">,</span> <span class="kt">String</span><span class="o">]()</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;metric&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;ctopentsdb&quot;</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;tags&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;city,location&quot;</span>
<span class="n">map</span><span class="o">(</span><span class="s">&quot;Host&quot;</span><span class="o">)</span> <span class="k">=</span> <span class="s">&quot;opentsdb-3xcl8dir15m58z3.cloudtable.com:4242&quot;</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">read</span><span class="o">.</span><span class="n">format</span><span class="o">(</span><span class="s">&quot;opentsdb&quot;</span><span class="o">).</span><span class="n">options</span><span class="o">(</span><span class="n">map</span><span class="o">.</span><span class="n">toMap</span><span class="o">).</span><span class="n">load</span><span class="o">().</span><span class="n">show</span><span class="o">()</span>
<span class="n">sparkSession</span><span class="o">.</span><span class="n">close</span><span class="o">()</span>
<span class="o">}</span>
<span class="o">}</span>
</pre></div>
</td></tr></table></div>
</li></ul>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_09_0080.html">Connecting to OpenTSDB</a></div>
</div>
</div>